Source code for twin4build.systems.schedule.occupancy.occupancy_system

# Standard library imports
import datetime
from typing import Optional

# Local application imports
import twin4build.core as core
import twin4build.systems as systems
import twin4build.utils.types as tps
from twin4build.translator.translator import Exact, Node, SignaturePattern, SinglePath


[docs] def get_signature_pattern(): node0 = Node(cls=(core.namespace.S4BLDG.Schedule)) node1 = Node(cls=(core.namespace.S4BLDG.BuildingSpace)) node2 = Node(cls=(core.namespace.S4BLDG.Damper)) node3 = Node(cls=(core.namespace.S4BLDG.Damper)) node4 = Node(cls=(core.namespace.SAREF.Co2)) node5 = Node(cls=(core.namespace.SAREF.Sensor)) node6 = Node(cls=(core.namespace.SAREF.Sensor)) node7 = Node(cls=(core.namespace.SAREF.OpeningPosition)) sp = SignaturePattern( semantic_model_=core.ontologies, id="schedule_signature_pattern", ) sp.add_triple( Exact(subject=node1, object=node0, predicate=core.namespace.SAREF.hasProfile) ) sp.add_triple( Exact(subject=node1, object=node4, predicate=core.namespace.SAREF.hasProperty) ) sp.add_triple( Exact(subject=node5, object=node4, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node2, object=node1, predicate=core.namespace.FSO.suppliesFluidTo) ) sp.add_triple( Exact( subject=node3, object=node1, predicate=core.namespace.FSO.hasFluidReturnedBy ) ) sp.add_triple( Exact(subject=node6, object=node7, predicate=core.namespace.SAREF.observes) ) sp.add_triple( Exact(subject=node2, object=node7, predicate=core.namespace.SAREF.hasProperty) ) sp.add_modeled_node(node0) return sp
[docs] class OccupancySystem(core.System): # sp = [get_signature_pattern()] def __init__(self, **kwargs): super().__init__(**kwargs) self.input = { "supplyAirFlowRate": tps.Scalar(), "exhaustAirFlowRate": tps.Scalar(), "indoorCO2Concentration": tps.Scalar(), } self.output = {"scheduleValue": tps.Scalar()} self.optional_inputs = [ "supplyAirFlowRate", "exhaustAirFlowRate", "indoorCO2Concentration", ] self._config = {"parameters": []} @property def config(self): return self._config
[docs] def initialize( self, start_time: datetime.datetime, end_time: datetime.datetime, step_size: int, simulator: core.Simulator, ) -> None: model = simulator.model (modeled_match_nodes, (component_cls, sp, groups)) = ( model.instance_to_group_map[self] ) space_node = sp.get_node_by_id("<BuildingSpace<SUB>2</SUB>>") # TODO modeled_space = groups[0][space_node] modeled_space = model.sem2sim_map[modeled_space] self.airVolume = modeled_space.airVolume self.outdoorCo2Concentration = modeled_space.C_supply self.infiltration = modeled_space.infiltration self.generationCo2Concentration = modeled_space.CO2_occ_gain self.previous_indoorCO2Concentration = modeled_space.CO2_start self.airMass = self.airVolume * 1.225 supply_damper_node = sp.get_node_by_id("<Damper<SUB>3</SUB>>") # TODO exhaust_damper_node = sp.get_node_by_id("<Damper<SUB>4</SUB>>") # TODO modeled_supply_damper = groups[0][supply_damper_node] modeled_exhaust_damper = groups[0][exhaust_damper_node] modeled_supply_damper = model.sem2sim_map[modeled_supply_damper] modeled_exhaust_damper = model.sem2sim_map[modeled_exhaust_damper] self.do_step_instance_supplyDamper = systems.DamperSystem( **model.get_object_properties(modeled_supply_damper) ) self.do_step_instance_supplyDamper.initialize(start_time, end_time, step_size) self.do_step_instance_exhaustDamper = systems.DamperSystem( **model.get_object_properties(modeled_exhaust_damper) ) self.do_step_instance_exhaustDamper.initialize(start_time, end_time, step_size) damper_position_sensor_node = sp.get_node_by_id("<Sensor<SUB>7</SUB>>") # TODO modeled_damper_position_sensor = groups[0][damper_position_sensor_node] modeled_damper_position_sensor = model.sem2sim_map[ modeled_damper_position_sensor ] filename_damper_position = modeled_damper_position_sensor.filename datecolumn_damper_position = self.datecolumn = ( modeled_damper_position_sensor.datecolumn ) valuecolumn_damper_position = self.valuecolumn = ( modeled_damper_position_sensor.valuecolumn ) co2_sensor_node = sp.get_node_by_id("<Sensor<SUB>6</SUB>>") # TODO modeled_co2_sensor = groups[0][co2_sensor_node] modeled_co2_sensor = model.sem2sim_map[modeled_co2_sensor] filename_co2 = modeled_co2_sensor.filename datecolumn_co2 = self.datecolumn = modeled_co2_sensor.datecolumn valuecolumn_co2 = self.valuecolumn = modeled_co2_sensor.valuecolumn self.do_step_instance_supplyDamperPosition = systems.TimeSeriesInputSystem( id="supplyDamperPosition", filename=filename_damper_position, datecolumn=datecolumn_damper_position, valuecolumn=valuecolumn_damper_position, ) self.do_step_instance_supplyDamperPosition.output = {"value": tps.Scalar()} self.do_step_instance_supplyDamperPosition.initialize( start_time, end_time, step_size, simulator ) self.do_step_instance_exhaustDamperPosition = systems.TimeSeriesInputSystem( id="exhaustDamperPosition", filename=filename_damper_position, datecolumn=datecolumn_damper_position, valuecolumn=valuecolumn_damper_position, ) self.do_step_instance_exhaustDamperPosition.output = {"value": tps.Scalar()} self.do_step_instance_exhaustDamperPosition.initialize( start_time, end_time, step_size, simulator ) self.do_step_instance_indoorCO2Concentration = systems.TimeSeriesInputSystem( id="indoorCO2Concentration", filename=filename_co2, datecolumn=datecolumn_co2, valuecolumn=valuecolumn_co2, ) self.do_step_instance_indoorCO2Concentration.output = {"value": tps.Scalar()} self.do_step_instance_indoorCO2Concentration.initialize( start_time, end_time, step_size, simulator )
[docs] def do_step( self, secondTime: float, dateTime: datetime.datetime, step_size: int, stepIndex: int, ) -> None: self.do_step_instance_supplyDamperPosition.do_step( secondTime=secondTime, dateTime=dateTime, step_size=step_size, stepIndex=stepIndex, ) self.do_step_instance_exhaustDamperPosition.do_step( secondTime=secondTime, dateTime=dateTime, step_size=step_size, stepIndex=stepIndex, ) self.do_step_instance_indoorCO2Concentration.do_step( secondTime=secondTime, dateTime=dateTime, step_size=step_size, stepIndex=stepIndex, ) self.do_step_instance_supplyDamper.input["damperPosition"].set( self.do_step_instance_supplyDamperPosition.output["value"], stepIndex ) self.do_step_instance_exhaustDamper.input["damperPosition"].set( self.do_step_instance_exhaustDamperPosition.output["value"], stepIndex ) self.do_step_instance_supplyDamper.do_step( secondTime=secondTime, dateTime=dateTime, step_size=step_size, stepIndex=stepIndex, ) self.do_step_instance_exhaustDamper.do_step( secondTime=secondTime, dateTime=dateTime, step_size=step_size, stepIndex=stepIndex, ) self.input["supplyAirFlowRate"].set( self.do_step_instance_supplyDamper.output["value"], stepIndex ) self.input["exhaustAirFlowRate"].set( self.do_step_instance_exhaustDamper.output["value"], stepIndex ) self.input["indoorCO2Concentration"].set( self.do_step_instance_indoorCO2Concentration.output["value"], stepIndex ) # Steady state. # self.output["scheduleValue"] = (-self.outdoorCo2Concentration*(self.input["supplyAirFlowRate"] + self.infiltration) + self.input["indoorCO2Concentration"]*(self.input["exhaustAirFlowRate"]+self.infiltration))/(self.generationCo2Concentration*1e+6) # diff equation self.output["scheduleValue"].set( ( self.airMass * ( self.input["indoorCO2Concentration"] - self.previous_indoorCO2Concentration ) / step_size - self.outdoorCo2Concentration * (self.input["supplyAirFlowRate"] + self.infiltration) + self.input["indoorCO2Concentration"] * (self.input["exhaustAirFlowRate"] + self.infiltration) ) / (self.generationCo2Concentration * 1e6), stepIndex, ) if self.output["scheduleValue"] < 0: self.output["scheduleValue"].set(0, stepIndex) self.previous_indoorCO2Concentration = self.input["indoorCO2Concentration"]