Source code for archiver_test.archivers_test_set

"""This module lets you run a collection of tests against
the data in multiple archiver clusters.
"""

import asyncio
import json
import logging
import typing
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path

from dateutil.relativedelta import relativedelta
from epicsarchiver import ArchiveEvent
from rich.console import Console
from rich.table import Table

from archiver_test.details import Detail
from archiver_test.fail_reasons import Reason
from archiver_test.get import archiver_get_version, get_data
from archiver_test.pv_archive_events import (
    DataSetDiff,
    PVArchiveEvents,
    compare_data,
    equal_but_status,
    import_pv_archive_events,
)

[docs] LOG: logging.Logger = logging.getLogger(__name__)
@dataclass
[docs] class ArchiversTestSet: """Represents data needed to run a compare test between two archiver clusters. Returns: ArchiversTestSet: Storage of the configuration of a test. """
[docs] msg: str
[docs] pvs: list[str]
[docs] sop: datetime
[docs] eop: datetime
[docs] archiver_old: str
[docs] archiver_new: str
[docs] archiver_old_data: list[Path] | None
[docs] async def export(self, output_folder: Path) -> Path: """Stores the configuration for a test run, with the result. Args: output_folder: Path to store the config. """ test_json_object = json.dumps( [self.__dict__.copy()], indent=4, sort_keys=True, default=str ) current_time = datetime.now(tz=UTC) test_set_name = f"{self.msg}-{current_time.isoformat()}" filename = "".join( x if (x.isalnum() or x in "._-") else "_" for x in test_set_name ) final_path = output_folder.joinpath(f"{filename}.config.json") with open(final_path, "w") as outfile: outfile.write(test_json_object) return final_path
[docs] async def store( self, datasets: dict[str, dict[str, PVArchiveEvents]], output_folder: Path ) -> None: """Store data from running a test for use in future tests, without redownloading the data. Args: datasets: Data from running test_set output_folder: Path of folder to store the output data. """ config_path = await self.export(output_folder) data_filename = "".join( ( self.sop.isoformat(), self.eop.isoformat(), ".", datetime.now(tz=UTC).isoformat(), ) ) store_paths = await asyncio.gather( *[ datasets[arch][data_key].export_data(output_folder, arch, data_filename) for arch in datasets.keys() for data_key in datasets[arch].keys() ] ) LOG.info( "WROTE config file: %s, store files: %s", config_path, [str(p) for p in store_paths], )
[docs] async def load_old_data(self) -> dict[str, PVArchiveEvents]: """Loads old data""" LOG.debug("LOAD old data from: %s", self.archiver_old_data) res: dict[str, PVArchiveEvents] = {} if self.archiver_old_data is None: return res for path in self.archiver_old_data: dataset = await import_pv_archive_events(path) res[dataset.pv_name] = dataset return res
[docs] async def check(self, output_folder: Path | None) -> "TestResult": """Tests a single ArchiverTestSet Args: output_folder: Path where to store the data from the run. """ if self.archiver_old_data: old_data = await self.load_old_data() else: old_data = await get_data(self.pvs, self.sop, self.eop, self.archiver_old) new_data = await get_data(self.pvs, self.sop, self.eop, self.archiver_new) datasets = {self.archiver_new: new_data, self.archiver_old: old_data} results = await compare_data( datasets[self.archiver_old], datasets[self.archiver_new], ) if output_folder: await self.store(datasets, output_folder) old_version = await archiver_get_version(self.archiver_old) new_version = await archiver_get_version(self.archiver_new) return TestResult(self, old_version, new_version, old_data, new_data, results)
[docs] def import_test_sets(config: Path) -> list[ArchiversTestSet]: """Loads a config file in json. Files are created via the [store_test_set] method. Args: config: The file path of the config file. """ with open(config, "rb") as config_file: configs = json.loads(config_file.read()) res = [ ArchiversTestSet( ts["msg"], ts["pvs"], datetime.fromisoformat(ts["sop"]), datetime.fromisoformat(ts["eop"]), ts["archiver_old"], ts["archiver_new"], ts["archiver_old_data"], ) for ts in configs ] return res
[docs] MAX_LEN_DISPLAY_VAL_STR = 10
[docs] class TestResult(typing.NamedTuple): """Result of a test and method to pretty print as tables."""
[docs] test_set: "ArchiversTestSet"
[docs] old_version: str
[docs] new_version: str
[docs] old_data_set: dict[str, PVArchiveEvents]
[docs] new_data_set: dict[str, PVArchiveEvents]
[docs] comparisons: dict[str, DataSetDiff]
[docs] def get_reason(self, pv: str) -> Reason: """Calculates a possible Reason why two archivers have different data. Args: pv: The pv with test failure. """ if self.check_types(pv): return Reason.DifferentType if self.check_no_events(pv): return Reason.NoEventsTime old_details = self.old_data_set[pv].details new_details = self.new_data_set[pv].details if self.check_not_archiving(old_details, new_details): return Reason.NotArchiving if (not event_rate_same(old_details, new_details)) or ( float(old_details[Detail.EventsLost]) > (float(old_details[Detail.TotalEvents]) / 4.0) ): if old_details[Detail.Method] != new_details[Detail.Method]: return Reason.SamplingMethod if old_details[Detail.Period] != new_details[Detail.Period]: return Reason.SamplingPeriod if int(old_details[Detail.Capacity]) > int(new_details[Detail.Capacity]): return Reason.CapacityAdjustment if self.check_capacity_full(pv, old_details): return Reason.CapacityFull if self.check_protocol_difference(pv, old_details, new_details): return Reason.ProtocolDifference return Reason.Unknown
[docs] def check_protocol_difference( self, pv: str, old_details: dict[Detail, str], new_details: dict[Detail, str] ) -> bool: """Checks if the reason for the test failure is inherent to the protocols being different. In this case, if the only thing different in the differences is the alarm status. Args: pv (str): pv name old_details (dict[Detail, str]): old details new_details (dict[Detail, str]): new details Returns: bool: result of check """ if ( old_details[Detail.PVAccess] != new_details[Detail.PVAccess] and len(self.old_data_set[pv].data) == len(self.new_data_set[pv].data) and len(self.comparisons[pv].different) > 0 ): events_diff_val_or_severity = list( filter(check_equal, self.comparisons[pv].different) ) LOG.debug(f"events diff val of severity {events_diff_val_or_severity}") diff_not_status = len(events_diff_val_or_severity) == 0 return diff_not_status return False
[docs] def check_capacity_full(self, pv: str, old_details: dict[Detail, str]) -> bool: """If there are many lost events in the old And the difference in number of events is not larger than 10% of number of the old archiver test period events. Args: pv (str): pv name old_details (dict[Detail, str]): details from old archiver Returns: bool: if true """ return ( float(old_details[Detail.EventsLost]) > float(old_details[Detail.TotalEvents]) / 4.0 ) and ( abs( len(self.old_data_set[pv].data.keys()) - len(self.new_data_set[pv].data.keys()) ) < 0.1 * len(self.old_data_set[pv].data.keys()) )
[docs] def check_not_archiving( self, old_details: dict[Detail, str], new_details: dict[Detail, str] ) -> bool: return ( Detail.EventRate not in new_details.keys() or Detail.EventRate not in old_details.keys() )
[docs] def check_no_events(self, pv: str) -> bool: if ( len(self.old_data_set[pv].data.keys()) == 1 and len(self.new_data_set[pv].data.keys()) == 1 ): old_event_time = ( self.old_data_set[pv] .data[next(iter(self.old_data_set[pv].data.keys()))] .timestamp ) new_event_time = ( self.new_data_set[pv] .data[next(iter(self.new_data_set[pv].data.keys()))] .timestamp ) return ( old_event_time < self.test_set.sop or new_event_time < self.test_set.sop ) return False
[docs] def check_types(self, pv: str) -> bool: return ( self.old_data_set[pv].details[Detail.DBRType] != self.new_data_set[pv].details[Detail.DBRType] )
[docs] def result_message(self, pv: str) -> str: """Returns a message saying pass or failure with a reason. Args: pv: PV to check. """ if not self.comparisons[pv].result: return f"[bold red]FAIL(Reason: {self.get_reason(pv).value})" return "[bold green]PASS"
[docs] def print_pv_result(self, console: Console, pv: str) -> None: """Prints a summary of a test to console for a single PV. Args: console: Where to print the summary. pv: Which PV for the summary. """ pass_fail = self.result_message(pv) console.print(f"PV {pv}: {pass_fail}") summary_table = Table(title=f"PV {pv} Properties") summary_table.add_column("Property", justify="left", no_wrap=True) archiver_old_col = f"{self.test_set.archiver_old.split('.')[0]}" if self.test_set.archiver_old_data: archiver_old_col += "(stored)" archiver_new_col = self.test_set.archiver_new.split(".")[0] summary_table.add_column(archiver_old_col, justify="right") summary_table.add_column(archiver_new_col, justify="right") old_details = self.old_data_set[pv].details new_details = self.new_data_set[pv].details details_keys = list(set(old_details.keys()).union(set(new_details.keys()))) details_keys.sort() # Sort dictionary keys so the display is the same each time for prop in details_keys: old_detail = "" if prop in old_details.keys(): old_detail = old_details[prop] new_detail = "" if prop in new_details.keys(): new_detail = new_details[prop] summary_table.add_row(prop.value, f"{old_detail}", f"{new_detail}") summary_table.add_row( "Test Period Events", f"{self.old_data_set[pv].size()}", f"{self.new_data_set[pv].size()}", ) summary_table.add_row( "Mean", f"{self.old_data_set[pv].avg()}", f"{self.new_data_set[pv].avg()}", ) console.print(summary_table) if not self.comparisons[pv].result: console.print("Different Events (up to 3 printed):") console.print( f"{archiver_old_col}: {list(self.comparisons[pv].removed)[:3]}" ) console.print(f"{archiver_new_col}: {list(self.comparisons[pv].added)[:3]}") diff_list_str = [ f"{truncate_repr(e_old)} v {truncate_repr(e_new)}" for (e_old, e_new) in self.comparisons[pv].different[:3] ] console.print(f"Different by value: {diff_list_str}")
[docs] def print_result(self) -> None: """Prints the test result as a colleciton of summary tables.""" console = Console() console.rule() self.print_time_summary(console) console.print(f"Test Set: {self.test_set.msg}") console.print( " vs ".join( [ f"{self.test_set.archiver_old}:{self.old_version}", f"{self.test_set.archiver_new}:{self.new_version}", ] ) ) console.rule("Summary per PV") for pv in self.comparisons.keys(): self.print_pv_result(console, pv) console.rule(f"Test {self.test_set.msg}") summary_table = self.create_summary_table() console.print(summary_table)
[docs] def create_summary_table(self) -> Table: """Creates a rich.Table summarizing the test results. With columns Number of events, Events different, Events Same, Mean difference and the test result. Returns: Table: A table of summary test result information. """ summary_table = Table(title=f"Summary: {self.test_set.msg}") summary_table.add_column("PV", justify="left", no_wrap=True) archiver_old_col = f"{self.test_set.archiver_old.split('.')[0]}" if self.test_set.archiver_old_data: archiver_old_col += "(stored)" summary_table.add_column(f"Events {archiver_old_col}", justify="right") summary_table.add_column( f"Events {self.test_set.archiver_new.split('.')[0]}", justify="right" ) summary_table.add_column("Events diff", justify="right") summary_table.add_column("Events same", justify="right") summary_table.add_column("Mean diff", justify="right") summary_table.add_column("Result", justify="center") for pv in self.comparisons.keys(): pass_fail = self.result_message(pv) same = min( self.new_data_set[pv].size() - len(self.comparisons[pv].added), self.old_data_set[pv].size() - len(self.comparisons[pv].removed), self.old_data_set[pv].size() - len(self.comparisons[pv].different), ) events_diff = ( len(self.comparisons[pv].added) + len(self.comparisons[pv].removed) + len(self.comparisons[pv].different) ) summary_table.add_row( pv, f"{self.old_data_set[pv].size()}", f"{self.new_data_set[pv].size()}", f"{events_diff}", f"{same}", f"{self.comparisons[pv].means_diff:.3f}", pass_fail, ) return summary_table
[docs] def print_time_summary(self, console: Console) -> None: """Print summary of the time period to the console. Args: console (Console): console to print in """ sop_format = self.test_set.sop.isoformat() eop_format = self.test_set.eop.isoformat() period = self.test_set.eop - self.test_set.sop microseconds = int(period / timedelta(microseconds=1)) human_period_diff = " ".join( human_readable(relativedelta(microseconds=microseconds)) ) console.print( f"Time period: {sop_format} to {eop_format} of {human_period_diff}" )
[docs] MAX_LEN_DISPLAY_ARCHIVE_EVENT_STR = 1000
[docs] def truncate_repr( event: ArchiveEvent, amount: int = MAX_LEN_DISPLAY_ARCHIVE_EVENT_STR ) -> str: text = f"{event!s}" return text[:amount] + "..." if len(text) > amount else text
[docs] def check_equal(pair: tuple[ArchiveEvent, ArchiveEvent]) -> bool: return equal_but_status(pair[0], pair[1])
[docs] def human_readable(delta: relativedelta) -> list[str]: attrs = ["years", "months", "days", "hours", "minutes", "seconds"] return [ ( "%d %s" % (getattr(delta, attr), attr if getattr(delta, attr) > 1 else attr[:-1]) ) for attr in attrs if getattr(delta, attr) ]
[docs] async def sets_check( sets: list[ArchiversTestSet], output_folder: Path | None ) -> list[TestResult]: """Tests from a list of parameters: Args: sets: list of test sets output_folder: Where to store any data. :return dictionary of results keys of test_set.msg: """ gather_data = await asyncio.gather( *[test_set.check(output_folder) for test_set in sets] ) return gather_data
[docs] ACCEPTABLE_EVENT_RATE_DIFF = 0.1
[docs] def event_rate_same(old: dict[Detail, str], new: dict[Detail, str]) -> bool: """Calculates if the event rates are effectively the same. :param old: Old Archiver details. :param new: New Archvier details :returns: Returns true or false if the same. """ if old[Detail.EventRate] == new[Detail.EventRate]: return True if isinstance(old[Detail.EventRate], str) or isinstance(new[Detail.EventRate], str): return False if ( float(old[Detail.EventRate]) - float(new[Detail.EventRate]) > ACCEPTABLE_EVENT_RATE_DIFF ): return False return True