"""OR-Tools CP-SAT solver implementation for LumiX."""
from __future__ import annotations
import time
from typing import Any, Dict, List, Optional, Tuple, Union
try:
from ortools.sat.python import cp_model
except ImportError:
cp_model = None # type: ignore
from ..core.constraints import LXConstraint
from ..core.enums import LXConstraintSense, LXObjectiveSense, LXVarType
from ..core.expressions import LXLinearExpression
from ..core.model import LXModel
from ..core.variables import LXVariable
from ..solution.solution import LXSolution
from ..utils.rational import LXRationalConverter
from .base import LXSolverInterface
from .capabilities import CPSAT_CAPABILITIES
[docs]
class LXCPSATSolver(LXSolverInterface):
"""
OR-Tools CP-SAT (Constraint Programming - Satisfiability) solver implementation for LumiX.
CP-SAT is a constraint programming solver optimized for:
- Scheduling problems
- Routing problems (TSP, VRP)
- Combinatorial optimization
- Integer and boolean decision problems
IMPORTANT: CP-SAT only supports INTEGER and BINARY variables (no continuous variables).
Float coefficients are automatically converted to integers using rational approximation.
Supports:
- Integer variables (bounded or unbounded)
- Binary (boolean) variables
- Linear constraints (≤, ≥, ==)
- Single and indexed variable families
- Multi-model expressions
- Automatic float-to-rational conversion
TODO: Future CP-SAT-specific features:
- AllDifferent constraints for scheduling
- Circuit constraints for routing/TSP
- Table constraints for allowed combinations
- Interval variables for scheduling
- NoOverlap constraints for resource scheduling
- Cumulative constraints for resource capacity
- Boolean logic constraints (OR, AND, XOR, implications)
- Solution pool for multiple optimal solutions
- Warm start via solution hints
- Custom search strategies
- Symmetry breaking
"""
[docs]
def __init__(
self,
enable_rational_conversion: bool = False,
rational_max_denom: int = 10000,
scale_objective: bool = True,
scaling_factor: int = 1000,
) -> None:
"""
Initialize CP-SAT solver.
Args:
enable_rational_conversion: Enable rational conversion and auto-scaling for continuous variables
rational_max_denom: Maximum denominator for float-to-rational conversion
scale_objective: Whether to scale objective coefficients to integers
scaling_factor: Scaling factor for continuous variables (default: 1000)
Higher values = more precision but risk integer overflow
"""
super().__init__(CPSAT_CAPABILITIES)
if cp_model is None:
raise ImportError(
"OR-Tools is not installed. "
"Install it with: pip install ortools"
)
# Internal state
self._model: Optional[cp_model.CpModel] = None
self._variable_map: Dict[str, Union[Any, Dict[Any, Any]]] = {}
self._constraint_list: List[Any] = []
self._rational_converter = LXRationalConverter(max_denominator=rational_max_denom)
self._scale_objective = scale_objective
self._objective_scale: int = 1
# Rational conversion enables auto-scaling for continuous variables
self._enable_rational_conversion = enable_rational_conversion
self._scaling_factor = scaling_factor
self._scaled_variables: set = set() # Names of variables that were scaled
self._variable_scales: Dict[str, int] = {} # Variable name -> scale factor
self._objective_has_scaled_vars: bool = False # Track if objective needs de-scaling
self._objective_coeff_scale: int = 1 # Coefficient scale from rational conversion
[docs]
def build_model(self, model: LXModel) -> cp_model.CpModel:
"""
Build CP-SAT native model from LXModel.
Args:
model: LumiX model to build
Returns:
CP-SAT CpModel instance
Raises:
ValueError: If model contains continuous variables or unsupported features
"""
# Check for continuous variables (not supported by CP-SAT)
continuous_vars = [
var.name for var in model.variables
if var.var_type == LXVarType.CONTINUOUS
]
if continuous_vars:
if not self._enable_rational_conversion:
raise ValueError(
f"CP-SAT does not support continuous variables. "
f"Found continuous variables: {', '.join(continuous_vars)}.\n"
f"Options:\n"
f" 1. Use integer/binary variables only\n"
f" 2. Switch to 'ortools', 'gurobi', or 'cplex' solvers\n"
f" 3. Enable rational conversion: optimizer.enable_rational_conversion()\n"
f" This will scale continuous variables to integers and de-scale solutions."
)
else:
# Rational conversion enabled: warn and proceed with auto-scaling
self._log_scaling_warning(continuous_vars)
# Track which variables will be scaled
self._scaled_variables = set(continuous_vars)
for var_name in continuous_vars:
self._variable_scales[var_name] = self._scaling_factor
# Create CP-SAT model instance
self._model = cp_model.CpModel()
# Reset internal state
self._variable_map = {}
self._constraint_list = []
self._objective_scale = 1
# Build variables
for lx_var in model.variables:
instances = lx_var.get_instances()
if not instances:
# Single variable (not indexed)
self._create_single_variable(lx_var)
else:
# Variable family (indexed by data)
self._create_indexed_variables(lx_var, instances)
# Build constraints
for lx_constraint in model.constraints:
instances = lx_constraint.get_instances()
if not instances:
# Single constraint
self._create_single_constraint(lx_constraint)
else:
# Constraint family (indexed by data)
self._create_indexed_constraints(lx_constraint, instances)
# Set objective
self._set_objective(model)
return self._model
[docs]
def solve(
self,
model: LXModel,
time_limit: Optional[float] = None,
gap_tolerance: Optional[float] = None,
**solver_params: Any,
) -> LXSolution:
"""
Solve optimization model with CP-SAT.
Args:
model: LumiX model to solve
time_limit: Time limit in seconds (None = no limit)
gap_tolerance: Relative optimality gap tolerance (None = 0.0001)
**solver_params: Additional CP-SAT parameters
Examples:
- num_search_workers: Number of parallel workers (default: auto)
- log_search_progress: Enable search logging (default: False)
- max_time_in_seconds: Alternative to time_limit
- cp_model_presolve: Enable presolve (default: True)
- linearization_level: Linearization aggressiveness (0-2)
- cp_model_probing_level: Probing level (0-2)
Returns:
Solution object with results
TODO: Add support for additional features:
- Solution hints (warm start)
- Solution callbacks
- Assumption-based solving
- Multiple solutions enumeration
"""
# Build the model
cpsat_model = self.build_model(model)
# Create solver instance
solver = cp_model.CpSolver()
# Set time limit
if time_limit is not None:
solver.parameters.max_time_in_seconds = time_limit
# Set relative gap tolerance
if gap_tolerance is not None:
solver.parameters.relative_gap_limit = gap_tolerance
# Set additional solver parameters
for param_name, param_value in solver_params.items():
try:
if param_name == "num_search_workers":
solver.parameters.num_search_workers = param_value
elif param_name == "log_search_progress":
solver.parameters.log_search_progress = param_value
elif param_name == "max_time_in_seconds":
solver.parameters.max_time_in_seconds = param_value
elif param_name == "cp_model_presolve":
solver.parameters.cp_model_presolve = param_value
elif param_name == "linearization_level":
solver.parameters.linearization_level = param_value
elif param_name == "cp_model_probing_level":
solver.parameters.cp_model_probing_level = param_value
else:
# Try to set as generic parameter
setattr(solver.parameters, param_name, param_value)
except Exception as e:
self.logger.logger.warning(
f"Failed to set CP-SAT parameter '{param_name}': {e}"
)
# Solve
start_time = time.time()
status = solver.Solve(cpsat_model)
solve_time = time.time() - start_time
# Parse and return solution
solution = self._parse_solution(model, solver, status, solve_time)
return solution
[docs]
def get_solver_model(self) -> cp_model.CpModel:
"""
Get underlying CP-SAT model for advanced usage.
Returns:
CP-SAT CpModel instance
Raises:
RuntimeError: If model hasn't been built yet
Examples:
# Access CP-SAT model for advanced features
cpsat_model = solver.get_solver_model()
# Add custom CP-SAT constraints
cpsat_model.AddAllDifferent([var1, var2, var3])
"""
if self._model is None:
raise RuntimeError(
"Solver model not built yet. Call build_model() first."
)
return self._model
# ==================== PRIVATE HELPER METHODS ====================
def _log_scaling_warning(self, continuous_vars: List[str]) -> None:
"""Log warning about rational conversion auto-scaling."""
self.logger.logger.warning("")
self.logger.logger.warning("╔════════════════════════════════════════════════════════════╗")
self.logger.logger.warning("║ CP-SAT: RATIONAL CONVERSION (AUTO-SCALING ENABLED) ║")
self.logger.logger.warning("╠════════════════════════════════════════════════════════════╣")
self.logger.logger.warning(f"║ Variables scaled: {len(continuous_vars):2d} ║")
self.logger.logger.warning(f"║ Scale factor: {self._scaling_factor:5d} ║")
self.logger.logger.warning(f"║ Rational max denom: {self._rational_converter.max_denominator:5d} ║")
self.logger.logger.warning("║ ║")
self.logger.logger.warning("║ Continuous variables will be converted to integers by ║")
self.logger.logger.warning("║ multiplying by the scale factor. Solution values will be ║")
self.logger.logger.warning("║ automatically de-scaled back to continuous domain. ║")
self.logger.logger.warning("║ ║")
self.logger.logger.warning("║ Note: Small rounding errors may occur due to integer ║")
self.logger.logger.warning("║ approximation. For high-precision needs, use a linear ║")
self.logger.logger.warning("║ solver ('ortools', 'gurobi', or 'cplex') instead. ║")
self.logger.logger.warning("╚════════════════════════════════════════════════════════════╝")
self.logger.logger.warning("")
def _create_single_variable(self, lx_var: LXVariable) -> None:
"""Create single CP-SAT variable (not indexed)."""
model = self._model
assert model is not None
# Get bounds (CP-SAT requires integer bounds)
lb = lx_var.lower_bound if lx_var.lower_bound is not None else cp_model.INT32_MIN
ub = lx_var.upper_bound if lx_var.upper_bound is not None else cp_model.INT32_MAX
# Create variable based on type
if lx_var.var_type == LXVarType.BINARY:
var = model.NewBoolVar(lx_var.name)
elif lx_var.var_type == LXVarType.INTEGER:
# Ensure integer bounds
lb = int(lb)
ub = int(ub)
var = model.NewIntVar(lb, ub, lx_var.name)
elif lx_var.var_type == LXVarType.CONTINUOUS:
if not self._enable_rational_conversion:
raise ValueError(
f"CP-SAT does not support continuous variables. "
f"Variable '{lx_var.name}' is continuous."
)
# Rational conversion enabled: auto-scale continuous to integer
scale = self._variable_scales.get(lx_var.name, self._scaling_factor)
lb_scaled = int(lb * scale) if lb != cp_model.INT32_MIN else cp_model.INT32_MIN
ub_scaled = int(ub * scale) if ub != cp_model.INT32_MAX else cp_model.INT32_MAX
var = model.NewIntVar(lb_scaled, ub_scaled, lx_var.name)
else:
raise ValueError(f"Unknown variable type: {lx_var.var_type}")
# Store in mapping
self._variable_map[lx_var.name] = var
def _create_indexed_variables(
self, lx_var: LXVariable, instances: List[Any]
) -> None:
"""Create indexed family of CP-SAT variables."""
model = self._model
assert model is not None
var_dict: Dict[Any, Any] = {}
for instance in instances:
# Get index for this instance
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None:
# For cartesian products, build key from dimension key functions
if isinstance(instance, tuple):
# Apply each dimension's key function to its corresponding element
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
else:
# For multi-model (tuple instances), use the tuple as key
# This may fail if instances are not hashable
index_key = instance
# Variable name: "varname[index]"
var_name = f"{lx_var.name}[{index_key}]"
# Get bounds (same for all instances for now)
# TODO: Support per-instance bounds via bound functions
lb = lx_var.lower_bound if lx_var.lower_bound is not None else cp_model.INT32_MIN
ub = lx_var.upper_bound if lx_var.upper_bound is not None else cp_model.INT32_MAX
# Create CP-SAT variable based on type
if lx_var.var_type == LXVarType.BINARY:
var = model.NewBoolVar(var_name)
elif lx_var.var_type == LXVarType.INTEGER:
# Ensure integer bounds
lb = int(lb)
ub = int(ub)
var = model.NewIntVar(lb, ub, var_name)
elif lx_var.var_type == LXVarType.CONTINUOUS:
if not self._enable_rational_conversion:
raise ValueError(
f"CP-SAT does not support continuous variables. "
f"Variable '{lx_var.name}' is continuous."
)
# Rational conversion enabled: auto-scale continuous to integer
scale = self._variable_scales.get(lx_var.name, self._scaling_factor)
lb_scaled = int(lb * scale) if lb != cp_model.INT32_MIN else cp_model.INT32_MIN
ub_scaled = int(ub * scale) if ub != cp_model.INT32_MAX else cp_model.INT32_MAX
var = model.NewIntVar(lb_scaled, ub_scaled, var_name)
else:
raise ValueError(f"Unknown variable type: {lx_var.var_type}")
var_dict[index_key] = var
# Store dictionary in mapping
self._variable_map[lx_var.name] = var_dict
def _create_single_constraint(self, lx_constraint: LXConstraint) -> None:
"""Create single CP-SAT constraint."""
model = self._model
assert model is not None
if lx_constraint.lhs is None:
raise ValueError(f"Constraint '{lx_constraint.name}' has no LHS expression")
# Build linear expression with integer coefficients
terms, int_coeffs = self._build_expression(lx_constraint.lhs)
# Get RHS value
if lx_constraint.rhs_value is not None:
rhs = lx_constraint.rhs_value
elif lx_constraint.rhs_func is not None:
# For single constraint with rhs function, call with None
rhs = lx_constraint.rhs_func(None) # type: ignore
else:
raise ValueError(f"Constraint '{lx_constraint.name}' has no RHS value")
# Scale RHS if constraint involves scaled variables
# Original: sum(coeff * var_cont) <= RHS
# Scaled: sum(coeff * var_int / scale) <= RHS
# Multiply both sides by scale: sum(coeff * var_int) <= RHS * scale
rhs_scaled = self._scale_rhs_if_needed(lx_constraint.lhs, rhs)
# Convert RHS to integer
if not isinstance(rhs_scaled, int):
rhs_scaled = int(round(rhs_scaled))
# Build linear expression
if not terms:
# Empty constraint (0 sense rhs)
linear_expr = 0
else:
linear_expr = sum(var * coeff for var, coeff in zip(terms, int_coeffs))
# Create constraint based on sense
if lx_constraint.sense == LXConstraintSense.LE:
ct = model.Add(linear_expr <= rhs_scaled)
elif lx_constraint.sense == LXConstraintSense.GE:
ct = model.Add(linear_expr >= rhs_scaled)
elif lx_constraint.sense == LXConstraintSense.EQ:
ct = model.Add(linear_expr == rhs_scaled)
else:
raise ValueError(f"Unknown constraint sense: {lx_constraint.sense}")
# Set constraint name (for debugging)
ct.Proto().name = lx_constraint.name
self._constraint_list.append(ct)
def _create_indexed_constraints(
self, lx_constraint: LXConstraint, instances: List[Any]
) -> None:
"""Create indexed family of CP-SAT constraints."""
model = self._model
assert model is not None
if lx_constraint.lhs is None:
raise ValueError(f"Constraint '{lx_constraint.name}' has no LHS expression")
for instance in instances:
# Get index for naming
if lx_constraint.index_func is not None:
index_key = lx_constraint.index_func(instance)
else:
index_key = instance
# Constraint name: "constraintname[index]"
ct_name = f"{lx_constraint.name}[{index_key}]"
# Build expression for this instance
terms, int_coeffs = self._build_expression(
lx_constraint.lhs, constraint_instance=instance
)
# Get RHS value for this instance
if lx_constraint.rhs_value is not None:
rhs = lx_constraint.rhs_value
elif lx_constraint.rhs_func is not None:
rhs = lx_constraint.rhs_func(instance)
else:
raise ValueError(f"Constraint '{lx_constraint.name}' has no RHS value")
# Scale RHS if constraint involves scaled variables
rhs_scaled = self._scale_rhs_if_needed(lx_constraint.lhs, rhs)
# Convert RHS to integer
if not isinstance(rhs_scaled, int):
rhs_scaled = int(round(rhs_scaled))
# Build linear expression
if not terms:
# Empty constraint
linear_expr = 0
else:
linear_expr = sum(var * coeff for var, coeff in zip(terms, int_coeffs))
# Create constraint
if lx_constraint.sense == LXConstraintSense.LE:
ct = model.Add(linear_expr <= rhs_scaled)
elif lx_constraint.sense == LXConstraintSense.GE:
ct = model.Add(linear_expr >= rhs_scaled)
elif lx_constraint.sense == LXConstraintSense.EQ:
ct = model.Add(linear_expr == rhs_scaled)
else:
raise ValueError(f"Unknown constraint sense: {lx_constraint.sense}")
# Set constraint name
ct.Proto().name = ct_name
self._constraint_list.append(ct)
def _scale_rhs_if_needed(self, lx_expr: LXLinearExpression, rhs: float) -> float:
"""
Scale RHS if expression contains scaled variables.
When variables are scaled (var_int = var_cont * scale), constraints must be adjusted:
Original: sum(coeff * var_cont) <= RHS
Scaled: sum(coeff * var_int / scale) <= RHS
Multiply both sides by scale: sum(coeff * var_int) <= RHS * scale
Args:
lx_expr: Expression to check
rhs: Original RHS value
Returns:
Scaled RHS if any variables are scaled, otherwise original RHS
"""
# Check if any variables in the expression are scaled
has_scaled_vars = False
# Check regular terms
for var_name in lx_expr.terms.keys():
if var_name in self._scaled_variables:
has_scaled_vars = True
break
# Check multi-model terms
if not has_scaled_vars:
for lx_var, _, _ in lx_expr._multi_terms:
if lx_var.name in self._scaled_variables:
has_scaled_vars = True
break
# If any scaled variables, multiply RHS by the scale factor
if has_scaled_vars:
# Assume uniform scaling for now (all variables have same scale)
scale = self._scaling_factor
return rhs * scale
return rhs
def _build_expression(
self,
lx_expr: LXLinearExpression,
constraint_instance: Optional[Any] = None,
) -> Tuple[List[Any], List[int]]:
"""
Build CP-SAT expression from LXLinearExpression.
Converts float coefficients to integers using rational approximation.
Args:
lx_expr: LumiX linear expression
constraint_instance: Instance for indexed constraints (for multi-model coefficients)
Returns:
Tuple of (CP-SAT variables, integer coefficients)
"""
terms: List[Any] = []
float_coeffs: List[float] = []
# Process regular terms
for var_name, (lx_var, coeff_func, where_func) in lx_expr.terms.items():
solver_vars = self._variable_map[var_name]
if isinstance(solver_vars, dict):
# Indexed variable family
instances = lx_var.get_instances()
for instance in instances:
# Check where clause
if not where_func(instance):
continue
# Get index key
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None and isinstance(instance, tuple):
# For cartesian products, build key from dimension key functions
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
# Get coefficient
if constraint_instance is not None:
try:
coeff = coeff_func(instance, constraint_instance)
except TypeError:
coeff = coeff_func(instance)
else:
coeff = coeff_func(instance)
# Note: For scaled variables, coefficients stay the same
# but RHS will be scaled instead (see _create_single_constraint)
if abs(coeff) > 1e-10: # Skip near-zero coefficients
terms.append(solver_vars[index_key])
float_coeffs.append(coeff)
else:
# Single variable
if constraint_instance is not None:
try:
coeff = coeff_func(constraint_instance)
except TypeError:
coeff = coeff_func(None) # type: ignore
else:
coeff = coeff_func(None) # type: ignore
# Note: Coefficients stay the same for scaled variables
# RHS will be scaled instead
if abs(coeff) > 1e-10:
terms.append(solver_vars)
float_coeffs.append(coeff)
# Process multi-model terms
for lx_var, coeff_func, where_func in lx_expr._multi_terms:
solver_vars = self._variable_map[lx_var.name]
if isinstance(solver_vars, dict):
instances = lx_var.get_instances()
for instance in instances:
# Check where clause
if where_func is not None:
if isinstance(instance, tuple):
if not where_func(*instance):
continue
else:
if not where_func(instance):
continue
# Get coefficient
if isinstance(instance, tuple):
coeff = coeff_func(*instance)
else:
coeff = coeff_func(instance)
# Note: Coefficients stay the same for scaled variables
# RHS will be scaled instead
# Get index key
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None and isinstance(instance, tuple):
# For cartesian products, build key from dimension key functions
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
if abs(coeff) > 1e-10:
terms.append(solver_vars[index_key])
float_coeffs.append(coeff)
# Convert float coefficients to integers
int_coeffs, coeff_scale = self._convert_coefficients_to_integers(float_coeffs)
return terms, int_coeffs
def _convert_coefficients_to_integers(self, coeffs: List[float]) -> Tuple[List[int], int]:
"""
Convert float coefficients to integers using rational approximation.
Args:
coeffs: List of float coefficients
Returns:
Tuple of (integer coefficients, common scale factor)
"""
# Check if all coefficients are already integers
if all(isinstance(c, int) or c.is_integer() for c in coeffs):
return [int(c) for c in coeffs], 1
# Use rational converter for float coefficients
coeff_dict = {i: c for i, c in enumerate(coeffs)}
int_coeff_dict, scale = self._rational_converter.convert_coefficients(coeff_dict)
# Return scaled integer coefficients and scale factor
return [int_coeff_dict[i] for i in range(len(coeffs))], scale
def _set_objective(self, model: LXModel) -> None:
"""Set objective function in CP-SAT model."""
cpsat_model = self._model
assert cpsat_model is not None
if model.objective_expr is None:
# No objective, just feasibility
return
# Check if objective involves scaled variables (for later de-scaling)
self._objective_has_scaled_vars = False
if model.objective_expr:
# Check regular terms
for var_name in model.objective_expr.terms.keys():
if var_name in self._scaled_variables:
self._objective_has_scaled_vars = True
break
# Check multi-model terms
if not self._objective_has_scaled_vars:
for lx_var, _, _ in model.objective_expr._multi_terms:
if lx_var.name in self._scaled_variables:
self._objective_has_scaled_vars = True
break
# Build expression - need to handle scaling manually for objective
terms: List[Any] = []
float_coeffs: List[float] = []
# Collect terms and coefficients from objective expression
for var_name, (lx_var, coeff_func, where_func) in model.objective_expr.terms.items():
solver_vars = self._variable_map[var_name]
if isinstance(solver_vars, dict):
instances = lx_var.get_instances()
for instance in instances:
if not where_func(instance):
continue
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None and isinstance(instance, tuple):
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
coeff = coeff_func(instance)
if abs(coeff) > 1e-10:
terms.append(solver_vars[index_key])
float_coeffs.append(coeff)
else:
coeff = coeff_func(None) # type: ignore
if abs(coeff) > 1e-10:
terms.append(solver_vars)
float_coeffs.append(coeff)
# Handle multi-model terms
for lx_var, coeff_func, where_func in model.objective_expr._multi_terms:
solver_vars = self._variable_map[lx_var.name]
if isinstance(solver_vars, dict):
instances = lx_var.get_instances()
for instance in instances:
if where_func is not None:
if isinstance(instance, tuple):
if not where_func(*instance):
continue
else:
if not where_func(instance):
continue
if isinstance(instance, tuple):
coeff = coeff_func(*instance)
else:
coeff = coeff_func(instance)
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None and isinstance(instance, tuple):
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
if abs(coeff) > 1e-10:
terms.append(solver_vars[index_key])
float_coeffs.append(coeff)
# Convert coefficients to integers and track the scale factor
int_coeffs, self._objective_coeff_scale = self._convert_coefficients_to_integers(float_coeffs)
# Handle constant term (convert to integer)
constant = model.objective_expr.constant
int_constant = int(round(constant)) if constant != 0 else 0
# Build objective expression
if not terms:
# Constant objective only
obj_expr = int_constant
else:
obj_expr = sum(var * coeff for var, coeff in zip(terms, int_coeffs))
if int_constant != 0:
obj_expr += int_constant
# Set sense (maximize or minimize)
if model.objective_sense == LXObjectiveSense.MAXIMIZE:
cpsat_model.Maximize(obj_expr)
else:
cpsat_model.Minimize(obj_expr)
def _parse_solution(
self,
model: LXModel,
solver: cp_model.CpSolver,
status: int,
solve_time: float,
) -> LXSolution:
"""
Parse CP-SAT solution to LXSolution.
Args:
model: Original LumiX model
solver: CP-SAT solver with solution
status: Solver status code
solve_time: Time taken to solve
Returns:
LXSolution object
"""
# Map status codes
status_map = {
cp_model.OPTIMAL: "optimal",
cp_model.FEASIBLE: "feasible",
cp_model.INFEASIBLE: "infeasible",
cp_model.MODEL_INVALID: "model_invalid",
cp_model.UNKNOWN: "unknown",
}
lx_status = status_map.get(status, "unknown")
# Extract objective value
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
obj_value = float(solver.ObjectiveValue())
# De-scale objective
# The objective is scaled by:
# 1. Variable scaling (if continuous variables): var_int = var_cont * var_scale
# 2. Coefficient rational conversion: coefficients multiplied by coeff_scale
#
# So: CP-SAT minimizes sum((coeff * coeff_scale) * (var_cont * var_scale))
# = coeff_scale * var_scale * sum(coeff * var_cont)
#
# To get original objective: divide by both scales
if self._objective_has_scaled_vars:
obj_value = obj_value / self._scaling_factor
obj_value = obj_value / self._objective_coeff_scale
else:
obj_value = 0.0
# Extract variable values
variables: Dict[str, Union[float, Dict[Any, float]]] = {}
mapped: Dict[str, Dict[Any, float]] = {}
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
for lx_var in model.variables:
solver_vars = self._variable_map[lx_var.name]
if isinstance(solver_vars, dict):
# Indexed variable family
var_values: Dict[Any, float] = {}
mapped_values: Dict[Any, float] = {}
instances = lx_var.get_instances()
for instance in instances:
# Get index key
if lx_var.index_func is not None:
index_key = lx_var.index_func(instance)
elif lx_var._cartesian is not None and isinstance(instance, tuple):
# For cartesian products, build key from dimension key functions
index_key = tuple(
dim.key_func(inst)
for dim, inst in zip(lx_var._cartesian.dimensions, instance)
)
else:
index_key = instance
var = solver_vars[index_key]
value = float(solver.Value(var))
# De-scale if variable was scaled
if lx_var.name in self._scaled_variables:
scale = self._variable_scales[lx_var.name]
value = value / scale
var_values[index_key] = value
mapped_values[index_key] = value
variables[lx_var.name] = var_values
mapped[lx_var.name] = mapped_values
else:
# Single variable
value = float(solver.Value(solver_vars))
# De-scale if variable was scaled
if lx_var.name in self._scaled_variables:
scale = self._variable_scales[lx_var.name]
value = value / scale
variables[lx_var.name] = value
# CP-SAT doesn't provide sensitivity analysis
# TODO: Could compute reduced costs for feasibility/bound analysis
shadow_prices: Dict[str, float] = {}
reduced_costs: Dict[str, float] = {}
# Extract solver statistics
gap: Optional[float] = None
iterations: Optional[int] = None
nodes: Optional[int] = None
# Get best objective bound for gap calculation
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
try:
best_bound = solver.BestObjectiveBound()
if obj_value != 0 and abs(obj_value - best_bound) > 1e-10:
gap = abs(obj_value - best_bound) / abs(obj_value)
except Exception:
pass
# CP-SAT provides branch count and conflict count
try:
nodes = solver.NumBranches()
except Exception:
pass
# Create and return solution
return LXSolution(
objective_value=obj_value,
status=lx_status,
solve_time=solve_time,
variables=variables,
mapped=mapped,
shadow_prices=shadow_prices,
reduced_costs=reduced_costs,
gap=gap,
iterations=iterations,
nodes=nodes,
)
__all__ = ["LXCPSATSolver"]