Source code for ewoksmx.tasks.MXPipelineInput
import json
import pathlib
import re
from typing import Optional
from typing import Tuple
from edna2.utils import UtilsPath
from edna2.utils import UtilsSymmetry
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class MXPipelineInput(
IcatCallbackTask,
input_names=["raw_data_path"],
optional_input_names=[
"mx_pipeline_name",
"beamline",
"icat_metadata",
"forced_spacegroup",
"forced_cell",
"start_image",
"end_image",
"anomalous",
"low_res_limit",
"high_res_limit",
"callback",
"exclude_range",
],
output_names=[
"metadata",
"EDNA_proc",
"autoPROC",
"XIA2_DIALS",
"grenades_fastproc",
"grenades_parallelproc",
],
):
[docs]
def run(self):
raw_data_path = pathlib.Path(self.inputs.raw_data_path[0])
processed_data_path = pathlib.Path(
str(raw_data_path).replace("RAW_DATA", "PROCESSED_DATA")
)
if not raw_data_path.exists():
raise FileNotFoundError(str(raw_data_path))
# Load metadata
metadata = self.get_input_value("icat_metadata", None)
if metadata is None:
metadata_path = raw_data_path / "metadata.json"
if metadata_path.exists():
with open(metadata_path) as f:
metadata = json.loads(f.read())
else:
raise FileNotFoundError(str(metadata_path))
# Get beamline and proposal
beamline, proposal = UtilsPath.getBeamlineProposal(str(raw_data_path))
metadata["beamline"] = beamline
metadata["proposal"] = proposal
# Set up processing path
mx_pipeline_name = self.get_input_value(
"mx_pipeline_name", ["grenades_fastproc"]
)
metadata["no_pipelines"] = len(mx_pipeline_name)
run = 1
template = metadata["MX_template"]
prefix = template.split("_%")[0]
do_continue = True
while do_continue:
reprocess_path = processed_data_path / f"reprocess_{prefix}_run_{run}"
if reprocess_path.exists():
run += 1
else:
try:
reprocess_path.mkdir(parents=True, exist_ok=False, mode=0o755)
do_continue = False
except FileExistsError:
do_continue = True
metadata["reprocess_path"] = str(reprocess_path)
# Callback
metadata["icat_callback_url"] = self.inputs.callback
self.notify_icat({"status": "RUNNING", "outputFolder": str(reprocess_path)})
# Find the XDS.INP file - first look for RAW_DATA/.../process/xds_*/XDS.INP
list_xds_path = list(raw_data_path.glob("process/xds_*/XDS.INP"))
if not list_xds_path:
# XDS.INP not present in RAW_DATA - look in PROCESSED_DATA
list_xds_path = list(processed_data_path.glob("auto*/XDS.INP"))
if not list_xds_path:
raise RuntimeError(
f"Missing XDS path, processed_data_path: {processed_data_path}"
)
metadata["xds_inp_path"] = str(list_xds_path[0])
# Create DCOLID.txt file
dcolid_path = reprocess_path / "DCOLID.txt"
with open(dcolid_path, "w") as f:
f.write(f"datacollectionID:{metadata['MX_dataCollectionId']}\n")
# Check formatting of forced spacegroup
forced_spacegroup = self.get_input_value("forced_spacegroup", None)
if forced_spacegroup is None:
short_forced_spacegroup = None
else:
forced_spacegroup = forced_spacegroup.strip()
short_forced_spacegroup = UtilsSymmetry.get_short_space_group_name(
forced_spacegroup
)
if (
short_forced_spacegroup is None
and metadata["icat_callback_url"] is not None
):
error_message = f"Unknown space group '{forced_spacegroup}'"
self.notify_icat({"logs": {"message": error_message}})
raise RuntimeError(error_message)
metadata["forced_spacegroup"] = short_forced_spacegroup
metadata["forced_cell"] = self.get_input_value("forced_cell", None)
metadata["start_image"] = self.get_input_value("start_image", None)
metadata["end_image"] = self.get_input_value("end_image", None)
metadata["anomalous"] = self.get_input_value("anomalous", True)
metadata["low_res_limit"] = self.get_input_value("low_res_limit", None)
metadata["high_res_limit"] = self.get_input_value("high_res_limit", None)
# Parse exclude ranges and convert to list of lists of integers
exclude_range = self.get_input_value("exclude_range", None)
exclude_range = _parse_exclude_range(exclude_range)
if exclude_range:
metadata["exclude_range"] = exclude_range
metadata["job_id"] = self.job_id
self.outputs.metadata = metadata
self.outputs.EDNA_proc = "EDNA_proc" in mx_pipeline_name
self.outputs.autoPROC = "autoPROC" in mx_pipeline_name
self.outputs.XIA2_DIALS = "XIA2_DIALS" in mx_pipeline_name
self.outputs.grenades_fastproc = "grenades_fastproc" in mx_pipeline_name
self.outputs.grenades_parallelproc = "grenades_parallelproc" in mx_pipeline_name
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.callback
def _parse_exclude_range(exclude_range: Optional[str]) -> Tuple[Tuple[int, int], ...]:
if not exclude_range:
return tuple()
exclude_range = re.sub(r"\s+", "", exclude_range)
if not exclude_range:
return tuple()
pattern = r"^\d+-\d+(,\d+-\d+)*$"
if re.fullmatch(pattern, exclude_range):
return tuple(
(int(start), int(end))
for start, end in re.findall(r"(\d+)-(\d+)", exclude_range)
)
raise ValueError(f"Invalid exclude_range format '{exclude_range}'")