Source code for epicsarchiver.statistics.reports.archiver_report

"""Generate a report detailing a list of statistics of Archiver pvs.

Examples:

    .. code-block:: python

        report = ArchiverReport(
            query_limit=1000,
            time_minimum=timedelta(days=100),
            connection_drops_minimum=30,
            config_options=configuration.ConfigOptions("/config_repo", "tn"),
            other_archiver=ArchiverAppliance("other_archiver.example.org"),
            mb_per_day_minimum=1000,
            events_dropped_minimum=1000,
            channelfinder=ChannelFinder("channelfinder.tn.ess.lu.se"),
            ioc_name="AN_IOC_NAME",
        )

        report.print_report(archiver, out_file, verbose=True)

"""

from __future__ import annotations

import asyncio
import csv
import dataclasses
import datetime
import json
import logging
import operator
from dataclasses import dataclass
from datetime import timedelta
from typing import IO, TYPE_CHECKING, Any

import pytz
from rich.console import Console

from epicsarchiver.statistics import configuration
from epicsarchiver.statistics._external_stats import (
    filter_by_ioc,
    get_double_archived,
    get_iocs,
)
from epicsarchiver.statistics.models.stat_responses import (
    UNKNOWN_IOC,
    BaseStatResponse,
    DroppedReason,
    Ioc,
)
from epicsarchiver.statistics.models.stats import PVStats, Stat
from epicsarchiver.statistics.pv_names import get_invalid_names, log_pv_parts_stats
from epicsarchiver.statistics.reports import REPORT_CSV_HEADINGS

