Source code for epicsarchiver.statistics.services.archiver_statistics

"""Archiver Statistics Service."""

from __future__ import annotations

import asyncio

from epicsarchiver.common.async_service import ServiceClient
from epicsarchiver.common.base_archiver import mgmt_url
from epicsarchiver.mgmt.archiver_mgmt import ArchiverMgmt
from epicsarchiver.statistics.models.pv_details import DetailEnum, Details
from epicsarchiver.statistics.models.stat_responses import (
    DisconnectedPVsResponse,
    DroppedPVResponse,
    DroppedReason,
    LostConnectionsResponse,
    PausedPVResponse,
    SilentPVsResponse,
    StorageRatesResponse,
)
from epicsarchiver.statistics.models.stats import PVStats


[docs] class ArchiverStatistics(ServiceClient): """Responses from the reports of the archiver appliance.""" def __init__(self, hostname: str = "localhost", port: int = 17665): """Create Archiver Appliance object. Args: hostname (str, optional): hostname of archiver. Defaults to "localhost". port (int, optional): port number of mgmt interface. Defaults to 17665. """
[docs] self.hostname = hostname
super().__init__(mgmt_url(hostname, port))
[docs] async def get_pvs_dropped( self, reason: DroppedReason, limit: int | None = 1000, ) -> list[DroppedPVResponse]: """Gets the pvs ordered by dropped events. Returns: list[DroppedPVResponse]: List of responses """ params = None if limit: params = {"limit": str(limit)} r = await self._get_json(reason.value, params=params) return [DroppedPVResponse.from_json(rs, reason) for rs in r]
[docs] async def get_disconnected_pvs(self) -> list[DisconnectedPVsResponse]: """Gets the list of disconnected pvs. Returns: list[DisconnectedPVsResponse]: List of responses """ r = await self._get_json("/getCurrentlyDisconnectedPVs") return [DisconnectedPVsResponse.from_json(rs) for rs in r]
[docs] async def get_silent_pvs(self, limit: int | None = 1000) -> list[SilentPVsResponse]: """Gets the list of pvs with no events. Returns: list[SilentPVsResponse]: List of responses """ params = None if limit: params = {"limit": str(limit)} r = await self._get_json("/getSilentPVsReport", params=params) return [SilentPVsResponse.from_json(rs) for rs in r]
[docs] async def get_lost_connections_pvs( self, limit: int | None = 1000, ) -> list[LostConnectionsResponse]: """Gets the list of pvs with no events. Returns: list[LostConnectionsResponse]: List of responses """ params = None if limit: params = {"limit": str(limit)} r = await self._get_json("/getLostConnectionsReport", params=params) return [LostConnectionsResponse.from_json(rs) for rs in r]
[docs] async def get_storage_rates( self, limit: int | None = 1000 ) -> list[StorageRatesResponse]: """Gets the list of pvs with no events. Returns: list[StorageRatesResponse]: List of responses """ params = None if limit: params = {"limit": str(limit)} r = await self._get_json("/getStorageRateReport", params=params) return [StorageRatesResponse.from_json(rs) for rs in r]
[docs] async def get_paused_pvs(self) -> list[PausedPVResponse]: """Gets the list of paused pvs. Returns: list[PausedPVResponse]: List of responses """ r = await self._get_json("/getPausedPVsReport") return [PausedPVResponse.from_json(rs) for rs in r]
[docs] async def get_pv_details( self, pvs: list[str], mb_per_day_min: float = 0 ) -> dict[str, PVStats]: """Return the details of a PV. Args: pvs (list[str]): names of the pvs for which the details are to be determined. mb_per_day_min (float): Minimum MB per day to filter by Returns: list of dict with the details of the matching PVs """ base_details = await asyncio.gather(*[ self._get_json("/getPVDetails", params={"pv": pv}) for pv in pvs ]) details = [Details.from_json(dets) for dets in base_details] # Convert each detail to stat_responses return { pv_details[DetailEnum.PVName]: PVStats( pv_details[DetailEnum.PVName], pv_details.to_base_responses(mb_per_day_min), ) for pv_details in details }
[docs] class ArchiverWrapper: """Wrapper around ArchiverStatistics and ArchiverMgmt for Statistics usage.""" def __init__(self, hostname: str = "localhost", port: int = 17665): """Create Archiver Appliance object. Args: hostname (str, optional): hostname of archiver. Defaults to "localhost". port (int, optional): port number of mgmt interface. Defaults to 17665. """
[docs] self.mgmt = ArchiverMgmt(hostname, port)
[docs] self.stats = ArchiverStatistics(hostname)
[docs] async def close(self) -> None: """Closes any connected sessions.""" if self.stats: await self.stats.close()
[docs] def __repr__(self) -> str: """String representation of ArchiverWrapper. Returns: str: ouput string """ return f"ArchiverWrapper({self.mgmt.hostname}, {self.mgmt.port})"