Source code for ops_data_client.handle_commands

"""Methods for gathering data from different sources."""

from __future__ import annotations

import logging
from pathlib import Path

import pandas as pd
import polars as pl

from ops_data_client.timing import cycleid
from ops_data_client.utilities.configuration import parse_config_options

LOG: logging.Logger = logging.getLogger(__name__)


[docs] def get_data( pvs: list[str], start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, time_range_seconds: float = 5, number: int = 0, arch_processor_name: str = "", arch_processor_bin_size: int = 1, config_file: str = str(Path.home()) + "/.ops-config.ini", ) -> pl.DataFrame: """Get data for given PVs from the different sources, constrained by options. Args: pvs (list): List of PV names number (int): number of records to retrieve start (pd.Timestamp): start of time range end (pd.Timestamp): end of time range time_range_seconds: (float): search time range arch_processor_name (str): name of pre-processor to use in Archiver arch_processor_bin_size (int): bin size for certain pre-processors config_file (str): the config file name path Return: dataframe_total (polars dataframe): final output containing all found datasets """ # Parse the config file, use supplied options if given. ops_config, pvs, start, end, datasources, timing_datasource = parse_config_options( pvs=pvs, start=start, end=end, time_range_seconds=time_range_seconds, number=number, arch_processor_name=arch_processor_name, arch_processor_bin_size=arch_processor_bin_size, config_file=config_file, ) if not ops_config or not datasources or not pvs: return pl.DataFrame() dataframe_list: list[pl.DataFrame] = [] # Get the "official" cycle-ids that cover the selected time range, plus or minus # enough time to contain all cycles stored in an SDS event. search_offset_min_s = 15 search_offset_max_s = 1 cycle_id_dataframe = cycleid.get_cycle_id_timestamps( start=start - pd.Timedelta(seconds=search_offset_min_s), end=end + pd.Timedelta(seconds=search_offset_max_s), timing_datasource=timing_datasource, ) # Loop over each configured data source. for ds in datasources: # See which pvs are available in this source. pvs_found = ds.find_pvs( pvs=pvs, start=start, ) # Get a dictionary containing a dataframe for each PV. dataframe_dict = ds.get_data( pvs=pvs_found, start=start, end=end, ) # For each PV dataframe, add cycle-ids and cycle-id timestamps dataframe_dict = cycleid.add_cycle_id_timestamp( datasource=ds, dataframe_dict=dataframe_dict, timing_datasource=timing_datasource, cycle_id_dataframe=cycle_id_dataframe, ) # Add source name column to dataframes when debugging. if logging.root.level <= logging.DEBUG: _add_source_name_column( dataframe_dict=dataframe_dict, source_name=ds.name, ) # Add dataframes from this data source to a list of dataframe from all data # sources. dataframe_list.extend(dataframe_dict.values()) # Create a polars dataframe that combines the dataframes for each PV from # all data sources, aligned and sorted by cycle_id. return _combine_dataframes(dataframe_list)
def _add_source_name_column( dataframe_dict: dict[str, pl.DataFrame], source_name: str = "", ) -> dict[str, pl.DataFrame]: """Add a column to all dataframes containing the source name.""" if not dataframe_dict: return {} for pv, dataframe in dataframe_dict.items(): # Add source name column, sort by cycle_id. dataframe_dict[pv] = dataframe.with_columns(source=pl.lit(source_name)).sort( "cycle_id", ) return dataframe_dict def _combine_dataframes( dataframe_list: list[pl.DataFrame], ) -> pl.DataFrame: """Combine multiple dataframes, sort by cycle-id.""" if not dataframe_list: return pl.DataFrame() # Combine dataframes with common supertype, missing column values set to null return pl.concat(dataframe_list, how="diagonal_relaxed").sort([ "cycle_id", "data_ts", ])