"""Getting and munging data from the archiver."""
import asyncio
import logging
from datetime import datetime
from epicsarchiver import ArchiveEvent, ArchiverAppliance
from requests import HTTPError
from archiver_test.details import Detail
from archiver_test.pv_archive_events import PVArchiveEvents
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
async def munge_data(
pv: str,
archiver_data: list[ArchiveEvent] | None,
pv_status: dict[Detail, str],
means_data: list[ArchiveEvent] | None,
) -> PVArchiveEvents:
"""Converts the data straight from the archiver to
an intermediary format that is easier to read.
Args:
archiver_data: list of archiver events
pv: name of pv
pv_status: Details of pv from getPVDetails
means_data: Same as archiver_data but expected to be calculated
means value events.
"""
event_dict = event_list_to_dict(archiver_data)
means_event_dict = event_list_to_dict(means_data)
return PVArchiveEvents(pv, pv_status, event_dict, means_event_dict)
[docs]
def event_list_to_dict(a_data: list[ArchiveEvent] | None) -> dict[str, ArchiveEvent]:
"""Converts raw archiver data to a dictionary of events with isoformat
datetimestamp keys.
Args:
data: The raw archiver json data.
"""
event_dict: dict[str, ArchiveEvent] = {}
if a_data is None:
return {}
if len(a_data) == 0:
return event_dict
for event in a_data:
time = event.timestamp
event_dict[time.isoformat()] = event
return event_dict
[docs]
async def get_details(archiver: ArchiverAppliance, pv: str) -> dict[Detail, str]:
"""Get the PV Details for a given pv and input archiver.
Args:
archiver: The archiver to query
pv: name of pv
"""
details = []
try:
details = archiver.get_pv_details(pv)
except HTTPError as err:
LOG.error("GET details for %s, %s, HTTPError: %s", pv, archiver.hostname, err)
filtered_dict = {}
for detail in details:
match detail["name"]:
case "Are we using PVAccess?":
filtered_dict[Detail.PVAccess] = detail["value"]
case "Sampling method:":
filtered_dict[Detail.Method] = detail["value"]
case "Sampling period:":
filtered_dict[Detail.Period] = detail["value"]
case "How many events lost totally so far?":
filtered_dict[Detail.EventsLost] = detail["value"]
case "Sample buffer capacity":
filtered_dict[Detail.Capacity] = detail["value"]
case "Estimated event rate (events/sec)":
filtered_dict[Detail.EventRate] = detail["value"]
case "How many events so far?":
filtered_dict[Detail.TotalEvents] = str(int(detail["value"]))
case "Archiver DBR type (from typeinfo):":
filtered_dict[Detail.DBRType] = detail["value"]
return filtered_dict
[docs]
async def archiver_get_dataset(
pv: str, sop: datetime, eop: datetime, archiver: ArchiverAppliance
) -> PVArchiveEvents:
"""Retrieve archived data and convert to the internal Dataset format
Args:
archiver: The archiver to query
pv: name of the pv.
sop: start time. Can be a string or `datetime.datetime` object.
eop: end time. Can be a string or `datetime.datetime` object.
Returns:
Dataset
"""
data = archiver.get_events(pv, sop, eop)
means_data = archiver.get_events(f"mean_10({pv})", sop, eop)
LOG.info("GET data hostname: %s pv: %s", archiver.hostname, pv)
LOG.debug("GET data hostname: %s pv: %s, data: %s", archiver.hostname, pv, data)
status = await get_details(archiver, pv)
return await munge_data(pv, data, status, means_data)
[docs]
async def get_data(
pvs: list[str], sop: datetime, eop: datetime, archiver_url: str
) -> dict[str, PVArchiveEvents]:
"""Retrieve archived data for multiple pvs
Args:
archiver_url: The archiver hostname to query
pvs: name of the pvs to check.
sop: start time. Can be a string or `datetime.datetime` object.
eop: end time. Can be a string or `datetime.datetime` object.
Returns:
dictionary of Datasets with pv names as keys.
"""
LOG.debug("GET data hostname: %s, period: %s - %s", archiver_url, sop, eop)
archiver = ArchiverAppliance(archiver_url)
gather_data = await asyncio.gather(
*[archiver_get_dataset(pv, sop, eop, archiver) for pv in pvs]
)
pvs_data = {pv_data.pv_name: pv_data for pv_data in gather_data}
LOG.debug(
"GET data hostname: %s, period: %s - %s is %s", archiver_url, sop, eop, pvs_data
)
return pvs_data
[docs]
async def archiver_get_version(archiver_url: str) -> str:
"""Retrieve archiver version
Args:
archiver_url: The archiver hostname to query
Returns:
version name from archiver
"""
LOG.debug("GET versions hostname: %s", archiver_url)
archiver = ArchiverAppliance(archiver_url)
version = "Unknown"
try:
if isinstance(archiver.version, str):
version = archiver.version
except HTTPError as err:
LOG.error("GET versions number for %s, HTTPError: %s", archiver.hostname, err)
return version