Source code for epicsarchiver.retrieval.pb

"""Handle retrieval of data in the Archiver Appliance PB file format.

The file format is described on this page:
https://slacmshankar.github.io/epicsarchiver_docs/pb_pbraw.html

The data can be parsed in the same way whether retrieved using the
Rest API or whether reading files directly from disk. In either
case, it is important to treat the data as binary data - a stream of
bytes. The Google Protobuf library handles converting the stream of
bytes into the objects defined by the EPICSEvent.proto file.

The Archiver Appliance escapes certain characters as described on the
page above, which allows one to deduce the number of events in the
binary file using tools such as wc.

The unescape_bytes() method handles unescaping these characters before
handing the interpretation over to the Google Protobuf library.

"""

from __future__ import annotations

import collections
import logging
import re
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING, TypeAlias

import pandas as pd
from attr import dataclass
from google.protobuf.message import DecodeError
from pandas import Timestamp

from epicsarchiver.retrieval import EPICSEvent_pb2 as ee
from epicsarchiver.retrieval.archive_event import (
    ArchiveEvent,
    FieldValue,
    ysn_timestamp,
)

if TYPE_CHECKING:
    from collections.abc import Callable
    from datetime import datetime as pydt

[docs] LOG: logging.Logger = logging.getLogger(__name__)
# It is not clear to me why I can't extract this information # from the compiled protobuf file.
[docs] TYPE_MAPPINGS: dict[int, type] = { 0: ee.ScalarString, 1: ee.ScalarShort, 2: ee.ScalarFloat, 3: ee.ScalarEnum, 4: ee.ScalarByte, 5: ee.ScalarInt, 6: ee.ScalarDouble, 7: ee.VectorString, 8: ee.VectorShort, 9: ee.VectorFloat, 10: ee.VectorEnum, 11: ee.VectorChar, 12: ee.VectorInt, 13: ee.VectorDouble, 14: ee.V4GenericBytes, }
[docs] INVERSE_TYPE_MAPPINGS = {cls: numeric for numeric, cls in TYPE_MAPPINGS.items()}
[docs] ESC_BYTE = b"\x1b"
[docs] NL_BYTE = b"\x0a"
[docs] CR_BYTE = b"\x0d"
# The character sequences required to unescape & escape AA pb file format. # Note that we need to be careful about the ordering here. We must apply them # in the opposite order when escaping and unescaping. In particular, the # escape byte needs to be escaped *first* and unescaped *last* in order to # prevent extra bytes appearing and causing problems. See #59.
[docs] PB_REPLACEMENTS_ESCAPING = collections.OrderedDict([ (ESC_BYTE + b"\x01", ESC_BYTE), (ESC_BYTE + b"\x02", NL_BYTE), (ESC_BYTE + b"\x03", CR_BYTE), ])
[docs] PB_REPLACEMENTS_UNESCAPING = collections.OrderedDict([ (ESC_BYTE + b"\x03", CR_BYTE), (ESC_BYTE + b"\x02", NL_BYTE), (ESC_BYTE + b"\x01", ESC_BYTE), ])
[docs] EeScalarEvent: TypeAlias = ( ee.ScalarString | ee.ScalarShort | ee.ScalarFloat | ee.ScalarEnum | ee.ScalarByte | ee.ScalarInt | ee.ScalarDouble )
[docs] EeVectorEvent = ( ee.VectorString | ee.VectorShort | ee.VectorFloat | ee.VectorEnum | ee.VectorChar | ee.VectorInt | ee.VectorDouble | ee.V4GenericBytes )
[docs] EeEvent: TypeAlias = EeScalarEvent | EeVectorEvent
# Create a regex pattern that matches any of the keys
[docs] RE_ESCAPE_PATTERN = re.compile( b"|".join(re.escape(k) for k in PB_REPLACEMENTS_UNESCAPING) )
[docs] def unescape_bytes(byte_seq: bytes) -> bytes: """Replace specific sub-sequences in a bytes sequence. This escaping is defined as part of the Archiver Appliance raw file format: https://slacmshankar.github.io/epicsarchiver_docs/pb_pbraw.html Args: byte_seq: any byte sequence Returns: the byte sequence unescaped according to the AA file format rules """ # Use re.sub to replace all occurrences in a single pass return RE_ESCAPE_PATTERN.sub( lambda match: PB_REPLACEMENTS_UNESCAPING[match.group(0)], byte_seq )
[docs] def escape_bytes(byte_seq: bytes) -> bytes: """Replace specific sub-sequences in a bytes sequence. This escaping is defined as part of the Archiver Appliance raw file format: https://slacmshankar.github.io/epicsarchiver_docs/pb_pbraw.html Args: byte_seq: any byte sequence Returns: the byte sequence escaped according to the AA file format rules """ for key, value in PB_REPLACEMENTS_ESCAPING.items(): byte_seq = byte_seq.replace(value, key) return byte_seq
[docs] def event_pd_timestamp( year: int, event: EeEvent, ) -> Timestamp: """Converts from protobuf event time format to python datetime. Args: year (int): year of event event (EeEvent): input event Returns: pydt: Output datetime """ return ysn_timestamp(year, event.secondsintoyear, event.nano)
[docs] def event_timestamp( year: int, event: EeEvent, ) -> pydt: """Converts from protobuf event time format to python datetime. Args: year (int): year of event event (EeEvent): input event Returns: pydt: Output datetime """ return event_pd_timestamp(year, event).to_pydatetime()
[docs] def get_timestamp_from_line_function( chunk_info: ee.PayloadInfo, ) -> Callable[[bytes], pydt]: """From a unescaped protobuf line create a function to get datetime. Args: chunk_info (ee.PayloadInfo): Payload info of protobuf file Returns: Callable[[bytes], pydt]: Function to provide event time """ def timestamp_from_line(line: bytes) -> pydt: event = TYPE_MAPPINGS[chunk_info.type]() event.ParseFromString(unescape_bytes(line)) return event_timestamp( chunk_info.year, event, # pylint: disable=no-member ) return timestamp_from_line
[docs] def _break_up_chunks( raw_data: bytes, ) -> OrderedDict[int, tuple[ee.PayloadInfo, list[bytes]]]: """Break up raw data into chunks by year. Args: raw_data: Raw data from file Returns: collections.OrderedDict: keys are years; values are lists of chunks """ chunks = [chunk.strip() for chunk in raw_data.split(b"\n\n")] LOG.debug("%s chunks in pb file", len(chunks)) year_chunks: OrderedDict[int, tuple[ee.PayloadInfo, list[bytes]]] = ( collections.OrderedDict() ) for chunk_index, chunk in enumerate(chunks): lines = chunk.split(b"\n") chunk_info = ee.PayloadInfo() chunk_info.ParseFromString(unescape_bytes(lines[0])) LOG.debug("line 0 bytes: %s", lines[0]) chunk_year = chunk_info.year # pylint: disable=no-member LOG.debug( "Year %s, Chunk Index %s: %s events in chunk", chunk_year, chunk_index, len(lines) - 1, ) if chunk_year in year_chunks: _, ls = year_chunks[chunk_year] ls.extend(lines[1:]) else: year_chunks[chunk_year] = chunk_info, lines[1:] return year_chunks
[docs] def _event_from_line( line: bytes, pv: str, year: int, event_type: int, line_number: int = 0 ) -> ArchiveEvent | None: """Get an ArchiveEvent from this line. Args: line: A line of chunks of data pv: Name of the PV year: Year of interest event_type: Need to know the type of the event as key of TYPE_MAPPINGS line_number: Line number in the file Returns: ArchiveEvent: The event """ unescaped = unescape_bytes(line) event = TYPE_MAPPINGS[event_type]() try: event.ParseFromString(unescaped) except DecodeError: LOG.exception( "Error parsing line %s with unescaped bytes: %s", line_number, unescaped ) return None val = event.val if isinstance( event, ee.VectorDouble | ee.VectorEnum | ee.VectorFloat | ee.VectorInt | ee.VectorShort | ee.VectorString, ): # Note purposefully not including all Vectortypes here vector_val = list(val) val = vector_val return ArchiveEvent( pv, val, event.secondsintoyear, year, event.nano, event.severity, event.status, [to_field_value(f) for f in event.fieldvalues], )
@dataclass
[docs] class ArchiveEventsMeta: """Metadata about the events."""
[docs] pv_name: str
[docs] pv_type: str
[docs] element_count: int
[docs] headers: list[FieldValue]
[docs] year: int
[docs] ArchiveEventsData = tuple[dict[int, ArchiveEventsMeta], list[ArchiveEvent]]
[docs] def parse_pb_data( raw_data: bytes, ) -> ArchiveEventsData: """Turn raw PB data into an ArchiveData object. Args: raw_data: The raw data Returns: An ArchiveData object """ year_chunks = _break_up_chunks(raw_data) metadata: dict[int, ArchiveEventsMeta] = {} events: list[ArchiveEvent] = [] # Iterate over years for year, (chunk_info, lines) in year_chunks.items(): for line_number, line in enumerate(lines): event = _event_from_line( line, chunk_info.pvname, year, chunk_info.type, line_number ) if event is not None: events.append(event) metadata[year] = metadata_from_chunk_info(year, chunk_info) return metadata, events
[docs] def metadata_from_chunk_info( year: int, chunk_info: ee.PayloadInfo ) -> ArchiveEventsMeta: """Convert a chunk info into metadata. Args: year (int): Year of interest chunk_info (ee.PayloadInfo): Input chunk info Returns: ArchiveEventsMeta: Output metadata """ return ArchiveEventsMeta( pv_name=chunk_info.pvname, pv_type=str(TYPE_MAPPINGS[chunk_info.type]), element_count=chunk_info.elementCount, headers=[to_field_value(field_value) for field_value in chunk_info.headers], year=year, )
[docs] def get_iso_timestamp_for_event( year: int, event: EeEvent, ) -> str: """Returns an ISO-formatted timestamp string for the given event.""" return pd.Timestamp(event_timestamp(year, event)).isoformat()
[docs] def read_pb_file(filename: str) -> ArchiveEventsData: """Read an unescaped protobuf file and produce a list of events from file. Args: filename (str): location of file Returns: list[ArchiveEvent]: list of events in file """ return parse_pb_data(Path(filename).read_bytes())
[docs] def to_field_value(f: ee.FieldValue) -> FieldValue: """From the protobuf variant. Args: f (ee.FieldValue): protobuf field value Returns: FieldValue: Basic Field Value """ return FieldValue(f.name, f.val)