Source code for epicsarchiver.mgmt.archiver_mgmt_operations

"""Archiver Mgmt operations module."""

from __future__ import annotations

import logging
from enum import Enum, auto
from pathlib import Path
from typing import TYPE_CHECKING, Any, Collection, Dict, List, cast

from epicsarchiver.mgmt import archive_files
from epicsarchiver.mgmt.archiver_mgmt_info import (
    ArchiverMgmtInfo,
    ArchivingStatus,
)

if TYPE_CHECKING:
    from epicsarchiver.common import ArchDbrType

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] class Storage(str, Enum): """Represents the different storage levels of the archiver appliance."""
[docs] STS = "STS"
[docs] MTS = "MTS"
[docs] LTS = "LTS"
[docs] class PutInfoType(Enum): """Represents the different types of put type info."""
[docs] Override = auto()
[docs] CreateNew = auto()
[docs] TypeInfo = Dict[str, Collection[str]]
[docs] OperationResult = Dict[str, str]
[docs] OperationResultList = List[OperationResult]
[docs] class ArchiverMgmtOperations(ArchiverMgmtInfo): """Mgmt Operations EPICS Archiver Appliance client. Hold a session to the Archiver Appliance web application and use the mgmt interface. Args: hostname: EPICS Archiver Appliance hostname [default: localhost] port: EPICS Archiver Appliance management port [default: 17665] Examples: .. code-block:: python from epicsarchiver.archiver.mgmt import ArchiverMgmtOperations archappl = ArchiverMgmtOperations("archiver-01.tn.esss.lu.se") print(archappl.version) archappl.archive_pv("PVNAME") """ # EPICS Archiver Appliance documentation of mgmt endpoints: # https://epicsarchiver.readthedocs.io/en/latest/developer/mgmt_scriptables.html
[docs] def archive_pv(self, pv: str, **kwargs: Any) -> OperationResultList: r"""Archive a PV. Args: pv: name of the pv to be achived. Can be a comma separated list of names. **kwargs: optional extra keyword arguments - samplingperiod - samplingmethod - controllingPV - policy - appliance Returns: list of submitted PVs """ params = {"pv": pv} params.update(kwargs) r = self._get("/archivePV", params=params) return cast("OperationResultList", r.json())
[docs] def archive_pvs(self, pvs: OperationResultList) -> OperationResultList: """Archive a list of PVs. Args: pvs: list of PVs (as dict) to archive Returns: list of submitted PVs """ r = self._post("/archivePV", json=pvs) return cast("OperationResultList", r.json())
[docs] def archive_pvs_from_files( self, files: list[str], appliance: str | None = None, ) -> OperationResultList: """Archive PVs from a list of files. Args: files: list of files in CSV format with PVs to archive. appliance: optional appliance to use to archive PVs (in a cluster) Returns: list of submitted PVs """ pvs = archive_files.get_pvs_from_files([Path(f) for f in files], appliance) return self.archive_pvs(pvs)
[docs] def pause_pv(self, pv: str) -> OperationResultList | OperationResult: """Pause the archiving of a PV(s). Args: pv: name of the pv. Can be a GLOB wildcards or a list of comma separated names. Returns: list of submitted PVs """ response = self._get_or_post("/pauseArchivingPV", pv) if "," not in pv: return cast("OperationResult", response) return cast("OperationResultList", response)
[docs] def resume_pv(self, pv: str) -> OperationResultList | OperationResult: """Resume the archiving of a PV(s). Args: pv: name of the pv. Can be a GLOB wildcards or a list of comma separated names. Returns: list of submitted PVs """ response = self._get_or_post("/resumeArchivingPV", pv) if "," not in pv: return cast("OperationResult", response) return cast("OperationResultList", response)
[docs] def abort_pv(self, pv: str) -> list[str]: """Abort any pending requests for archiving this PV. Args: pv: name of the pv. Returns: list of submitted PVs """ r = self._get("/abortArchivingPV", params={"pv": pv}) return cast("List[str]", r.json())
[docs] def add_alias(self, pv: str, alias_name: str) -> None: """Add an alias to a pv. Args: pv: PV to add alias. alias_name: name of alias to add to pv. """ r = self._get("/addAlias", params={"pv": pv, "aliasname": alias_name}) r_json = r.json() if r_json["status"] != "ok": LOG.error("Failed to add alias, response %s", str(r_json)) return LOG.debug(r_json["desc"])
[docs] def delete_pv( self, pv: str, delete_data: bool = False, # noqa: FBT002, FBT001 ) -> list[str]: """Stop archiving the specified PV. The PV needs to be paused first. Args: pv: name of the pv. delete_data: delete the data that has already been recorded. Default to False. Returns: list of submitted PVs """ r = self._get("/deletePV", params={"pv": pv, "delete_data": delete_data}) return cast("List[str]", r.json())
[docs] def rename_pv(self, pv: str, newname: str) -> OperationResult: """Rename this pv to a new name. The PV needs to be paused first. Args: pv (str): name of the pv. newname (str): new name of the pv Returns: OperationResult: Status of action and description. Example: {"status":"ok","desc":"Successfully renamed PV PV1 to PV2"} """ r = self._get("/renamePV", params={"pv": pv, "newname": newname}) return cast("OperationResult", r.json())
[docs] def update_pv( self, pv: str, samplingperiod: float, samplingmethod: str | None = None, ) -> list[str]: """Change the archival parameters for a PV. Args: pv: name of the pv. samplingperiod: the new sampling period in seconds. samplingmethod: the new sampling method [SCAN|MONITOR] Returns: list of submitted PV """ params = {"pv": pv, "samplingperiod": samplingperiod} if samplingmethod: params["samplingmethod"] = samplingmethod r = self._get("/changeArchivalParameters", params=params) return cast("List[str]", r.json())
[docs] def pause_rename_resume_pv(self, pv: str, new: str) -> None: """Pause, rename and resume a PV. Args: pv: name of the pv new: new name of the pv """ result = self.get_archiving_status(pv) if result != ArchivingStatus.BeingArchived: LOG.error("PV %s isn't being archived. Skipping.\n", pv) return result = self.get_archiving_status(new) if result != ArchivingStatus.NotBeingArchived: LOG.error("New PV %s already exists. Skipping.\n", new) return cresult = self.pause_pv(pv) if not check_result(cresult, f"Error while pausing {pv}"): return cresult = self.rename_pv(pv, new) if not check_result(cresult, f"Error while renaming {pv} to {new}"): return cresult = self.resume_pv(new) if not check_result(cresult, f"Error while resuming {new}"): return LOG.debug("PV %s successfully renamed to %s", pv, new)
[docs] def rename_pvs_from_files(self, files: list[str]) -> None: """Rename PVs from a list of files. Each PV will be paused, renamed and resumed Args: files: list of files in CSV format with PVs to rename. """ pvs = archive_files.get_rename_pvs_from_files(files) for current, new in pvs: self.pause_rename_resume_pv(current, new)
[docs] def rename_and_append(self, old: str, new: str, storage: Storage) -> None: """Appends the data for an older PV into a newer PV. The older PV is deleted and an alias mapping the older PV name to the new PV is added. Args: old (str): The name of the older pv. The data for this PV will be appended to the newer PV and then deleted. new (str): The name of the newer pv. storage (Storage): The name of the store to consolidate data before appending. """ pvs = [old, new] for pv in pvs: status = self.get_archiving_status(pv) if status != ArchivingStatus.Paused: LOG.error("PV %s isn't paused. Skipping.\n", pv) return response = self._get( "/appendAndAliasPV", params={"olderpv": old, "newerpv": new, "storage": storage}, ) LOG.debug("/appendAndAliasPV response %s", response.json()) result = cast("OperationResultList", response.json()) if not check_result(result, f"Error while append_and_alias_pv {old}, {new}"): return LOG.debug("PV %s successfully appended and aliased to %s", old, new)
[docs] def change_type(self, pv: str, new_type: ArchDbrType) -> None: """Change the type of a pv to a new type. Args: pv (str): Name of the PV new_type (ArchDbrType): New DBR_TYPE """ LOG.info("Change type of pv %s to %s", pv, new_type) response = self._get( "/changeTypeForPV", params={"pv": pv, "newtype": new_type.name} ) result = cast("OperationResultList", response.json()) if not check_result(result, f"Error while change_type {pv}"): return LOG.debug("PV %s successfully changed type to %s", pv, new_type)
[docs] def put_pv_type_info( self, pv: str, type_info: TypeInfo, put_info_type: PutInfoType ) -> TypeInfo: """Put the type info for a PV. Args: pv (str): Name of the PV type_info (InfoResult): Type info put_info_type (PutInfoType): Whether override or create new Returns: OperationResult: The updated type info """ LOG.info("Put type info for pv %s", pv) params = {"pv": pv} if put_info_type == PutInfoType.CreateNew: params["createnew"] = "true" params["override"] = "false" elif put_info_type == PutInfoType.Override: params["createnew"] = "false" params["override"] = "true" response = self._post( "/putPVTypeInfo", params=params, json=type_info, ) result = cast("TypeInfo", response.json()) LOG.debug("Put type info %s for pv %s", result, pv) return cast("TypeInfo", response.json())
[docs] def check_result( result: OperationResult | OperationResultList, default_message: str | None = None, ) -> bool: """Check a result returned by the Archiver Appliance. Args: result (OperationResult | OperationResultList): Input result type default_message (str | None, optional): Message for the user. Defaults to None. Returns: bool: Return True if the status is ok Return False otherwise and print the default_message or validation value """ if isinstance(result, list): LOG.error( "Method check_result does not support multiple PVs from result %s", result, ) return False status = result.get("status", "nok") if status.lower() != "ok": message = result.get("validation", default_message) LOG.error(message) return False return True