Source code for ewoksmx.tasks.mx_ccp4.drac_dataset_task
import logging
from abc import abstractmethod
from typing import Any
from typing import Optional
import requests
from pyicat_plus.client.main import IcatClient
from . import utils
from .catalogued_dataset_task import CataloguedDatasetTask
from .drac_dataset_task_inputs import DracDatasetTaskInputs
logger = logging.getLogger(__name__)
# Module-level constant
DEFAULT_INGESTER_URL = "bcu-mq-01:61613"
DEFAULT_ICATPLUS_URL = "https://icatplus.esrf.fr"
[docs]
class DracDatasetTask(
CataloguedDatasetTask,
input_model=DracDatasetTaskInputs,
):
"""
It is a task conceived to be run on a single RAW dataset.
It manages the output destination and the initialization
"""
[docs]
@abstractmethod
def get_task_name(self):
"""
the task name will be used as folder name in the PROCESSED_FOLDER
Example: /data/visitor/PROCESSED_DATA/sample/dataset/{task_name}
"""
pass
[docs]
@abstractmethod
def is_single_run(self):
"""
If True the same task can be ran only once
"""
pass
[docs]
def is_upload_to_catalogue_enabled(self):
"""
If true the data will be uploaded to the catalogue
"""
return True
def _set_status(self, status):
self._log({"status": status})
def _log(self, payload):
"""
Sends a log payload to the callback URL if provided.
Args:
payload (dict): The JSON payload to send.
"""
logger.debug(payload)
if not self.callback:
return
try:
response = requests.put(self.callback, json=payload)
response.raise_for_status()
except Exception as ex:
logger.error(f"Logging request failed: {ex}")
[docs]
def process(self):
pass
[docs]
def are_roles_valid(self):
"""
Check that the user has the role that corresponds with the keywods of the workflow
"""
return True
[docs]
def is_in_validity_days_range(self):
"""
Check that the run is between the validity days of the workflow
"""
return True
def _get_icat_client(self, token: Optional[str]) -> Any:
"""
Lazily initializes and returns an ICAT client instance.
Args:
token (Optional[str]): The ICAT session ID token used for authentication.
Returns:
Any: An initialized ICAT client instance.
"""
if self.icatClient is None:
self.icatClient = IcatClient(
icat_session_id=token,
metadata_urls=[DEFAULT_INGESTER_URL],
icatplus_restricted_url=DEFAULT_ICATPLUS_URL,
)
return self.icatClient
[docs]
def upload_to_catalogue(self):
# Extract the metadata about the dataset
upload_parameters = self.extract_metadata_from(
self.datasets, self.raw_data_path, self.output_folder
)
self.metadata["input_datasets"] = [upload_parameters["raw"]]
self.metadata["Sample_name"] = upload_parameters["Sample_name"]
self.metadata["Workflow_name"] = self.get_task_name()
self.metadata["Process_triggering"] = "MANUAL"
self._log(
{
"logs": {
"upload_parameters": upload_parameters,
"metadata": self.metadata,
}
}
)
self.icatClient.store_dataset(
beamline=upload_parameters["beamline"],
proposal=upload_parameters["proposal"],
dataset=upload_parameters["dataset"],
path=upload_parameters["path"],
metadata=self.metadata,
)
[docs]
def authenticate(self, sessionId) -> str:
"""
Validates:
1) the user session based on the provided session ID.
2) The user's role is allowed by the workflow
Args:
session_id (str): The session ID to validate.
Raises:
Exception: If the session has expired (lifeTimeMinutes < 0).
"""
try:
self.icatClient = None # Not sure the best way to do this
session = self._get_icat_client(sessionId).get_session_information()
user = session.get("user", {})
self.full_name = user.get("fullName", "Unknown")
self.user_name = session.get("userName", "Unknown")
self.lifeTimeMinutes = user.get("lifeTimeMinutes", 0)
logger.debug(f"Processing launched by user_name={self.user_name}")
logger.debug(f"Logged in. lifeTimeMinutes={self.lifeTimeMinutes}")
if self.lifeTimeMinutes < 0:
raise RuntimeError("Session expired")
return True
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
[docs]
def authorize(self, sessionId, datasets):
"""
Checks:
1) the user has access to all the datasets sent in the attribute datasets
2) the raw_data_path is the location of one of these datasets
3) Validity days
Args:
session_id (str): The session ID to validate.
raw_data_path (str): Location of the raw data that needs processing
datasets ([]): Array f datasets
Raises:
Exception: If the user has not access to the datasets or the raw_data_path does not belong to the datasets
"""
try:
dataset_ids = [dataset["id"] for dataset in datasets]
self._log(
{
"logs": {
"dataset_ids": str(dataset_ids),
}
}
)
# check that at least there is one dataset provided
if len(dataset_ids) == 0:
raise RuntimeError("No input datasets provided")
authorized_datasets = self._get_icat_client(sessionId).get_datasets_by(
dataset_ids=",".join(map(str, dataset_ids))
)
# check that the number of datasets corresponds to the number of input datasets
if len(authorized_datasets) != len(datasets):
raise RuntimeError(
"The number of authorized datasets does not correspond to the input datasets"
)
# check that raw dataset path is within the authorized datasets
# dataset_paths = [dataset["location"] for dataset in authorized_datasets]
# if not set(datasets_path).issubset((set(dataset_paths))):
# raise Exception(
# "raw_data_path should be part of the datasets locations"
# )
if not self.is_in_validity_days_range():
raise RuntimeError("dataset is not in the validity days range")
except Exception as e:
logger.error(f"Authorization failed: {e}")
raise
[docs]
def calculate_output_folder(self):
"""
Initializes the FolderManager and computes the output folder path
based on the raw data path. Ensures the folder exists before returning.
Returns:
str: The path to the output folder.
"""
# Initialize FolderManager with run mode and task name
self.folderManager = utils.FolderManager(
is_single_run=self.is_single_run(), task_name=self.get_task_name()
)
# Determine the output folder path
output_folder = self.folderManager.get_output_folder(self.raw_data_path)
# Ensure the folder exists
self.folderManager.create_folder_if_not_exists(output_folder)
return output_folder