Source code for twin4build.model.simulation_model.simulation_model

from __future__ import annotations

# Standard library imports
import copy
import datetime
import json
import os
import pickle
import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple, Type

# Third party imports
import numpy as np
import torch
import torch.nn.parameter
from prettytable import PrettyTable
from rdflib import RDF, RDFS, Literal, Namespace

# Local application imports
import twin4build.core as core
import twin4build.estimator.estimator as estimator
import twin4build.systems as systems
import twin4build.utils.types as tps
from twin4build.model.semantic_model.semantic_model import get_short_name
from twin4build.utils.dict_utils import (
    compare_dict_structure,
    flatten_dict,
    merge_dicts,
)
from twin4build.utils.get_object_attributes import get_object_attributes
from twin4build.utils.isnumeric import isnumeric
from twin4build.utils.istype import istype
from twin4build.utils.mkdir_in_root import mkdir_in_root
from twin4build.utils.print_progress import PRINTPROGRESS, PrintProgress
from twin4build.utils.rdelattr import rdelattr
from twin4build.utils.rgetattr import rgetattr
from twin4build.utils.rhasattr import rhasattr
from twin4build.utils.rsetattr import rsetattr
from twin4build.utils.simple_cycle import simple_cycles

INVALID_ID_CHARS = ["_", "-", " ", "(", ")", "[", "]"]


