"""Archiver Mgmt operations module."""
from __future__ import annotations
import logging
from enum import Enum, auto
from pathlib import Path
from typing import TYPE_CHECKING, Any, Collection, Dict, List, cast
from epicsarchiver.mgmt import archive_files
from epicsarchiver.mgmt.archiver_mgmt_info import (
ArchiverMgmtInfo,
ArchivingStatus,
)
if TYPE_CHECKING:
from epicsarchiver.common import ArchDbrType
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
class Storage(str, Enum):
"""Represents the different storage levels of the archiver appliance."""
[docs]
class PutInfoType(Enum):
"""Represents the different types of put type info."""
[docs]
TypeInfo = Dict[str, Collection[str]]
[docs]
OperationResult = Dict[str, str]
[docs]
OperationResultList = List[OperationResult]
[docs]
class ArchiverMgmtOperations(ArchiverMgmtInfo):
"""Mgmt Operations EPICS Archiver Appliance client.
Hold a session to the Archiver Appliance web application and use the mgmt interface.
Args:
hostname: EPICS Archiver Appliance hostname [default: localhost]
port: EPICS Archiver Appliance management port [default: 17665]
Examples:
.. code-block:: python
from epicsarchiver.archiver.mgmt import ArchiverMgmtOperations
archappl = ArchiverMgmtOperations("archiver-01.tn.esss.lu.se")
print(archappl.version)
archappl.archive_pv("PVNAME")
"""
# EPICS Archiver Appliance documentation of mgmt endpoints:
# https://epicsarchiver.readthedocs.io/en/latest/developer/mgmt_scriptables.html
[docs]
def archive_pv(self, pv: str, **kwargs: Any) -> OperationResultList:
r"""Archive a PV.
Args:
pv: name of the pv to be achived. Can be a comma separated
list of names.
**kwargs: optional extra keyword arguments -
samplingperiod - samplingmethod - controllingPV - policy
- appliance
Returns:
list of submitted PVs
"""
params = {"pv": pv}
params.update(kwargs)
r = self._get("/archivePV", params=params)
return cast("OperationResultList", r.json())
[docs]
def archive_pvs(self, pvs: OperationResultList) -> OperationResultList:
"""Archive a list of PVs.
Args:
pvs: list of PVs (as dict) to archive
Returns:
list of submitted PVs
"""
r = self._post("/archivePV", json=pvs)
return cast("OperationResultList", r.json())
[docs]
def archive_pvs_from_files(
self,
files: list[str],
appliance: str | None = None,
) -> OperationResultList:
"""Archive PVs from a list of files.
Args:
files: list of files in CSV format with PVs to archive.
appliance: optional appliance to use to archive PVs (in a
cluster)
Returns:
list of submitted PVs
"""
pvs = archive_files.get_pvs_from_files([Path(f) for f in files], appliance)
return self.archive_pvs(pvs)
[docs]
def pause_pv(self, pv: str) -> OperationResultList | OperationResult:
"""Pause the archiving of a PV(s).
Args:
pv: name of the pv. Can be a GLOB wildcards or a list of
comma separated names.
Returns:
list of submitted PVs
"""
response = self._get_or_post("/pauseArchivingPV", pv)
if "," not in pv:
return cast("OperationResult", response)
return cast("OperationResultList", response)
[docs]
def resume_pv(self, pv: str) -> OperationResultList | OperationResult:
"""Resume the archiving of a PV(s).
Args:
pv: name of the pv. Can be a GLOB wildcards or a list of
comma separated names.
Returns:
list of submitted PVs
"""
response = self._get_or_post("/resumeArchivingPV", pv)
if "," not in pv:
return cast("OperationResult", response)
return cast("OperationResultList", response)
[docs]
def abort_pv(self, pv: str) -> list[str]:
"""Abort any pending requests for archiving this PV.
Args:
pv: name of the pv.
Returns:
list of submitted PVs
"""
r = self._get("/abortArchivingPV", params={"pv": pv})
return cast("List[str]", r.json())
[docs]
def add_alias(self, pv: str, alias_name: str) -> None:
"""Add an alias to a pv.
Args:
pv: PV to add alias.
alias_name: name of alias to add to pv.
"""
r = self._get("/addAlias", params={"pv": pv, "aliasname": alias_name})
r_json = r.json()
if r_json["status"] != "ok":
LOG.error("Failed to add alias, response %s", str(r_json))
return
LOG.debug(r_json["desc"])
[docs]
def delete_pv(
self,
pv: str,
delete_data: bool = False, # noqa: FBT002, FBT001
) -> list[str]:
"""Stop archiving the specified PV.
The PV needs to be paused first.
Args:
pv: name of the pv.
delete_data: delete the data that has already been recorded.
Default to False.
Returns:
list of submitted PVs
"""
r = self._get("/deletePV", params={"pv": pv, "delete_data": delete_data})
return cast("List[str]", r.json())
[docs]
def rename_pv(self, pv: str, newname: str) -> OperationResult:
"""Rename this pv to a new name.
The PV needs to be paused first.
Args:
pv (str): name of the pv.
newname (str): new name of the pv
Returns:
OperationResult: Status of action and description. Example:
{"status":"ok","desc":"Successfully renamed PV PV1 to PV2"}
"""
r = self._get("/renamePV", params={"pv": pv, "newname": newname})
return cast("OperationResult", r.json())
[docs]
def update_pv(
self,
pv: str,
samplingperiod: float,
samplingmethod: str | None = None,
) -> list[str]:
"""Change the archival parameters for a PV.
Args:
pv: name of the pv.
samplingperiod: the new sampling period in seconds.
samplingmethod: the new sampling method [SCAN|MONITOR]
Returns:
list of submitted PV
"""
params = {"pv": pv, "samplingperiod": samplingperiod}
if samplingmethod:
params["samplingmethod"] = samplingmethod
r = self._get("/changeArchivalParameters", params=params)
return cast("List[str]", r.json())
[docs]
def pause_rename_resume_pv(self, pv: str, new: str) -> None:
"""Pause, rename and resume a PV.
Args:
pv: name of the pv
new: new name of the pv
"""
result = self.get_archiving_status(pv)
if result != ArchivingStatus.BeingArchived:
LOG.error("PV %s isn't being archived. Skipping.\n", pv)
return
result = self.get_archiving_status(new)
if result != ArchivingStatus.NotBeingArchived:
LOG.error("New PV %s already exists. Skipping.\n", new)
return
cresult = self.pause_pv(pv)
if not check_result(cresult, f"Error while pausing {pv}"):
return
cresult = self.rename_pv(pv, new)
if not check_result(cresult, f"Error while renaming {pv} to {new}"):
return
cresult = self.resume_pv(new)
if not check_result(cresult, f"Error while resuming {new}"):
return
LOG.debug("PV %s successfully renamed to %s", pv, new)
[docs]
def rename_pvs_from_files(self, files: list[str]) -> None:
"""Rename PVs from a list of files.
Each PV will be paused, renamed and resumed
Args:
files: list of files in CSV format with PVs to rename.
"""
pvs = archive_files.get_rename_pvs_from_files(files)
for current, new in pvs:
self.pause_rename_resume_pv(current, new)
[docs]
def rename_and_append(self, old: str, new: str, storage: Storage) -> None:
"""Appends the data for an older PV into a newer PV.
The older PV is deleted and an alias mapping the older PV name to
the new PV is added.
Args:
old (str): The name of the older pv.
The data for this PV will be appended to the newer PV and then deleted.
new (str): The name of the newer pv.
storage (Storage): The name of the store to consolidate data
before appending.
"""
pvs = [old, new]
for pv in pvs:
status = self.get_archiving_status(pv)
if status != ArchivingStatus.Paused:
LOG.error("PV %s isn't paused. Skipping.\n", pv)
return
response = self._get(
"/appendAndAliasPV",
params={"olderpv": old, "newerpv": new, "storage": storage},
)
LOG.debug("/appendAndAliasPV response %s", response.json())
result = cast("OperationResultList", response.json())
if not check_result(result, f"Error while append_and_alias_pv {old}, {new}"):
return
LOG.debug("PV %s successfully appended and aliased to %s", old, new)
[docs]
def change_type(self, pv: str, new_type: ArchDbrType) -> None:
"""Change the type of a pv to a new type.
Args:
pv (str): Name of the PV
new_type (ArchDbrType): New DBR_TYPE
"""
LOG.info("Change type of pv %s to %s", pv, new_type)
response = self._get(
"/changeTypeForPV", params={"pv": pv, "newtype": new_type.name}
)
result = cast("OperationResultList", response.json())
if not check_result(result, f"Error while change_type {pv}"):
return
LOG.debug("PV %s successfully changed type to %s", pv, new_type)
[docs]
def put_pv_type_info(
self, pv: str, type_info: TypeInfo, put_info_type: PutInfoType
) -> TypeInfo:
"""Put the type info for a PV.
Args:
pv (str): Name of the PV
type_info (InfoResult): Type info
put_info_type (PutInfoType): Whether override or create new
Returns:
OperationResult: The updated type info
"""
LOG.info("Put type info for pv %s", pv)
params = {"pv": pv}
if put_info_type == PutInfoType.CreateNew:
params["createnew"] = "true"
params["override"] = "false"
elif put_info_type == PutInfoType.Override:
params["createnew"] = "false"
params["override"] = "true"
response = self._post(
"/putPVTypeInfo",
params=params,
json=type_info,
)
result = cast("TypeInfo", response.json())
LOG.debug("Put type info %s for pv %s", result, pv)
return cast("TypeInfo", response.json())
[docs]
def check_result(
result: OperationResult | OperationResultList,
default_message: str | None = None,
) -> bool:
"""Check a result returned by the Archiver Appliance.
Args:
result (OperationResult | OperationResultList): Input result type
default_message (str | None, optional): Message for the user. Defaults to None.
Returns:
bool: Return True if the status is ok
Return False otherwise and print the default_message or validation value
"""
if isinstance(result, list):
LOG.error(
"Method check_result does not support multiple PVs from result %s",
result,
)
return False
status = result.get("status", "nok")
if status.lower() != "ok":
message = result.get("validation", default_message)
LOG.error(message)
return False
return True