epicsarchiver.retrieval.command

Command module.

Attributes

LOG

DATE_FORMATS

AlignedPVEvents

Functions

get(→ None)

Print out data from an archiver cluster.

_table_title(→ str)

_create_multi_table(→ rich.table.Table)

_to_local_timestamp_str(→ str)

_val_to_str(→ str)

_create_singular_table(→ rich.table.Table)

_align_events(→ AlignedPVEvents)

Align events from multiple PVs by timestamp.

_fetch_events(→ AlignedPVEvents)

Module Contents

epicsarchiver.retrieval.command.LOG: logging.Logger[source]
epicsarchiver.retrieval.command.DATE_FORMATS = ['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%d %H:%M:%S.%f'][source]
epicsarchiver.retrieval.command.AlignedPVEvents[source]
epicsarchiver.retrieval.command.get(ctx: click.core.Context, pvs: tuple[str], start: datetime.datetime, end: datetime.datetime, processor_name: str | None, bin_size: int | None, debug: bool) None[source]

Print out data from an archiver cluster.

ARGUMENT pvs What pvs to get data of.

Example usage:

epicsarchiver --hostname archiver-01.example.com get PV_NAME1 PV_NAME2
epicsarchiver.retrieval.command._table_title(pvs: tuple[str], start: datetime.datetime, end: datetime.datetime, processor: epicsarchiver.retrieval.archiver_retrieval.processor.Processor | None) str[source]
epicsarchiver.retrieval.command._create_multi_table(pvs: tuple[str], title: str, events: AlignedPVEvents) rich.table.Table[source]
epicsarchiver.retrieval.command._to_local_timestamp_str(timestamp: pandas.Timestamp) str[source]
epicsarchiver.retrieval.command._val_to_str(event: epicsarchiver.retrieval.archive_event.ArchiveEvent | None) str[source]
epicsarchiver.retrieval.command._create_singular_table(pv: str, title: str, events: AlignedPVEvents) rich.table.Table[source]
epicsarchiver.retrieval.command._align_events(all_events: dict[str, list[epicsarchiver.retrieval.archive_event.ArchiveEvent]]) AlignedPVEvents[source]

Align events from multiple PVs by timestamp.

Parameters:

all_events (dict[str, list[ArchiveEvent]]) – Events per PV with PV name as key

Returns:

List of pairs, (timestamp, dict[pv_name, pv_value])

Return type:

AlignedPVEvents

async epicsarchiver.retrieval.command._fetch_events(archiver: epicsarchiver.epicsarchiver.ArchiverAppliance, pvs: list[str], start: datetime.datetime, end: datetime.datetime, processor: epicsarchiver.retrieval.archiver_retrieval.processor.Processor | None) AlignedPVEvents[source]