# import pygad
# Standard library imports
import datetime
from typing import Any, Dict, List, Tuple, Union
# Third party imports
import numpy as np
import torch
import torch.nn as nn
from scipy.optimize import Bounds, least_squares, minimize
# Local application imports
import twin4build.core as core
import twin4build.systems as systems
from twin4build.utils.deprecation import deprecate_args
def _min_max_normalize(x, min_val=None, max_val=None):
if min_val is None:
min_val = torch.min(x)
if max_val is None:
max_val = torch.max(x)
return (x - min_val) / (max_val - min_val)
[docs]
class Optimizer:
r"""
A class for optimizing building operation in the twin4build framework.
This class optimizes model inputs (variables) (e.g., setpoints) by minimizing a loss function, using gradient-based or other optimization algorithms.
The optimizer implements soft constraints on model outputs (embedded in the loss function) and hard constraints on variables.
Args:
simulator: The simulator instance for running simulations.
Mathematical Formulation
=======================
The general optimization problem is formulated as:
.. math::
\hat{\boldsymbol{U}} = \underset{\boldsymbol{U} \in \mathcal{U}}{\operatorname{argmin}} \; \mathcal{L}(\boldsymbol{U})
where:
- :math:`\hat{\boldsymbol{U}}` is the optimal control input matrix
- :math:`\boldsymbol{U}` is the control input matrix
- :math:`\mathcal{U} \subseteq \mathbb{R}^{n_u \times n_t}` is the set of feasible control inputs
- :math:`\mathcal{L}(\boldsymbol{U})` is the loss function
Dimensions
----------
- :math:`n_t`: Number of time steps in the simulation period
- :math:`n_u`: Number of control inputs (actuators)
- :math:`n_d`: Number of disturbance inputs (weather, occupancy, etc.)
- :math:`n_y`: Number of system outputs (sensors, performance metrics)
Model Structure
---------------
The building model :math:`\mathcal{M}` is represented as a directed graph where nodes are dynamic components
and edges represent input/output connections as shown in a simple example below.
.. figure:: /_static/optimizer_graph_.png
:alt: System overview showing components and their relationships
:align: center
:width: 80%
The model takes control inputs :math:`\boldsymbol{U} \in \mathbb{R}^{n_u \times n_t}`
(the optimization variables) along with external inputs or disturbances :math:`\boldsymbol{D} \in \mathbb{R}^{n_d \times n_t}`, and produces system outputs for optimization
:math:`\boldsymbol{\hat{Y}} \in \mathbb{R}^{n_y \times n_t}` with timesteps :math:`\boldsymbol{t} \in \mathbb{R}^{n_t}`:
.. math::
\boldsymbol{\hat{Y}} = \mathcal{M}(\boldsymbol{X}, \boldsymbol{t})
where:
.. math::
\boldsymbol{X} = [\boldsymbol{U}, \boldsymbol{D}]
and :math:`\mathcal{M}` represents the complete simulation model. See :class:`~twin4build.simulator.simulator.Simulator`
for detailed explanation of the simulation process.
Loss Function
-------------
The loss function :math:`\mathcal{L}(\boldsymbol{U})` is composed of the following terms:
**Equality Constraints**
.. math::
\mathcal{L}_{eq} = \frac{1}{n_t} \sum_{t=1}^{n_t} \sum_{(j, \boldsymbol{y}) \in \mathcal{C}_{eq}} |\boldsymbol{\hat{Y}}_{j,t} - \boldsymbol{y}_{t}|
where :math:`\mathcal{C}_{eq}` is the set of equality constraints, each element is (output index :math:`j`, desired value :math:`\boldsymbol{y}_{t}`).
**Inequality Constraints**
Upper constraints:
.. math::
\mathcal{L}_{ineq}^{upper} = \frac{1}{n_t} \sum_{t=1}^{n_t} \sum_{(j, \boldsymbol{y}) \in \mathcal{C}_{ineq}^{upper}} k \cdot \text{relu}\left(\boldsymbol{\hat{Y}}_{j,t} - \boldsymbol{y}_{t}\right)
Lower constraints:
.. math::
\mathcal{L}_{ineq}^{lower} = \frac{1}{n_t} \sum_{t=1}^{n_t} \sum_{(j, \boldsymbol{y}) \in \mathcal{C}_{ineq}^{lower}} k \cdot \text{relu}\left(\boldsymbol{y}_{t} - \boldsymbol{\hat{Y}}_{j,t}\right)
where :math:`\mathcal{C}_{ineq}^{upper}` and :math:`\mathcal{C}_{ineq}^{lower}` are the sets of upper and lower inequality constraints, and :math:`k` is a penalty factor.
Combined inequality constraint loss:
.. math::
\mathcal{L}_{ineq} = \mathcal{L}_{ineq}^{upper} + \mathcal{L}_{ineq}^{lower}
**Objective Terms**
.. math::
\mathcal{L}_{obj} = \frac{1}{n_t} \sum_{t=1}^{n_t} \sum_{(j, w) \in \mathcal{O}_{obj}} w \cdot \boldsymbol{\hat{Y}}_{j,t}
where :math:`\mathcal{O}_{obj}` is the set of outputs to minimize or maximize, and :math:`w` is a weight (+1 for minimization, -1 for maximization).
**Total Loss**
.. math::
\mathcal{L}(\boldsymbol{U}) = \mathcal{L}_{eq} + \mathcal{L}_{ineq} + \mathcal{L}_{obj}
See method docstrings for details on the specific loss terms and optimization algorithms.
Examples
--------
Basic optimization with PyTorch method:
>>> import twin4build as tb
>>> import datetime
>>> import pytz
>>>
>>> # Create model and simulator
>>> model = tb.SimulationModel(id="my_model")
>>> simulator = tb.Simulator(model)
>>> optimizer = tb.Optimizer(simulator)
>>>
>>> # Define decision variables (actuators to optimize)
>>> variables = [
... (heater_component, "setpointValue", 18.0, 25.0), # Temperature setpoint bounds
... (ventilation_component, "flowRate", 0.1, 1.0) # Ventilation flow rate bounds
... ]
>>>
>>> # Define objectives (what to optimize)
>>> objectives = [
... (energy_meter, "powerConsumption", "min"), # Minimize energy consumption
... (comfort_sensor, "comfortIndex", "max") # Maximize comfort
... ]
>>>
>>> # Set time period
>>> start = datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC)
>>> end = datetime.datetime(2024, 1, 2, tzinfo=pytz.UTC)
>>> step = 3600
>>>
>>> # Run optimization with PyTorch (default SGD)
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method="torch",
... options={"lr": 0.1, "iterations": 100}
... )
Advanced PyTorch optimization with scheduler:
>>> # Use Adam optimizer with cosine annealing scheduler
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("torch", "Adam", "ad"),
... options={
... "lr": 0.01,
... "iterations": 200,
... "scheduler_type": "cosine",
... "scheduler_params": {"T_max": 200, "eta_min": 1e-6}
... }
... )
SciPy optimization with constraints:
>>> # Define equality constraints (maintain temperature at specific times)
>>> equality_constraints = [
... (room_temperature, "temperature", 21.0) # Maintain 21°C
... ]
>>>
>>> # Define inequality constraints (comfort bounds)
>>> inequality_constraints = [
... (room_temperature, "temperature", "lower", 20.0), # Not below 20°C
... (room_temperature, "temperature", "upper", 24.0), # Not above 24°C
... (co2_sensor, "concentration", "upper", 1000.0) # CO2 limit
... ]
>>>
>>> # Run SciPy optimization with SLSQP (preferred for constrained problems)
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... eq_cons=equality_constraints,
... ineq_cons=inequality_constraints,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("scipy", "SLSQP", "ad"),
... options={"verbose": 2, "maxiter": 1000}
... )
Alternative SciPy methods:
>>> # Use L-BFGS-B for unconstrained optimization
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("scipy", "L-BFGS-B", "ad"),
... options={"gtol": 1e-8, "maxiter": 500}
... )
>>> # Use trust-region method for difficult constraints
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... eq_cons=equality_constraints,
... ineq_cons=inequality_constraints,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("scipy", "trust-constr", "ad"),
... options={"verbose": 1, "barrier_tol": 1e-8}
... )
Schedule-based constraints:
>>> # Use schedule systems for time-varying constraints
>>> import twin4build.systems as systems
>>>
>>> # Create temperature schedule
>>> temp_schedule = systems.ScheduleSystem(
... id="temp_schedule",
... schedule_filename="temperature_profile.csv"
... )
>>>
>>> # Use schedule as constraint
>>> equality_constraints = [
... (room_temperature, "temperature", temp_schedule)
... ]
>>>
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... eq_cons=equality_constraints,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("scipy", "SLSQP", "ad")
... )
Multi-objective optimization:
>>> # Optimize multiple conflicting objectives
>>> objectives = [
... (energy_meter, "powerConsumption", "min"), # Minimize energy
... (comfort_sensor, "thermalComfort", "max"), # Maximize comfort
... (air_quality_sensor, "iaqIndex", "max"), # Maximize air quality
... ]
>>>
>>> # Use multiple decision variables
>>> variables = [
... (heater_component, "setpointValue", 18.0, 25.0),
... (cooler_component, "setpointValue", 22.0, 28.0),
... (ventilation_component, "flowRate", 0.1, 2.0),
... (window_actuator, "openingDegree", 0.0, 1.0)
... ]
>>>
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method=("scipy", "SLSQP", "ad"),
... options={"ftol": 1e-9, "maxiter": 2000}
... )
Legacy string format (still supported):
>>> # Simple usage with default settings
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method="scipy" # Defaults to ("scipy", "SLSQP", "ad")
... )
>>> # PyTorch method with defaults
>>> optimizer.optimize(
... variables=variables,
... objectives=objectives,
... start_time=start,
... end_time=end,
... step_size=step,
... method="torch" # Defaults to ("torch", "SGD", "ad")
... )
"""
def __init__(self, simulator: core.Simulator):
assert isinstance(
simulator, core.Simulator
), "Simulator must be a twin4build.core.Simulator instance"
self.simulator = simulator
def _closure(self):
self.optimizer.zero_grad()
# Apply bounds to decision variables
with torch.no_grad():
for component, output_name, *bounds in self._variables:
if len(bounds) > 0:
lower_bound = bounds[0] if len(bounds) > 0 else float("-inf")
upper_bound = bounds[1] if len(bounds) > 1 else float("inf")
if component.output[output_name].do_normalization:
lower_bound_ = component.output[output_name].normalize(
lower_bound
)
upper_bound_ = component.output[output_name].normalize(
upper_bound
)
# print("==========================")
# print(f"CLAMPED BEFORE: {component.id}.{output_name} to {component.output[output_name].denormalize(component.output[output_name].normalized_history)}")
component.output[output_name].normalized_history.clamp_(
min=lower_bound_, max=upper_bound_
)
# print("==========================")
# print(f"CLAMPED AFTER: {component.id}.{output_name} to {component.output[output_name].denormalize(component.output[output_name].normalized_history)}")
else:
component.output[output_name].history.clamp_(
min=lower_bound, max=upper_bound
)
# Run simulation
self.simulator.simulate(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
show_progress_bar=False,
)
self.loss = 0
k = 100
# Handle equality constraints
if self._eq_cons is not None:
eq_term = 0
for constraint in self._eq_cons:
component, output_name, desired_value = constraint
y = component.output[output_name].history
desired_tensor = self.equality_constraint_values[component, output_name]
y = component.output[output_name].normalize(y)
desired_tensor = component.output[output_name].normalize(desired_tensor)
eq_term += torch.mean(torch.abs(y - desired_tensor))
self.loss += eq_term
# Handle inequality constraints
if self._ineq_cons is not None:
ineq_upper_term = 0
ineq_lower_term = 0
for constraint in self._ineq_cons:
component, output_name, constraint_type, desired_value = constraint
y = component.output[output_name].history
desired_tensor = self.inequality_constraint_values[
(component, output_name, constraint_type)
]
y_norm = component.output[output_name].normalize(y)
desired_tensor_norm = component.output[output_name].normalize(
desired_tensor
)
if constraint_type == "upper":
# Penalize when y > desired_value
constraint_violations = torch.relu(y_norm - desired_tensor_norm)
constraint_term = torch.mean(k * constraint_violations)
ineq_upper_term += constraint_term
elif constraint_type == "lower":
# Penalize when y < desired_value
constraint_violations = torch.relu(desired_tensor_norm - y_norm)
constraint_term = torch.mean(k * constraint_violations)
ineq_lower_term += constraint_term
self.loss += ineq_upper_term + ineq_lower_term
# Handle minimization objectives
if self._objectives is not None:
min_term = 0
for minimize_obj in self._objectives:
component, output_name = minimize_obj
y = component.output[output_name].history
y_norm = component.output[output_name].normalize(y)
# print(f"NORMALIZED MINIMIZE OBJECTIVE BETWEEN: {component.output[output_name]._min_history} and {component.output[output_name]._max_history}")
min_term += torch.mean(y_norm)
self.loss += min_term # Minimize the mean value
# Compute gradients
self.loss.backward()
return self.loss
[docs]
def optimize(
self,
start_time: Union[datetime.datetime, List[datetime.datetime]] = None,
end_time: Union[datetime.datetime, List[datetime.datetime]] = None,
step_size: Union[float, List[float]] = None,
variables: List[Tuple[Any, str, float, float]] = None,
objectives: List[Tuple[Any, str, str]] = None,
eq_cons: List[Tuple[Any, str, Any]] = None,
ineq_cons: List[Tuple[Any, str, str, Any]] = None,
method: Union[str, Tuple[str, str, str]] = "scipy",
options: Dict = None,
**kwargs,
):
"""
Optimize the model using various optimization methods.
Args:
start_time: Start time for simulation
end_time: End time for simulation
step_size: Step size for simulation
variables: List of tuples (component, output_name, lower_bound, upper_bound)
objectives: List of tuples (component, output_name, objective_type)
where objective_type is "min" or "max"
eq_cons: List of tuples (component, output_name, desired_value)
ineq_cons: List of tuples (component, output_name, constraint_type, desired_value)
where constraint_type is "upper" or "lower"
method: Optimization method specification. Can be specified in two formats:
1. String format (legacy):
- "torch": Uses PyTorch-based gradient optimization
- "scipy": Uses SciPy's SLSQP solver with automatic differentiation
2. Tuple format (recommended):
- (library, optimizer, mode) where:
- library: "torch" or "scipy"
- optimizer: The specific optimization algorithm
- mode: "ad" (automatic differentiation) or "fd" (finite difference)
Supported optimizers by library:
PyTorch-based methods (library="torch"):
- "SGD": Stochastic Gradient Descent (default)
- "Adam": Adam optimizer
- "LBFGS": Limited-memory BFGS
- Mode: Always "ad" (automatic differentiation)
SciPy-based methods (library="scipy"):
- "SLSQP": Sequential Least Squares Programming (preferred for most problems)
- "L-BFGS-B": Limited-memory BFGS with bounds
- "TNC": Truncated Newton algorithm with bounds
- "trust-constr": Trust-region constrained optimization
- "trf": Trust Region Reflective (for least-squares problems)
- "dogbox": Dogleg algorithm (for least-squares problems)
- Mode: "ad" (automatic differentiation) or "fd" (finite difference)
Method selection guidelines:
- PyTorch methods: Good for simple optimization problems, easy to configure
- SciPy SLSQP with AD: Preferred for most constrained optimization problems
- SciPy with FD: Use for non-PyTorch models or when AD is not available
Examples:
- ("scipy", "SLSQP", "ad"): Preferred for most constrained optimization problems
- ("torch", "Adam", "ad"): Good for simple unconstrained problems
- ("scipy", "trf", "fd"): For non-PyTorch models with least-squares formulation
- "scipy": Legacy format, defaults to ("scipy", "SLSQP", "ad")
options: Additional options for the chosen method:
For PyTorch methods (library="torch"):
- "lr": Learning rate for optimizer (default: 1.0)
- "iterations": Number of optimization iterations (default: 100)
- "optimizer_type": Type of PyTorch optimizer ("SGD", "Adam", "LBFGS")
- "scheduler_type": Type of learning rate scheduler ("step", "exponential", "cosine", "reduce_on_plateau", None)
- "scheduler_params": Parameters for learning rate scheduler
- For "step": step_size (default: 30), gamma (default: 0.1)
- For "exponential": gamma (default: 0.95)
- For "cosine": T_max (default: 100), eta_min (default: 0)
- For "reduce_on_plateau": mode (default: "min"), factor (default: 0.9), patience (default: 10), threshold (default: 1e-4)
For SciPy methods (library="scipy"):
- "verbose": Verbosity level (0-3)
- "maxiter": Maximum iterations
- "gtol": Gradient tolerance
- "xtol": Parameter tolerance
- "barrier_tol": Barrier tolerance
- "initial_tr_radius": Initial trust region radius
- "initial_constr_penalty": Initial constraint penalty
- Additional method-specific options as supported by SciPy optimizers
"""
deprecated_args = [
"startTime",
"endTime",
"stepSize",
"variables",
"objectives",
]
new_args = ["start_time", "end_time", "step_size", "variables", "objectives"]
position = [1, 2, 3, 4, 5]
value_map = deprecate_args(deprecated_args, new_args, position, kwargs)
start_time = value_map.get("start_time", start_time)
end_time = value_map.get("end_time", end_time)
step_size = value_map.get("step_size", step_size)
variables = value_map.get("variables", variables)
objectives = value_map.get("objectives", objectives)
self._variables = variables or []
self._objectives = objectives or []
self._eq_cons = eq_cons or []
self._ineq_cons = ineq_cons or []
self._start_time = start_time
self._end_time = end_time
self._stepSize = step_size
self._max_values = {}
# Validate input arguments
# Check required simulation parameters
assert start_time is not None, "start_time must be provided"
assert end_time is not None, "end_time must be provided"
assert step_size is not None, "step_size must be provided"
# Check that we have something to optimize
assert (
len(self._variables) > 0
), "No decision variables specified for optimization"
# Check that we have at least one objective (minimize or constraints)
has_objective = (
len(self._objectives) > 0
or len(self._eq_cons) > 0
or len(self._ineq_cons) > 0
)
assert (
has_objective
), "No optimization objectives specified (minimize, eq_cons, or ineq_cons)"
# Validate method
# Define allowed optimization methods
allowed_methods = [
("torch", "SGD", "ad"),
("torch", "Adam", "ad"),
("torch", "LBFGS", "ad"),
("scipy", "SLSQP", "ad"),
("scipy", "L-BFGS-B", "ad"),
("scipy", "TNC", "ad"),
("scipy", "trust-constr", "ad"),
("scipy", "trf", "ad"),
("scipy", "dogbox", "ad"),
("scipy", "trf", "fd"),
("scipy", "dogbox", "fd"),
]
default_methods = [("scipy", "SLSQP", "ad")]
default_mode = (
"ad" # Always choose automatic differentiation mode when ambiguous
)
# Process method specification
if isinstance(method, str):
valid_methods = list(
set([l[0] for l in allowed_methods] + [l[1] for l in allowed_methods])
)
assert (
method in valid_methods
), f"If a string is provided, the \"method\" argument must be one of the following: {', '.join(valid_methods)} - \"{method}\" was provided."
# Try to match with default methods first
matched = False
for t in default_methods:
if t[0] == method:
method = t
matched = True
break
# If no match found, look for candidates
if not matched:
candidates = []
for m in allowed_methods:
if m[1] == method:
candidates.append(m)
if len(candidates) == 1:
method = candidates[0]
elif len(candidates) > 1:
# Choose the one with default mode
for c in candidates:
if c[2] == default_mode:
method = c
break
elif isinstance(method, tuple):
assert (
len(method) == 3
), f'If a tuple is provided, it must contain three elements, corresponding to the library, method, and mode (e.g. ("scipy", "SLSQP", "ad")) - "{method}" was provided.'
assert method[0] in [
l[0] for l in allowed_methods
], f"If a tuple is provided, the first element must be one of the following: {', '.join(list(set([l[0] for l in allowed_methods])))} - \"{method}\" was provided."
assert method[1] in [
l[1] for l in allowed_methods
], f"If a tuple is provided, the second element must be one of the following: {', '.join(list(set([l[1] for l in allowed_methods])))} - \"{method}\" was provided."
assert method[2] in [
l[2] for l in allowed_methods
], f"If a tuple is provided, the third element must be one of the following: {', '.join(list(set([l[2] for l in allowed_methods])))} - \"{method}\" was provided."
# Validate the method tuple
method_ = None
for t in allowed_methods:
if t[0] == method[0] and t[1] == method[1] and t[2] == method[2]:
method_ = t
break
assert (
method_ is not None
), f"The method {method} is not valid. Only the following methods are supported: {', '.join([str(t) for t in allowed_methods])}"
method = method_
else:
raise ValueError(
f'The "method" argument must be a string or a tuple - "{method}" was provided.'
)
# Validate format of decision variables
for i, decision_var in enumerate(self._variables):
assert (
len(decision_var) >= 2
), f"Decision variable at index {i} must have at least component and output_name"
component, output_name, *bounds = decision_var
assert hasattr(
component, "output"
), f"Component {component} at index {i} does not have 'output' attribute"
assert (
output_name in component.output
), f"Output '{output_name}' not found in component {component.id}"
if len(bounds) >= 2:
lower, upper = bounds[0], bounds[1]
assert (
upper > lower
), f"Upper bound ({upper}) must be greater than lower bound ({lower}) for {component.id}.{output_name}"
# Validate format of minimize objectives
for i, min_obj in enumerate(self._objectives):
assert (
len(min_obj) == 3
), f"Minimize objective at index {i} must have component, output_name, and objective_type (min or max)"
component, output_name, objective_type = min_obj
assert hasattr(
component, "output"
), f"Component {component} at index {i} does not have 'output' attribute"
assert (
output_name in component.output
), f"Output '{output_name}' not found in component {component.id}"
# Validate format of equality constraints
for i, eq_constraint in enumerate(self._eq_cons):
assert (
len(eq_constraint) == 3
), f"Equality constraint at index {i} must have component, output_name, and desired_value"
component, output_name, desired_value = eq_constraint
assert hasattr(
component, "output"
), f"Component {component} at index {i} does not have 'output' attribute"
assert (
output_name in component.output
), f"Output '{output_name}' not found in component {component.id}"
# Validate format of inequality constraints
for i, ineq_constraint in enumerate(self._ineq_cons):
assert (
len(ineq_constraint) == 4
), f"Inequality constraint at index {i} must have component, output_name, constraint_type, and desired_value"
component, output_name, constraint_type, desired_value = ineq_constraint
assert hasattr(
component, "output"
), f"Component {component} at index {i} does not have 'output' attribute"
assert (
output_name in component.output
), f"Output '{output_name}' not found in component {component.id}"
assert constraint_type in [
"upper",
"lower",
], f"Constraint type must be 'upper' or 'lower', got '{constraint_type}'"
# Check for conflicting constraints: can't minimize and have equality constraint on same output
if self._objectives and self._eq_cons:
minimize_pairs = {
(component, output_name) for component, output_name in self._objectives
}
equality_pairs = {
(component, output_name) for component, output_name, _ in self._eq_cons
}
conflicting_pairs = minimize_pairs.intersection(equality_pairs)
if conflicting_pairs:
conflict_info = [f"({c.id}, {o})" for c, o in conflicting_pairs]
raise ValueError(
f"Cannot simultaneously minimize and apply equality constraints to the same outputs: {', '.join(conflict_info)}. "
f"These objectives conflict with each other."
)
# Check for decision variables that are also in equality constraints
if self._variables and self._eq_cons:
decision_pairs = {
(component, output_name)
for component, output_name, *_ in self._variables
}
equality_pairs = {
(component, output_name) for component, output_name, _ in self._eq_cons
}
conflicting_pairs = decision_pairs.intersection(equality_pairs)
if conflicting_pairs:
conflict_info = [f"({c.id}, {o})" for c, o in conflicting_pairs]
raise ValueError(
f"Cannot optimize and apply equality constraints to the same outputs: {', '.join(conflict_info)}. "
f"These objectives conflict with each other."
)
# allowed_methods = [("scipy", "trf", "fd"),
# ("scipy", "dogbox", "fd"),
# ("scipy", "trf", "ad"),
# ("scipy", "dogbox", "ad"),
# ("scipy", "L-BFGS-B", "ad"),
# ("scipy", "TNC", "ad"),
# ("scipy", "SLSQP", "ad"),
# ("scipy", "trust-constr", "ad"),
# # ("torch", "Adadelta", "ad"), # Currently, we do not support torch optimizers
# # ("torch", "Adafactor", "ad"),
# # ("torch", "Adagrad", "ad"),
# # ("torch", "Adam", "ad"),
# # ("torch", "AdamW", "ad"),
# # ("torch", "SparseAdam", "ad"),
# # ("torch", "Adamax", "ad"),
# # ("torch", "ASGD", "ad"),
# # ("torch", "LBFGS", "ad"),
# # ("torch", "NAdam", "ad"),
# # ("torch", "RAdam", "ad"),
# # ("torch", "RMSprop", "ad"),
# # ("torch", "Rprop", "ad"),
# # ("torch", "SGD", "ad"),
# ]
# default_none_method = ("scipy", "SLSQP", "ad")
# default_methods = [("scipy", "SLSQP", "ad")]#, ("torch", "SGD", "ad")]
# default_mode = "ad" # Always choose automatic differentiation mode when ambiguous
# Call the appropriate optimization method
if method[0] == "torch":
if options is None:
options = {}
# Extract optimizer type from method tuple
optimizer_type = method[1]
options["optimizer_type"] = optimizer_type
return self._torch_solver(**options)
elif method[0] == "scipy":
if options is None:
options = {}
return self._scipy_solver(method=method, **options)
def _torch_solver(
self,
lr: float = 1.0,
iterations: int = 100,
optimizer_type: str = "SGD",
scheduler_type: str = "step",
scheduler_params: Dict = None,
):
"""
Perform optimization using PyTorch-based gradient optimization.
This method uses PyTorch's automatic differentiation to compute gradients and
applies gradient-based optimization algorithms to minimize the objective function.
It supports various optimizers and learning rate schedulers for fine-tuning
the optimization process.
Args:
lr: Learning rate for optimizer. Controls the step size in gradient descent.
Higher values may converge faster but risk overshooting, while lower
values are more stable but may converge slowly.
iterations: Number of optimization iterations. More iterations generally
lead to better convergence but take longer to compute.
optimizer_type: Type of PyTorch optimizer:
- "SGD": Stochastic Gradient Descent - simple, robust, good for most problems
- "Adam": Adaptive learning rate optimizer - often faster convergence
- "LBFGS": Limited-memory BFGS - good for smooth, well-behaved functions
scheduler_type: Type of learning rate scheduler to adjust learning rate during optimization:
- "step": Decreases learning rate by gamma every step_size iterations
- "exponential": Decreases learning rate exponentially
- "cosine": Uses cosine annealing schedule
- "reduce_on_plateau": Reduces learning rate when loss stops improving
- None: No scheduler, constant learning rate
scheduler_params: Dictionary of parameters for the chosen scheduler:
- For "step": {"step_size": int, "gamma": float}
- For "exponential": {"gamma": float}
- For "cosine": {"T_max": int, "eta_min": float}
- For "reduce_on_plateau": {"mode": str, "factor": float, "patience": int, "threshold": float}
Note:
This method automatically handles gradient computation and parameter updates.
It disables gradients for model parameters and only optimizes the decision variables.
The optimization process is logged with current learning rate and loss values.
"""
# Validate optimization parameters
assert lr > 0, f"Learning rate must be positive, got {lr}"
assert (
iterations > 0
), f"Number of iterations must be positive, got {iterations}"
# Validate scheduler type
valid_scheduler_types = [
"step",
"exponential",
"cosine",
"reduce_on_plateau",
None,
]
assert (
scheduler_type in valid_scheduler_types
), f"Invalid scheduler_type: {scheduler_type}. Must be one of {valid_scheduler_types}"
# Disable gradients for all parameters since we're optimizing inputs.
# It is VERY important to do this before initializing the model.
# Otherwise, the model parameters and state space matrices will have requires_grad=True
# and the backpropagate() call will fail.
for component in self.simulator.model.components.values():
if isinstance(component, nn.Module):
for parameter in component.parameters():
parameter.requires_grad_(False)
# Set before initializing the model
for component, output_name, *bounds in self._variables:
component.output[output_name].do_normalization = True
self.simulator.get_simulation_timesteps(
self._start_time, self._end_time, self._stepSize
)
self.simulator.model.initialize(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
simulator=self.simulator,
)
# Enable gradients only for the inputs we want to optimize
opt_list = []
for component, output_name, *bounds in self._variables:
component.output[output_name].set_requires_grad(True)
if component.output[output_name].do_normalization:
opt_list.append(component.output[output_name].normalized_history)
else:
opt_list.append(component.output[output_name].history)
if optimizer_type == "SGD":
# Initialize optimizer
self.optimizer = torch.optim.SGD(opt_list, lr=lr)
elif optimizer_type == "Adam":
self.optimizer = torch.optim.Adam(opt_list, lr=lr)
elif optimizer_type == "LBFGS":
self.optimizer = torch.optim.LBFGS(
opt_list, lr=lr, line_search_fn=None, history_size=100
)
else:
raise ValueError(
f"Invalid optimizer type: {optimizer_type}. Must be one of {['SGD', 'Adam', 'LBFGS']}"
)
# Initialize scheduler
if scheduler_params is None:
scheduler_params = {}
if scheduler_type == "step":
# StepLR decreases learning rate by gamma every step_size epochs
step_size = scheduler_params.get("step_size", 30)
gamma = scheduler_params.get("gamma", 0.1)
self.scheduler = torch.optim.lr_scheduler.StepLR(
self.optimizer, step_size=step_size, gamma=gamma
)
elif scheduler_type == "exponential":
# ExponentialLR decreases learning rate by gamma every epoch
gamma = scheduler_params.get("gamma", 0.95)
self.scheduler = torch.optim.lr_scheduler.ExponentialLR(
self.optimizer, gamma=gamma
)
elif scheduler_type == "cosine":
# CosineAnnealingLR uses a cosine schedule to decrease learning rate
T_max = scheduler_params.get("T_max", 100)
eta_min = scheduler_params.get("eta_min", 0)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer, T_max=T_max, eta_min=eta_min
)
elif scheduler_type == "reduce_on_plateau":
# ReduceLROnPlateau reduces learning rate when a metric has stopped improving
mode = scheduler_params.get("mode", "min")
factor = scheduler_params.get("factor", 0.9)
patience = scheduler_params.get("patience", 10)
threshold = scheduler_params.get("threshold", 1e-4)
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer,
mode=mode,
factor=factor,
patience=patience,
threshold=threshold,
)
else:
# Default: no scheduler
self.scheduler = None
def _get_constraint_value(component_or_value):
"""Helper function to get constraint value, handling both ScheduleSystem and scalar values"""
if isinstance(component_or_value, (int, float)):
return torch.tensor(component_or_value)
elif isinstance(component_or_value, systems.ScheduleSystem):
component_or_value.initialize(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
simulator=self.simulator,
)
return component_or_value.output["scheduleValue"].history
elif isinstance(component_or_value, torch.Tensor):
return component_or_value
else:
raise ValueError(
f"Invalid constraint value type: {type(component_or_value)}"
)
# Pre-compute all constraint values
self.equality_constraint_values = {}
if self._eq_cons is not None:
for component, output_name, desired_value in self._eq_cons:
self.equality_constraint_values[component, output_name] = (
_get_constraint_value(desired_value)
)
self.inequality_constraint_values = {}
if self._ineq_cons is not None:
for (
component,
output_name,
constraint_type,
desired_value,
) in self._ineq_cons:
self.inequality_constraint_values[
(component, output_name, constraint_type)
] = _get_constraint_value(desired_value)
for i in range(iterations):
# Perform optimization step
self.optimizer.step(self._closure)
# Update learning rate with scheduler
if self.scheduler is not None:
if isinstance(
self.scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau
):
# ReduceLROnPlateau needs the loss value
self.scheduler.step(self.loss)
else:
# Other schedulers just need to be stepped
self.scheduler.step()
# Log current learning rate
current_lr = self.optimizer.param_groups[0]["lr"]
print(f"Current learning rate: {current_lr}")
print(f"Loss at step {i}: {self.loss.detach().item()}")
def _scipy_solver(self, method: tuple = None, **options):
"""
Perform optimization using SciPy's optimization algorithms.
This method uses SciPy's optimization library to solve constrained and unconstrained
optimization problems. It supports both automatic differentiation (AD) and finite
difference (FD) modes for gradient computation. The method automatically handles
constraint formulation and bounds specification.
Args:
method: Tuple of (library, optimizer, mode) specifying the optimization method:
- library: Always "scipy" for this method
- optimizer: The specific optimization algorithm:
- "SLSQP": Sequential Least Squares Programming - preferred for most constrained problems
- "L-BFGS-B": Limited-memory BFGS with bounds - good for unconstrained or bound-constrained problems
- "TNC": Truncated Newton algorithm with bounds - efficient for large-scale problems
- "trust-constr": Trust-region constrained optimization - robust for difficult constraints
- "trf": Trust Region Reflective - specialized for least-squares problems
- "dogbox": Dogleg algorithm - alternative for least-squares problems
- mode: Differentiation mode:
- "ad": Automatic differentiation using PyTorch (recommended)
- "fd": Finite difference (not yet implemented)
**options: Additional options passed to the SciPy optimizer:
- "verbose": Verbosity level (0-3) for optimization output
- "maxiter": Maximum number of iterations
- "gtol": Gradient tolerance for convergence
- "xtol": Parameter tolerance for convergence
- "barrier_tol": Barrier tolerance for interior point methods
- "initial_tr_radius": Initial trust region radius
- "initial_constr_penalty": Initial constraint penalty
- Additional method-specific options as supported by SciPy optimizers
Note:
This method automatically handles the conversion between PyTorch tensors and
NumPy arrays required by SciPy. It uses caching to avoid redundant computations
when the same parameters are evaluated multiple times. The method supports
both equality and inequality constraints through the loss function formulation.
"""
if method is None:
method = ("scipy", "SLSQP", "ad")
for component in self.simulator.model.components.values():
if isinstance(component, nn.Module):
for parameter in component.parameters():
parameter.requires_grad_(False)
# Set before initializing the model
for component, output_name, *bounds in self._variables:
component.output[output_name].do_normalization = True
self.simulator.get_simulation_timesteps(
self._start_time, self._end_time, self._stepSize
)
self.simulator.model.initialize(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
simulator=self.simulator,
)
# Create initial guess vector
x0 = []
bounds_list = []
n_timesteps = len(self.simulator.dateTimeSteps)
# Create flattened vector of size N*M
for t in range(n_timesteps):
for component, output_name, *bounds in self._variables:
component.output[output_name].set_requires_grad(True)
if component.output[output_name].do_normalization:
x0.append(
component.output[output_name].normalized_history[t].item()
)
else:
x0.append(component.output[output_name].history[t].item())
# Set bounds (same for all timesteps for each actuator)
if len(bounds) >= 2:
lower, upper = bounds[0], bounds[1]
if component.output[output_name].do_normalization:
lower = (
component.output[output_name]
.normalize(torch.tensor(lower))
.item()
)
upper = (
component.output[output_name]
.normalize(torch.tensor(upper))
.item()
)
bounds_list.append((lower, upper))
else:
bounds_list.append((None, None))
x0 = np.array(x0)
# Create bounds object for SciPy
if all(b[0] is not None and b[1] is not None for b in bounds_list):
bounds_obj = Bounds(
[b[0] for b in bounds_list], [b[1] for b in bounds_list]
)
else:
bounds_obj = None
# Pre-compute constraint values
def _get_constraint_value(component_or_value):
"""Helper function to get constraint value, handling both ScheduleSystem and scalar values"""
if isinstance(component_or_value, (int, float)):
return torch.tensor(component_or_value)
elif isinstance(component_or_value, systems.ScheduleSystem):
component_or_value.initialize(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
simulator=self.simulator,
)
return component_or_value.output["scheduleValue"].history
elif isinstance(component_or_value, torch.Tensor):
return component_or_value
else:
raise ValueError(
f"Invalid constraint value type: {type(component_or_value)}"
)
self.equality_constraint_values = {}
if self._eq_cons is not None:
for component, output_name, desired_value in self._eq_cons:
self.equality_constraint_values[component, output_name] = (
_get_constraint_value(desired_value)
)
self.inequality_constraint_values = {}
if self._ineq_cons is not None:
for (
component,
output_name,
constraint_type,
desired_value,
) in self._ineq_cons:
self.inequality_constraint_values[
(component, output_name, constraint_type)
] = _get_constraint_value(desired_value)
# Initialize caching variables for AD
self._theta_jac = torch.nan * torch.ones_like(
torch.tensor(x0, dtype=torch.float64)
)
self._theta_hes = torch.nan * torch.ones_like(
torch.tensor(x0, dtype=torch.float64)
)
self._theta_obj = torch.nan * torch.ones_like(
torch.tensor(x0, dtype=torch.float64)
)
# Run optimization based on method
optimizer_name = method[1]
mode = method[2]
if mode == "ad":
# Use automatic differentiation
if optimizer_name in ["trf", "dogbox"]:
# These are least-squares optimizers
least_squares(
self._obj_ad,
x0,
jac=self._jac_ad,
bounds=bounds_obj,
method=optimizer_name,
**options,
)
else:
# These are general optimization algorithms
minimize(
self._obj_ad,
x0,
method=optimizer_name,
jac=self._jac_ad,
bounds=bounds_obj,
options=options,
)
else:
# Use finite difference (not implemented yet for optimizer)
raise NotImplementedError(
"Finite difference mode is not yet implemented for the optimizer. Use automatic differentiation mode."
)
def __obj_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Objective function for automatic differentiation.
Args:
theta (torch.Tensor): Flattened parameter vector of size N*M where
N = number of timesteps, M = number of actuators.
Returns:
torch.Tensor: Objective value.
"""
# Reshape theta from flattened vector (N*M) to matrix (N, M)
n_timesteps = len(self.simulator.dateTimeSteps)
n_actuators = len(self._variables)
theta_matrix = theta.reshape(n_timesteps, n_actuators)
# Update decision variables for each timestep using proper initialization
for i, (component, output_name, *bounds) in enumerate(self._variables):
# Extract values for this actuator across all timesteps
values = component.output[output_name].denormalize(theta_matrix[:, i])
# Initialize with the new values
component.output[output_name].initialize(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
simulator=self.simulator,
values=values,
force=True,
)
# Run simulation
self.simulator.simulate(
start_time=self._start_time,
end_time=self._end_time,
step_size=self._stepSize,
show_progress_bar=False,
)
# Compute loss
loss = 0
k = 100
# Handle equality constraints
if self._eq_cons is not None:
for constraint in self._eq_cons:
component, output_name, desired_value = constraint
y = component.output[output_name].history
# print(f"{component.id}.{output_name}.history.grad_fn", y.grad_fn)
desired_tensor = self.equality_constraint_values[component, output_name]
y_norm = component.output[output_name].normalize(y)
desired_tensor_norm = component.output[output_name].normalize(
desired_tensor
)
loss += torch.mean(torch.abs(y_norm - desired_tensor_norm))
# Handle inequality constraints
if self._ineq_cons is not None:
ineq_upper_term = 0
ineq_lower_term = 0
for constraint in self._ineq_cons:
component, output_name, constraint_type, desired_value = constraint
y = component.output[output_name].history
# print(f"{component.id}.{output_name}.history.grad_fn", y.grad_fn)
desired_tensor = self.inequality_constraint_values[
(component, output_name, constraint_type)
]
y_norm = component.output[output_name].normalize(y)
desired_tensor_norm = component.output[output_name].normalize(
desired_tensor
)
if constraint_type == "upper":
# Penalize when y > desired_value
constraint_violations = torch.relu(y_norm - desired_tensor_norm)
ineq_upper_term += torch.mean(k * constraint_violations)
elif constraint_type == "lower":
# Penalize when y < desired_value
constraint_violations = torch.relu(desired_tensor_norm - y_norm)
ineq_lower_term += torch.mean(k * constraint_violations)
loss += ineq_upper_term + ineq_lower_term
# Handle minimization objectives
if self._objectives is not None:
for component, output_name, objective_type in self._objectives:
y = component.output[output_name].history
y_norm = component.output[output_name].normalize(y)
if objective_type == "min":
loss += torch.mean(y_norm)
elif objective_type == "max":
loss += -torch.mean(y_norm)
self.obj = loss
return self.obj
def _obj_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Wrapper function for SciPy interface that converts numpy to torch and returns numpy.
Args:
theta (torch.Tensor): Parameter vector.
Returns:
torch.Tensor: Objective value as numpy array.
"""
theta = torch.tensor(theta, dtype=torch.float64)
if torch.equal(theta, self._theta_obj):
return self.obj.detach().numpy()
else:
self._theta_obj = theta
self.obj = self.__obj_ad(theta)
# self._hes_ad(theta) # hes calls jac which calls obj.
return self.obj.detach().numpy()
def __jac_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Compute the Jacobian matrix using automatic differentiation.
Args:
theta (torch.Tensor): Parameter vector.
Returns:
torch.Tensor: Jacobian matrix.
"""
self.jac = torch.func.jacrev(self.__obj_ad, argnums=0)(theta)
return self.jac
def _jac_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Compute the Jacobian matrix using automatic differentiation.
Args:
theta (torch.Tensor): Parameter vector.
Returns:
torch.Tensor: Jacobian matrix.
"""
theta = torch.tensor(theta, dtype=torch.float64)
if torch.equal(theta, self._theta_jac):
return self.jac.detach().numpy()
else:
self._theta_jac = theta
self.jac = self.__jac_ad(theta)
return self.jac.detach().numpy()
def __hes_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Compute the Hessian matrix using automatic differentiation.
Args:
theta (torch.Tensor): Parameter vector.
Returns:
torch.Tensor: Hessian matrix.
"""
self.hes = torch.func.jacfwd(self.__jac_ad, argnums=0)(theta)
return self.hes
def _hes_ad(self, theta: torch.Tensor) -> torch.Tensor:
"""
Compute the Hessian matrix using automatic differentiation.
Args:
theta (torch.Tensor): Parameter vector.
Returns:
torch.Tensor: Hessian matrix.
"""
theta = torch.tensor(theta, dtype=torch.float64)
if torch.equal(theta, self._theta_hes):
return self.hes.detach().numpy()
else:
self._theta_hes = theta
self.hes = self.__hes_ad(theta)
return self.hes.detach().numpy()