diff --git a/src/gdm/distribution/sys_functools.py b/src/gdm/distribution/sys_functools.py index 2153b8f8..e4ee4fe0 100644 --- a/src/gdm/distribution/sys_functools.py +++ b/src/gdm/distribution/sys_functools.py @@ -136,7 +136,8 @@ def _get_load_power_per_phase( denormalized_data = get_time_series_actual_data(ts_data) if user_attr.use_actual: - return [(phase, denormalized_data) for phase in load.phases] + n_phases = len(load.phases) + return [(phase, denormalized_data / n_phases) for phase in load.phases] if metadata.name in {"active_power", "reactive_power"}: return [ @@ -390,171 +391,70 @@ def get_aggregated_load_time_series( ) -def _get_combined_single_time_series_df( - sys: DistributionSystem, - component_type: type, - var_of_interest: set[str], - power_function: Callable, - unit_conversion: dict[str, str], - time_series_type: Type[TimeSeriesData] = SingleTimeSeries, - aggregate_phases: bool = True, - per_phase_function: Callable | None = None, - include_features: bool = False, -) -> pd.DataFrame: - """ - Generalized function for returning combined single time series dataframe for given component type. - - Parameters - ---------- - sys: DistributionSystem - Instance of DistributionSystem. - component_type: type - The type of components to retrieve (e.g., DistributionLoad, DistributionSolar). - var_of_interest: set[str] - Set of variable names of interest. - power_function: callable - Function to compute power data for the component. - unit_conversion: dict[str, str] - Optional dictionary to perform unit conversion on data in pint quantities. - time_series_type: Type[TimeSeriesData] - Type of time series data. Defaults to: SingleTimeSeries - aggregate_phases: bool - If True (default), phases are summed and no ``phase`` column is added. - If False, one row per phase is emitted and a ``phase`` column is added. - Requires ``per_phase_function`` when False. - per_phase_function: Callable | None - Function with the same signature as ``power_function`` that returns - ``list[tuple[Phase, Quantity]]``. Required when ``aggregate_phases=False``. - include_features: bool - If True, columns for each entry in ``metadata.features`` (excluding - ``use_actual``) are added to the output DataFrame. Defaults to False. - Returns - ------- - pd.DataFrame - - Raises - ------ - NoComponentsFoundError - If no components of the specified type are found. - NoTimeSeriesDataFound - If no time series data is found for a component. - TypeError - If time series data is not of type SingleTimeSeries. - TimeSeriesVariableDoesNotExist - If specified variables do not exist for the given component. - """ - dfs = [] - components: list[Component] = list(sys.get_components(component_type)) - if not components: - raise NoComponentsFoundError( - f"No components of type {component_type.__name__} found in {sys.name}" - ) +def _get_timestamps(ts_data: TimeSeriesData) -> list: + """Extract timestamps from SingleTimeSeries or NonSequentialTimeSeries.""" + if isinstance(ts_data, SingleTimeSeries): + return [ + ts_data.initial_timestamp + idx * ts_data.resolution for idx in range(ts_data.length) + ] + return ts_data.timestamps - for component in components: - ts_metadata = sys.list_time_series_metadata(component, time_series_type=time_series_type) - if not ts_metadata: - msg = f"No time series data found for {component=}." - raise NoTimeSeriesDataFound(msg) - - avail_vars = {md.name for md in ts_metadata} +def _convert_power_value(power_data, var: str, unit_conversion: dict[str, str]): + """Apply unit conversion to power data. Returns (value, units) tuple.""" + if var in unit_conversion: + if not isinstance(power_data, Quantity): + msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." + raise GDMQuantityError(msg) + return power_data.to(unit_conversion[var]).magnitude, unit_conversion[var] + return power_data, power_data.units - if not var_of_interest.issubset(avail_vars): - msg = f"{avail_vars=}. Only {var_of_interest=} is supported for dataframe computation." - raise TimeSeriesVariableDoesNotExist(msg) - for var in var_of_interest & avail_vars: - ts_data: SingleTimeSeries = sys.get_time_series( - owner=component, name=var, time_series_type=time_series_type - ) - metadata = [meta for meta in ts_metadata if meta.name == var][0] - timestamps = [ - ts_data.initial_timestamp + idx * ts_data.resolution - for idx in range(ts_data.length) - ] - features_cols: dict = ( - { - k: [v] * ts_data.length - for k, v in (metadata.features or {}).items() - if k != "use_actual" - } - if include_features - else {} - ) +def _extract_features_cols(metadata: TimeSeriesMetadata, length: int) -> dict: + """Extract feature columns from metadata, excluding use_actual.""" + return {k: [v] * length for k, v in (metadata.features or {}).items() if k != "use_actual"} - if not aggregate_phases and per_phase_function is not None: - phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( - component, ts_data, metadata - ) - for phase, power_data in phase_power_pairs: - if var in unit_conversion and not isinstance(power_data, Quantity): - msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." - raise GDMQuantityError(msg) - dfs.append( - pd.DataFrame( - { - "timestamp": timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "phase": [phase] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } - ) - ) - else: - power_data = power_function(component, ts_data, metadata) - if var in unit_conversion and not isinstance(power_data, Quantity): - msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." - raise GDMQuantityError(msg) - dfs.append( - pd.DataFrame( - { - "timestamp": timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } - ) - ) - return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() +def _build_power_row_df( + timestamps: list, + var: str, + component_uuid, + length: int, + power_data, + unit_conversion: dict[str, str], + features_cols: dict, + phase: Phase | None = None, +) -> pd.DataFrame: + """Build a DataFrame for one (component, variable, [phase]) time series slice.""" + value, units = _convert_power_value(power_data, var, unit_conversion) + row: dict = { + "timestamp": timestamps, + "name": [var] * length, + "component_uuid": [component_uuid] * length, + } + if phase is not None: + row["phase"] = [phase] * length + row["value"] = value + row["units"] = [units] * length + row.update(features_cols) + return pd.DataFrame(row) -def _get_combined_nonsequential_time_series_df( +def _get_combined_time_series_df( sys: DistributionSystem, component_type: type, var_of_interest: set[str], power_function: Callable, unit_conversion: dict[str, str], - time_series_type: Type[TimeSeriesData] = NonSequentialTimeSeries, + time_series_type: Type[TimeSeriesData] = SingleTimeSeries, aggregate_phases: bool = True, per_phase_function: Callable | None = None, include_features: bool = False, ) -> pd.DataFrame: """ - Generalized function for returning combined nonsequential time series dataframe for given component type. + Generalized function for returning combined time series dataframe for given component type. + + Works with both SingleTimeSeries and NonSequentialTimeSeries. Parameters ---------- @@ -569,7 +469,7 @@ def _get_combined_nonsequential_time_series_df( unit_conversion: dict[str, str] Optional dictionary to perform unit conversion on data in pint quantities. time_series_type: Type[TimeSeriesData] - Type of time series data. Defaults to: NonSequentialTimeSeries + Type of time series data. Defaults to: SingleTimeSeries aggregate_phases: bool If True (default), phases are summed and no ``phase`` column is added. If False, one row per phase is emitted and a ``phase`` column is added. @@ -591,8 +491,6 @@ def _get_combined_nonsequential_time_series_df( If no components of the specified type are found. NoTimeSeriesDataFound If no time series data is found for a component. - TypeError - If time series data is not of type NonSequentialTimeSeries. TimeSeriesVariableDoesNotExist If specified variables do not exist for the given component. """ @@ -617,68 +515,43 @@ def _get_combined_nonsequential_time_series_df( raise TimeSeriesVariableDoesNotExist(msg) for var in var_of_interest & avail_vars: - ts_data: NonSequentialTimeSeries = sys.get_time_series( + ts_data = sys.get_time_series( owner=component, name=var, time_series_type=time_series_type ) metadata = [meta for meta in ts_metadata if meta.name == var][0] - features_cols: dict = ( - { - k: [v] * ts_data.length - for k, v in (metadata.features or {}).items() - if k != "use_actual" - } - if include_features - else {} + timestamps = _get_timestamps(ts_data) + features_cols = ( + _extract_features_cols(metadata, ts_data.length) if include_features else {} ) if not aggregate_phases and per_phase_function is not None: - phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( - component, ts_data, metadata - ) - for phase, power_data in phase_power_pairs: + for phase, power_data in per_phase_function(component, ts_data, metadata): dfs.append( - pd.DataFrame( - { - "timestamp": ts_data.timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "phase": [phase] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } + _build_power_row_df( + timestamps, + var, + component.uuid, + ts_data.length, + power_data, + unit_conversion, + features_cols, + phase=phase, ) ) + elif not aggregate_phases and per_phase_function is None: + msg = "per_phase_function is required when aggregate_phases is False." + raise ValueError(msg) else: power_data = power_function(component, ts_data, metadata) dfs.append( - pd.DataFrame( - { - "timestamp": ts_data.timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } + _build_power_row_df( + timestamps, + var, + component.uuid, + ts_data.length, + power_data, + unit_conversion, + features_cols, ) ) @@ -712,37 +585,25 @@ def get_combined_load_time_series_df( include_features: bool If True, columns for each entry in ``metadata.features`` (excluding ``use_actual``) are added to the output DataFrame. Defaults to False. + Returns ------- pd.DataFrame """ - if time_series_type.__name__ == "SingleTimeSeries": - return _get_combined_single_time_series_df( - sys=sys, - component_type=DistributionLoad, - var_of_interest=var_of_interest, - power_function=_get_load_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_load_power_per_phase, - include_features=include_features, - ) - elif time_series_type.__name__ == "NonSequentialTimeSeries": - return _get_combined_nonsequential_time_series_df( - sys=sys, - component_type=DistributionLoad, - var_of_interest=var_of_interest, - power_function=_get_load_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_load_power_per_phase, - include_features=include_features, - ) - else: + if time_series_type.__name__ not in {"SingleTimeSeries", "NonSequentialTimeSeries"}: msg = f"get_combined_load_time_series_df not implemented for {time_series_type.__name__}" raise IncompatibleTimeSeries(msg) + return _get_combined_time_series_df( + sys=sys, + component_type=DistributionLoad, + var_of_interest=var_of_interest, + power_function=_get_load_power, + unit_conversion=unit_conversion, + time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_load_power_per_phase, + include_features=include_features, + ) def get_combined_solar_time_series_df( @@ -773,47 +634,23 @@ def get_combined_solar_time_series_df( include_features: bool If True, columns for each entry in ``metadata.features`` (excluding ``use_actual``) are added to the output DataFrame. Defaults to False. + Returns ------- pd.DataFrame """ - if time_series_type.__name__ == "SingleTimeSeries": - solar_df = _get_combined_single_time_series_df( - sys=sys, - component_type=DistributionSolar, - var_of_interest=var_of_interest, - power_function=_get_solar_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_solar_power_per_phase, - include_features=include_features, - ) - return solar_df.replace("irradiance", "active_power") - elif time_series_type.__name__ == "NonSequentialTimeSeries": - solar_df = _get_combined_nonsequential_time_series_df( - sys=sys, - component_type=DistributionSolar, - var_of_interest=var_of_interest, - power_function=_get_solar_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_solar_power_per_phase, - include_features=include_features, - ) - return solar_df.replace("irradiance", "active_power") - else: - msg = f"get_combined_load_time_series_df not implemented for {time_series_type.__name__}" + if time_series_type.__name__ not in {"SingleTimeSeries", "NonSequentialTimeSeries"}: + msg = f"get_combined_solar_time_series_df not implemented for {time_series_type.__name__}" raise IncompatibleTimeSeries(msg) - - -# Backward-compatible aliases for legacy API names. -get_timeseries_actual_data = get_time_series_actual_data -_check_for_timeseries_metadata_consistency = _check_for_time_series_metadata_consistency -_check_for_timeseries_consistency = _check_for_time_series_consistency -get_aggregated_solar_timeseries = get_aggregated_solar_time_series -get_aggregated_battery_timeseries = get_aggregated_battery_time_series -get_aggregated_load_timeseries = get_aggregated_load_time_series -get_combined_load_timeseries_df = get_combined_load_time_series_df -get_combined_solar_timeseries_df = get_combined_solar_time_series_df + solar_df = _get_combined_time_series_df( + sys=sys, + component_type=DistributionSolar, + var_of_interest=var_of_interest, + power_function=_get_solar_power, + unit_conversion=unit_conversion, + time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_solar_power_per_phase, + include_features=include_features, + ) + return solar_df.replace("irradiance", "active_power")