Source code for ewoksmx.tasks.mx_ccp4.mx_ccp4_cloud_uploader

import re
import subprocess

from ewoksmx.bin import resource_filename

from .drac_dataset_task import DracDatasetTask
from .mx_ccp4_cloud_uploader_inputs import MXCCP4CloudUploaderInputs


[docs] def create_ccp4_cloud_id(dataset): """ Create a unique and sanitized ID for CCP4 Cloud based on dataset information. """ # 1. Use .get() to avoid KeyErrors and ensure we have strings # Default to "unknown" or empty string if key is missing dataset_name = str(dataset.get("name", "unknown")) if not dataset_name: dataset_name = "unknown" sample_name = str(dataset.get("sampleName", "unknown")) if not sample_name: sample_name = "unknown" # 2. Combine and lower raw_id = f"{sample_name}_{dataset_name}".lower() # 3. Sanitize (Regex is often faster/cleaner for this) # Replace anything NOT alphanumeric, dash, or underscore with '_' sanitized_id = re.sub(r"[^a-z0-9_-]", "_", raw_id) # 4. Remove consecutive underscores to look cleaner # e.g., "sample__data" -> "sample_data" sanitized_id = re.sub(r"_+", "_", sanitized_id) # 5. Remove leading/trailing underscores if any sanitized_id = sanitized_id.strip("_") return sanitized_id
# Implement a workflow task
[docs] class MXCCP4CloudUploader( DracDatasetTask, input_model=MXCCP4CloudUploaderInputs, ):
[docs] def get_task_name(self): return "MXCCP4CloudUploader"
[docs] def is_single_run(self): return False
[docs] def is_upload_to_catalogue_enabled(self): return False
[docs] def process(self, datasets, raw_dataset_folder): if datasets is not None: if len(datasets) == 1: dataset = datasets[0] # Create an unique id from the dataset ccp4_cloud_id = create_ccp4_cloud_id(dataset) # We could imagine to make more tests about size, for instance single_raw_folder = dataset["location"] self._log( { "logs": { "msg": "About to upload", "folder": single_raw_folder, } } ) # Blocking execution result = subprocess.run( [ "node", resource_filename("dl_client.js"), "--url", "https://data.cloud.ccp4.ac.uk/api", "--user", self.get_input_value("user", None), "--cloudrun_id", self.get_input_value("cloudrun_id", None), "--source", "upload", "--id", ccp4_cloud_id, "upload", "--", single_raw_folder, ], capture_output=True, text=True, check=True, ) self._log( { "logs": { "result": str(result), } } )