Source code for twin4build.systems.building_space.building_space_torch_system

# Standard library imports
import datetime
from typing import Optional

# Third party imports
import torch
import torch.nn as nn

# Local application imports
import twin4build.core as core
import twin4build.utils.types as tps
from twin4build.systems.building_space.building_space_mass_torch_system import (
    BuildingSpaceMassTorchSystem,
)
from twin4build.systems.building_space.building_space_thermal_torch_system import (
    BuildingSpaceThermalTorchSystem,
)
from twin4build.translator.translator import (
    Exact,
    MultiPath,
    Node,
    SignaturePattern,
    SinglePath,
)


[docs] def get_signature_pattern(): """ Get the signature pattern of the FMU component. Returns: SignaturePattern: The signature pattern of the FMU component. """ node0 = Node(cls=core.namespace.S4BLDG.Damper) # supply damper node1 = Node(cls=core.namespace.S4BLDG.Damper) # return damper node2 = Node(cls=core.namespace.S4BLDG.BuildingSpace) node4 = Node(cls=core.namespace.S4BLDG.SpaceHeater) node5 = Node(cls=core.namespace.S4BLDG.Schedule) node6 = Node(cls=core.namespace.S4BLDG.OutdoorEnvironment) node7 = Node( cls=( core.namespace.S4BLDG.Coil, core.namespace.S4BLDG.AirToAirHeatRecovery, core.namespace.S4BLDG.Fan, ) ) node9 = Node(cls=core.namespace.S4BLDG.BuildingSpace) sp = SignaturePattern( semantic_model_=core.ontologies, id="building_space_signature_pattern", ) sp.add_triple( Exact(subject=node0, object=node2, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact( subject=node1, object=node2, predicate=core.namespace.FSO.hasFluidReturnedBy ) ) sp.add_triple( Exact( subject=node4, object=node2, predicate=core.namespace.S4BLDG.isContainedIn ) ) sp.add_triple( Exact(subject=node2, object=node5, predicate=core.namespace.SAREF.hasProfile) ) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.S4SYST.connectedTo) ) sp.add_triple( SinglePath( subject=node0, object=node7, predicate=core.namespace.FSO.hasFluidSuppliedBy ) ) # sp.add_triple(MultiPath(subject=node9, object=node2, predicate=core.namespace.S4SYST.connectedTo)) # TODO: Makes _prune_recursive fail, infinite recursion sp.add_input("supplyAirFlowRate", node0, "airFlowRate") sp.add_input("exhaustAirFlowRate", node1, "airFlowRate") sp.add_input("heatGain", node4, "Power") sp.add_input("numberOfPeople", node5, "scheduleValue") sp.add_input("outdoorTemperature", node6, "outdoorTemperature") sp.add_input("outdoorCO2", node6, "outdoorCo2Concentration") sp.add_input("globalIrradiation", node6, "globalIrradiation") sp.add_input( "supplyAirTemperature", node7, ("outletAirTemperature", "primaryTemperatureOut", "outletAirTemperature"), ) sp.add_input("adjacentZoneTemperature", node9, "indoorTemperature") sp.add_modeled_node(node2) return sp
[docs] def get_signature_pattern_brick(): """ Get the BRICK-only signature pattern of the building space component. Returns: SignaturePattern: The BRICK-only signature pattern of the building space component. """ node0 = Node(cls=core.namespace.BRICK.VAV) # supply damper # node1 = Node(cls=core.namespace.BRICK.VAV) #return damper node2 = Node(cls=core.namespace.BRICK.HVAC_Zone) # building space/room node6 = Node( cls=core.namespace.BRICK.Outside_Air_Temperature_Sensor ) # outdoor temperature sensor node7 = Node(cls=core.namespace.BRICK.AHU) # supply equipment (composite AHU) # node9 = Node(cls=core.namespace.BRICK.Room) #adjacent room sp = SignaturePattern( semantic_model_=core.ontologies, id="building_space_signature_pattern_brick", ) sp.add_triple( Exact(subject=node0, object=node2, predicate=core.namespace.BRICK.feeds) ) # sp.add_triple(Exact(subject=node1, object=node2, predicate=core.namespace.BRICK.isFedBy)) sp.add_triple( Exact(subject=node7, object=node6, predicate=core.namespace.BRICK.hasPoint) ) sp.add_triple( SinglePath(subject=node0, object=node7, predicate=core.namespace.BRICK.isFedBy) ) # sp.add_triple(MultiPath(subject=node9, object=node2, predicate=core.namespace.BRICK.isAdjacentTo)) # TODO: Makes _prune_recursive fail, infinite recursion # Optional # heatGain # numberOfPeople sp.add_input("supplyAirFlowRate", node0, "airFlowRate") # sp.add_input("exhaustAirFlowRate", node1, "airFlowRate") # sp.add_input("numberOfPeople", node5, "measuredValue") sp.add_input("outdoorTemperature", node6, "measuredValue") # sp.add_input("outdoorCO2", node6, "outdoorCo2Concentration") sp.add_input("globalIrradiation", node6, "globalIrradiation") sp.add_input( "supplyAirTemperature", node7, ("outletAirTemperature", "primaryTemperatureOut", "outletAirTemperature"), ) # sp.add_input("adjacentZoneTemperature", node9, "indoorTemperature") sp.add_modeled_node(node2) return sp
[docs] def get_signature_pattern_sensor(): """ Get the signature pattern of the FMU component. Returns: SignaturePattern: The signature pattern of the FMU component. """ node0 = Node(cls=core.namespace.S4BLDG.Damper) # supply damper node1 = Node(cls=core.namespace.S4BLDG.Damper) # return damper node2 = Node(cls=core.namespace.S4BLDG.BuildingSpace) node4 = Node(cls=core.namespace.S4BLDG.SpaceHeater) node5 = Node(cls=core.namespace.S4BLDG.Schedule) # return valve node6 = Node(cls=core.namespace.S4BLDG.OutdoorEnvironment) node7 = Node(cls=core.namespace.SAREF.Sensor) node8 = Node(cls=core.namespace.SAREF.Temperature) node9 = Node(cls=core.namespace.S4BLDG.BuildingSpace) sp = SignaturePattern( semantic_model_=core.ontologies, id="building_space_signature_pattern", ) sp.add_triple( Exact(subject=node0, object=node2, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact( subject=node1, object=node2, predicate=core.namespace.FSO.hasFluidReturnedBy ) ) sp.add_triple( Exact( subject=node4, object=node2, predicate=core.namespace.S4BLDG.isContainedIn ) ) sp.add_triple( Exact(subject=node2, object=node5, predicate=core.namespace.SAREF.hasProfile) ) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.S4SYST.connectedTo) ) sp.add_triple( SinglePath( subject=node0, object=node7, predicate=core.namespace.FSO.hasFluidSuppliedBy ) ) sp.add_triple( Exact(subject=node7, object=node8, predicate=core.namespace.SAREF.observes) ) # sp.add_triple(MultiPath(subject=node9, object=node2, predicate=core.namespace.S4SYST.connectedTo)) # TODO: Makes _prune_recursive fail, infinite recursion sp.add_input("supplyAirFlowRate", node0, "airFlowRate") sp.add_input("exhaustAirFlowRate", node1, "airFlowRate") sp.add_input("heatGain", node4, "Power") sp.add_input("numberOfPeople", node5, "scheduleValue") sp.add_input("outdoorTemperature", node6, "outdoorTemperature") sp.add_input("outdoorCO2", node6, "outdoorCo2Concentration") sp.add_input("globalIrradiation", node6, "globalIrradiation") sp.add_input("supplyAirTemperature", node7, "measuredValue") sp.add_input("adjacentZoneTemperature", node9, "indoorTemperature") sp.add_modeled_node(node2) return sp
[docs] def get_signature_pattern_sensor_brick(): """ Get the BRICK-only signature pattern with sensor for the building space component. Returns: SignaturePattern: The BRICK-only signature pattern with sensor of the building space component. """ node0 = Node(cls=core.namespace.BRICK.Damper) # supply damper node1 = Node(cls=core.namespace.BRICK.Damper) # return damper node2 = Node(cls=core.namespace.BRICK.Room) # building space/room node4 = Node(cls=core.namespace.BRICK.Space_Heater) # space heater node5 = Node(cls=core.namespace.BRICK.Schedule) # occupancy schedule node6 = Node(cls=core.namespace.BRICK.Outside_Air) # outdoor environment node7 = Node( cls=core.namespace.BRICK.Supply_Air_Temperature_Sensor ) # temperature sensor node8 = Node(cls=core.namespace.BRICK.Temperature) # temperature point node9 = Node(cls=core.namespace.BRICK.Room) # adjacent room sp = SignaturePattern( semantic_model_=core.ontologies, id="building_space_signature_pattern_brick", ) sp.add_triple( Exact(subject=node0, object=node2, predicate=core.namespace.BRICK.feeds) ) sp.add_triple( Exact(subject=node1, object=node2, predicate=core.namespace.BRICK.isFedBy) ) sp.add_triple( Exact(subject=node4, object=node2, predicate=core.namespace.BRICK.isPartOf) ) sp.add_triple( Exact(subject=node2, object=node5, predicate=core.namespace.BRICK.hasPoint) ) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.BRICK.isFedBy) ) sp.add_triple( SinglePath(subject=node0, object=node7, predicate=core.namespace.BRICK.isFedBy) ) sp.add_triple( Exact(subject=node7, object=node8, predicate=core.namespace.BRICK.hasPoint) ) # sp.add_triple(MultiPath(subject=node9, object=node2, predicate=core.namespace.BRICK.isAdjacentTo)) # TODO: Makes _prune_recursive fail, infinite recursion sp.add_input("supplyAirFlowRate", node0, "airFlowRate") sp.add_input("exhaustAirFlowRate", node1, "airFlowRate") sp.add_input("heatGain", node4, "Power") sp.add_input("numberOfPeople", node5, "scheduleValue") sp.add_input("outdoorTemperature", node6, "outdoorTemperature") sp.add_input("outdoorCO2", node6, "outdoorCo2Concentration") sp.add_input("globalIrradiation", node6, "globalIrradiation") sp.add_input("supplyAirTemperature", node7, "measuredValue") sp.add_input("adjacentZoneTemperature", node9, "indoorTemperature") sp.add_modeled_node(node2) return sp
[docs] class BuildingSpaceTorchSystem(core.System, nn.Module): r""" Combined building space model for both thermal (RC) and CO2 (mass balance) dynamics. This class composes BuildingSpaceThermalTorchSystem and BuildingSpaceMassTorchSystem to provide a unified building space model that captures both thermal and air quality dynamics in a building zone. Args: thermal_kwargs: Keyword arguments for BuildingSpaceThermalTorchSystem mass_kwargs: Keyword arguments for BuildingSpaceMassTorchSystem kwargs: Additional keyword arguments (must include 'id') Mathematical Formulation: ========================= See individual component documentation: - BuildingSpaceThermalTorchSystem: RC network thermal dynamics - BuildingSpaceMassTorchSystem: CO2 mass balance dynamics Both models use DiscreteStatespaceSystem for efficient computation and automatic differentiation support. System Composition: The combined model consists of two parallel subsystems: **Thermal Subsystem (BuildingSpaceThermalTorchSystem):** - Models temperature dynamics using RC network - Handles heat transfer between indoor air, walls, and adjacent zones - Includes HVAC thermal effects, solar gains, and occupant heat gains **Mass Balance Subsystem (BuildingSpaceMassTorchSystem):** - Models CO2 concentration dynamics using mass balance equations - Handles ventilation, infiltration, and occupant CO2 generation - Tracks indoor air quality changes Implementation Details: - Both subsystems run in parallel during each simulation step - Input signals are shared between both models where applicable - Each subsystem maintains its own state variables and outputs - The combined model provides unified input/output interfaces - All parameters from both subsystems are available for calibration Combined Input/Output Interface: **Shared Inputs:** - supplyAirFlowRate: Used by both thermal (heating/cooling) and mass (ventilation) - exhaustAirFlowRate: Used by both thermal (heat removal) and mass (CO2 removal) - numberOfPeople: Used by both thermal (heat gain) and mass (CO2 generation) - outdoorTemperature: Used by thermal model - outdoorCO2: Used by mass balance model **Thermal-Only Inputs:** - supplyAirTemperature, globalIrradiation, heatGain - boundaryTemperature, adjacentZoneTemperature **Combined Outputs:** - indoorTemperature: From thermal subsystem - wallTemperature: From thermal subsystem - indoorCO2: From mass balance subsystem """ sp = [ get_signature_pattern(), get_signature_pattern_brick(), get_signature_pattern_sensor(), get_signature_pattern_sensor_brick(), ] def __init__(self, thermal_kwargs: dict = None, mass_kwargs: dict = None, **kwargs): """Initialize the combined building space system.""" if thermal_kwargs is None: thermal_kwargs = {} if mass_kwargs is None: mass_kwargs = {} super().__init__(**kwargs) nn.Module.__init__(self) if "id" not in thermal_kwargs: assert "id" in kwargs, "id is required for thermal model" thermal_kwargs["id"] = kwargs["id"] + "_thermal" if "id" not in mass_kwargs: assert "id" in kwargs, "id is required for mass model" mass_kwargs["id"] = kwargs["id"] + "_mass" assert "id" in kwargs, "id is required for thermal model" self.thermal = BuildingSpaceThermalTorchSystem(**thermal_kwargs) self.mass = BuildingSpaceMassTorchSystem(**mass_kwargs) # Merge input and output dictionaries as private variables self._input = {**self.thermal.input, **self.mass.input} self._output = {**self.thermal.output, **self.mass.output} thermal_parameters = [ "thermal." + s for s in self.thermal._config["parameters"] ] mass_parameters = ["mass." + s for s in self.mass._config["parameters"]] self._config = {"parameters": thermal_parameters + mass_parameters} self.INITIALIZED = False @property def input(self) -> dict: """ Get the input ports of the building space system. Returns: dict: Dictionary containing combined input ports from thermal and mass models """ return self._input @property def output(self) -> dict: """ Get the output ports of the building space system. Returns: dict: Dictionary containing combined output ports from thermal and mass models """ return self._output
[docs] def initialize( self, start_time: datetime.datetime, end_time: datetime.datetime, step_size: int, simulator: core.Simulator, ) -> None: """Initialize the system and its submodels.""" # Initialize I/O for the combined system for input in self.input.values(): input.initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, ) for output in self.output.values(): output.initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, ) # Find if boundary temperature is set as input connection_point = [ cp for cp in self.connects_at if cp.inputPort == "boundaryTemperature" ] n_boundary_temperature = ( len(connection_point[0].connects_system_through) if connection_point else 0 ) n_boundary_temperature = n_boundary_temperature assert ( n_boundary_temperature == 0 or n_boundary_temperature == 1 ), "Maximum one boundary temperature input is allowed" # Find number of adjacent zones connection_point = [ cp for cp in self.connects_at if cp.inputPort == "adjacentZoneTemperature" ] n_adjacent_zones = ( len(connection_point[0].connects_system_through) if connection_point else 0 ) n_adjacent_zones = n_adjacent_zones self.thermal.n_adjacent_zones = n_adjacent_zones self.thermal.n_boundary_temperature = n_boundary_temperature self.thermal.initialize(start_time, end_time, step_size, simulator) self.mass.initialize(start_time, end_time, step_size, simulator) self.INITIALIZED = True
@property def config(self): """Get the system configuration.""" return self._config
[docs] def do_step( self, secondTime: float, dateTime: datetime.datetime, step_size: int, stepIndex: int, ) -> None: """Execute a single simulation step for both submodels.""" # Set inputs for thermal submodel for k in self.thermal.input: self.thermal.input[k].set(self.input[k].get(), stepIndex) # Set inputs for mass submodel for k in self.mass.input: self.mass.input[k].set(self.input[k].get(), stepIndex) self.thermal.do_step(secondTime, dateTime, step_size, stepIndex) self.mass.do_step(secondTime, dateTime, step_size, stepIndex) # Update outputs from both submodels for k in self.thermal.output: self.output[k].set(self.thermal.output[k].get(), stepIndex) for k in self.mass.output: self.output[k].set(self.mass.output[k].get(), stepIndex)