"""Archiver Retrieval methods."""
from __future__ import annotations
import datetime
import logging
from typing import TYPE_CHECKING, Any, cast
import pandas as pd
from pytz import UTC
from epicsarchiver.common.base_archiver import BaseArchiverAppliance
from epicsarchiver.common.date_util import (
datetime_from_str,
format_date,
set_timezone_utc,
)
from epicsarchiver.common.validation import (
validate_processor,
validate_pv,
validate_start_end,
)
from epicsarchiver.retrieval.archive_event import ArchiveEvent, dataframe_from_events
from epicsarchiver.retrieval.pb import parse_pb_data
if TYPE_CHECKING:
from requests import Response
from epicsarchiver.retrieval.archiver_retrieval.processor import Processor
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
ENDPOINT_GET_DATA = "/data/getData.raw"
[docs]
ENDPOINT_GET_MATCHING_PVS = "/bpl/getMatchingPVs"
[docs]
def json_to_dataframe(data: Any) -> pd.DataFrame:
"""Converts json from the archiver.
Converts to a dataframe with two
columns "date" and "val" and the index is "date".
Args:
data: json from a json archiver request
Returns:
pd.DataFrame
"""
events_dataframe = pd.DataFrame(data[0]["data"])
try:
total_nanos = (
events_dataframe["secs"].multiply(1e9).add(events_dataframe["nanos"])
)
events_dataframe["date"] = pd.to_datetime(total_nanos, unit="ns", utc=True)
except KeyError:
# Empty data
pass
else:
events_dataframe = events_dataframe[["date", "val"]]
events_dataframe = events_dataframe.set_index("date")
return events_dataframe
[docs]
class ArchiverRetrieval(BaseArchiverAppliance):
"""Retrieval EPICS Archiver Appliance client.
Hold a session to the Retrieval Archiver Appliance web application.
Args:
hostname: EPICS Archiver Appliance hostname [default: localhost]
port: EPICS Archiver Appliance management port [default: 17665]
Examples:
.. code-block:: python
from epicsarchiver.archiver.retrieval import ArchiverRetrieval
archappl = ArchiverRetrieval("archiver-01.tn.esss.lu.se")
print(archappl.version)
df = archappl.get_data("my:pv", start="2018-07-04 13:00", end=datetime.utcnow())
"""
def __init__(self, hostname: str = "localhost", port: int = 17665):
"""Create Archiver Appliance object.
Args:
hostname (str, optional): hostname of archiver. Defaults to "localhost".
port (int, optional): port number of mgmt interface. Defaults to 17665.
"""
super().__init__(hostname, port)
[docs]
self._data_retrieval_url = self.info["dataRetrievalURL"]
[docs]
self.data_url: str = self._data_retrieval_url + ENDPOINT_GET_DATA
[docs]
self.matching_pvs_url: str = (
self._data_retrieval_url + ENDPOINT_GET_MATCHING_PVS
)
[docs]
def _get_data_raw(
self,
pv: str,
start: datetime.datetime,
end: datetime.datetime,
) -> Response:
"""Retrieve archived data.
Args:
pv: name of the pv.
start: start time. Can be a string or `datetime.datetime`
object.
end: end time. Can be a string or `datetime.datetime`
object.
Returns:
`Response`
"""
# http://slacmshankar.github.io/epicsarchiver_docs/userguide.html
params = {
"pv": pv,
"from": format_date(start),
"to": format_date(end),
}
return self._get(
self.data_url,
params=params,
stream=True,
)
[docs]
def _get_matching_pvs(
self,
query: str,
limit: int,
) -> list[str]:
"""Retrieve list of matching pv names for given regex search string.
Args:
query (str): A regex search string.
limit (int): Limit of PV names to return.
Returns:
list[str]: List of pv names
"""
params = {
"regex": query,
"limit": str(limit),
}
return cast(
"list[str]",
self._get(
self.matching_pvs_url,
params=params,
stream=True,
).json(),
)
[docs]
def _check_for_pvs_in_time_range(
self,
query: list[str],
start: datetime.datetime | None = None,
end: datetime.datetime | None = None,
) -> list[str]:
"""Check if data recorded during the given time range for each PV in list.
If both start and end given, return only PVs which recorded data during that
time range.
If start given and end not, return PVs which recorded data between start and
now.
If end given and start not, return PVs which recorded any data before end.
Args:
query (list[str]): List of pvs.
start (datetime.datetime | None): Start of the time range.
end (datetime.datetime | None): End of the time range.
Returns:
list[str]: List of PV names found.
"""
if not start and not end:
return query
# Add timezone if missing, otherwise convert to UTC.
start = set_timezone_utc(input_time=start) if start else None
end = set_timezone_utc(input_time=end) if end else datetime.datetime.now(tz=UTC)
# Set both ends of time range in the data query query to end, then Archiver
# returns the most recent event prior to end, or an empty result.
all_events = [self.get_events(pv, end, end) for pv in query]
# Create list of those PVs with atleast one event within specified time range.
pv_list: list[str] = []
for events in all_events:
pv_list.extend(
event.pv
for event in events
if (start and event.pd_timestamp.to_pydatetime(warn=False) >= start)
or not start
)
return pv_list
[docs]
def get_events(
self,
pv: str,
start: datetime.datetime,
end: datetime.datetime,
processor: Processor | None = None,
) -> list[ArchiveEvent]:
"""Retrieve archived data.
Args:
pv: name of the pv.
start: start time. Can be a string or `datetime.datetime`
object.
end: end time. Can be a string or `datetime.datetime`
object.
processor (Processor | None, optional): Preprocessor
to use. Defaults to None.
Returns:
list[ArchiveEvent]: requested events from the archiver.
"""
# http://slacmshankar.github.io/epicsarchiver_docs/userguide.html
validate_pv(pv)
validate_start_end(start, end)
validate_processor(processor)
pv_request = processor.calc_pv_name(pv) if processor else pv
r = self._get_data_raw(pv_request, start, end)
pb_data = r.content
metadata, events = parse_pb_data(pb_data)
LOG.debug("Metadata: %s", metadata)
return events
[docs]
def get_data(
self,
pv: str,
start: str | datetime.datetime,
end: str | datetime.datetime,
processor: Processor | None = None,
) -> pd.DataFrame:
"""Retrieve archived data.
Args:
pv: name of the pv.
start: start time. Can be a string or `datetime.datetime`
object.
end: end time. Can be a string or `datetime.datetime`
object.
processor (Processor | None, optional): Preprocessor
to use. Defaults to None.
Returns:
`pandas.DataFrame`
"""
# http://slacmshankar.github.io/epicsarchiver_docs/userguide.html
start_time = datetime_from_str(start)
end_time = datetime_from_str(end)
events = self.get_events(pv, start_time, end_time, processor)
if not events:
return dataframe_from_events([])
# Convert events to DataFrame
return dataframe_from_events(events)
[docs]
def search(
self,
query: str,
*,
start: datetime.datetime | None = None,
end: datetime.datetime | None = None,
limit: int = 500,
) -> list[str]:
"""Search for names of PVs matching the given regex search string.
Optionally specify start and/or end times to only return PVs that recorded data
in the specified time range.
Args:
query (str): A regex search string.
start (datetime.datetime | None): Start time of the time period.
end (datetime.datetime | None): End time of the time period.
limit (int): Limit of PV names to return for each search string given.
To get all the PV names, (potentially in the millions), set limit to -1.
[default: 500]
Returns:
list[str]: List of PV names found.
"""
# Limit returned list of PV to those in time range, if supplied.
return self._check_for_pvs_in_time_range(
query=self._get_matching_pvs(query, limit),
start=start,
end=end,
)