Source code for epicsarchiver.retrieval.command

"""Command module."""

from __future__ import annotations

import asyncio
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Dict, List, Tuple

import click
from dateutil import tz
from pandas import Timestamp
from pytz import UTC
from rich.console import Console
from rich.table import Table

from epicsarchiver.common.command import handle_debug
from epicsarchiver.retrieval.archive_event import ArchiveEvent
from epicsarchiver.retrieval.archiver_retrieval.async_archiver_retrieval import (
    AsyncArchiverRetrieval,
)
from epicsarchiver.retrieval.archiver_retrieval.processor import (
    Processor,
    ProcessorName,
)

if TYPE_CHECKING:
    from epicsarchiver.epicsarchiver import ArchiverAppliance

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] DATE_FORMATS = [ "%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f", ]
[docs] AlignedPVEvents = List[Tuple[Timestamp, Dict[str, ArchiveEvent]]]
@click.command(context_settings={"show_default": True}) @click.option( "--debug", is_flag=True, callback=handle_debug, show_default=True, help="Turn on debug logging", ) @click.option( "--start", "-s", default=(datetime.now(tz=UTC) - timedelta(seconds=30)).strftime(DATE_FORMATS[2]), type=click.DateTime(formats=DATE_FORMATS), show_default=False, help="Start time of query [default: 30 seconds ago]", ) @click.option( "--end", "-e", default=str(datetime.now(tz=UTC).strftime(DATE_FORMATS[2])), type=click.DateTime(formats=DATE_FORMATS), show_default=False, help="End time of query, [default: now]", ) @click.option( "--processor-name", "-p", type=click.Choice( [processor.name for processor in ProcessorName], case_sensitive=False ), help="""PreProcessor to use \b Docs at https://epicsarchiver.readthedocs.io/en/latest/user/userguide.html#processing-of-data """, ) @click.option( "--bin_size", "-b", type=int, help="Bin size (mostly in seconds) for preprocessor.", ) @click.argument("pvs", type=str, required=True, nargs=-1) @click.pass_context
[docs] def get( # noqa: PLR0917, PLR0913 ctx: click.core.Context, pvs: tuple[str], start: datetime, end: datetime, processor_name: str | None, bin_size: int | None, debug: bool, # noqa: FBT001, ARG001 ) -> None: """Print out data from an archiver cluster. ARGUMENT pvs What pvs to get data of. Example usage: .. code-block:: console epicsarchiver --hostname archiver-01.example.com get PV_NAME1 PV_NAME2 """ archiver: ArchiverAppliance = ctx.obj["archiver"] console = Console() processor = ( Processor(ProcessorName[processor_name.upper()], bin_size) if processor_name else None ) LOG.debug("pvs %s", pvs) events = asyncio.run( _fetch_events(archiver, list(pvs), start, end, processor=processor) ) table_title = _table_title(pvs, start, end, processor) table = ( _create_multi_table(pvs, table_title, events) if len(pvs) > 1 else _create_singular_table(pvs[0], table_title, events) ) console.print(table) ctx.exit(0)
[docs] def _table_title( pvs: tuple[str], start: datetime, end: datetime, processor: Processor | None ) -> str: table_title = f"Period {start} - {end}" if len(pvs) == 1: table_title = pvs[0] + table_title if processor: table_title += f" Processor {processor.processor_name}" if processor.bin_size: table_title += f", {processor.bin_size} seconds" return table_title
[docs] def _create_multi_table( pvs: tuple[str], title: str, events: AlignedPVEvents, ) -> Table: table = Table(title=title) table.add_column("Time", justify="left") for pv in pvs: table.add_column(pv + " Value", justify="right") for e in events: table.add_row( _to_local_timestamp_str(e[0]), *[_val_to_str(e[1].get(pv)) for pv in pvs], ) return table
[docs] def _to_local_timestamp_str(timestamp: Timestamp) -> str: return str(timestamp.tz_convert(tz.tzlocal()))
[docs] def _val_to_str(event: ArchiveEvent | None) -> str: if event: return str(event.val) return ""
[docs] def _create_singular_table( pv: str, title: str, events: AlignedPVEvents, ) -> Table: table = Table(title=title) table.add_column("Time", justify="left") table.add_column("Value", justify="right") table.add_column("Status", justify="right") table.add_column("Severity", justify="right") for e in events: event = e[1].get(pv) if event: table.add_row( _to_local_timestamp_str(e[0]), str(event.val), str(event.status), str(event.severity), ) return table
[docs] def _align_events( all_events: dict[str, list[ArchiveEvent]], ) -> AlignedPVEvents: """Align events from multiple PVs by timestamp. Args: all_events (dict[str, list[ArchiveEvent]]): Events per PV with PV name as key Returns: AlignedPVEvents: List of pairs, (timestamp, dict[pv_name, pv_value]) """ data: dict[Timestamp, dict[str, ArchiveEvent]] = {} for pv, events in all_events.items(): for event in events: if event.pd_timestamp not in data: data[event.pd_timestamp] = {} data[event.pd_timestamp][pv] = event return [(timestamp, data[timestamp]) for timestamp in sorted(data.keys())]
[docs] async def _fetch_events( archiver: ArchiverAppliance, pvs: list[str], start: datetime, end: datetime, processor: Processor | None, ) -> AlignedPVEvents: async with AsyncArchiverRetrieval(archiver.hostname, archiver.port) as a_retrieval: all_events = await a_retrieval.get_all_events( set(pvs), start, end, processor=processor ) return _align_events(all_events)