Source code for ewoksmx.tasks.mx_ccp4.drac_dataset_task

import logging
from abc import abstractmethod
from typing import Any
from typing import Optional

import requests
from pyicat_plus.client.main import IcatClient

from . import utils
from .catalogued_dataset_task import CataloguedDatasetTask
from .drac_dataset_task_inputs import DracDatasetTaskInputs

logger = logging.getLogger(__name__)

# Module-level constant
DEFAULT_INGESTER_URL = "bcu-mq-01:61613"
DEFAULT_ICATPLUS_URL = "https://icatplus.esrf.fr"


[docs] class DracDatasetTask( CataloguedDatasetTask, input_model=DracDatasetTaskInputs, ): """ It is a task conceived to be run on a single RAW dataset. It manages the output destination and the initialization """
[docs] @abstractmethod def get_task_name(self): """ the task name will be used as folder name in the PROCESSED_FOLDER Example: /data/visitor/PROCESSED_DATA/sample/dataset/{task_name} """ pass
[docs] @abstractmethod def is_single_run(self): """ If True the same task can be ran only once """ pass
[docs] def is_upload_to_catalogue_enabled(self): """ If true the data will be uploaded to the catalogue """ return True
def _set_status(self, status): self._log({"status": status}) def _log(self, payload): """ Sends a log payload to the callback URL if provided. Args: payload (dict): The JSON payload to send. """ logger.debug(payload) if not self.callback: return try: response = requests.put(self.callback, json=payload) response.raise_for_status() except Exception as ex: logger.error(f"Logging request failed: {ex}")
[docs] def process(self): pass
[docs] def extract_metadata_from(self, datasets, raw_data_path, output_folder): """ This method extracts the necessary "administrative" metadata to upload the datasets to DRAC """ if datasets is None: raise RuntimeError("Datasets can not be None") if len(datasets) == 0: raise RuntimeError("No datasets provided") try: return { "beamline": datasets[0]["investigation"]["investigationInstruments"][0][ "instrument" ]["name"], "proposal": datasets[0]["investigation"]["name"], "dataset": self.get_task_name(), "path": output_folder, "raw": raw_data_path[0], "metadata": {}, "Sample_name": datasets[0]["sampleName"], } except RuntimeError: raise
[docs] def are_roles_valid(self): """ Check that the user has the role that corresponds with the keywods of the workflow """ return True
[docs] def is_in_validity_days_range(self): """ Check that the run is between the validity days of the workflow """ return True
def _get_icat_client(self, token: Optional[str]) -> Any: """ Lazily initializes and returns an ICAT client instance. Args: token (Optional[str]): The ICAT session ID token used for authentication. Returns: Any: An initialized ICAT client instance. """ if self.icatClient is None: self.icatClient = IcatClient( icat_session_id=token, metadata_urls=[DEFAULT_INGESTER_URL], icatplus_restricted_url=DEFAULT_ICATPLUS_URL, ) return self.icatClient
[docs] def upload_to_catalogue(self): # Extract the metadata about the dataset upload_parameters = self.extract_metadata_from( self.datasets, self.raw_data_path, self.output_folder ) self.metadata["input_datasets"] = [upload_parameters["raw"]] self.metadata["Sample_name"] = upload_parameters["Sample_name"] self.metadata["Workflow_name"] = self.get_task_name() self.metadata["Process_triggering"] = "MANUAL" self._log( { "logs": { "upload_parameters": upload_parameters, "metadata": self.metadata, } } ) self.icatClient.store_dataset( beamline=upload_parameters["beamline"], proposal=upload_parameters["proposal"], dataset=upload_parameters["dataset"], path=upload_parameters["path"], metadata=self.metadata, )
[docs] def authenticate(self, sessionId) -> str: """ Validates: 1) the user session based on the provided session ID. 2) The user's role is allowed by the workflow Args: session_id (str): The session ID to validate. Raises: Exception: If the session has expired (lifeTimeMinutes < 0). """ try: self.icatClient = None # Not sure the best way to do this session = self._get_icat_client(sessionId).get_session_information() user = session.get("user", {}) self.full_name = user.get("fullName", "Unknown") self.user_name = session.get("userName", "Unknown") self.lifeTimeMinutes = user.get("lifeTimeMinutes", 0) logger.debug(f"Processing launched by user_name={self.user_name}") logger.debug(f"Logged in. lifeTimeMinutes={self.lifeTimeMinutes}") if self.lifeTimeMinutes < 0: raise RuntimeError("Session expired") return True except Exception as e: logger.error(f"Authentication failed: {e}") raise
[docs] def authorize(self, sessionId, datasets): """ Checks: 1) the user has access to all the datasets sent in the attribute datasets 2) the raw_data_path is the location of one of these datasets 3) Validity days Args: session_id (str): The session ID to validate. raw_data_path (str): Location of the raw data that needs processing datasets ([]): Array f datasets Raises: Exception: If the user has not access to the datasets or the raw_data_path does not belong to the datasets """ try: dataset_ids = [dataset["id"] for dataset in datasets] self._log( { "logs": { "dataset_ids": str(dataset_ids), } } ) # check that at least there is one dataset provided if len(dataset_ids) == 0: raise RuntimeError("No input datasets provided") authorized_datasets = self._get_icat_client(sessionId).get_datasets_by( dataset_ids=",".join(map(str, dataset_ids)) ) # check that the number of datasets corresponds to the number of input datasets if len(authorized_datasets) != len(datasets): raise RuntimeError( "The number of authorized datasets does not correspond to the input datasets" ) # check that raw dataset path is within the authorized datasets # dataset_paths = [dataset["location"] for dataset in authorized_datasets] # if not set(datasets_path).issubset((set(dataset_paths))): # raise Exception( # "raw_data_path should be part of the datasets locations" # ) if not self.is_in_validity_days_range(): raise RuntimeError("dataset is not in the validity days range") except Exception as e: logger.error(f"Authorization failed: {e}") raise
[docs] def calculate_output_folder(self): """ Initializes the FolderManager and computes the output folder path based on the raw data path. Ensures the folder exists before returning. Returns: str: The path to the output folder. """ # Initialize FolderManager with run mode and task name self.folderManager = utils.FolderManager( is_single_run=self.is_single_run(), task_name=self.get_task_name() ) # Determine the output folder path output_folder = self.folderManager.get_output_folder(self.raw_data_path) # Ensure the folder exists self.folderManager.create_folder_if_not_exists(output_folder) return output_folder