Source code for ewoksmx.tasks.mx_pipelines.grenades_parallelproc_pipeline

import os
import pathlib
from typing import Optional

from ewoksmx.config.instrument_config import InstrumentConfigRegistry
from ewoksmx.tasks.base_tasks.icat_task import IcatCallbackTask


[docs] class Grenades_parallelproc_pipeline( IcatCallbackTask, input_names=["metadata", "grenades_parallelproc"], output_names=["slurm_params"], ):
[docs] def run(self): metadata = self.metadata main_cfg = InstrumentConfigRegistry.load() beamline = metadata.get("beamline", "default") cfg_all_pipelines = getattr(main_cfg, beamline) cfg = getattr(cfg_all_pipelines, self.PIPELINE_NAME) icat_dir = pathlib.Path(metadata["reprocess_path"]) / cfg.icat_dir_name grenades_working_dir = icat_dir / cfg.process_working_dirname grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755) # Read XDS.INP file xds_inp_path = pathlib.Path(metadata["xds_inp_path"]) with open(xds_inp_path) as f: xds_inp = f.read() # Replace "../links" with "../../../links" xds_inp = xds_inp.replace("../links", "../../../links") new_xds_inp_path = grenades_working_dir / cfg.xds_config_filename with open(new_xds_inp_path, "w") as f: f.write(xds_inp) command_line = cfg.xds_command command_line += f" -path {str(grenades_working_dir)}" command_line += f" -datacollectID {metadata['MX_dataCollectionId']}" command_line += " -mode after" command_line += " -slurm" with open(str(grenades_working_dir / cfg.xds_command_txt_filename), "w") as f: f.write(command_line) os.system(command_line) slurm_params = { "pipeline_name": "grenades_parallelproc", "icat_callback_url": self.metadata["icat_callback_url"], "no_pipelines": self.metadata["no_pipelines"], } self.notify_icat( {"step": {"name": slurm_params["pipeline_name"], "status": "FINISHED"}} ) self.outputs.slurm_params = slurm_params
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"]