Source code for sofirpy.simulation.simulation

"""This module allows to co-simulate multiple fmus and models written in python."""

from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, overload

import numpy as np
import numpy.typing as npt
import pandas as pd
from tqdm import tqdm

import sofirpy.common as co
from sofirpy.simulation.config import BaseSimulationConfig, ExtendedSimulationConfig
from sofirpy.simulation.fmu import Fmu, FmuInitConfig
from sofirpy.simulation.simulation_entity import SimulationEntity


[docs] @dataclass(frozen=True) class System: """System object representing a simulation entity. Args: simulation_entity (SimulationEntity): fmu or python model name (str): name of the system """ simulation_entity: SimulationEntity name: str
[docs] @dataclass(frozen=True) class SystemParameter: """SystemParameter object representing a parameter in a system. Args: system (str): Name of the corresponding system name (str): name of the parameter """ system_name: str name: str
[docs] @dataclass(frozen=True) class Connection: """Representing a connection between two systems. Args: input_point (SystemParameter): SystemParameter object that represents an input of a system output_point (SystemParameter): SystemParameter object that represents an output of a system """ input_point: SystemParameter output_point: SystemParameter
class BaseSimulator: time: float step: int systems: dict[str, System] connections: list[Connection] def __init__( self, fmu_paths: co.FmuPaths | None = None, model_classes: co.SimulationEntityMapping | None = None, connections_config: co.ConnectionsConfig | None = None, init_configs: co.InitConfigs | None = None, ) -> None: config = BaseSimulationConfig( fmu_paths=fmu_paths or {}, custom_model_classes=model_classes or {}, connections=connections_config or {}, init_configs=init_configs or {}, ) config.init_configs, fmu_classes = _extract_fmu_init_configs( config.fmu_paths, config.init_configs ) simulation_entity_mapping = fmu_classes | config.custom_model_classes self.systems = init_systems(simulation_entity_mapping, config.init_configs) self.connections = init_connections(config.connections) self.time = 0 self.step = 0 def do_step(self, time: float, step_size: float) -> None: """Perform a calculation in all systems. Args: time (float): current simulation time step_size (float): step size of the simulation """ for system in self.systems.values(): system.simulation_entity.do_step(time, step_size) def set_systems_inputs(self) -> None: """Set inputs for all systems.""" for connection in self.connections: input_system_name = connection.input_point.system_name input_system = self.systems[input_system_name] input_name = connection.input_point.name output_system_name = connection.output_point.system_name output_system = self.systems[output_system_name] output_name = connection.output_point.name input_value = output_system.simulation_entity.get_parameter_value( output_name, ) input_system.simulation_entity.set_parameter(input_name, input_value) def get_parameter(self, system_name: str, parameter_name: str) -> co.ParameterValue: """Get the value of a parameter in a system. Args: system_name (str): name of the system parameter_name (str): name of the parameter Returns: float: value of the parameter """ system = self.systems[system_name] return system.simulation_entity.get_parameter_value(parameter_name) def set_parameter( self, system_name: str, parameter_name: str, value: co.ParameterValue ) -> None: """Set the value of a parameter in a system. Args: system_name (str): name of the system parameter_name (str): name of the parameter value (float): value to set """ system = self.systems[system_name] system.simulation_entity.set_parameter(parameter_name, value) def conclude_simulation(self) -> None: """Conclude the simulation for all simulation entities.""" for system in self.systems.values(): system.simulation_entity.conclude_simulation()
[docs] class Simulator(BaseSimulator): """Object that performs the simulation.""" def __init__( self, stop_time: float, step_size: float, logging_step_size: float | None = None, fmu_paths: co.FmuPaths | None = None, model_classes: co.SimulationEntityMapping | None = None, connections_config: co.ConnectionsConfig | None = None, init_configs: co.InitConfigs | None = None, parameters_to_log: co.ParametersToLog | None = None, ) -> None: super().__init__(fmu_paths, model_classes, connections_config, init_configs) extended_simulation_config = ExtendedSimulationConfig( system_names=set(self.systems), stop_time=stop_time, step_size=step_size, logging_step_size=logging_step_size or step_size, parameters_to_log=parameters_to_log or {}, ) self.parameters_to_log = init_parameter_list(parameters_to_log or {}) self.stop_time = extended_simulation_config.stop_time self.step_size = extended_simulation_config.step_size self.logging_step_size = extended_simulation_config.logging_step_size self.start_time = extended_simulation_config.start_time
[docs] def simulate( self, ) -> pd.DataFrame: """Simulate the systems. The following steps are performed. 1. A time array is created starting from 0 to the specified stop time. The intervals have the size of the step size. If the last element in the array is greater than the stop time, it is deleted. Advancing in time this way, leads to less numerical errors in comparison than using a while loop and adding the step size in each iteration. 2. The logging multiple is calculated from the logging step size. Since the logging step size needs to be a multiple of the step size, the logging multiple is an integer. Therefore a precise modulo operation inside the simulation loop can be performed. E.g if the step size 1e-3 and the logging step size is 1e-1, the logging multiple will be 100. Therefor every 100 time step will be logged. 3. The numpy results object is initialized. 4. The start values are logged. 5. The simulation loop starts. 5.1 A simulation step is performed. 5.2 All system inputs are set. 5.3 If the time step + 1 is a multiple of the logging multiple, values are logged. 6. The simulation process is concluded. 7. The numpy results object is converted to a pandas DataFrame. Returns: pd.DataFrame: result DataFrame with times series of logged parameters """ logging.info(f"Simulation stop time set to {self.stop_time} seconds.") logging.info(f"Simulation step size set to {self.step_size} seconds.") logging.info( f"Simulation logging step size set to {self.logging_step_size} seconds." ) time_series = self.compute_time_array( self.stop_time, self.step_size, self.start_time ) number_log_steps = int(self.stop_time / self.logging_step_size) + 1 logging_multiple = round(self.logging_step_size / self.step_size) dtypes = self.get_dtypes_of_logged_parameters() # self.results is a structured numpy array self.results = np.zeros(number_log_steps, dtype=dtypes) logging.info("Starting simulation.") self.log_values(time=0, log_step=0) log_step = 1 for time_step, time in enumerate(tqdm(time_series[:-1])): self.do_step(time, self.step_size) self.set_systems_inputs() if ((time_step + 1) % logging_multiple) == 0: self.log_values(time_series[time_step + 1], log_step) log_step += 1 logging.info("Simulation done.") logging.info("Concluding simulation.") self.conclude_simulation() logging.info("Simulation concluded.") return self.convert_to_data_frame(self.results)
[docs] def compute_time_array( self, stop_time: float, step_size: float, start_time: float, ) -> npt.NDArray[np.float64]: """Compute the time array for the simulation. Args: stop_time (float): stop time for the simulation step_size (float): step size for the simulation start_time (float): start time of the simulation. Returns: npt.NDArray[np.float64]: time array """ time_series = np.arange(start_time, stop_time + step_size, step_size) if time_series[-1] > stop_time: return time_series[:-1] return time_series
[docs] def log_values(self, time: float, log_step: int) -> None: """Log parameter values that are set to be logged. Args: time (float): current simulation time log_step (int): current time step """ self.results[log_step][0] = time for i, parameter in enumerate(self.parameters_to_log, start=1): system_name = parameter.system_name system = self.systems[system_name] parameter_name = parameter.name value = system.simulation_entity.get_parameter_value(parameter_name) self.results[log_step][i] = value
[docs] def convert_to_data_frame(self, results: npt.NDArray[np.void]) -> pd.DataFrame: """Covert result numpy array to DataFrame. Args: results (npt.NDArray[np.void]): Results of the simulation. Returns: pd.DataFrame: Results as DataFrame. Columns are named as follows: '<system_name>.<parameter_name>'. """ return pd.DataFrame(results)
[docs] def get_units(self) -> co.Units: """Get a dictionary with units of all logged parameters. Returns: Units: keys: parameter name, values: unit. If the unit can not be obtained it is set to None. """ units = {} for parameter in self.parameters_to_log: system_name = parameter.system_name system = self.systems[system_name] parameter_name = parameter.name unit = system.simulation_entity.get_unit(parameter_name) units[f"{system.name}.{parameter_name}"] = unit return units
[docs] def get_dtypes_of_logged_parameters(self) -> npt.DTypeLike: """Get the dtypes of the logged parameters. Returns: np.dtypes.VoidDType: dtypes of the logged parameters """ dtypes: list[tuple[str, type]] = [("time", np.float64)] for parameter in self.parameters_to_log: system_name = parameter.system_name system = self.systems[system_name] parameter_name = parameter.name dtype = system.simulation_entity.get_dtype_of_parameter(parameter_name) dtypes.append((f"{system.name}.{parameter_name}", dtype)) return np.dtype(dtypes)
@overload def simulate( stop_time: float, step_size: float, fmu_paths: co.FmuPaths | None = ..., model_classes: co.ModelClasses | None = ..., connections_config: co.ConnectionsConfig | None = ..., init_configs: co.InitConfigs | None = ..., parameters_to_log: co.ParametersToLog | None = ..., logging_step_size: float | None = ..., *, get_units: Literal[True], ) -> tuple[pd.DataFrame, co.Units]: ... @overload def simulate( stop_time: float, step_size: float, fmu_paths: co.FmuPaths | None = ..., model_classes: co.ModelClasses | None = ..., connections_config: co.ConnectionsConfig | None = ..., init_configs: co.InitConfigs | None = ..., parameters_to_log: co.ParametersToLog | None = ..., logging_step_size: float | None = ..., *, get_units: Literal[False], ) -> pd.DataFrame: ... @overload def simulate( stop_time: float, step_size: float, fmu_paths: co.FmuPaths | None = ..., model_classes: co.ModelClasses | None = ..., connections_config: co.ConnectionsConfig | None = ..., init_configs: co.InitConfigs | None = ..., parameters_to_log: co.ParametersToLog | None = ..., logging_step_size: float | None = ..., ) -> pd.DataFrame: ...
[docs] def simulate( stop_time: float, step_size: float, fmu_paths: co.FmuPaths | None = None, model_classes: co.ModelClasses | None = None, connections_config: co.ConnectionsConfig | None = None, init_configs: co.InitConfigs | None = None, parameters_to_log: co.ParametersToLog | None = None, logging_step_size: float | None = None, get_units: bool = False, ) -> pd.DataFrame | tuple[pd.DataFrame, co.Units]: """Simulate fmus and models written in python. Any number of python models and fmus can be simulated, but at least one python model or fmu has to be simulated. Args: stop_time (float): stop time for the simulation step_size (float): step size for the simulation fmu_paths (FmuPaths | None, optional): Dictionary which defines which fmu should be simulated. key -> name of the fmu; value -> path to the fmu >>> fmu_paths = { ... "<name of the fmu 1>": <Path to the fmu1>, ... "<name of the fmu 2>": <Path to the fmu2>, ... } Note: The name of the fmus can be chosen arbitrarily, but each name in 'fmu_paths' and 'model_classes' must occur only once. Defaults to None. model_classes (ModelClasses | None, optional): Dictionary which defines which Python Models should be simulated. key -> name of the model; value -> Class of the model. The class that defines the model must inherit from the abstract class SimulationEntity. >>> model_classes = { ... "<name of the model 1>": <class of the model1> ... "<name of the model 2>": <class of the model2> ... } Note: The name of the models can be chosen arbitrarily, but each name in 'fmu_paths' and 'model_classes' must occur only once. Defaults to None. connections_config (ConnectionsConfig | None, optional): Dictionary which defines how the inputs and outputs of the systems (fmu or python model) are connected. key -> name of the system; value -> list of connections >>> connections_config = { ... "<name of the system 1>": [ ... { ... "parameter_name": "<name of the input" ... "parameter of the system>", ... "connect_to_system": "<name of the system the input" ... "parameter should be connected to>", ... "connect_to_external_parameter": "<name of the output" ... "parameter in the" ... "connected system the" ... "input parameter should" ... "be connected to>" ... }, ... { ... "parameter_name": "<name of the input" ... "parameter of the system>", ... "connect_to_system": "<name of the system the input" ... "parameter should be connected to>", ... "connect_to_external_parameter": "<name of the output" ... "parameter in the" ... "connected system the" ... "input parameter should" ... "be connected to>" ... } ... ], ... "<name of the system 2>": [ ... { ... "parameter_name": "<name of the input" ... "parameter of the system>", ... "connect_to_system": "<name of the system the input" ... "parameter should be connected to>", ... "connect_to_external_parameter": "<name of the output" ... "parameter in the" ... "connected system the" ... "input parameter should" ... "be connected to>" ... } ... ] ... } Defaults to None. init_configs (co.InitConfigs | None, optional): Dictionary which defines initial configurations for the systems. Fmus can only have the key 'start_values' for specifying the start values. key -> name of the system; value -> dictionary (key -> config name; value -> config value) >>> init_configs = { ... "<name of system 1>": ... { ... "<name of config 1>": <config value 1>, ... "<name of config 2>", <config value 2> ... }, ... "<name of fmu 1>": ... { ... "start_values": { ... "<name of parameter 1>": (<start value>, unit e.g 'kg.m2'), ... "<name of parameter 2>": <start value> ... } ... } Defaults to None. parameters_to_log (ParametersToLog | None, optional): Dictionary that defines which parameters should be logged. key -> name of the system; value -> list of parameters names to be logged >>> parameters_to_log = { ... "<name of system 1>": ... [ ... "<name of parameter 1>", ... "<name of parameter 2>", ... ], ... "<name of system 2>": ... [ ... "<name of parameter 1>", ... "<name of parameter 2>", ... ] ... } Defaults to None. logging_step_size (float | None, optional): step size for logging. It must be a multiple of the chosen simulation step size. Example: If the simulation step size is set to 1e-3 and logging step size is set to 2e-3, every second time step is logged. Defaults to None. get_units (bool, optional): Determines whether the units of the logged parameter should be returned. Defaults to False. Returns: pd.DataFrame | tuple[pd.DataFrame, co.Units]: Result DataFrame with times series of logged parameters, units of logged parameters. """ logging.basicConfig( format="Simulation::%(levelname)s::%(message)s", level=logging.INFO, force=True, ) simulator = Simulator( fmu_paths=fmu_paths, model_classes=model_classes, connections_config=connections_config, init_configs=init_configs, parameters_to_log=parameters_to_log, stop_time=stop_time, step_size=step_size, logging_step_size=logging_step_size, ) results = simulator.simulate() if get_units: units = simulator.get_units() return results, units return results
def init_systems( simulation_entity_mapping: co.SimulationEntityMapping, init_configs: co.InitConfigs ) -> dict[str, System]: systems: dict[str, System] = {} for system_name, simulation_entity_class in simulation_entity_mapping.items(): init_config = init_configs.get(system_name, {}) system_instance = simulation_entity_class(init_config) system = System(system_instance, system_name) systems[system_name] = system logging.info(f"System '{system_name}' initialized.") return systems
[docs] def init_connections(connections_config: co.ConnectionsConfig) -> list[Connection]: """Initialize all the connections. Args: connections_config (ConnectionsConfig): Defines how all systems are connected. Returns: list[Connection]: List of Connections. """ all_connections: list[Connection] = [] for this_system_name, connections in connections_config.items(): for con in connections: this_parameter_name = con[co.ConnectionKeys.INPUT_PARAMETER.value] this_connection_point = SystemParameter( this_system_name, this_parameter_name, ) other_system_name = con[co.ConnectionKeys.CONNECTED_SYSTEM.value] other_parameter_name = con[co.ConnectionKeys.OUTPUT_PARAMETER.value] other_connection_point = SystemParameter( other_system_name, other_parameter_name, ) connection = Connection(this_connection_point, other_connection_point) all_connections.append(connection) logging.info("Connections initialized.") return all_connections
[docs] def init_parameter_list(parameters_to_log: co.ParametersToLog) -> list[SystemParameter]: """Initialize all parameters that should be logged. Args: parameters_to_log (ParametersToLog): Defines which parameters should be logged. Returns: list[SystemParameter]: List of system parameters that should be logged. """ log: list[SystemParameter] = [] for system_name, parameter_names in parameters_to_log.items(): for parameter_name in parameter_names: parameter_to_log = SystemParameter(system_name, parameter_name) log.append(parameter_to_log) return log
def _extract_fmu_init_configs( fmu_paths: dict[str, Path], init_config: co.InitConfig, ) -> tuple[dict[str, Any], co.SimulationEntityMapping]: fmu_classes: co.SimulationEntityMapping = {} for fmu_name, fmu_path in fmu_paths.items(): fmu_init_config: dict[str, Any] = init_config.get(fmu_name, {}) init_config[fmu_name] = FmuInitConfig( fmu_path=fmu_path, name=fmu_name, start_values=fmu_init_config.get(co.StartValueConfigLabel, {}), ).model_dump() fmu_classes[fmu_name] = Fmu return init_config, fmu_classes