Source code for archiver_test.get

"""Getting and munging data from the archiver."""

import asyncio
import logging
from datetime import datetime

from epicsarchiver import ArchiveEvent, ArchiverAppliance
from requests import HTTPError

from archiver_test.details import Detail
from archiver_test.pv_archive_events import PVArchiveEvents

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] async def munge_data( pv: str, archiver_data: list[ArchiveEvent] | None, pv_status: dict[Detail, str], means_data: list[ArchiveEvent] | None, ) -> PVArchiveEvents: """Converts the data straight from the archiver to an intermediary format that is easier to read. Args: archiver_data: list of archiver events pv: name of pv pv_status: Details of pv from getPVDetails means_data: Same as archiver_data but expected to be calculated means value events. """ event_dict = event_list_to_dict(archiver_data) means_event_dict = event_list_to_dict(means_data) return PVArchiveEvents(pv, pv_status, event_dict, means_event_dict)
[docs] def event_list_to_dict(a_data: list[ArchiveEvent] | None) -> dict[str, ArchiveEvent]: """Converts raw archiver data to a dictionary of events with isoformat datetimestamp keys. Args: data: The raw archiver json data. """ event_dict: dict[str, ArchiveEvent] = {} if a_data is None: return {} if len(a_data) == 0: return event_dict for event in a_data: time = event.timestamp event_dict[time.isoformat()] = event return event_dict
[docs] async def get_details(archiver: ArchiverAppliance, pv: str) -> dict[Detail, str]: """Get the PV Details for a given pv and input archiver. Args: archiver: The archiver to query pv: name of pv """ details = [] try: details = archiver.get_pv_details(pv) except HTTPError as err: LOG.error("GET details for %s, %s, HTTPError: %s", pv, archiver.hostname, err) filtered_dict = {} for detail in details: match detail["name"]: case "Are we using PVAccess?": filtered_dict[Detail.PVAccess] = detail["value"] case "Sampling method:": filtered_dict[Detail.Method] = detail["value"] case "Sampling period:": filtered_dict[Detail.Period] = detail["value"] case "How many events lost totally so far?": filtered_dict[Detail.EventsLost] = detail["value"] case "Sample buffer capacity": filtered_dict[Detail.Capacity] = detail["value"] case "Estimated event rate (events/sec)": filtered_dict[Detail.EventRate] = detail["value"] case "How many events so far?": filtered_dict[Detail.TotalEvents] = str(int(detail["value"])) case "Archiver DBR type (from typeinfo):": filtered_dict[Detail.DBRType] = detail["value"] return filtered_dict
[docs] async def archiver_get_dataset( pv: str, sop: datetime, eop: datetime, archiver: ArchiverAppliance ) -> PVArchiveEvents: """Retrieve archived data and convert to the internal Dataset format Args: archiver: The archiver to query pv: name of the pv. sop: start time. Can be a string or `datetime.datetime` object. eop: end time. Can be a string or `datetime.datetime` object. Returns: Dataset """ data = archiver.get_events(pv, sop, eop) means_data = archiver.get_events(f"mean_10({pv})", sop, eop) LOG.info("GET data hostname: %s pv: %s", archiver.hostname, pv) LOG.debug("GET data hostname: %s pv: %s, data: %s", archiver.hostname, pv, data) status = await get_details(archiver, pv) return await munge_data(pv, data, status, means_data)
[docs] async def get_data( pvs: list[str], sop: datetime, eop: datetime, archiver_url: str ) -> dict[str, PVArchiveEvents]: """Retrieve archived data for multiple pvs Args: archiver_url: The archiver hostname to query pvs: name of the pvs to check. sop: start time. Can be a string or `datetime.datetime` object. eop: end time. Can be a string or `datetime.datetime` object. Returns: dictionary of Datasets with pv names as keys. """ LOG.debug("GET data hostname: %s, period: %s - %s", archiver_url, sop, eop) archiver = ArchiverAppliance(archiver_url) gather_data = await asyncio.gather( *[archiver_get_dataset(pv, sop, eop, archiver) for pv in pvs] ) pvs_data = {pv_data.pv_name: pv_data for pv_data in gather_data} LOG.debug( "GET data hostname: %s, period: %s - %s is %s", archiver_url, sop, eop, pvs_data ) return pvs_data
[docs] async def archiver_get_version(archiver_url: str) -> str: """Retrieve archiver version Args: archiver_url: The archiver hostname to query Returns: version name from archiver """ LOG.debug("GET versions hostname: %s", archiver_url) archiver = ArchiverAppliance(archiver_url) version = "Unknown" try: if isinstance(archiver.version, str): version = archiver.version except HTTPError as err: LOG.error("GET versions number for %s, HTTPError: %s", archiver.hostname, err) return version