if TYPE_CHECKING:
    from collections.abc import Sequence

    from epicsarchiver.statistics.services.archiver_statistics import ArchiverWrapper
    from epicsarchiver.statistics.services.channelfinder import ChannelFinder

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] def _is_greater_than_time_minimum( in_time: datetime.datetime | None, time_minimum: timedelta, ) -> bool: now = datetime.datetime.now(tz=pytz.utc) diff = now - (in_time or datetime.datetime.fromtimestamp(0, tz=pytz.utc)) return diff > time_minimum
@dataclass
[docs] class ArchiverReport: """Configuration for generating the report."""
[docs] query_limit: int | None
[docs] time_minimum: timedelta
[docs] connection_drops_minimum: int
[docs] config_options: configuration.ConfigOptions | None
[docs] other_archiver: ArchiverWrapper | None
[docs] mb_per_day_minimum: float
[docs] events_dropped_minimum: int
[docs] channelfinder: ChannelFinder
[docs] ioc_name: str | None
[docs] async def _get_responses( # noqa: PLR0911, C901, PLR0912 self, statistic: Stat, archiver: ArchiverWrapper, ) -> Sequence[BaseStatResponse]: """Produce a list of PVs and stats. Args: statistic (Stat): Statistic to fetch archiver (ArchiverWrapper): Archiver to request against Returns: Sequence[BaseStatResponse]: Sequence of statistic responses """ if statistic == Stat.BufferOverflow: return [ f for f in await archiver.stats.get_pvs_dropped( DroppedReason.BufferOverflow, limit=self.query_limit, ) if f.events_dropped > self.events_dropped_minimum ] if statistic == Stat.TypeChange: return await archiver.stats.get_pvs_dropped( DroppedReason.TypeChange, limit=self.query_limit, ) if statistic == Stat.IncorrectTimestamp: return [ f for f in await archiver.stats.get_pvs_dropped( DroppedReason.IncorrectTimestamp, limit=self.query_limit, ) if f.events_dropped > self.events_dropped_minimum ] if statistic == Stat.SlowChanging: return [ f for f in await archiver.stats.get_pvs_dropped( DroppedReason.SlowChanging, limit=None, ) if f.events_dropped > self.events_dropped_minimum ] if statistic == Stat.DisconnectedPVs: return [ ev for ev in await archiver.stats.get_disconnected_pvs() if _is_greater_than_time_minimum( ev.connection_lost_at, self.time_minimum, ) ] if statistic == Stat.SilentPVs: silent_pvs = await archiver.stats.get_silent_pvs(limit=self.query_limit) return [ ev for ev in silent_pvs if _is_greater_than_time_minimum( ev.last_known_event, self.time_minimum, ) ] if statistic == Stat.LostConnection: return [ el for el in await archiver.stats.get_lost_connections_pvs( limit=self.query_limit, ) if el.lost_connections > self.connection_drops_minimum ] if statistic == Stat.StorageRates: storage_rates = await archiver.stats.get_storage_rates( limit=self.query_limit ) return [r for r in storage_rates if r.mb_per_day > self.mb_per_day_minimum] if statistic == Stat.DoubleArchived: if self.other_archiver: return await get_double_archived(archiver, self.other_archiver) return [] if statistic == Stat.NotConfigured: if self.config_options: return await configuration.get_not_configured( archiver, self.channelfinder, self.config_options, self.ioc_name, ) return [] if statistic == Stat.InvalidName: return await get_invalid_names(archiver) return []
[docs] async def generate_stats( self, statistic: Stat, archiver: ArchiverWrapper, ) -> dict[str, BaseStatResponse]: """Produce a list of PVs and stats. Args: statistic (Stat): Statistic to generate data from archiver (ArchiverWrapper): Archiver to check against Returns: dict[str, BaseStatResponse]: dictionary of pvs to statistics """ responses = await self._get_responses(statistic, archiver) LOG.info("Found %s responses for %s", len(responses), statistic) return {r.pv_name: r for r in responses}
[docs] async def generate( self, archiver: ArchiverWrapper, ) -> dict[Ioc, dict[str, PVStats]]: """Generate all the statistics available from the Stat list. Args: archiver (ArchiverWrapper): Archiver to get statistics from Returns: dict[Ioc, dict[str, PVStats]]: Return a dictionary with pv names as keys, and detailed statistics after. """ gather_all_stats = await asyncio.gather(*[ self.generate_stats(stat, archiver) for stat in Stat ]) # Invert the data from being per stat to per PV inverted_data = _invert_data(dict(zip(list(Stat), gather_all_stats))) log_pv_parts_stats(set(inverted_data.keys())) if self.channelfinder: return await _organise_by_ioc( inverted_data, self.channelfinder, ioc_name=self.ioc_name, ) return {UNKNOWN_IOC: inverted_data}
[docs] def print_report( self, archiver: ArchiverWrapper, file: IO[str], *, verbose: bool = False, ) -> None: """Prints a report about the statistics of PVs in the archiver. Args: archiver (ArchiverWrapper): Archiver to get statistics file (IO[str]): file to print the report to verbose (bool, optional): Verbose output or not. Defaults to False. """ report = asyncio.run(self.generate(archiver)) if verbose: console = Console(file=file) console.print(report) return sum_report = csv_output(report) csvwriter = csv.writer(file) csvwriter.writerow(REPORT_CSV_HEADINGS) for row in sum_report: csvwriter.writerow(row)
[docs] class _EnhancedJSONEncoder(json.JSONEncoder):
[docs] def default(self, o: Any) -> Any: if dataclasses.is_dataclass(o): return dataclasses.asdict(o) # type: ignore[arg-type] return super().default(o)
[docs] async def _organise_by_ioc( inverted_report: dict[str, PVStats], channelfinder: ChannelFinder, ioc_name: str | None = None, ) -> dict[Ioc, dict[str, PVStats]]: if ioc_name: iocs = await filter_by_ioc( channelfinder, ioc_name, list(inverted_report.keys()), ) else: iocs = await get_iocs(channelfinder, list(inverted_report.keys())) LOG.info("IOCS: %s", json.dumps(_iocs_summary(iocs), cls=_EnhancedJSONEncoder)) return {ioc: {pv: inverted_report[pv] for pv in iocs[ioc]} for ioc in iocs}
[docs] def _iocs_summary(iocs: dict[Ioc, list[str]]) -> list[tuple[Ioc, int]]: sorted_iocs = [(ioc, len(pvs)) for ioc, pvs in iocs.items()] return sorted(sorted_iocs, key=operator.itemgetter(1))
[docs] def csv_output( report: dict[Ioc, dict[str, PVStats]], ) -> list[list[str]]: """Creates a list[str] output of the generated data for printing as csv. Outs with headings: IOC Name, IOC hostname, PV name, Statistic, Statistic Note Args: report (dict[str, PVStats]): Base input data in form of pv mapped to Stat and responses from the archiver. Returns: list[list[str]]: List of list of strings """ return [ [ioc.name, ioc.hostname, pv, stat.name, str(stat_note)] for ioc, pvs in report.items() for pv, issue in pvs.items() for stat, stat_note in issue.stats.items() ]
[docs] def _invert_data(data: dict[Stat, dict[str, BaseStatResponse]]) -> dict[str, PVStats]: """Inverts data from being by statistic, to be by PV. Args: data (dict[Stat, dict[str, BaseStatResponse]]): Input data with Stats to dictionary with pv name keys Returns: dict[str, PVStats]: Output with Pv name keys. """ dict_data: dict[str, dict[Stat, BaseStatResponse]] = {} for stat, stat_item in data.items(): for pv in stat_item: if pv not in dict_data: dict_data[pv] = {} dict_data[pv][stat] = stat_item[pv] output = {pv: PVStats(pv, dict_data[pv]) for pv in dict_data} return dict(sorted(output.items()))