Source code for ewoksmx.tasks.MXPipelineInput

import json
import pathlib
import re
from typing import Optional
from typing import Tuple

from edna2.utils import UtilsPath
from edna2.utils import UtilsSymmetry

from .base_tasks.icat_task import IcatCallbackTask


[docs] class MXPipelineInput( IcatCallbackTask, input_names=["raw_data_path"], optional_input_names=[ "mx_pipeline_name", "beamline", "icat_metadata", "forced_spacegroup", "forced_cell", "start_image", "end_image", "anomalous", "low_res_limit", "high_res_limit", "callback", "exclude_range", ], output_names=[ "metadata", "EDNA_proc", "autoPROC", "XIA2_DIALS", "grenades_fastproc", "grenades_parallelproc", ], ):
[docs] def run(self): raw_data_path = pathlib.Path(self.inputs.raw_data_path[0]) processed_data_path = pathlib.Path( str(raw_data_path).replace("RAW_DATA", "PROCESSED_DATA") ) if not raw_data_path.exists(): raise FileNotFoundError(str(raw_data_path)) # Load metadata metadata = self.get_input_value("icat_metadata", None) if metadata is None: metadata_path = raw_data_path / "metadata.json" if metadata_path.exists(): with open(metadata_path) as f: metadata = json.loads(f.read()) else: raise FileNotFoundError(str(metadata_path)) # Get beamline and proposal beamline, proposal = UtilsPath.getBeamlineProposal(str(raw_data_path)) metadata["beamline"] = beamline metadata["proposal"] = proposal # Set up processing path mx_pipeline_name = self.get_input_value( "mx_pipeline_name", ["grenades_fastproc"] ) metadata["no_pipelines"] = len(mx_pipeline_name) run = 1 template = metadata["MX_template"] prefix = template.split("_%")[0] do_continue = True while do_continue: reprocess_path = processed_data_path / f"reprocess_{prefix}_run_{run}" if reprocess_path.exists(): run += 1 else: try: reprocess_path.mkdir(parents=True, exist_ok=False, mode=0o755) do_continue = False except FileExistsError: do_continue = True metadata["reprocess_path"] = str(reprocess_path) # Callback metadata["icat_callback_url"] = self.inputs.callback self.notify_icat({"status": "RUNNING", "outputFolder": str(reprocess_path)}) # Find the XDS.INP file - first look for RAW_DATA/.../process/xds_*/XDS.INP list_xds_path = list(raw_data_path.glob("process/xds_*/XDS.INP")) if not list_xds_path: # XDS.INP not present in RAW_DATA - look in PROCESSED_DATA list_xds_path = list(processed_data_path.glob("auto*/XDS.INP")) if not list_xds_path: raise RuntimeError( f"Missing XDS path, processed_data_path: {processed_data_path}" ) metadata["xds_inp_path"] = str(list_xds_path[0]) # Create DCOLID.txt file dcolid_path = reprocess_path / "DCOLID.txt" with open(dcolid_path, "w") as f: f.write(f"datacollectionID:{metadata['MX_dataCollectionId']}\n") # Check formatting of forced spacegroup forced_spacegroup = self.get_input_value("forced_spacegroup", None) if forced_spacegroup is None: short_forced_spacegroup = None else: forced_spacegroup = forced_spacegroup.strip() short_forced_spacegroup = UtilsSymmetry.get_short_space_group_name( forced_spacegroup ) if ( short_forced_spacegroup is None and metadata["icat_callback_url"] is not None ): error_message = f"Unknown space group '{forced_spacegroup}'" self.notify_icat({"logs": {"message": error_message}}) raise RuntimeError(error_message) metadata["forced_spacegroup"] = short_forced_spacegroup metadata["forced_cell"] = self.get_input_value("forced_cell", None) metadata["start_image"] = self.get_input_value("start_image", None) metadata["end_image"] = self.get_input_value("end_image", None) metadata["anomalous"] = self.get_input_value("anomalous", True) metadata["low_res_limit"] = self.get_input_value("low_res_limit", None) metadata["high_res_limit"] = self.get_input_value("high_res_limit", None) # Parse exclude ranges and convert to list of lists of integers exclude_range = self.get_input_value("exclude_range", None) exclude_range = _parse_exclude_range(exclude_range) if exclude_range: metadata["exclude_range"] = exclude_range metadata["job_id"] = self.job_id self.outputs.metadata = metadata self.outputs.EDNA_proc = "EDNA_proc" in mx_pipeline_name self.outputs.autoPROC = "autoPROC" in mx_pipeline_name self.outputs.XIA2_DIALS = "XIA2_DIALS" in mx_pipeline_name self.outputs.grenades_fastproc = "grenades_fastproc" in mx_pipeline_name self.outputs.grenades_parallelproc = "grenades_parallelproc" in mx_pipeline_name
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.callback
def _parse_exclude_range(exclude_range: Optional[str]) -> Tuple[Tuple[int, int], ...]: if not exclude_range: return tuple() exclude_range = re.sub(r"\s+", "", exclude_range) if not exclude_range: return tuple() pattern = r"^\d+-\d+(,\d+-\d+)*$" if re.fullmatch(pattern, exclude_range): return tuple( (int(start), int(end)) for start, end in re.findall(r"(\d+)-(\d+)", exclude_range) ) raise ValueError(f"Invalid exclude_range format '{exclude_range}'")