"""Scenario and What-If analysis for LumiX models."""
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
from typing_extensions import Self
from ..core.constraints import LXConstraint
from ..core.model import LXModel
from ..core.variables import LXVariable
from ..solution.solution import LXSolution
from ..solvers.base import LXOptimizer
if TYPE_CHECKING:
from ..visualization.scenario import LXScenarioCompare
TModel = TypeVar("TModel")
[docs]
@dataclass
class LXScenarioModification:
"""
Represents a single modification to a model parameter.
Examples:
Increase capacity by 20%::
LXScenarioModification(
target_type="constraint",
target_name="capacity",
modification_type="rhs_multiply",
value=1.2
)
Set minimum production to 100::
LXScenarioModification(
target_type="constraint",
target_name="min_production",
modification_type="rhs_set",
value=100.0
)
"""
target_type: str # "constraint", "variable", "objective"
target_name: str # Name of the constraint/variable
modification_type: str # "rhs_set", "rhs_add", "rhs_multiply", "bound_set", "coeff_multiply"
value: Union[float, Callable[[Any], float]]
description: str = ""
[docs]
@dataclass
class LXScenario(Generic[TModel]):
"""
Type-safe scenario definition for what-if analysis.
A scenario represents a set of modifications to a base model that allow
you to explore different business conditions or assumptions.
Examples:
Create a high-capacity scenario::
high_capacity = (
LXScenario[Product]("high_capacity")
.modify_constraint_rhs("capacity", multiply=1.5)
.describe("Increase capacity by 50%")
)
Create a low-cost scenario::
low_cost = (
LXScenario[Product]("low_cost")
.modify_constraint_rhs("min_production", set_value=50.0)
.modify_variable_bound("production", lower=10.0)
.describe("Lower minimum production requirements")
)
Create a combined scenario::
optimistic = (
LXScenario[Product]("optimistic")
.modify_constraint_rhs("capacity", multiply=1.3)
.modify_constraint_rhs("budget", add=10000.0)
.describe("Optimistic market conditions")
)
"""
name: str
modifications: List[LXScenarioModification] = field(default_factory=list)
description: str = ""
[docs]
def describe(self, description: str) -> Self:
"""
Add description to scenario.
Args:
description: Human-readable description
Returns:
Self for chaining
"""
self.description = description
return self
[docs]
def modify_constraint_rhs(
self,
constraint_name: str,
set_value: Optional[float] = None,
add: Optional[float] = None,
multiply: Optional[float] = None,
description: str = "",
) -> Self:
"""
Modify constraint right-hand side.
Args:
constraint_name: Name of constraint to modify
set_value: Set RHS to this value
add: Add this value to RHS
multiply: Multiply RHS by this factor
description: Description of modification
Returns:
Self for chaining
Examples:
# Set capacity to 1000
scenario.modify_constraint_rhs("capacity", set_value=1000.0)
# Increase capacity by 200
scenario.modify_constraint_rhs("capacity", add=200.0)
# Increase capacity by 50%
scenario.modify_constraint_rhs("capacity", multiply=1.5)
"""
if set_value is not None:
mod = LXScenarioModification(
target_type="constraint",
target_name=constraint_name,
modification_type="rhs_set",
value=set_value,
description=description or f"Set {constraint_name} RHS to {set_value}",
)
elif add is not None:
mod = LXScenarioModification(
target_type="constraint",
target_name=constraint_name,
modification_type="rhs_add",
value=add,
description=description or f"Add {add} to {constraint_name} RHS",
)
elif multiply is not None:
mod = LXScenarioModification(
target_type="constraint",
target_name=constraint_name,
modification_type="rhs_multiply",
value=multiply,
description=description or f"Multiply {constraint_name} RHS by {multiply}",
)
else:
raise ValueError("Must specify one of: set_value, add, or multiply")
self.modifications.append(mod)
return self
[docs]
def modify_variable_bound(
self,
variable_name: str,
lower: Optional[float] = None,
upper: Optional[float] = None,
description: str = "",
) -> Self:
"""
Modify variable bounds.
Args:
variable_name: Name of variable to modify
lower: New lower bound
upper: New upper bound
description: Description of modification
Returns:
Self for chaining
Examples:
# Set lower bound
scenario.modify_variable_bound("production", lower=100.0)
# Set both bounds
scenario.modify_variable_bound("inventory", lower=50.0, upper=500.0)
"""
if lower is not None:
mod = LXScenarioModification(
target_type="variable",
target_name=variable_name,
modification_type="bound_lower",
value=lower,
description=description or f"Set {variable_name} lower bound to {lower}",
)
self.modifications.append(mod)
if upper is not None:
mod = LXScenarioModification(
target_type="variable",
target_name=variable_name,
modification_type="bound_upper",
value=upper,
description=description or f"Set {variable_name} upper bound to {upper}",
)
self.modifications.append(mod)
return self
[docs]
def add_custom_modification(self, modification: LXScenarioModification) -> Self:
"""
Add custom modification.
Args:
modification: Custom modification to add
Returns:
Self for chaining
"""
self.modifications.append(modification)
return self
[docs]
class LXScenarioAnalyzer(Generic[TModel]):
"""
Scenario analysis for optimization models.
Allows running multiple what-if scenarios on a base model and comparing
the results to understand how different assumptions affect outcomes.
Examples:
Create analyzer and add scenarios::
analyzer = LXScenarioAnalyzer(base_model, optimizer)
# Add scenarios
analyzer.add_scenario(
LXScenario[Product]("high_capacity")
.modify_constraint_rhs("capacity", multiply=1.5)
)
analyzer.add_scenario(
LXScenario[Product]("low_capacity")
.modify_constraint_rhs("capacity", multiply=0.8)
)
# Run all scenarios
results = analyzer.run_all_scenarios()
# Compare results
print(analyzer.compare_scenarios())
# Get specific result
high_cap_solution = analyzer.get_result("high_capacity")
"""
[docs]
def __init__(
self,
base_model: LXModel[TModel],
optimizer: LXOptimizer[TModel],
include_baseline: bool = True,
):
"""
Initialize scenario analyzer.
Args:
base_model: Base optimization model
optimizer: Optimizer to use for solving scenarios
include_baseline: Whether to include baseline (unmodified) scenario
"""
self.base_model = base_model
self.optimizer = optimizer
self.include_baseline = include_baseline
self.scenarios: Dict[str, LXScenario[TModel]] = {}
self.results: Dict[str, LXSolution[TModel]] = {}
[docs]
def add_scenario(self, scenario: LXScenario[TModel]) -> Self:
"""
Add scenario to analyze.
Args:
scenario: Scenario to add
Returns:
Self for chaining
"""
self.scenarios[scenario.name] = scenario
return self
[docs]
def add_scenarios(self, *scenarios: LXScenario[TModel]) -> Self:
"""
Add multiple scenarios.
Args:
*scenarios: Scenarios to add
Returns:
Self for chaining
"""
for scenario in scenarios:
self.scenarios[scenario.name] = scenario
return self
[docs]
def run_scenario(self, scenario_name: str) -> LXSolution[TModel]:
"""
Run single scenario.
Args:
scenario_name: Name of scenario to run
Returns:
Solution for the scenario
Raises:
KeyError: If scenario not found
"""
if scenario_name not in self.scenarios:
raise KeyError(f"Scenario '{scenario_name}' not found")
scenario = self.scenarios[scenario_name]
modified_model = self._apply_scenario(scenario)
solution = self.optimizer.solve(modified_model)
self.results[scenario_name] = solution
return solution
[docs]
def run_all_scenarios(self, include_baseline: Optional[bool] = None) -> Dict[str, LXSolution[TModel]]:
"""
Run all scenarios.
Args:
include_baseline: Override include_baseline setting
Returns:
Dictionary mapping scenario names to solutions
"""
include_baseline = include_baseline if include_baseline is not None else self.include_baseline
# Run baseline if requested
if include_baseline:
baseline_solution = self.optimizer.solve(self.base_model)
self.results["baseline"] = baseline_solution
# Run all scenarios
for scenario_name in self.scenarios:
self.run_scenario(scenario_name)
return self.results
[docs]
def get_result(self, scenario_name: str) -> Optional[LXSolution[TModel]]:
"""
Get result for specific scenario.
Args:
scenario_name: Name of scenario
Returns:
Solution if available, None otherwise
"""
return self.results.get(scenario_name)
[docs]
def compare_scenarios(
self,
scenario_names: Optional[List[str]] = None,
include_baseline: bool = True,
sort_by_objective: bool = True,
) -> str:
"""
Compare scenario results.
Args:
scenario_names: Scenarios to compare (None = all)
include_baseline: Include baseline in comparison
sort_by_objective: Sort results by objective value
Returns:
Formatted comparison report
"""
if not self.results:
return "No results available. Run scenarios first."
# Determine which scenarios to include
if scenario_names is None:
compare_names = list(self.results.keys())
else:
compare_names = [name for name in scenario_names if name in self.results]
if not include_baseline and "baseline" in compare_names:
compare_names.remove("baseline")
if not compare_names:
return "No scenarios to compare."
# Sort by objective if requested
if sort_by_objective:
compare_names.sort(key=lambda name: self.results[name].objective_value, reverse=True)
# Build comparison report
lines = ["Scenario Comparison", "=" * 80]
# Add baseline reference if available
if "baseline" in self.results:
baseline_obj = self.results["baseline"].objective_value
lines.append(f"Baseline Objective: {baseline_obj:,.2f}")
lines.append("")
# Add scenario results
lines.append(f"{'Scenario':<30s} {'Objective':>15s} {'Status':>12s} {'vs Baseline':>15s}")
lines.append("-" * 80)
for name in compare_names:
solution = self.results[name]
obj_str = f"{solution.objective_value:,.2f}"
status_str = solution.status
# Calculate difference from baseline
if "baseline" in self.results and name != "baseline":
baseline_obj = self.results["baseline"].objective_value
if baseline_obj != 0:
diff_pct = ((solution.objective_value - baseline_obj) / baseline_obj) * 100
diff_str = f"{diff_pct:+.2f}%"
else:
diff_str = "N/A"
else:
diff_str = "-"
lines.append(f"{name:<30s} {obj_str:>15s} {status_str:>12s} {diff_str:>15s}")
# Add scenario descriptions
lines.append("")
lines.append("Scenario Descriptions:")
lines.append("-" * 80)
for name in compare_names:
if name == "baseline":
lines.append(f" baseline: Original model without modifications")
elif name in self.scenarios:
scenario = self.scenarios[name]
desc = scenario.description or "No description"
lines.append(f" {name}: {desc}")
for mod in scenario.modifications:
if mod.description:
lines.append(f" • {mod.description}")
return "\n".join(lines)
[docs]
def get_best_scenario(self, maximize: bool = True) -> Optional[str]:
"""
Get name of best scenario by objective value.
Args:
maximize: True if objective is maximization, False for minimization
Returns:
Name of best scenario, or None if no results
"""
if not self.results:
return None
if maximize:
return max(self.results.items(), key=lambda x: x[1].objective_value)[0]
else:
return min(self.results.items(), key=lambda x: x[1].objective_value)[0]
[docs]
def sensitivity_to_parameter(
self,
parameter_name: str,
values: List[float],
modification_type: str = "rhs_multiply",
target_type: str = "constraint",
) -> Dict[float, LXSolution[TModel]]:
"""
Analyze sensitivity to a single parameter across multiple values.
Args:
parameter_name: Name of parameter to vary
values: List of values to test
modification_type: Type of modification
target_type: Type of target ("constraint" or "variable")
Returns:
Dictionary mapping parameter values to solutions
Examples:
Analyze sensitivity to capacity multiplier::
results = analyzer.sensitivity_to_parameter(
"capacity",
values=[0.8, 0.9, 1.0, 1.1, 1.2, 1.3],
modification_type="rhs_multiply"
)
for multiplier, solution in results.items():
print(f"Capacity × {multiplier}: ${solution.objective_value:,.2f}")
"""
sensitivity_results: Dict[float, LXSolution[TModel]] = {}
for value in values:
# Create temporary scenario
scenario = LXScenario[TModel](f"{parameter_name}_{value}")
mod = LXScenarioModification(
target_type=target_type,
target_name=parameter_name,
modification_type=modification_type,
value=value,
)
scenario.modifications.append(mod)
# Apply and solve
modified_model = self._apply_scenario(scenario)
solution = self.optimizer.solve(modified_model)
sensitivity_results[value] = solution
return sensitivity_results
def _apply_scenario(self, scenario: LXScenario[TModel]) -> LXModel[TModel]:
"""
Apply scenario modifications to create modified model.
Args:
scenario: Scenario to apply
Returns:
Modified model
"""
# Deep copy the model
modified_model = self._clone_model(self.base_model)
# Apply each modification
for mod in scenario.modifications:
if mod.target_type == "constraint":
self._apply_constraint_modification(modified_model, mod)
elif mod.target_type == "variable":
self._apply_variable_modification(modified_model, mod)
elif mod.target_type == "objective":
# Future: objective modifications
pass
return modified_model
def _clone_model(self, model: LXModel[TModel]) -> LXModel[TModel]:
"""
Clone a model (deep copy).
Args:
model: Model to clone
Returns:
Cloned model
"""
# Deep copy is safest approach
return deepcopy(model)
def _apply_constraint_modification(
self,
model: LXModel[TModel],
mod: LXScenarioModification,
) -> None:
"""
Apply constraint modification to model.
Args:
model: Model to modify
mod: Modification to apply
"""
constraint = model.get_constraint(mod.target_name)
if constraint is None:
raise ValueError(f"Constraint '{mod.target_name}' not found in model")
if mod.modification_type == "rhs_set":
constraint.rhs_value = float(mod.value) # type: ignore
elif mod.modification_type == "rhs_add":
if constraint.rhs_value is not None:
constraint.rhs_value += float(mod.value) # type: ignore
elif mod.modification_type == "rhs_multiply":
if constraint.rhs_value is not None:
constraint.rhs_value *= float(mod.value) # type: ignore
def _apply_variable_modification(
self,
model: LXModel[TModel],
mod: LXScenarioModification,
) -> None:
"""
Apply variable modification to model.
Args:
model: Model to modify
mod: Modification to apply
"""
variable = model.get_variable(mod.target_name)
if variable is None:
raise ValueError(f"Variable '{mod.target_name}' not found in model")
if mod.modification_type == "bound_lower":
variable.lower_bound = float(mod.value) # type: ignore
elif mod.modification_type == "bound_upper":
variable.upper_bound = float(mod.value) # type: ignore
[docs]
def visualize(self) -> "LXScenarioCompare[TModel]":
"""
Create interactive visualization for scenario comparison.
Requires the visualization extra: pip install lumix-opt[viz]
Returns:
LXScenarioCompare instance
Examples:
Basic usage::
analyzer = LXScenarioAnalyzer(model, optimizer)
analyzer.add_scenario(scenario1)
analyzer.add_scenario(scenario2)
analyzer.run_all_scenarios()
analyzer.visualize().show()
Comparison chart::
analyzer.visualize().plot_comparison().show()
Export to HTML::
analyzer.visualize().to_html("scenarios.html")
"""
from ..visualization import LXScenarioCompare
return LXScenarioCompare(self)
__all__ = [
"LXScenario",
"LXScenarioModification",
"LXScenarioAnalyzer",
]