Source code for archiver_test.pv_archive_events

"""Data set module for the Dataset type and related functions."""

from __future__ import annotations

import asyncio
import logging
import pickle
import typing
from dataclasses import dataclass
from enum import Flag
from pathlib import Path
from statistics import mean

import aiofiles
from epicsarchiver import ArchiveEvent

from archiver_test.details import Detail

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] def equal_but_status(event: ArchiveEvent, other: ArchiveEvent) -> bool: return event.val != other.val or event.severity != other.severity
[docs] class Result(Flag): """Result of a test."""
[docs] PASS = True
[docs] FAIL = False
[docs] class DataSetDiff(typing.NamedTuple): """Represent difference information between two datasets."""
[docs] added: set[str]
[docs] removed: set[str]
[docs] different: list[tuple[ArchiveEvent, ArchiveEvent]]
[docs] means_diff: float
[docs] result: Result
@dataclass
[docs] class PVArchiveEvents: """Representation of a data set of the pv information in an archiver."""
[docs] pv_name: str
[docs] details: dict[Detail, str]
[docs] data: dict[str, ArchiveEvent]
[docs] means: dict[str, ArchiveEvent]
[docs] async def export_data( self, output_folder: Path, archiver: str, filename: str ) -> Path: """Stores the data from a dataset into a store folder with path: output_folder/store/archiver/pv_name/filename.pickle Args: output_folder: Path of folder to store data. archiver: Hostname of archiver filename: Filename of file, usually "sop-eop" """ # create folder out_dir = ( output_folder.joinpath("store").joinpath(archiver).joinpath(self.pv_name) ) out_dir.mkdir(parents=True, exist_ok=True) # store data full_path = out_dir.joinpath(f"{filename}.pickle") LOG.info("WRITE file: %s", full_path) async with aiofiles.open(full_path, "wb") as outfile: await outfile.write(pickle.dumps(self, protocol=pickle.HIGHEST_PROTOCOL)) return full_path
[docs] def size(self) -> int: """Returns the number of events in the data set.""" return len(self.data.keys())
[docs] def avg(self) -> float | None: """Returns the average value over the events in the data.""" if len(self.means.keys()) == 0: return None return mean( [ val_to_number(self.means[timestamp].val) for timestamp in self.means.keys() ] )
[docs] async def avg_mean_diff(self, other: PVArchiveEvents) -> float: """Calculates the average difference between the values of the events in the data vs the other dataset. Args: other: Another dataset. """ if len(self.means.keys()) == 0: return 0.0 data_to_mean = [ abs( val_to_number(self.means[timestamp].val) - val_to_number(other.means[timestamp].val) ) for timestamp in filter( lambda key: key in other.means.keys(), self.means.keys() ) ] if len(data_to_mean) > 0: return mean(data_to_mean) return 0.0
[docs] async def compare(self, other: PVArchiveEvents) -> DataSetDiff: """Compares the current dataset with another. Calculates the number of different events in each set, and the average difference between the means. Args: other: Another dataset. """ removed_elements = self.data.keys() - other.data.keys() added_elements = other.data.keys() - self.data.keys() different_elements = [ (self.data[key], other.data[key]) for key in list( filter( lambda key: key in other.data.keys() and self.data[key] != other.data[key], self.data.keys(), ) ) ] LOG.debug( "DIFF PV %s, removed: %s, added: %s, different: %s", self.pv_name, removed_elements, added_elements, different_elements, ) return DataSetDiff( added=added_elements, removed=removed_elements, different=different_elements, means_diff=await self.avg_mean_diff(other), result=Result( len(added_elements) + len(removed_elements) + len(different_elements) == 0 ), )
[docs] def val_to_number( val: int | float | str | list[str] | list[int] | list[float] | bytes, ) -> float: if isinstance(val, int | float): return float(val) if isinstance(val, list[str] | list[float] | list[int] | str): return float(hash(val)) return 0
[docs] async def import_pv_archive_events(input_file: Path) -> PVArchiveEvents: """Imports data. Args: input_file: Path of file of data """ with open(input_file, "rb") as data_file: return typing.cast(PVArchiveEvents, pickle.load(data_file)) # noqa: S301
[docs] async def compare_data( datasets_old: dict[str, PVArchiveEvents], datasets_new: dict[str, PVArchiveEvents], ) -> dict[str, DataSetDiff]: """Compares an array of length 2 of datasets by using DeepDiff Args: datasets_old: Datasets from the first archiver. dataset_new: Datasets form the second archiver. """ gather_data = await asyncio.gather( *[datasets_old[pv].compare(datasets_new[pv]) for pv in datasets_old.keys()] ) results = dict(zip(datasets_old.keys(), gather_data, strict=True)) return results