Source code for twin4build.systems.sensor.sensor_system

# Standard library imports
import datetime
from typing import Any, Dict, List, Optional, Union

# Third party imports
import pandas as pd

# Local application imports
import twin4build.core as core
import twin4build.utils.types as tps
from twin4build.systems.utils.pass_input_to_output import PassInputToOutput
from twin4build.systems.utils.time_series_input_system import TimeSeriesInputSystem
from twin4build.translator.translator import Exact, Node, SignaturePattern, SinglePath


[docs] def get_signature_pattern_input(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) sp = SignaturePattern( semantic_model_=core.ontologies, id="signature_pattern_input", ) sp.add_modeled_node(node0) return sp
[docs] def get_flow_signature_pattern_after_coil_air_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.Coil)) # waterside node3 = Node(cls=(core.namespace.S4BLDG.Coil)) # airside node4 = Node(cls=(core.namespace.S4BLDG.Coil)) # supersystem node5 = Node(cls=core.namespace.S4SYST.System) # before waterside node6 = Node(cls=core.namespace.S4SYST.System) # after waterside node7 = Node(cls=core.namespace.S4SYST.System) # before airside node8 = Node(cls=core.namespace.S4SYST.System) # after airside sp = SignaturePattern( semantic_model_=core.ontologies, id="flow_signature_pattern_after_coil_air_side" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node5, object=node2, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.FSO.returnsFluidTo) ) sp.add_triple( Exact(subject=node7, object=node3, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node3, object=node8, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node2, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( Exact(subject=node3, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( SinglePath( subject=node3, object=node0, predicate=core.namespace.FSO.suppliesFluidTo ) ) sp.add_input("measuredValue", node4, ("outletAirTemperature")) sp.add_modeled_node(node0) return sp
[docs] def get_flow_signature_pattern_after_coil_air_side_simple(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node3 = Node(cls=(core.namespace.S4BLDG.Coil)) # airside node4 = Node(cls=(core.namespace.S4BLDG.Coil)) # supersystem sp = SignaturePattern( semantic_model_=core.ontologies, id="flow_signature_pattern_after_coil_air_side_simple", ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node3, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( SinglePath( subject=node3, object=node0, predicate=core.namespace.FSO.suppliesFluidTo ) ) sp.add_input("measuredValue", node4, ("outletAirTemperature")) sp.add_modeled_node(node0) return sp
[docs] def get_flow_signature_pattern_after_coil_water_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.Coil)) # waterside node3 = Node(cls=(core.namespace.S4BLDG.Coil)) # airside node4 = Node(cls=(core.namespace.S4BLDG.Coil)) # supersystem node5 = Node(cls=core.namespace.S4SYST.System) # before waterside node6 = Node(cls=core.namespace.S4SYST.System) # after waterside node7 = Node(cls=core.namespace.S4SYST.System) # before airside node8 = Node(cls=core.namespace.S4SYST.System) # after airside sp = SignaturePattern( semantic_model_=core.ontologies, id="flow_signature_pattern_after_coil_water_side", ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node5, object=node2, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.FSO.returnsFluidTo) ) sp.add_triple( Exact(subject=node7, object=node3, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node3, object=node8, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node2, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( Exact(subject=node3, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( SinglePath( subject=node2, object=node0, predicate=core.namespace.FSO.returnsFluidTo ) ) sp.add_input("measuredValue", node4, ("outletWaterTemperature")) sp.add_modeled_node(node0) return sp
[docs] def get_flow_signature_pattern_before_coil_water_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.Coil)) # waterside node3 = Node(cls=(core.namespace.S4BLDG.Coil)) # airside node4 = Node(cls=(core.namespace.S4BLDG.Coil)) # supersystem node6 = Node(cls=core.namespace.S4SYST.System) # after waterside node7 = Node(cls=core.namespace.S4SYST.System) # before airside node8 = Node(cls=core.namespace.S4SYST.System) # after airside sp = SignaturePattern( semantic_model_=core.ontologies, id="flow_signature_pattern_before_coil_water_side", ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) # sp.add_triple(Exact(subject=node5, object=node2, predicate="suppliesFluidTo")) sp.add_triple( Exact(subject=node2, object=node6, predicate=core.namespace.FSO.returnsFluidTo) ) sp.add_triple( Exact(subject=node7, object=node3, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node3, object=node8, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact(subject=node2, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( Exact(subject=node3, object=node4, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_triple( SinglePath( subject=node2, object=node0, predicate=core.namespace.FSO.hasFluidSuppliedBy ) ) sp.add_input("measuredValue", node4, ("inletWaterTemperature")) sp.add_modeled_node(node0) return sp
# Properties of spaces
[docs] def get_space_temperature_signature_pattern(): node0 = Node(cls=(core.namespace.SAREF.Sensor)) node1 = Node(cls=(core.namespace.SAREF.Temperature)) node2 = Node(cls=(core.namespace.S4BLDG.BuildingSpace)) sp = SignaturePattern( semantic_model_=core.ontologies, id="space_temperature_signature_pattern" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node1, object=node2, predicate=core.namespace.SAREF.isPropertyOf) ) sp.add_input("measuredValue", node2, ("indoorTemperature")) sp.add_modeled_node(node0) return sp
# Properties of spaces
[docs] def get_space_co2_signature_pattern(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Co2,)) node2 = Node(cls=(core.namespace.S4BLDG.BuildingSpace,)) sp = SignaturePattern( semantic_model_=core.ontologies, id="space_co2_signature_pattern" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node1, object=node2, predicate=core.namespace.SAREF.isPropertyOf) ) sp.add_input("measuredValue", node2, ("indoorCO2")) sp.add_modeled_node(node0) return sp
[docs] def get_position_signature_pattern(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.OpeningPosition,)) node2 = Node( cls=( core.namespace.S4BLDG.Valve, core.namespace.S4BLDG.Damper, ) ) node3 = Node(cls=(core.namespace.S4BLDG.Controller)) sp = SignaturePattern( semantic_model_=core.ontologies, id="position_signature_pattern" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node1, object=node2, predicate=core.namespace.SAREF.isPropertyOf) ) sp.add_triple( Exact(subject=node3, object=node1, predicate=core.namespace.SAREF.controls) ) sp.add_input("measuredValue", node3, ("inputSignal", "inputSignal")) sp.add_modeled_node(node0) return sp
[docs] def get_temperature_before_air_to_air_supply_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery,)) # AirToAirPrimary node9 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirSuper sp = SignaturePattern( semantic_model_=core.ontologies, id="temperature_before_air_to_air_supply_side" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( SinglePath( subject=node2, object=node0, predicate=core.namespace.FSO.hasFluidSuppliedBy ) ) sp.add_triple( Exact(subject=node2, object=node9, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_input("measuredValue", node2, ("primaryTemperatureIn")) sp.add_modeled_node(node0) return sp
[docs] def get_temperature_before_air_to_air_exhaust_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirPrimary node9 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirSuper sp = SignaturePattern( semantic_model_=core.ontologies, id="temperature_before_air_to_air_exhaust_side" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( SinglePath( subject=node0, object=node2, predicate=core.namespace.FSO.returnsFluidTo ) ) sp.add_triple( Exact(subject=node2, object=node9, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_input("measuredValue", node2, ("secondaryTemperatureIn")) sp.add_modeled_node(node0) return sp
[docs] def get_temperature_after_air_to_air_supply_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirPrimary node9 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirSuper sp = SignaturePattern( semantic_model_=core.ontologies, id="temperature_after_air_to_air_supply_side" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact( subject=node0, object=node2, predicate=core.namespace.FSO.hasFluidSuppliedBy ) ) sp.add_triple( Exact(subject=node2, object=node9, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_input("measuredValue", node2, ("primaryTemperatureOut")) sp.add_modeled_node(node0) return sp
[docs] def get_temperature_after_air_to_air_exhaust_side(): node0 = Node(cls=(core.namespace.SAREF.Sensor,)) node1 = Node(cls=(core.namespace.SAREF.Temperature,)) node2 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirPrimary node9 = Node(cls=(core.namespace.S4BLDG.AirToAirHeatRecovery)) # AirToAirSuper sp = SignaturePattern( semantic_model_=core.ontologies, id="temperature_after_air_to_air_exhaust_side" ) sp.add_triple( Exact(subject=node0, object=node1, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node2, object=node0, predicate=core.namespace.FSO.returnsFluidTo) ) sp.add_triple( Exact(subject=node2, object=node9, predicate=core.namespace.S4SYST.subSystemOf) ) sp.add_input("measuredValue", node2, ("secondaryTemperatureOut")) sp.add_modeled_node(node0) return sp
[docs] class SensorSystem(core.System): """A system representing a physical or virtual sensor in the building. This class implements sensor functionality, supporting both physical sensors (reading from time series data) and virtual sensors (computing values from other inputs). It integrates with TimeSeriesInputSystem for data handling. Args: filename: Path to sensor readings file. Defaults to None. df: DataFrame containing readings. Defaults to None. useSpreadsheet: Whether to use a spreadsheet for input. Defaults to False. useDatabase: Whether to use a database for input. Defaults to False. **kwargs: Additional keyword arguments passed to parent class. Note: A sensor must either have connections to other systems (virtual sensor) or have data input through filename/df (physical sensor). """ sp = [ get_temperature_before_air_to_air_supply_side(), get_temperature_before_air_to_air_exhaust_side(), get_temperature_after_air_to_air_supply_side(), get_temperature_after_air_to_air_exhaust_side(), get_signature_pattern_input(), get_flow_signature_pattern_after_coil_air_side(), get_flow_signature_pattern_after_coil_water_side(), get_flow_signature_pattern_before_coil_water_side(), get_space_temperature_signature_pattern(), get_space_co2_signature_pattern(), get_position_signature_pattern(), ] def __init__( self, filename: Optional[str] = None, df: Optional[pd.DataFrame] = None, uuid: Optional[str] = None, name: Optional[str] = None, dbconfig: Optional[Dict[str, Any]] = None, useSpreadsheet: bool = False, useDatabase: bool = False, **kwargs, ) -> None: """Initialize the sensor system. Args: filename: Path to sensor readings file. Defaults to None. df: DataFrame containing readings. Defaults to None. useSpreadsheet: Whether to use a spreadsheet for input. Defaults to False. useDatabase: Whether to use a database for input. Defaults to False. **kwargs: Additional keyword arguments passed to parent class. Note: Either filename/df must be provided for physical sensors, or the sensor must have connections defined for virtual sensors. """ assert ( useSpreadsheet == False or useDatabase == False ), "useSpreadsheet and useDatabase cannot both be True." super().__init__(**kwargs) # Define inputs and outputs as private variables self._input = {"measuredValue": tps.Scalar()} self._output = { "measuredValue": tps.Scalar(0) } # TODO: Not necessary to be a leaf scalar, if the sensor has inputs. Need to implement check in initialize() # Store attributes as private variables self._useSpreadsheet = useSpreadsheet self._useDatabase = useDatabase self._filename = filename self._df = df self._datecolumn = 0 self._valuecolumn = 1 self._uuid = uuid self._name = name self._dbconfig = dbconfig self._is_leaf = None self._physicalSystem = None self._config = { "parameters": ["useSpreadsheet", "useDatabase"], "spreadsheet": ["filename", "datecolumn", "valuecolumn"], "database": ["uuid", "name", "dbconfig"], } @property def config(self): return self._config @property def input(self) -> dict: """ Get the input ports of the sensor system. Returns: dict: Dictionary containing input ports: - "measuredValue": Measured value input for virtual sensors """ return self._input @property def output(self) -> dict: """ Get the output ports of the sensor system. Returns: dict: Dictionary containing output ports: - "measuredValue": Measured value output [units depend on sensor type] """ return self._output @property def filename(self) -> Optional[str]: """ Get the path to sensor readings file. """ return self._filename @filename.setter def filename(self, value: Optional[str]) -> None: """ Set the path to sensor readings file. """ self._filename = value @property def df(self) -> Optional[pd.DataFrame]: """ Get the direct DataFrame input of sensor readings. """ return self._df @df.setter def df(self, value: Optional[pd.DataFrame]) -> None: """ Set the direct DataFrame input of sensor readings. """ self._df = value @property def datecolumn(self) -> int: """ Get the column index for datetime values. """ return self._datecolumn @datecolumn.setter def datecolumn(self, value: int) -> None: """ Set the column index for datetime values. """ self._datecolumn = value @property def valuecolumn(self) -> int: """ Get the column index for sensor readings. """ return self._valuecolumn @valuecolumn.setter def valuecolumn(self, value: int) -> None: """ Set the column index for sensor readings. """ self._valuecolumn = value @property def is_leaf(self) -> bool: """ Get whether the sensor reads from file/DataFrame (True) or is virtual (False). """ return self._is_leaf @is_leaf.setter def is_leaf(self, value: bool) -> None: """ Set whether the sensor reads from file/DataFrame (True) or is virtual (False). """ self._is_leaf = value @property def physicalSystem(self) -> Optional[TimeSeriesInputSystem]: """ Get the data handling system for physical sensors. """ return self._physicalSystem @physicalSystem.setter def physicalSystem(self, value: Optional[TimeSeriesInputSystem]) -> None: """ Set the data handling system for physical sensors. """ self._physicalSystem = value @property def useSpreadsheet(self) -> bool: """ Get whether to use a spreadsheet for input. """ return self._useSpreadsheet @useSpreadsheet.setter def useSpreadsheet(self, value: bool) -> None: """ Set whether to use a spreadsheet for input. """ self._useSpreadsheet = value @property def useDatabase(self) -> bool: """ Get whether to use a database for input. """ return self._useDatabase @useDatabase.setter def useDatabase(self, value: bool) -> None: """ Set whether to use a database for input. """ self._useDatabase = value @property def uuid(self) -> Optional[str]: """ Get the UUID for database operations. """ return self._uuid @uuid.setter def uuid(self, value: Optional[str]) -> None: """ Set the UUID for database operations. """ self._uuid = value @property def name(self) -> Optional[str]: """ Get the name for database operations. """ return self._name @name.setter def name(self, value: Optional[str]) -> None: """ Set the name for database operations. """ self._name = value @property def dbconfig(self) -> Optional[Dict[str, Any]]: """ Get the database configuration parameters. """ return self._dbconfig @dbconfig.setter def dbconfig(self, value: Optional[Dict[str, Any]]) -> None: """ Set the database configuration parameters. """ self._dbconfig = value
[docs] def validate(self, p) -> tuple[bool, bool, bool, bool]: """Validate the sensor system configuration. Checks if the sensor has proper inputs for different operational modes. Args: p: Logging function for validation messages. Returns: tuple[bool, bool, bool, bool]: Validation status for: - Simulator - Estimator - Evaluator - Monitor """ validated_for_simulator = True validated_for_estimator = True validated_for_optimizer = True if len(self.connects_at) == 0 and self.filename is None: message = f"|CLASS: {self.__class__.__name__}|ID: {self.id}|: filename or df must be provided to enable use of Simulator, Estimator, and Optimizer." p(message, plain=True, status="WARNING") validated_for_simulator = False validated_for_estimator = False validated_for_optimizer = False elif len(self.connects_at) > 0 and self.filename is None: message = f"|CLASS: {self.__class__.__name__}|ID: {self.id}|: filename or df must be provided to enable use of Estimator." p(message, plain=True, status="WARNING") validated_for_estimator = False self.is_leaf = len(self.connects_at) == 0 self.output["measuredValue"].is_leaf = self.is_leaf return ( validated_for_simulator, validated_for_estimator, validated_for_optimizer, )
[docs] def validate_connections(self, p) -> bool: validated = True if self.is_leaf and self.useSpreadsheet == False and self.useDatabase == False: message = f"|CLASS: {self.__class__.__name__}|ID: {self.id}|: Missing connections for the following input(s) to enable use of Simulator, Estimator, and Optimizer:" p(message, plain=True, status="[WARNING]") p.add_level() p("measuredValue", plain=True) p.remove_level() validated = False return validated
[docs] def initialize( self, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, step_size: Optional[float] = None, simulator: Optional[Any] = None, ) -> None: """Initialize the sensor system. Sets up the physical or virtual sensor system and initializes the step instance. Args: start_time (Optional[datetime.datetime]): Start time for the simulation. end_time (Optional[datetime.datetime]): End time for the simulation. step_size (Optional[float]): Time step size in seconds. model (Optional[Any]): Model object (not used in this class). """ if ( self.filename is not None or self.df is not None or self.dbconfig is not None ): if self.df is None: assert ( self.useSpreadsheet == True or self.useDatabase == True ), "useSpreadsheet or useDatabase must be True if df is not provided." self.physicalSystem = TimeSeriesInputSystem( id=f"time series input - {self.id}", df=self.df, filename=self.filename, datecolumn=self.datecolumn, valuecolumn=self.valuecolumn, useSpreadsheet=self.useSpreadsheet, useDatabase=self.useDatabase, uuid=self.uuid, name=self.name, dbconfig=self.dbconfig, ) self.physicalSystem.initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, ) else: self.physicalSystem = None assert ( len(self.connects_at) == 0 and self.physicalSystem is None ) == False, f'Sensor object "{self.id}" has no inputs and and holds no data.' if self.is_leaf: self.output["measuredValue"].initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, values=self.physicalSystem.df.values, ) else: self.input["measuredValue"].initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, ) self.output["measuredValue"].initialize( start_time=start_time, end_time=end_time, step_size=step_size, simulator=simulator, )
[docs] def do_step( self, secondTime: Optional[float] = None, dateTime: Optional[datetime.datetime] = None, step_size: Optional[float] = None, stepIndex: Optional[int] = None, ) -> None: """Execute one time step of the sensor system. Updates sensor outputs based on either physical readings or virtual calculations. Args: secondTime (Optional[float]): Current simulation time in seconds. dateTime (Optional[datetime.datetime]): Current simulation datetime. step_size (Optional[float]): Time step size in seconds. """ if self.is_leaf: self.output["measuredValue"].set(stepIndex=stepIndex) else: self.output["measuredValue"].set( self.input["measuredValue"].get(), stepIndex )
[docs] def get_physical_readings( self, start_time: datetime.datetime, end_time: datetime.datetime, step_size: int, simulator: core.Simulator, ) -> pd.DataFrame: """Retrieve physical sensor readings for a specified time period. Args: start_time (Optional[datetime.datetime]): Start time for readings. end_time (Optional[datetime.datetime]): End time for readings. step_size (Optional[float]): Time step size in seconds. Returns: pd.DataFrame: DataFrame containing sensor readings. Raises: AssertionError: If called on a virtual sensor (no physical readings available). """ assert ( self.physicalSystem is not None ), f'Cannot return physical readings for Sensor with id "{self.id}" as the argument "filename" was not provided when the object was initialized.' self.physicalSystem.initialize(start_time, end_time, step_size, simulator) return self.physicalSystem.df