Source code for ewoksmx.tasks.mx_ccp4.mx_ccp4_cloud_uploader
import re
import subprocess
from ewoksmx.bin import resource_filename
from .drac_dataset_task import DracDatasetTask
from .mx_ccp4_cloud_uploader_inputs import MXCCP4CloudUploaderInputs
[docs]
def create_ccp4_cloud_id(dataset):
"""
Create a unique and sanitized ID for CCP4 Cloud based on dataset information.
"""
# 1. Use .get() to avoid KeyErrors and ensure we have strings
# Default to "unknown" or empty string if key is missing
dataset_name = str(dataset.get("name", "unknown"))
if not dataset_name:
dataset_name = "unknown"
sample_name = str(dataset.get("sampleName", "unknown"))
if not sample_name:
sample_name = "unknown"
# 2. Combine and lower
raw_id = f"{sample_name}_{dataset_name}".lower()
# 3. Sanitize (Regex is often faster/cleaner for this)
# Replace anything NOT alphanumeric, dash, or underscore with '_'
sanitized_id = re.sub(r"[^a-z0-9_-]", "_", raw_id)
# 4. Remove consecutive underscores to look cleaner
# e.g., "sample__data" -> "sample_data"
sanitized_id = re.sub(r"_+", "_", sanitized_id)
# 5. Remove leading/trailing underscores if any
sanitized_id = sanitized_id.strip("_")
return sanitized_id
# Implement a workflow task
[docs]
class MXCCP4CloudUploader(
DracDatasetTask,
input_model=MXCCP4CloudUploaderInputs,
):
[docs]
def get_task_name(self):
return "MXCCP4CloudUploader"
[docs]
def is_single_run(self):
return False
[docs]
def is_upload_to_catalogue_enabled(self):
return False
[docs]
def process(self, datasets, raw_dataset_folder):
if datasets is not None:
if len(datasets) == 1:
dataset = datasets[0]
# Create an unique id from the dataset
ccp4_cloud_id = create_ccp4_cloud_id(dataset)
# We could imagine to make more tests about size, for instance
single_raw_folder = dataset["location"]
self._log(
{
"logs": {
"msg": "About to upload",
"folder": single_raw_folder,
}
}
)
# Blocking execution
result = subprocess.run(
[
"node",
resource_filename("dl_client.js"),
"--url",
"https://data.cloud.ccp4.ac.uk/api",
"--user",
self.get_input_value("user", None),
"--cloudrun_id",
self.get_input_value("cloudrun_id", None),
"--source",
"upload",
"--id",
ccp4_cloud_id,
"upload",
"--",
single_raw_folder,
],
capture_output=True,
text=True,
check=True,
)
self._log(
{
"logs": {
"result": str(result),
}
}
)