Source code for twin4build.utils.data_loaders.load

# Standard library imports
import configparser
import os

# Third party imports
import numpy as np
import pandas as pd
from dateutil.parser import parse
from dateutil.tz import gettz

# Local application imports
from twin4build.utils.mkdir_in_root import mkdir_in_root


[docs] def parseDateStr(s): if s != "": try: return np.datetime64(parse(s)) except ValueError: return np.datetime64("NaT") else: return np.datetime64("NaT")
[docs] def sample_from_df( df, datecolumn=0, valuecolumn=None, step_size=None, start_time=None, end_time=None, resample=True, resample_method="linear", clip=True, tz="Europe/Copenhagen", preserve_order=True, ): r""" Sample and process time series data from a DataFrame with various resampling options. This function processes time series data with support for resampling, timezone conversion, and data clipping. It handles both constant and linear resampling methods. Mathematical Formulation ----------------------- 1. Time Series Resampling: a) Constant Resampling: For each time step :math:`t`: .. math:: y(t) = y(t_{last}) where :math:`t_{last}` is the last available data point before :math:`t` b) Linear Resampling: For each time step :math:`t`: .. math:: y(t) = y(t_1) + \frac{t - t_1}{t_2 - t_1} \cdot (y(t_2) - y(t_1)) where: - :math:`t_1` is the last available data point before :math:`t` - :math:`t_2` is the first available data point after :math:`t` 2. Time Zone Conversion: For a time :math:`t` in timezone :math:`TZ_1`: .. math:: t_{TZ_2} = t_{TZ_1} + \Delta TZ where: - :math:`t_{TZ_2}` is the time in target timezone - :math:`\Delta TZ` is the time difference between timezones 3. Data Clipping: For a time series :math:`y(t)`: .. math:: y_{clipped}(t) = \begin{cases} y(t) & \text{if } t_{start} \leq t < t_{end} \\ \text{undefined} & \text{otherwise} \end{cases} Args: df (pandas.DataFrame): Input DataFrame with time series data datecolumn (int): Column index containing datetime information valuecolumn (int, optional): Column index containing values to process step_size (int, optional): Time step size in seconds for resampling start_time (datetime, optional): Start time for data extraction end_time (datetime, optional): End time for data extraction resample (bool): Whether to resample data to regular intervals resample_method (str): Resampling method ("linear" or "constant") clip (bool): Whether to clip data to specified time range tz (str): Timezone for data processing preserve_order (bool): Whether to preserve original data order Returns: pandas.DataFrame: Processed DataFrame with resampled time series data """ assert datecolumn != valuecolumn, "datecolumn and valuecolumn cannot be the same" df = df.rename(columns={df.columns.to_list()[datecolumn]: "datetime"}) for i, column in enumerate(df.columns.to_list()): if column != "datetime" and valuecolumn is None: df[column] = pd.to_numeric( df[column], errors="coerce" ) # Remove string entries elif i == valuecolumn: df[column] = pd.to_numeric( df[column], errors="coerce" ) # Remove string entries df["datetime"] = pd.to_datetime(df["datetime"]) # ), format=format) if df["datetime"].apply(lambda x: x.tzinfo is not None).any(): has_tz = True df["datetime"] = df["datetime"].apply(lambda x: x.tz_convert("UTC")) else: has_tz = False df = df.set_index(pd.DatetimeIndex(df["datetime"])) df = df.drop(columns=["datetime"]) if preserve_order and has_tz == False: # Detect if dates are reverse diff_seconds = df.index.to_series().diff().dt.total_seconds() frac_neg = np.sum(diff_seconds < 0) / diff_seconds.size if frac_neg >= 0.95: df = df.iloc[::-1] elif frac_neg > 0.05 and frac_neg < 0.95: raise Exception( '"preserve_order" is true, but the datetime order cannot be determined.' ) else: df = df.sort_index() df = df.dropna(how="all") # Check if the first index is timezone aware if df.index[0].tzinfo is None: df = df.tz_localize(gettz(tz), ambiguous="infer", nonexistent="NaT") else: df = df.tz_convert(gettz(tz)) # Duplicate dates can occur either due to measuring/logging malfunctions # or due to change of daylight saving time where an hour occurs twice in fall. df = df.groupby(level=0).mean() if start_time.tzinfo is None: start_time = start_time.astimezone(tz=gettz(tz)) if end_time.tzinfo is None: end_time = end_time.astimezone(tz=gettz(tz)) if resample: allowable_resample_methods = ["constant", "linear"] assert ( resample_method in allowable_resample_methods ), f"resample_method \"{resample_method}\" is not valid. The options are: {', '.join(allowable_resample_methods)}" if resample_method == "constant": df = df.resample(f"{step_size}s", origin=start_time).ffill().bfill() elif resample_method == "linear": oidx = df.index nidx = pd.date_range(start_time, end_time, freq=f"{step_size}s") df = df.reindex(oidx.union(nidx)).interpolate("index").reindex(nidx) if clip: df = df[ (df.index >= start_time) & (df.index < end_time) ] # Exclude end time for similar behavior as normal python slicing return df
[docs] def load_from_spreadsheet( filename, datecolumn=0, valuecolumn=None, step_size=None, start_time=None, end_time=None, resample=True, clip=True, cache=True, cache_root=None, tz="Europe/Copenhagen", preserve_order=True, ): """ This function loads a spead either in .csv or .xlsx format. The datetime should in the first column - timezone-naive inputs are localized as "tz", while timezone-aware inputs are converted to "tz". All data except for datetime column is converted to numeric data. tz: can be "UTC+2", "GMT-8" (no trailing zeros) or timezone name "Europe/Copenhagen" preserve_order: If True, the order of rows in the spreadsheet are important in order to resolve DST when timezone information is not available PRINT THE FOLLOWING TO SEE AVAILABLE NAMES: from dateutil.zoneinfo import getzoneinfofile_stream, ZoneInfoFile print(ZoneInfoFile(getzoneinfofile_stream()).zones.keys()) """ name, file_extension = os.path.splitext(filename) if cache: # Check if file is cached startTime_str = start_time.strftime("%d-%m-%Y %H-%M-%S") endTime_str = end_time.strftime("%d-%m-%Y %H-%M-%S") cached_filename = f"name({os.path.basename(name)})_stepSize({str(step_size)})_start_time({startTime_str})_end_time({endTime_str})_cached.pickle" cached_filename, isfile = mkdir_in_root( folder_list=["generated_files", "cached_data"], filename=cached_filename, root=cache_root, ) if cache and os.path.isfile(cached_filename): df = pd.read_pickle(cached_filename) else: with open(filename, "rb") as filehandler: if file_extension == ".csv": df = pd.read_csv(filehandler, low_memory=False) # , parse_dates=[0]) elif file_extension == ".xlsx": df = pd.read_excel(filehandler) else: raise Exception(f"Invalid file extension: {file_extension}") if valuecolumn is not None: valuename = df.columns[valuecolumn] df = sample_from_df( df, datecolumn, step_size=step_size, start_time=start_time, end_time=end_time, resample=resample, clip=clip, tz=tz, preserve_order=preserve_order, ) if valuecolumn is not None: df = df[valuename] if cache: df.to_pickle(cached_filename) return df
[docs] def load_database_config(config_file=None, section="timescaledb"): """ Load TimescaleDB configuration from file or environment variables. This function loads database configuration from either: 1. An INI configuration file (if provided) 2. Environment variables (as fallback) Configuration Priority: 1. Function parameters (highest priority) 2. INI file configuration 3. Environment variables (lowest priority) Args: config_file (str, optional): Path to INI configuration file. If None, only environment variables are used. section (str, optional): Section name in INI file. Defaults to "timescaledb". Returns: dict: Database configuration dictionary with keys: - host: Database host address - port: Database port number - name: Database name - user: Database username - password: Database password (None if not set) Environment Variables: The following environment variables can be used as fallbacks: - TIMESCALEDB_HOST: Database host (default: "localhost") - TIMESCALEDB_PORT: Database port (default: 5432) - TIMESCALEDB_NAME: Database name (default: "postgres") - TIMESCALEDB_USER: Database username (default: "postgres") - TIMESCALEDB_PASSWORD: Database password (default: None) Example INI file format: [timescaledb] host = localhost port = 5432 name = postgres user = postgres password = mypassword Example: >>> config = load_database_config("database.ini") >>> print(config) {'host': 'localhost', 'port': 5432, 'name': 'postgres', 'user': 'postgres', 'password': 'mypassword'} """ config = { "host": "localhost", "port": 5432, "name": "postgres", "user": "postgres", "password": None, } # Load from INI file if provided if config_file and os.path.exists(config_file): try: parser = configparser.ConfigParser() parser.read(config_file) if parser.has_section(section): if parser.has_option(section, "host"): config["host"] = parser.get(section, "host") if parser.has_option(section, "port"): config["port"] = parser.getint(section, "port") if parser.has_option(section, "name"): config["name"] = parser.get(section, "name") if parser.has_option(section, "user"): config["user"] = parser.get(section, "user") if parser.has_option(section, "password"): config["password"] = parser.get(section, "password") except Exception as e: print(f"Warning: Could not load configuration from {config_file}: {e}") # Override with environment variables config["host"] = os.getenv("TIMESCALEDB_HOST", config["host"]) config["port"] = int(os.getenv("TIMESCALEDB_PORT", str(config["port"]))) config["name"] = os.getenv("TIMESCALEDB_NAME", config["name"]) config["user"] = os.getenv("TIMESCALEDB_USER", config["user"]) config["password"] = os.getenv("TIMESCALEDB_PASSWORD", config["password"]) return config
[docs] def load_from_database( building_name, sensor_name=None, sensor_uuid=None, step_size=None, start_time=None, end_time=None, resample=True, resample_method="linear", clip=True, cache=True, cache_root=None, tz="Europe/Copenhagen", preserve_order=True, config_file=None, section="timescaledb", db_host=None, db_port=None, db_name=None, db_user=None, db_password=None, ): r""" Load time series data from TimescaleDB database for building sensor data. This function connects to a TimescaleDB database and loads sensor data from tables with the naming convention `data_{building_name}`. The database schema should have columns: time (TIMESTAMPTZ), uuid (TEXT), name (TEXT), and value (FLOAT). Mathematical Formulation ----------------------- The database query is formulated as: .. math:: Q(t) = \begin{cases} \{y(t) : \text{name} = s_{name} \land \text{uuid} = s_{uuid}\} & \text{if } s_{name}, s_{uuid} \text{ specified} \\ \{y(t) : \text{name} = s_{name}\} & \text{if only } s_{name} \text{ specified} \\ \{y(t) : \text{uuid} = s_{uuid}\} & \text{if only } s_{uuid} \text{ specified} \\ \{y(t)\} & \text{otherwise (all sensors)} \end{cases} where: - :math:`Q(t)` is the query result at time :math:`t` - :math:`s_{name}` is the sensor name filter - :math:`s_{uuid}` is the sensor UUID filter - :math:`y(t)` represents sensor values at time :math:`t` For multiple sensors, data is transformed from long to wide format: .. math:: \mathbf{Y}(t) = \begin{bmatrix} y_1(t) & y_2(t) & \cdots & y_n(t) \end{bmatrix} where: - :math:`\mathbf{Y}(t)` is the wide-format data matrix at time :math:`t` - :math:`y_i(t)` is the value of sensor :math:`i` at time :math:`t` - :math:`n` is the number of sensors The function supports the same resampling and timezone conversion as the spreadsheet loader, following the mathematical formulations defined in the module docstring. Configuration ------------- Database connection can be configured through multiple methods (in order of priority): 1. Function parameters (highest priority) 2. Configuration file (INI format) 3. Environment variables (lowest priority) Args: building_name (str): Name of the building (e.g., "bldg1", "bldg10"). The function will query the table named `data_{building_name}`. sensor_name (str, optional): Name of the sensor to filter by (e.g., "temperature", "humidity", "CO2"). If provided, only data from sensors with this name will be returned. If None, data from all sensors will be returned. sensor_uuid (str, optional): UUID of the sensor to filter by. If provided, only data from sensors with this UUID will be returned. Can be used alone or in combination with sensor_name. step_size (int, optional): Time step size in seconds for resampling (e.g., 300 for 5-minute intervals). Required if resample=True. Ignored if resample=False. start_time (datetime, optional): Start time for data extraction. If timezone-naive, will be localized to 'tz'. If None, no lower time bound is applied. end_time (datetime, optional): End time for data extraction (exclusive). If timezone-naive, will be localized to 'tz'. If None, no upper time bound is applied. resample (bool, optional): Whether to resample the data to regular time intervals. If True, requires step_size, start_time, and end_time to be provided. Defaults to True. resample_method (str, optional): Resampling method to use when resample=True: - "linear": Linear interpolation between data points - "constant": Forward-fill with backward-fill for gaps Defaults to "linear". clip (bool, optional): Whether to clip data to the specified start_time and end_time range. If False, all available data within the time range will be returned. Defaults to True. cache (bool, optional): Whether to cache the results in pickle files for faster subsequent loads. Cache files are stored in generated_files/cached_data/ directory. Defaults to True. cache_root (str, optional): Root directory for cache files. If None, uses the default Twin4Build cache location. tz (str, optional): Timezone for data processing. Can be timezone name (e.g., "Europe/Copenhagen"), UTC offset (e.g., "UTC+2", "GMT-8"), or "UTC". Defaults to "Europe/Copenhagen". config_file (str, optional): Path to INI configuration file for database settings. If provided, database connection parameters will be loaded from this file. See load_database_config() for file format details. section (str, optional): Section of the INI file to use for database settings. db_host (str, optional): Database host address. Overrides config file and environment variables. db_port (int, optional): Database port number. Overrides config file and environment variables. db_name (str, optional): Database name. Overrides config file and environment variables. db_user (str, optional): Database username. Overrides config file and environment variables. db_password (str, optional): Database password. Overrides config file and environment variables. Returns: pandas.DataFrame: DataFrame with time series data. The index is a DatetimeIndex with timezone information. Columns represent different sensors (if multiple sensors are selected) or a single column named after the sensor (if single sensor). For multiple sensors, the DataFrame will have a wide format with each sensor as a separate column. For a single sensor, the DataFrame will have a single column named after the sensor. Raises: Exception: If the specified table does not exist in the database, if database connection fails, if invalid resample_method is provided, or if required parameters are missing for resampling. Example: Basic usage with configuration file: .. code-block:: python from datetime import datetime, timezone start_time = datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc) end_time = datetime(2023, 1, 2, 0, 0, 0, tzinfo=timezone.utc) df = load_from_database("bldg1", start_time=start_time, end_time=end_time, config_file="database.ini") Load specific sensor with explicit connection parameters: .. code-block:: python df = load_from_database( building_name="bldg1", sensor_name="temperature", start_time=start_time, end_time=end_time, step_size=300, db_host="192.168.1.100", db_port=5433, db_user="myuser", db_password="mypassword" ) Note: The function requires psycopg2 to be installed for PostgreSQL connectivity. Database tables should follow the naming convention: data_{building_name}. Database schema should have columns: time, uuid, name, value. Timezone handling follows the same logic as load_from_spreadsheet. Caching uses the same mechanism as load_from_spreadsheet for consistency. For large datasets, consider using sensor_name or sensor_uuid filters to reduce memory usage and improve performance. """ # Third party imports import psycopg2 from psycopg2.extras import RealDictCursor # Load database configuration db_config = load_database_config(config_file, section) # Override with function parameters if provided if db_host is not None: db_config["host"] = db_host if db_port is not None: db_config["port"] = db_port if db_name is not None: db_config["name"] = db_name if db_user is not None: db_config["user"] = db_user if db_password is not None: db_config["password"] = db_password # Handle caching if cache: startTime_str = ( start_time.strftime("%d-%m-%Y %H-%M-%S") if start_time else "None" ) endTime_str = end_time.strftime("%d-%m-%Y %H-%M-%S") if end_time else "None" sensor_filter = ( f"sensor_{sensor_name}_{sensor_uuid}" if sensor_name or sensor_uuid else "all_sensors" ) cached_filename = f"db_{building_name}_{sensor_filter}_stepSize({str(step_size)})_start_time({startTime_str})_end_time({endTime_str})_cached.pickle" cached_filename, isfile = mkdir_in_root( folder_list=["generated_files", "cached_data"], filename=cached_filename, root=cache_root, ) if os.path.isfile(cached_filename): return pd.read_pickle(cached_filename) # Build connection string if db_config["password"]: conn_string = f"host={db_config['host']} port={db_config['port']} dbname={db_config['name']} user={db_config['user']} password={db_config['password']}" else: conn_string = f"host={db_config['host']} port={db_config['port']} dbname={db_config['name']} user={db_config['user']}" try: # Connect to database conn = psycopg2.connect(conn_string) cursor = conn.cursor(cursor_factory=RealDictCursor) # Build query table_name = f"data_{building_name}" # Check if table exists cursor.execute( """ SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = %s ); """, (table_name,), ) if not cursor.fetchone()["exists"]: raise Exception(f"Table {table_name} does not exist in the database") # Build WHERE clause where_conditions = [] params = [] if sensor_name: where_conditions.append("name = %s") params.append(sensor_name) if sensor_uuid: where_conditions.append("uuid = %s") params.append(sensor_uuid) if start_time: where_conditions.append("time >= %s") params.append(start_time) if end_time: where_conditions.append("time < %s") params.append(end_time) where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # Execute query query = f""" SELECT time, uuid, name, value FROM {table_name} WHERE {where_clause} ORDER BY time """ print(f"Executing query: {query}") print(f"Query parameters: {params}") cursor.execute(query, params) rows = cursor.fetchall() print(f"Query returned {len(rows)} rows") if len(rows) > 0: print(f"Sample row: {rows[0]}") else: print("No rows returned from query") # Debug: Check what sensor names exist in the database try: cursor.execute(f"SELECT DISTINCT name FROM {table_name} ORDER BY name") existing_sensors = cursor.fetchall() print( f"Available sensors in database: {[row['name'] for row in existing_sensors]}" ) except Exception as e: print(f"Could not check existing sensors: {e}") if "conn" in locals(): conn.close() except Exception as e: if "conn" in locals(): conn.close() print(f"Error loading data from database: {e}") raise if not rows: print(f"No data found for building {building_name}") return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(rows) # Make the columns the sensor names (if there are multiple sensors) # This is necessary for the sample_from_df function as it expects all columns to be numeric for groupby operations df = df.pivot_table(index="time", columns="name", values="value", aggfunc="mean") df = df.reset_index() # Use the existing sample_from_df function for consistent processing df = sample_from_df( df, datecolumn=0, # datetime is now the index valuecolumn=3, step_size=step_size, start_time=start_time, end_time=end_time, resample=resample, resample_method=resample_method, clip=clip, tz=tz, preserve_order=preserve_order, ) # Cache the result if cache: df.to_pickle(cached_filename) return df