[docs] class SimulationModel: r""" A simulation model for building digital twins. This class manages component collections, connections between components, cycle removal for feedback control loops, and topological sorting to determine optimal execution order for simulation. Args: id: Unique identifier for the model. dir_conf: List of directories to store model-related files. Mathematical Formulation: ========================= The simulation model preparation process involves two main steps: cycle removal and topological sorting to create an executable simulation sequence. Component Dependency Graph: --------------------------- The simulation model can be represented as a directed multigraph :math:`G = (V, E, \iota)` comprising: .. math:: V = \{c_1, c_2, ..., c_n\} .. math:: E = \{e_1, e_2, e_3, ...\} .. math:: \iota: E \rightarrow V \times V where: - :math:`V` is the set of vertices (components) - :math:`E` is the set of edge identifiers (connections between components) - :math:`\iota` is the incidence function mapping edges to vertex pairs - Each edge :math:`e_i \in E` with :math:`\iota(e_i) = (c_j, c_k)` indicates that component :math:`c_j` provides input to component :math:`c_k` - Multiple edges can map to the same vertex pair (multigraph): :math:`\iota(e_i) = \iota(e_j) = (c_p, c_q)` Optimized Cycle Removal Process: -------------------------------- To find the execution order of the components (i.e. the topologically sorted order), we need to remove cycles from the dependency graph first. Such cycles can arise in the simulation model due to different reasons, e.g. modeling of feedback control loops or mutual dependencies between components (model a requires an input from model b and model b requires an input from model a). The optimized cycle removal process uses a greedy algorithm that minimizes the total number of edges removed: 1. **Cycle Detection:** Identify the set of simple cycles :math:`\mathcal{C} = \{C_1, C_2, ..., C_m\}` in the graph where we can write one simple cycle as a sequence of edges :math:`C = ((c_1, c_2), (c_2, c_3), ..., (c_k, c_1))`, i.e. the cycle starts and ends at the same component and can't visit any other component more than once. 2. **Edge Participation Analysis:** For each edge :math:`e \in E`, count its participation in cycles: .. math:: p(e) = |\{C \in \mathcal{C} \; | \; e \in C\}| This gives the number of cycles that edge :math:`e` participates in. 3. **Greedy Edge Selection:** Select the edge that participates in the maximum number of cycles: .. math:: e^* = \underset{e \in E}{\operatorname{argmax}} \; p(e) 4. **Iterative Removal:** Remove the selected edge and repeat until no cycles remain: .. math:: E_{k+1} = E_k \setminus \{e^*_k\} where :math:`e^*_k` is the optimal edge selected at iteration :math:`k`. The process terminates when :math:`G_{final} = (V, E_{final})` is acyclic: .. math:: E_{acyclic} = E_{final}, \quad \mathcal{C}(G_{final}) = \emptyset All removed edges become required initialization connections: .. math:: E_{init} = E \setminus E_{acyclic} This means, for all :math:`(c_i, c_j) \in E_{init}`, :math:`c_j` must have initial values provided. Topological Sorting Process: ----------------------------- After cycle removal, we need to find a topological ordering of the acyclic graph :math:`G_{acyclic} = (V, E_{acyclic})`. A topological ordering is a linear arrangement of vertices :math:`L` such that for every directed edge :math:`(c_i, c_j) \in E_{acyclic}`, component :math:`c_i` appears before component :math:`c_j` in the ordering. In practical terms, this means when executing component :math:`c_j`, all components :math:`c_i` that provides inputs to :math:`c_j` must have already been executed. The goal is to determine an execution sequence: .. math:: L = (c_1, c_2, ..., c_n) And a priority level for each component: .. math:: P = (p_1, p_2, ..., p_n) where: - Each :math:`L_p` contains components that can execute at priority level :math:`p` - Components with the same priority level can execute in parallel (no dependencies between them) All of the above prepares the model for simulation and is done when the :meth:`load` method is called. Attributes: id (str): Unique identifier for the model. components (dict): Dictionary of all components in the model. _execution_order (list): Ordered list of component groups for execution. _flat_execution_order (list): Flattened list of components in execution order. _components_no_cycles (dict): Copy of components with cycles removed. _required_initialization_connections (list): Connections that require initial values. See Also: :class:`twin4build.simulator.simulator.Simulator`: Handles simulation execution using the prepared execution order References: The methodology is based on: "An Ontology-based Innovative Energy Modeling Framework for Scalable and Adaptable Building Digital Twins" by Bjørnskov & Jradi. This class implements the optimized cycle removal and topological sorting procedures. Examples: Basic model setup and preparation: >>> model = SimulationModel(id="building_model") >>> # Create components >>> schedule = tb.ScheduleSystem(id="schedule") >>> damper = tb.DamperTorchSystem(id="damper") >>> # Add components to model >>> model.add_component(schedule) >>> model.add_component(damper) >>> # Connect schedule output to damper input >>> model.add_connection(schedule, damper, "scheduleValue", "damperPosition") >>> # Apply optimized cycle removal and topological sorting during model loading >>> model.load() >>> # Model is now ready for simulation with Simulator class >>> # Execution order and cycle-free structure are prepared with minimal edge removal """ __slots__ = ( "_id", "_components", "_saved_parameters", "_custom_initial_dict", "_execution_order", "_flat_execution_order", "_required_initialization_connections", "_components_no_cycles", "_is_loaded", "_is_validated", "_result", "_validated_for_simulator", "_validated_for_estimator", "_validated_for_optimizer", "_validated_for_monitor", "_dir_conf", "_semantic_model", ) def __str__(self): t = PrettyTable( ["Number of components in simulation model: ", self.count_components()] ) t.add_row( ["Number of connections in simulation model: ", self.count_connections()], divider=True, ) title = f"Model overview id: {self._id}" t.title = title t.add_row(["", ""]) t.add_row(["", ""], divider=True) t.add_row(["id", "Class"], divider=True) unique_class_list = [] for component in self._components.values(): cls = component.__class__ if cls not in unique_class_list: unique_class_list.append(cls) unique_class_list = sorted(unique_class_list, key=lambda x: x.__name__.lower()) for cls in unique_class_list: cs = self.get_component_by_class( self._components, cls, filter=lambda v, class_: v.__class__ is class_ ) n = len(cs) for i, c in enumerate(cs): t.add_row([c.id, cls.__name__], divider=True if i == n - 1 else False) return t.get_string() def __init__(self, id: str, dir_conf: List[str] = None) -> None: """ Initialize the Model instance. Args: id: Unique identifier for the model. dir_conf: List of directories to store model-related files. Raises: AssertionError: If the id is not a string or contains invalid characters. """ self._id = id if dir_conf is None: self._dir_conf = ["generated_files", "models", self._id] else: self._dir_conf = dir_conf assert isinstance(id, str), f'Argument "id" must be of type {str(type(str))}' isvalid = np.array([x.isalnum() or x in INVALID_ID_CHARS for x in id]) np_id = np.array(list(id)) violated_characters = list(np_id[isvalid == False]) assert all( isvalid ), f"The model with id \"{id}\" has an invalid id. The characters \"{', '.join(violated_characters)}\" are not allowed." self._id = id self._components = {} self._saved_parameters = {} self._custom_initial_dict = None self._is_loaded = False self._is_validated = False self._semantic_model = core.SemanticModel( id=self._id, namespaces={ "SIM": core.namespace.SIM, "SAREF": core.namespace.SAREF, "S4BLDG": core.namespace.S4BLDG, "S4SYST": core.namespace.S4SYST, "FSO": core.namespace.FSO, }, dir_conf=self._dir_conf + ["semantic_model"], ) @property def components(self) -> dict: return self._components @property def dir_conf(self) -> List[str]: return self._dir_conf @property def execution_order(self) -> List[str]: return self._execution_order @property def flat_execution_order(self) -> List[str]: return self._flat_execution_order @dir_conf.setter def dir_conf(self, dir_conf: List[str]) -> None: assert isinstance(dir_conf, list) and all( isinstance(x, str) for x in dir_conf ), f"The set value must be of type {list} and contain strings" self._dir_conf = dir_conf
[docs] def get_dir( self, folder_list: List[str] = [], filename: Optional[str] = None ) -> Tuple[str, bool]: """ Get the directory path for storing model-related files. Args: folder_list (List[str]): List of folder names to create. filename (Optional[str]): Name of the file to create. Returns: Tuple[str, bool]: The full path to the directory or file, and a boolean indicating if the file exists. """ folder_list_ = self.dir_conf.copy() folder_list_.extend(folder_list) filename, isfile = mkdir_in_root(folder_list=folder_list_, filename=filename) return filename, isfile
[docs] def add_component( self, component: core.System, components: Dict[str, core.System] = None ) -> None: """ Add a component to the model. Args: component (core.System): The component to add. Raises: AssertionError: If the component is not an instance of core.System. """ assert isinstance( component, core.System ), f'The argument "component" must be of type {core.System.__name__}' if components is None: components = self._components if component.id not in components: components[component.id] = component else: assert ( components[component.id] == component ), f'The component with id "{component.id}" already exists in the model.' if components == self._components: self._update_literals(component)
[docs] def make_pickable(self) -> None: """ Make the model instance pickable by removing unpickable references. This method prepares the Model instance for use with multiprocessing, e.g. in the Estimator class. """ # for c in self._components.values(): # print(f"Making {c.id} pickable") # for k, input_ in c.input.items(): # print(f"Making {k} of pickable") # input_.make_pickable() # for k, output_ in c.output.items(): # print(f"Making {k} of pickable") # output_.make_pickable() self.reset_torch_tensors() fmus = self.get_component_by_class(self._components, systems.fmuSystem) for fmu in fmus: if "fmu" in get_object_attributes(fmu): del fmu.fmu del fmu.fmu_initial_state fmu.INITIALIZED = False
[docs] def reset_torch_tensors(self) -> None: """ Reset all torch.Tensor objects in the model to remove TensorWrapper references. This method iterates through all components and their attributes to find torch.Tensor objects that might contain TensorWrapper (which causes pickling issues). It creates new tensors with the same values but without gradient tracking. This is particularly useful when switching from AD (automatic differentiation) to FD (finite difference) methods in the Estimator, as AD methods create gradient-tracking tensors that cannot be pickled for multiprocessing. """ def reset_tensor(tensor): """ Reset a torch tensor if it contains TensorWrapper or has gradient tracking. Args: tensor: The tensor to check and potentially reset path: Path for debugging purposes Returns: The original tensor or a new tensor without gradient tracking """ assert isinstance( tensor, torch.Tensor ), f"The tensor must be of type {torch.Tensor.__name__}" # First handle special cases if isinstance(tensor, tps.Parameter): tensor = tps.Parameter( tensor.get(), min_value=tensor._min_value, max_value=tensor._max_value, requires_grad=False, ) elif isinstance(tensor, tps.TensorParameter): tensor = tps.TensorParameter( tensor.get(), min_value=tensor._min_value, max_value=tensor._max_value, normalized=False, ) elif isinstance(tensor, torch.Tensor): tensor = ( tensor.detach().clone().requires_grad_(False).type(torch.float64) ) return tensor def reset_object_tensors(obj, obj_path="", visited=None): """ Recursively reset tensors in an object and its attributes. Args: obj: The object to process obj_path: Path for debugging purposes visited: Set of already visited object IDs to prevent infinite recursion """ if obj is None: return # Initialize visited set if not provided if visited is None: visited = set() # Create a unique identifier for this object to prevent infinite recursion obj_id = id(obj) if obj_id in visited: return visited.add(obj_id) # print(f"Current object: {obj_path}") # Handle different types of objects if isinstance(obj, torch.Tensor): # Direct tensor - reset if needed return reset_tensor(obj) elif isinstance(obj, (list, tuple)): # Container - process each element for i, item in enumerate(obj): item_path = f"{obj_path}[{i}]" if isinstance(item, torch.Tensor): new_item = reset_tensor(item) if new_item is not item: obj[i] = new_item else: # Recursively process non-tensor items reset_object_tensors(item, item_path, visited) elif isinstance(obj, dict): # Dictionary - process each value for key, value in obj.items(): value_path = f"{obj_path}.{key}" if isinstance(value, torch.Tensor): new_value = reset_tensor(value) if new_value is not value: obj[key] = new_value else: # Recursively process non-tensor values reset_object_tensors(value, value_path, visited) elif hasattr(obj, "__dict__"): # Object with attributes - process each attribute for attr_name, attr_value in obj.__dict__.items(): attr_path = f"{obj_path}.{attr_name}" if isinstance(attr_value, torch.Tensor): new_value = reset_tensor(attr_value) if new_value is not attr_value: setattr(obj, attr_name, new_value) else: # Recursively process non-tensor attributes reset_object_tensors(attr_value, attr_path, visited) # print("Resetting torch tensors in model components...") # Process each component for comp_id, component in self._components.items(): # print(f"Processing component: {comp_id}") # Reset tensors in the component itself reset_object_tensors(component, f"component.{comp_id}")
# Reset tensors in component properties (input, output, parameters) # for prop_name in ['input', 'output', 'parameters']: # if hasattr(component, prop_name): # prop_value = getattr(component, prop_name) # reset_object_tensors(prop_value, f"component.{comp_id}.{prop_name}") # print("Torch tensor reset complete.")
[docs] def remove_component( self, component: core.System, components: Dict[str, core.System] = None ) -> None: """ Remove a component from the model. Args: component (core.System): The component to remove. """ # Connection to component for connection_point in component.connects_at.copy(): for connection in connection_point.connects_system_through.copy(): self.remove_connection( connection.connects_system, component, connection.outputPort, connection_point.inputPort, ) # Connection from component for connection in component.connected_through.copy(): for connection_point in connection.connects_system_at.copy(): self.remove_connection( component, connection_point.connection_point_of, connection.outputPort, connection_point.inputPort, ) if components is None: components = self._components del components[component.id]
[docs] def add_connection( self, sender_component: core.System, receiver_component: core.System, outputPort: str, inputPort: str, components: Dict[str, core.System] = None, ) -> None: """ Add a connection between two components in the system. Args: sender_component (core.System): The component sending the connection. receiver_component (core.System): The component receiving the connection. outputPort (str): Name of the sender property. inputPort (str): Name of the receiver property. Raises: AssertionError: If property names are invalid for the components. AssertionError: If a connection already exists. """ if components is None: components = self._components self.add_component(sender_component, components=components) self.add_component(receiver_component, components=components) l = [f"'{k}'" for k in list(sender_component.output.keys())] message = f"The property '{outputPort}' is not a valid output for the component '{sender_component.id}' of type '{type(sender_component)}'.\nThe valid output properties are:\n{' '.join(l)}" assert outputPort in ( set(sender_component.output.keys()) | set(sender_component.output.keys()) ), message # Before we joined input and output sets l = [f"'{k}'" for k in list(receiver_component.input.keys())] message = f"The property '{inputPort}' is not a valid input for the component '{receiver_component.id}' of type '{type(receiver_component)}'.\nThe valid input properties are:\n{' '.join(l)}" assert inputPort in receiver_component.input.keys(), message found_connection_point = False # Check if there already is a connectionPoint with the same receiver_property_name for receiver_component_connection_point in receiver_component.connects_at: if receiver_component_connection_point.inputPort == inputPort: found_connection_point = True break found_connection = False # Check if there already is a connection with the same sender_property_name for sender_obj_connection in sender_component.connected_through: if sender_obj_connection.outputPort == outputPort: found_connection = True break if found_connection_point and found_connection: message = f'core.Connection between "{sender_component.id}" and "{receiver_component.id}" with the properties "{outputPort}" and "{inputPort}" already exists.' assert ( receiver_component_connection_point not in sender_obj_connection.connects_system_at ), message if found_connection == False: sender_obj_connection = core.Connection( connects_system=sender_component, outputPort=outputPort ) sender_component.connected_through.append(sender_obj_connection) if found_connection_point == False: receiver_component_connection_point = core.ConnectionPoint( connection_point_of=receiver_component, inputPort=inputPort ) receiver_component.connects_at.append(receiver_component_connection_point) sender_obj_connection.connects_system_at.append( receiver_component_connection_point ) receiver_component_connection_point.connects_system_through.append( sender_obj_connection ) # if sender_obj_connection not in receiver_component_connection_point.connects_system_through else None if components == self._components: sender_component_uri = self._semantic_model.SIM.__getitem__( sender_component.id ) receiver_component_uri = self._semantic_model.SIM.__getitem__( receiver_component.id ) sender_component_class_name = sender_component.__class__.__name__ receiver_component_class_name = receiver_component.__class__.__name__ connection_uri = self._semantic_model.SIM.__getitem__( str(hash(sender_obj_connection)) ) connection_point_uri = self._semantic_model.SIM.__getitem__( str(hash(receiver_component_connection_point)) ) literal_sender_property = Literal( outputPort ) # , datatype=core.namespace.XSD.string) literal_receiver_property = Literal( inputPort ) # , datatype=core.namespace.XSD.string) # Add the class of the components to the semantic model self._semantic_model.graph.add( ( sender_component_uri, RDF.type, core.namespace.SIM.__getitem__(sender_component_class_name), ) ) self._semantic_model.graph.add( ( receiver_component_uri, RDF.type, core.namespace.SIM.__getitem__(receiver_component_class_name), ) ) self._semantic_model.graph.add( ( core.namespace.SIM.__getitem__(sender_component_class_name), RDFS.subClassOf, core.namespace.S4SYST.System, ) ) self._semantic_model.graph.add( ( core.namespace.SIM.__getitem__(receiver_component_class_name), RDFS.subClassOf, core.namespace.S4SYST.System, ) ) # Add the class of the connections and connection points to the semantic model self._semantic_model.graph.add( (connection_uri, RDF.type, core.namespace.S4SYST.Connection) ) self._semantic_model.graph.add( (connection_point_uri, RDF.type, core.namespace.S4SYST.ConnectionPoint) ) # Add the forward connection to the semantic model self._semantic_model.graph.add( ( sender_component_uri, core.namespace.S4SYST.connectedThrough, connection_uri, ) ) self._semantic_model.graph.add( ( connection_uri, core.namespace.S4SYST.connectsSystemAt, connection_point_uri, ) ) self._semantic_model.graph.add( ( connection_point_uri, core.namespace.S4SYST.connectionPointOf, receiver_component_uri, ) ) # Add the reverse connection to the semantic model self._semantic_model.graph.add( ( connection_uri, core.namespace.S4SYST.connectsSystem, sender_component_uri, ) ) self._semantic_model.graph.add( ( connection_point_uri, core.namespace.S4SYST.connectsSystemThrough, connection_uri, ) ) self._semantic_model.graph.add( ( receiver_component_uri, core.namespace.S4SYST.connectsAt, connection_point_uri, ) ) self._semantic_model.graph.add( (connection_uri, core.namespace.SIM.outputPort, literal_sender_property) ) self._semantic_model.graph.add( ( connection_point_uri, core.namespace.SIM.inputPort, literal_receiver_property, ) )
[docs] def remove_connection( self, sender_component: core.System, receiver_component: core.System, outputPort: str, inputPort: str, components: Dict[str, core.System] = None, ) -> None: """ Remove a connection between two components in the system. Args: sender_component (core.System): The component sending the connection. receiver_component (core.System): The component receiving the connection. sender_property_name (str): Name of the sender property. receiver_property_name (str): Name of the receiver property. Raises: ValueError: If the specified connection does not exist. """ if components is None: components = self._components sender_component_connection = None for connection in sender_component.connected_through: if connection.outputPort == outputPort: sender_component_connection = connection break if sender_component_connection is None: raise ValueError( f'The sender component "{sender_component.id}" does not have a connection with the property "{outputPort}"' ) receiver_component_connection_point = None for connection_point in receiver_component.connects_at: if connection_point.inputPort == inputPort: receiver_component_connection_point = connection_point break if receiver_component_connection_point is None: raise ValueError( f'The receiver component "{receiver_component.id}" does not have a connection point with the property "{inputPort}"' ) sender_component_connection.connects_system_at.remove( receiver_component_connection_point ) receiver_component_connection_point.connects_system_through.remove( sender_component_connection ) if len(sender_component_connection.connects_system_at) == 0: sender_component.connected_through.remove(sender_component_connection) sender_component_connection.connects_system = None if len(receiver_component_connection_point.connects_system_through) == 0: receiver_component.connects_at.remove(receiver_component_connection_point) receiver_component_connection_point.connection_point_of = None if components == self._components: sender_component_uri = self._semantic_model.SIM.__getitem__( sender_component.id ) receiver_component_uri = self._semantic_model.SIM.__getitem__( receiver_component.id ) connection_uri = self._semantic_model.SIM.__getitem__( str(hash(sender_component_connection)) ) # self._semantic_model.SIM.__getitem__(sender_component.id + " " + sender_property_name) connection_point_uri = self._semantic_model.SIM.__getitem__( str(hash(receiver_component_connection_point)) ) # self._semantic_model.SIM.__getitem__(receiver_component.id + " " + receiver_property_name) literal_sender_property = list( self._semantic_model.graph.objects( connection_uri, core.namespace.SIM.outputPort ) ) literal_receiver_property = list( self._semantic_model.graph.objects( connection_point_uri, core.namespace.SIM.inputPort ) ) assert ( len(literal_sender_property) == 1 ), "The connection has more than one output port." assert ( len(literal_receiver_property) == 1 ), "The connection has more than one input port." literal_sender_property = literal_sender_property[0] literal_receiver_property = literal_receiver_property[0] # Remove the connections from the semantic model self._semantic_model.graph.remove( ( connection_uri, core.namespace.S4SYST.connectsSystemAt, connection_point_uri, ) ) self._semantic_model.graph.remove( ( connection_point_uri, core.namespace.S4SYST.connectsSystemThrough, connection_uri, ) ) if len(sender_component_connection.connects_system_at) == 0: self._semantic_model.graph.remove( ( sender_component_uri, core.namespace.S4SYST.connectedThrough, connection_uri, ) ) self._semantic_model.graph.remove( ( connection_uri, core.namespace.S4SYST.connectsSystem, sender_component_uri, ) ) self._semantic_model.graph.remove( ( connection_uri, core.namespace.SIM.outputPort, literal_sender_property, ) ) if len(receiver_component_connection_point.connects_system_through) == 0: self._semantic_model.graph.remove( ( receiver_component_uri, core.namespace.S4SYST.connectsAt, connection_point_uri, ) ) self._semantic_model.graph.remove( ( connection_point_uri, core.namespace.S4SYST.connectionPointOf, receiver_component_uri, ) ) self._semantic_model.graph.remove( ( connection_point_uri, core.namespace.SIM.inputPort, literal_receiver_property, ) )
[docs] def count_components(self) -> int: return len(self._components)
[docs] def count_connections(self) -> int: return self._semantic_model.count_triples( s=None, p=core.namespace.S4SYST.connectsSystemAt, o=None )
[docs] def get_object_properties(self, object_: Any) -> Dict: """ Get all properties of an object. Args: object_ (Any): The object to get properties from. Returns: Dict: A dictionary of object properties. """ return {key: value for (key, value) in vars(object_).items()}
[docs] def get_component_by_class( self, dict_: Dict, class_: Type, filter: Optional[Callable] = None ) -> List: """ Get components of a specific class from a dictionary. Args: dict_ (Dict): The dictionary to search. class_ (Type): The class to filter by. filter (Optional[Callable]): Additional filter function. Returns: List: List of components matching the class and filter. """ if filter is None: filter = lambda v, class_: True return [ v for v in dict_.values() if (isinstance(v, class_) and filter(v, class_)) ]
[docs] def set_custom_initial_dict( self, _custom_initial_dict: Dict[str, Dict[str, Any]] ) -> None: """ Set custom initial values for components. Args: _custom_initial_dict (Dict[str, Dict[str, Any]]): Dictionary of custom initial values. Raises: AssertionError: If unknown component IDs are provided. """ np_custom_initial_dict_ids = np.array(list(_custom_initial_dict.keys())) legal_ids = np.array( [dict_id in self._components for dict_id in _custom_initial_dict] ) assert np.all( legal_ids ), f'Unknown component id(s) provided in "_custom_initial_dict": {np_custom_initial_dict_ids[legal_ids==False]}' self._custom_initial_dict = _custom_initial_dict
[docs] def set_initial_values(self, dict_: Dict[str, Any]) -> None: """ Set initial values for all components in the model. """ for component in self._components.values(): # Check that all keys in the dictionary are valid output properties for key in dict_[component.id].keys(): assert ( key in component.output ), f'Invalid output property "{key}" for component "{component.id}"' assert isinstance( dict_[component.id][key], component.output[key].__class__ ), f'Invalid type for output property "{key}" for component "{component.id}"' component.output.update(dict_[component.id])
[docs] def set_parameters_from_array( self, values: List[Any], components: List[core.System], parameter_names: List[str], normalized: List[bool] = None, overwrite: bool = False, save_original: bool = False, ) -> None: """ Set parameters for components from an array. Args: values (List[Any]): List of parameter values. component_list (List[core.System]): List of components to set parameters for. attr_list (List[str]): List of attribute names corresponding to the parameters. Raises: AssertionError: If a component doesn't have the specified attribute. """ if normalized is None: normalized = [False] * len(values) elif isinstance(normalized, bool): normalized = [normalized] * len(values) for i, (v, obj, attr, normalized_) in enumerate( zip(values, components, parameter_names, normalized) ): assert rhasattr( obj, attr ), f'The component with class "{obj.__class__.__name__}" and id "{obj.id}" has no attribute "{attr}".' if v is not None: obj_ = rgetattr(obj, attr) if isinstance( obj_, tps.Parameter ): # Only change underlying data in torch.Parameter if overwrite: if save_original: if ( obj.id not in self._saved_parameters ): # Save the original parameter if we later need to restore it self._saved_parameters[obj.id] = {} self._saved_parameters[obj.id][attr] = obj_ new_param = tps.TensorParameter( v, min_value=obj_.min_value, max_value=obj_.max_value, normalized=normalized_, ) rdelattr(obj, attr) rsetattr(obj, attr, new_param) # new_param.set(v, normalized=normalized_) else: obj_.set(v, normalized=normalized_) elif isinstance(obj_, tps.TensorParameter): obj_.set(v, normalized=normalized_) else: rsetattr(obj, attr, v)
[docs] def restore_parameters(self, keep_values: bool = True) -> None: for obj in self._saved_parameters: for attr in self._saved_parameters[obj]: old_obj = rgetattr(self._components[obj], attr) v = old_obj.get() new_obj = self._saved_parameters[obj][attr] rdelattr(self._components[obj], attr) rsetattr(self._components[obj], attr, new_obj) if keep_values: new_obj.set(v, normalized=False)
[docs] def set_parameters_from_config(self, d: dict, component: core.System): """ Recursively set parameters from a dictionary. """ for key in d.keys(): entry = d[key] cond = isinstance(entry, dict) and all( [rhasattr(component, k) for k in entry.keys()] ) if cond: self.set_parameters_from_config(entry, component) else: self.set_parameters_from_array([entry], [component], [key]) return
[docs] def cache( self, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, step_size: Optional[int] = None, simulator: Optional[core.Simulator] = None, ) -> None: """ Cache data and create folder structure for time series data. Args: start_time (Optional[datetime.datetime]): Start time for caching. end_time (Optional[datetime.datetime]): End time for caching. step_size (Optional[int]): Time step size for caching. """ c = self.get_component_by_class( self._components, ( systems.SensorSystem, systems.OutdoorEnvironmentSystem, systems.TimeSeriesInputSystem, ), ) for component in c: component.initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, )
[docs] def initialize( self, start_time: datetime.datetime, end_time: datetime.datetime, step_size: int, simulator: "core.Simulator", ) -> None: """ Initialize the model for simulation. Args: start_time (datetime.datetime): Start time for the simulation. end_time (datetime.datetime): End time for the simulation. step_size (int): Time step size for the simulation. simulator (core.Simulator): Simulator instance. """ # self.set_initial_values() self.check_for_for_missing_initial_values() for component in self._flat_execution_order: # component.clear_results() # component.initialize(start_time=start_time, # end_time=end_time, # step_size=step_size, # simulator=simulator) for v in component.input.values(): v.reset() for v in component.output.values(): v.reset() # Make the inputs and outputs aware of the execution order. # This is important to ensure that input tps.Vectors have the same order, allowing for instance element-wise operations. for i, connection_point in enumerate(component.connects_at): for j, connection in enumerate( connection_point.connects_system_through ): connected_component = connection.connects_system if isinstance( component.input[connection_point.inputPort], tps.Vector ): if ( component, connected_component, connection.outputPort, connection_point.inputPort, ) in self._translator.E_conn_to_sp_group: sp, groups = self._translator.E_conn_to_sp_group[ ( component, connected_component, connection.outputPort, connection_point.inputPort, ) ] # Find the group of the connected component modeled_match_nodes_ = self._translator.sim2sem_map[ connected_component ] groups_matched = [ g for g in groups if len( modeled_match_nodes_.intersection(set(g.values())) ) > 0 ] assert ( len(groups_matched) == 1 ), "Only one group is allowed for each component." group = groups_matched[0] group_id = id(group) component.input[connection_point.inputPort].update( group_id=group_id ) else: component.input[connection_point.inputPort].update() component.initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, )
[docs] def validate(self) -> None: """ Validate the model by checking IDs and connections. """ PRINTPROGRESS.add_level() ( validated_for_simulator1, validated_for_estimator1, validated_for_optimizer1, ) = self.validate_components() ( validated_for_simulator2, validated_for_estimator2, validated_for_optimizer2, ) = self.validate_ids() ( validated_for_simulator3, validated_for_estimator3, validated_for_optimizer3, ) = self.validate_connections() self._validated_for_simulator = ( validated_for_simulator1 and validated_for_simulator2 and validated_for_simulator3 ) self._validated_for_estimator = ( validated_for_estimator1 and validated_for_estimator2 and validated_for_estimator3 ) self._validated_for_optimizer = ( validated_for_optimizer1 and validated_for_optimizer2 and validated_for_optimizer3 ) self._is_validated = ( self._validated_for_simulator and self._validated_for_estimator and self._validated_for_optimizer ) PRINTPROGRESS( "Validated for Simulator", status="[OK]" if self._validated_for_simulator else "[FAILED]", ) PRINTPROGRESS( "Validated for Estimator", status="[OK]" if self._validated_for_estimator else "[FAILED]", ) PRINTPROGRESS( "Validated for Optimizer", status="[OK]" if self._validated_for_optimizer else "[FAILED]", ) PRINTPROGRESS.remove_level()
# assert validated, "The model is not valid. See the warnings above."
[docs] def validate_components(self) -> None: """ Validate the parameters of all components in the model. Raises: AssertionError: If any component has invalid parameters. """ component_instances = list(self._components.values()) _validated_for_simulator = True _validated_for_estimator = True _validated_for_optimizer = True for component in component_instances: if hasattr(component, "validate"): # Check if component has validate method ( validated_for_simulator_, validated_for_estimator_, validated_for_optimizer_, ) = component.validate(PRINTPROGRESS) _validated_for_simulator = ( _validated_for_simulator and validated_for_simulator_ ) _validated_for_estimator = ( _validated_for_estimator and validated_for_estimator_ ) _validated_for_optimizer = ( _validated_for_optimizer and validated_for_optimizer_ ) else: # Validate parameters config = component.config.copy() parameters = { attr: rgetattr(component, attr) for attr in config["parameters"] } is_none = [k for k, v in parameters.items() if v is None] if any(is_none): message = f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: Missing values for the following parameter(s) to enable use of Simulator, and Optimizer:" PRINTPROGRESS(message, plain=True, status="[WARNING]") PRINTPROGRESS.add_level() for par in is_none: PRINTPROGRESS(par, plain=True, status="") PRINTPROGRESS.remove_level() _validated_for_simulator = False _validated_for_optimizer = False # Validate model definitions for input in component.input.values(): assert isinstance( input, (tps.Scalar, tps.Vector) ), "Only vectors and scalars can be used as input to components" for output in component.output.values(): assert isinstance( output, (tps.Scalar, tps.Vector) ), "Only vectors and scalars can be used as output from components" if len(component.connects_at) == 0: for key in component.output.keys(): output = component.output[key] if isinstance( output, tps.Scalar ): # TODO: Add support for vectors if output.is_leaf == False: message = f'|CLASS: {component.__class__.__name__}|ID: {component.id}|: The output "{key}" is not a leaf scalar. Only leaf scalars can be used as output from components with no inputs.' PRINTPROGRESS(message, plain=True, status="[WARNING]") _validated_for_optimizer = False # assert output.is_leaf, f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: The output \"{key}\" is not a leaf scalar. Only leaf scalars can be used as output from components with no inputs." else: for key in component.output.keys(): output = component.output[key] if isinstance( output, tps.Scalar ): # TODO: Add support for vectors if output.is_leaf: message = f'|CLASS: {component.__class__.__name__}|ID: {component.id}|: The output "{key}" is a leaf scalar. Only non-leaf scalars can be used as output from components with inputs.' PRINTPROGRESS(message, plain=True, status="[WARNING]") _validated_for_optimizer = False # assert output.is_leaf==False, f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: The output \"{key}\" is a leaf scalar. Only non-leaf scalars can be used as output from components with inputs." return ( _validated_for_simulator, _validated_for_estimator, _validated_for_optimizer, )
[docs] def validate_ids(self) -> None: """ Validate the IDs of all components in the model. Raises: AssertionError: If any component has an invalid ID. """ validated = True component_instances = list(self._components.values()) for component in component_instances: isvalid = np.array( [x.isalnum() or x in INVALID_ID_CHARS for x in component.id] ) np_id = np.array(list(component.id)) violated_characters = list(np_id[isvalid == False]) if not all(isvalid): message = f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: Invalid id. The characters \"{', '.join(violated_characters)}\" are not allowed." PRINTPROGRESS(message) validated = False return (validated, validated, validated)
[docs] def validate_connections(self) -> None: """ Validate the connections between components in the model. Raises: AssertionError: If any required connections are missing. """ component_instances = list(self._components.values()) validated = True for component in component_instances: if hasattr( component, "validate_connections" ): # Check if component has validate method validated = component.validate_connections(PRINTPROGRESS) else: if ( len(component.connected_through) == 0 and len(component.connects_at) == 0 ): message = f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: The component is not connected to any other components." PRINTPROGRESS(message, plain=True, status="[WARNING]") input_labels = [cp.inputPort for cp in component.connects_at] first_input = True for req_input_label in component.input.keys(): if ( req_input_label not in input_labels and component.input[req_input_label].optional == False ): if first_input: message = f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: Missing connections for the following input(s) to enable use of Simulator, Estimator, and Optimizer:" PRINTPROGRESS(message, plain=True, status="[WARNING]") first_input = False PRINTPROGRESS.add_level() PRINTPROGRESS(req_input_label, plain=True) validated = False if first_input == False: PRINTPROGRESS.remove_level() return (validated, validated, validated)
def _load_parameters(self, force_config_overwrite: bool = False) -> None: """ Load parameters for all components from configuration files. Args: force_config_overwrite (bool): If True, all parameters are read from the config file. If False, only the parameters that are None are read from the config file. If you want to use the fcn function to set the parameters, you should set force_config_overwrite to False to avoid it being overwritten. """ PRINTPROGRESS.add_level() for component in self._components.values(): assert hasattr( component, "config" ), f'The class "{component.__class__.__name__}" has no "config" attribute.' config_ = component.populate_config() # assert "parameters" in config_, f"The \"config\" attribute of class \"{component.__class__.__name__}\" has no \"parameters\" key." filename, isfile = self.get_dir( folder_list=["model_parameters", component.__class__.__name__], filename=f"{component.id}.json", ) if isfile == False: with open(filename, "w") as f: json.dump(config_, f, indent=4) else: with open(filename) as f: config = json.load(f) comparison_result = compare_dict_structure(config_, config) if not comparison_result["structures_match"]: message = f"|CLASS: {component.__class__.__name__}|ID: {component.id}|: Config structure mismatch." PRINTPROGRESS(message, plain=True, status="[WARNING]") PRINTPROGRESS.add_level() if comparison_result["missing_in_1"]: missing_msg = f"File config has unused parameters: {', '.join(sorted(comparison_result['missing_in_1']))}" PRINTPROGRESS(missing_msg, plain=True, status="[WARNING]") if comparison_result["missing_in_2"]: missing_msg = f"File config is missing the following parameters: {', '.join(sorted(comparison_result['missing_in_2']))}" PRINTPROGRESS(missing_msg, plain=True, status="[WARNING]") PRINTPROGRESS.remove_level() if force_config_overwrite: config_ = merge_dicts(config_, config, prioritize="dict2") else: config_ = merge_dicts( config_, config, prioritize="dict1" ) # Prioritize config_ over config to allow user to change stuff in the fcn function (programatically) self.set_parameters_from_config(config_, component) with open(filename, "w") as f: json.dump(config_, f, indent=4) PRINTPROGRESS.remove_level()
[docs] def load( self, rdf_file: Optional[str] = None, fcn: Optional[Callable] = None, verbose: bool = False, validate_model: bool = True, force_config_overwrite: bool = False, ) -> None: """ Load and set up the model for simulation. Args: rdf_file (Optional[str]): Path to a serialized model. fcn (Optional[Callable]): Custom function to be applied during model loading. verbose (bool): Whether to print verbose output during loading. validate_model (bool): Whether to perform model validation. force_config_overwrite (bool): If True, all parameters are read from the config file. If False, only the parameters that are None are read from the config file. If you want to use the fcn function to set the parameters, you should set force_config_overwrite to False to avoid it being overwritten. """ if verbose: self._load( rdf_file=rdf_file, fcn=fcn, validate_model=validate_model, force_config_overwrite=force_config_overwrite, verbose=verbose, ) else: with warnings.catch_warnings(): warnings.simplefilter("ignore") self._load( rdf_file=rdf_file, fcn=fcn, validate_model=validate_model, force_config_overwrite=force_config_overwrite, verbose=verbose, )
def _load( self, rdf_file: Optional[str] = None, fcn: Optional[Callable] = None, verbose: bool = False, validate_model: bool = True, force_config_overwrite: bool = False, ) -> None: """ Internal method to load and set up the model for simulation. This method is called by load and performs the actual loading process. Args: fcn (Optional[Callable]): Custom function to be applied during model loading. validate_model (bool): Whether to perform model validation. """ if self._is_loaded: warnings.warn("The simulation model is already loaded. Reloading.") self.reset() self._is_loaded = True PRINTPROGRESS("Loading simulation model") PRINTPROGRESS.add_level() if rdf_file is not None: PRINTPROGRESS("Loading model from RDF file") self._load_model_from_rdf(rdf_file) if fcn is not None: assert callable( fcn ), "The function to be applied during model loading is not callable." PRINTPROGRESS("Applying user defined function") fcn(self) PRINTPROGRESS("Removing cycles") self._get_components_no_cycles() PRINTPROGRESS("Determining execution order") self._get_execution_order() PRINTPROGRESS("Loading parameters") self._load_parameters(force_config_overwrite=force_config_overwrite) if validate_model: PRINTPROGRESS("Validating model") self.validate() PRINTPROGRESS.remove_level() if verbose: print(self)
[docs] def set_save_simulation_result(self, flag: bool = True, c: list = None): assert isinstance(flag, bool), "The flag must be a boolean." if c is not None: assert isinstance(c, list), "The c must be a list." for component in c: for input_key in component.input.keys(): if isinstance(component.input[input_key], tps.Scalar): component.input[input_key].log_history = flag for output_key in component.output.keys(): if isinstance(component.output[output_key], tps.Scalar): component.output[output_key].log_history = flag else: for component in self._components.values(): for input_key in component.input.keys(): if isinstance(component.input[input_key], tps.Scalar): component.input[input_key].log_history = flag for output_key in component.output.keys(): if isinstance(component.output[output_key], tps.Scalar): component.output[output_key].log_history = flag
[docs] def reset(self) -> None: """ Reset the model to its initial state. """ # Reset all the dictionaries and lists # self._components = {} ### self._custom_initial_dict = None ### self._execution_order = [] ### self._flat_execution_order = [] ### self._required_initialization_connections = [] ### self._components_no_cycles = {} ### self._saved_parameters = {} ### # Reset the loaded state self._is_loaded = False ### self._is_validated = False ### # Reset any estimation results self._result = None ###
[docs] def get_simple_graph(self, components) -> Dict: """ Get a simple graph representation of the system graph. This is a simplified version of the system graph that drops information about edge labels (Connection and ConnectionPoint pairs). Returns: Dict: The simple graph representation. """ simple_graph = {c: set() for c in components.values()} for component in components.values(): for connection in component.connected_through: for connection_point in connection.connects_system_at: receiver_component = connection_point.connection_point_of # If node component has multiple edges to node receiver_component, we will only add one edge to the simple graph (simple_graph[component] is a set). # Later if this is part of a cycle, we will have to remove all edges between component and receiver_component. simple_graph[component].add(receiver_component) return simple_graph
[docs] def get_simple_cycles(self, components: Dict) -> List[List[core.System]]: """ Get the simple cycles in the system graph. Args: components (Dict): Dictionary of components. Returns: List[List[core.System]]: List of simple cycles. """ G = self.get_simple_graph(components) cycles = simple_cycles(G) return cycles
def _copy_components(self) -> core.System: """ Copy the components of the model. """ _new_components = {} new_to_old_mapping = {} old_to_new_mapping = {} for component in self._components.values(): if component not in old_to_new_mapping: new_component = copy.copy(component) new_component.connected_through = [] new_component.connects_at = [] new_to_old_mapping[new_component] = component old_to_new_mapping[component] = new_component else: new_component = old_to_new_mapping[component] for connection in component.connected_through: for connection_point in connection.connects_system_at: connected_component = connection_point.connection_point_of if connected_component not in old_to_new_mapping: new_connected_component = copy.copy(connected_component) new_connected_component.connected_through = [] new_connected_component.connects_at = [] new_to_old_mapping[new_connected_component] = ( connected_component ) old_to_new_mapping[connected_component] = ( new_connected_component ) else: new_connected_component = old_to_new_mapping[ connected_component ] self.add_connection( new_component, new_connected_component, connection.outputPort, connection_point.inputPort, components=_new_components, ) # _new_components = {k: old_to_new_mapping[v] for k, v in self._components.items()} return _new_components def _get_components_no_cycles(self) -> None: """ Create a dictionary of components without cycles using an improved algorithm that minimizes the number of edges removed. """ self._components_no_cycles = self._copy_components() self._required_initialization_connections = [] # Use the improved cycle removal algorithm self._remove_cycles() def _remove_cycles(self) -> None: """ Remove cycles using an improved algorithm that minimizes edge removal. This algorithm uses multiple strategies: 1. Finds all simple cycles in the simplified graph (once) 2. Counts how many cycles each component-to-component edge participates in 3. Greedily removes edges that break the most cycles 4. Updates cycle list incrementally instead of recalculating 5. Repeats until no cycles remain Note: When an edge (c_from -> c_to) is selected for removal, ALL connections between those components are removed, as per the existing architecture. """ iteration = 0 max_iterations = 1000 # Safety limit to prevent infinite loops # Calculate all cycles once at the beginning cycles = list(self.get_simple_cycles(self._components_no_cycles)) if not cycles: return # No cycles to remove while iteration < max_iterations and cycles: iteration += 1 # Count edge participation in remaining cycles edge_cycle_count = {} for cycle in cycles: for i in range(len(cycle)): c_from = cycle[i] c_to = cycle[ (i + 1) % len(cycle) ] # Next component in cycle (wraps around) # Use simplified edge representation (just component pair) edge_key = (c_from, c_to) if edge_key not in edge_cycle_count: edge_cycle_count[edge_key] = 0 edge_cycle_count[edge_key] += 1 if not edge_cycle_count: break # Find the best edge to remove using multiple criteria best_edge = self._select_best_edge_to_remove(edge_cycle_count) # Remove ALL connections between the selected components c_from, c_to = best_edge self._remove_all_edges_between_components(c_from, c_to) # Update cycles list by removing cycles that contained the removed edge cycles = self._update_cycles_after_edge_removal(cycles, best_edge) if iteration >= max_iterations: print( f"Warning: Cycle removal reached maximum iterations ({max_iterations}). " "There might be remaining cycles." ) def _update_cycles_after_edge_removal(self, cycles, removed_edge): """ Update the cycles list after removing an edge, avoiding full recalculation. Args: cycles: Current list of cycles removed_edge: The edge (c_from, c_to) that was removed Returns: Updated list of cycles with broken cycles removed """ c_from, c_to = removed_edge updated_cycles = [] for cycle in cycles: # Check if this cycle contains the removed edge cycle_broken = False for i in range(len(cycle)): cycle_c_from = cycle[i] cycle_c_to = cycle[(i + 1) % len(cycle)] if cycle_c_from == c_from and cycle_c_to == c_to: cycle_broken = True break # Only keep cycles that don't contain the removed edge if not cycle_broken: updated_cycles.append(cycle) return updated_cycles def _select_best_edge_to_remove(self, edge_cycle_count): """ Select the best edge to remove using multiple criteria. Priority order: 1. Edges that participate in the most cycles 2. Among ties, prefer edges from components with more outgoing connections Args: edge_cycle_count: Dictionary mapping (c_from, c_to) tuples to cycle participation count Returns: The best edge tuple (c_from, c_to) to remove """ # Group edges by cycle participation count (descending) max_cycle_count = max(edge_cycle_count.values()) best_edges = [ edge for edge, count in edge_cycle_count.items() if count == max_cycle_count ] # If multiple edges have the same max count, apply additional criteria if len(best_edges) > 1: # Prefer edges from components with more outgoing connections def edge_priority(edge): c_from, c_to = edge # Higher number of outgoing connections = higher priority for removal outgoing_count = len(c_from.connected_through) return outgoing_count best_edges.sort(key=edge_priority, reverse=True) return best_edges[0] def _remove_all_edges_between_components(self, c_from, c_to): """ Remove ALL connections between two components. This aligns with the existing architecture where the simplified graph collapses multiple edges into one, so removing an edge means removing all connections between those components. Args: c_from: Source component c_to: Target component """ # Find and remove all connections from c_from to c_to connections_to_remove = [] for connection in c_from.connected_through: for connection_point in connection.connects_system_at: if c_to == connection_point.connection_point_of: connections_to_remove.append((connection, connection_point)) # Remove the identified connections for connection, connection_point in connections_to_remove: connection.connects_system_at.remove(connection_point) connection_point.connects_system_through.remove(connection) self._required_initialization_connections.append(connection) # Clean up empty connection point if len(connection_point.connects_system_through) == 0: c_to.connects_at.remove(connection_point) # Clean up empty connection if len(connection.connects_system_at) == 0: c_from.connected_through.remove(connection)
[docs] def load_estimation_result( self, filename: Optional[str] = None, result: Optional[Dict] = None ) -> None: """ Load a chain log from a file or dictionary. Args: filename (Optional[str]): The filename to load the chain log from. result (Optional[Dict]): The chain log dictionary to load. Raises: AssertionError: If invalid arguments are provided. """ if result is not None: assert isinstance(result, dict), "Argument d must be a dictionary" cls_ = result.__class__ self._result = cls_() for key, value in result.items(): if "chain." not in key: self._result[key] = copy.deepcopy(value) else: self._result[key] = value else: assert isinstance(filename, str), "Argument filename must be a string" _, ext = os.path.splitext(filename) if ext == ".pickle": with open(filename, "rb") as handle: self._result = pickle.load(handle) elif ext == ".npz": if "_ls.npz" in filename: d = dict(np.load(filename, allow_pickle=True)) d = { k.replace(".", "_"): v for k, v in d.items() } # For backwards compatibility self._result = estimator.EstimationResult(**d) elif "_mcmc.npz" in filename: d = dict(np.load(filename, allow_pickle=True)) d = { k.replace(".", "_"): v for k, v in d.items() } # For backwards compatibility self._result = estimator.EstimationResult(**d) else: raise Exception( 'The estimation result file is not of a supported type. The file must be a .pickle, .npz file with the name containing "_ls" or "_mcmc".' ) for key, value in self._result.items(): self._result[key] = ( 1 / self._result["chain_betas"] if key == "chain_T" else value ) if self._result[key].size == 1 and ( len(self._result[key].shape) == 0 or len(self._result[key].shape) == 1 ): self._result[key] = value.tolist() elif ( key == "startTime_train" or key == "endTime_train" or key == "stepSize_train" ): self._result[key] = value.tolist() else: raise Exception( f"The estimation result is of type {type(self._result)}. This type is not supported by the model class." ) if isinstance(self._result, estimator.EstimationResult): theta = self._result["result_x"] else: raise Exception( f"The estimation result is of type {type(self._result)}. This type is not supported by the model class." ) flat_components = [ self._components[com_id] for com_id in self._result["component_id"] ] flat_attr_list = self._result["component_attr"] theta_mask = self._result["theta_mask"] theta = theta[theta_mask] self.set_parameters_from_array(theta, flat_components, flat_attr_list)
[docs] def check_for_for_missing_initial_values(self) -> None: """ Check for missing initial values in components. Raises: Exception: If any component is missing an initial value. """ for connection in self._required_initialization_connections: component = connection.connects_system if connection.outputPort not in component.output: raise Exception( f'The component with id: "{component.id}" and class: "{component.__class__.__name__}" is missing an initial value for the output: {connection.outputPort}' ) elif component.output[connection.outputPort].get() is None: raise Exception( f'The component with id: "{component.id}" and class: "{component.__class__.__name__}" is missing an initial value for the output: {connection.outputPort}' )
def _get_execution_order(self) -> None: """ Determine the execution order of components. Raises: AssertionError: If cycles are detected in the model. """ def _flatten(_list: List) -> List: """ Flatten a nested list. Args: _list (List): The nested list to flatten. Returns: List: The flattened list. """ return [item for sublist in _list for item in sublist] def _traverse(self, activeComponents) -> None: """ Traverse the component graph to determine execution order. """ activeComponentsNew = [] component_group = [] for component in activeComponents: component_group.append(component) for connection in component.connected_through: for connection_point in connection.connects_system_at: # connection_point = connection.connects_system_at receiver_component = connection_point.connection_point_of connection_point.connects_system_through.remove(connection) if len(connection_point.connects_system_through) == 0: receiver_component.connects_at.remove(connection_point) if len(receiver_component.connects_at) == 0: activeComponentsNew.append(receiver_component) activeComponents = activeComponentsNew self._execution_order.append(component_group) return activeComponents initComponents = [ v for v in self._components_no_cycles.values() if len(v.connects_at) == 0 ] activeComponents = initComponents self._execution_order = [] while len(activeComponents) > 0: activeComponents = _traverse(self, activeComponents) # Map the execution order from the no cycles component dictionary to the full component dictionary. self._execution_order = [ [self._components[component.id] for component in component_group] for component_group in self._execution_order ] # Map required initialization connections from the no cycles component dictionary to the full component dictionary. self._required_initialization_connections = [ connection for no_cycle_connection in self._required_initialization_connections for connection in self._components[ no_cycle_connection.connects_system.id ].connected_through if connection.outputPort == no_cycle_connection.outputPort ] self._flat_execution_order = _flatten(self._execution_order) assert len(self._flat_execution_order) == len( self._components_no_cycles ), 'Cycles detected in the model. Inspect the generated file "system_graph.png" to see where.' def _update_literals(self, component: core.System = None) -> None: """ Update the literals in the semantic model. """ def _update_literals_for_component(component: core.System) -> None: component_uri = self._semantic_model.SIM.__getitem__(component.id) for key, value in flatten_dict(component.populate_config(), component): if isinstance(value, dict): value_ = json.dumps(value) datatype = core.namespace.RDF.JSON else: value_ = value datatype = None # Check if the property is already in the semantic model literal_property = list( self._semantic_model.graph.objects( component_uri, core.namespace.SIM.__getitem__(key) ) ) if len(literal_property) == 0: # No literal in the semantic model. # Add the literal to the semantic model. literal_property = Literal(value_, datatype=datatype) self._semantic_model.graph.add( ( component_uri, core.namespace.SIM.__getitem__(key), literal_property, ) ) elif len(literal_property) == 1: # There is one literal in the semantic model. literal_property = literal_property[0] # Remove the literal from the semantic model. self._semantic_model.graph.remove( ( component_uri, core.namespace.SIM.__getitem__(key), literal_property, ) ) # Add the new literal to the semantic model. literal_property = Literal(value_, datatype=datatype) self._semantic_model.graph.add( ( component_uri, core.namespace.SIM.__getitem__(key), literal_property, ) ) else: # There are more than one literal in the semantic model. raise Exception( f'The component with id: "{component.id}" has more than one output port.' ) if component is None: for component in self._components.values(): _update_literals_for_component(component) else: _update_literals_for_component(component)
[docs] def serialize(self): """ Serialize the simulation model. """ self._update_literals() self._semantic_model.serialize()
[docs] def visualize(self, query: str = None, literals: bool = True) -> None: """ Visualize the simulation model. """ self._update_literals() if query is None: if literals: query = None else: query = """ CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o . FILTER (?p = s4syst:connectsSystemAt || ?p = s4syst:connectedThrough || ?p = s4syst:connectionPointOf || ?p = sim:inputPort || ?p = sim:outputPort) } """ self._semantic_model.visualize(query)
def _load_model_from_rdf(self, rdf_file: str) -> None: """ Load a complete model (components and connections) from an RDF file. This method reads the RDF file and reconstructs both components and their connections. Args: rdf_file (str): Path to the RDF file to load from """ self._semantic_model = core.SemanticModel( id=self._id, rdf_file=rdf_file, namespaces={"SIM": core.namespace.SIM, "S4SYST": core.namespace.S4SYST}, dir_conf=self._dir_conf + ["semantic_model"], ) # Instantiate components with their attributes for sm_instance in self._semantic_model.get_instances_of_type( core.namespace.S4SYST.System ): t = [t for t in sm_instance.type if t.has_subclasses() == False][0] class_name = t.get_short_name() cls = getattr(systems, class_name) attributes = {} for pred, obj in sm_instance.get_predicate_object_pairs().items(): for obj_ in obj: if obj_.is_literal: literal_value = obj_.uri.value attributes[ get_short_name(pred, self._semantic_model.namespaces) ] = literal_value component = cls(id=sm_instance.get_short_name(), **attributes) # Check if the component already exists self.add_component(component) # Go through all the connections (from - to) and add them to the simulation model for sm_instance in self._semantic_model.get_instances_of_type( core.namespace.S4SYST.System ): component = self._components[sm_instance.get_short_name()] predicate_object_pairs = sm_instance.get_predicate_object_pairs() if ( core.namespace.S4SYST.connectedThrough in predicate_object_pairs ): # You can have a System without connections so we need to check connections = predicate_object_pairs[ core.namespace.S4SYST.connectedThrough ] for connection in connections: predicate_object_pairs_connection = ( connection.get_predicate_object_pairs() ) outputPort = predicate_object_pairs_connection[ core.namespace.SIM.outputPort ][ 0 ].uri.value # There can only be one output port per connection connection_points = predicate_object_pairs_connection[ core.namespace.S4SYST.connectsSystemAt ] for connection_point in connection_points: predicate_object_pairs_connection_point = ( connection_point.get_predicate_object_pairs() ) receiver_component = predicate_object_pairs_connection_point[ core.namespace.S4SYST.connectionPointOf ][ 0 ] # There can only be one connection point per connection inputPort = predicate_object_pairs_connection_point[ core.namespace.SIM.inputPort ][ 0 ].uri.value # There can only be one input port per connection point receiver_component_id = receiver_component.get_short_name() receiver_component = self._components[receiver_component_id] self.add_connection( sender_component=component, receiver_component=receiver_component, outputPort=outputPort, inputPort=inputPort, )
# def test(): # m = SimulationModel(id="testm") # c1 = systems.ScheduleSystem(id="sch") # c2 = systems.SpaceHeaterTorchSystem(id="sh") # m.add_connection(c1, c2, "scheduleValue", "indoorTemperature") # if __name__ == "__main__": # test()