"""Methods for gathering data from different sources."""
from __future__ import annotations
import logging
from pathlib import Path
import pandas as pd
import polars as pl
from ops_data_client.timing import cycleid
from ops_data_client.utilities.configuration import parse_config_options
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
def get_data(
pvs: list[str],
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
time_range_seconds: float = 5,
number: int = 0,
arch_processor_name: str = "",
arch_processor_bin_size: int = 1,
config_file: str = str(Path.home()) + "/.ops-config.ini",
) -> pl.DataFrame:
"""Get data for given PVs from the different sources, constrained by options.
Args:
pvs (list): List of PV names
number (int): number of records to retrieve
start (pd.Timestamp): start of time range
end (pd.Timestamp): end of time range
time_range_seconds: (float): search time range
arch_processor_name (str): name of pre-processor to use in Archiver
arch_processor_bin_size (int): bin size for certain pre-processors
config_file (str): the config file name path
Return:
dataframe_total (polars dataframe): final output containing all found datasets
"""
# Parse the config file, use supplied options if given.
ops_config, pvs, start, end, datasources, timing_datasource = parse_config_options(
pvs=pvs,
start=start,
end=end,
time_range_seconds=time_range_seconds,
number=number,
arch_processor_name=arch_processor_name,
arch_processor_bin_size=arch_processor_bin_size,
config_file=config_file,
)
if not ops_config or not datasources or not pvs:
return pl.DataFrame()
dataframe_list: list[pl.DataFrame] = []
# Get the "official" cycle-ids that cover the selected time range, plus or minus
# enough time to contain all cycles stored in an SDS event.
search_offset_min_s = 15
search_offset_max_s = 1
cycle_id_dataframe = cycleid.get_cycle_id_timestamps(
start=start - pd.Timedelta(seconds=search_offset_min_s),
end=end + pd.Timedelta(seconds=search_offset_max_s),
timing_datasource=timing_datasource,
)
# Loop over each configured data source.
for ds in datasources:
# See which pvs are available in this source.
pvs_found = ds.find_pvs(
pvs=pvs,
start=start,
)
# Get a dictionary containing a dataframe for each PV.
dataframe_dict = ds.get_data(
pvs=pvs_found,
start=start,
end=end,
)
# For each PV dataframe, add cycle-ids and cycle-id timestamps
dataframe_dict = cycleid.add_cycle_id_timestamp(
datasource=ds,
dataframe_dict=dataframe_dict,
timing_datasource=timing_datasource,
cycle_id_dataframe=cycle_id_dataframe,
)
# Add source name column to dataframes when debugging.
if logging.root.level <= logging.DEBUG:
_add_source_name_column(
dataframe_dict=dataframe_dict,
source_name=ds.name,
)
# Add dataframes from this data source to a list of dataframe from all data
# sources.
dataframe_list.extend(dataframe_dict.values())
# Create a polars dataframe that combines the dataframes for each PV from
# all data sources, aligned and sorted by cycle_id.
return _combine_dataframes(dataframe_list)
def _add_source_name_column(
dataframe_dict: dict[str, pl.DataFrame],
source_name: str = "",
) -> dict[str, pl.DataFrame]:
"""Add a column to all dataframes containing the source name."""
if not dataframe_dict:
return {}
for pv, dataframe in dataframe_dict.items():
# Add source name column, sort by cycle_id.
dataframe_dict[pv] = dataframe.with_columns(source=pl.lit(source_name)).sort(
"cycle_id",
)
return dataframe_dict
def _combine_dataframes(
dataframe_list: list[pl.DataFrame],
) -> pl.DataFrame:
"""Combine multiple dataframes, sort by cycle-id."""
if not dataframe_list:
return pl.DataFrame()
# Combine dataframes with common supertype, missing column values set to null
return pl.concat(dataframe_list, how="diagonal_relaxed").sort([
"cycle_id",
"data_ts",
])