Source code for ewoksmx.tasks.Grenades_parallelproc_pipeline

import os
import pathlib
from typing import Optional

from .base_tasks.icat_task import IcatCallbackTask


[docs] class Grenades_parallelproc_pipeline( IcatCallbackTask, input_names=["metadata", "grenades_parallelproc"], output_names=["slurm_params"], ):
[docs] def run(self): metadata = self.metadata icat_dir = pathlib.Path(metadata["reprocess_path"]) / "grenades_parallelproc" grenades_working_dir = icat_dir / "nobackup" grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755) # Read XDS.INP file xds_inp_path = pathlib.Path(metadata["xds_inp_path"]) with open(xds_inp_path) as f: xds_inp = f.read() # Replace "../links" with "../../../links" xds_inp = xds_inp.replace("../links", "../../../links") new_xds_inp_path = grenades_working_dir / "XDS.INP" with open(new_xds_inp_path, "w") as f: f.write(xds_inp) command_line = "xdsproc.pl" command_line += f" -path {str(grenades_working_dir)}" command_line += f" -datacollectID {metadata['MX_dataCollectionId']}" command_line += " -mode after" command_line += " -slurm" with open(str(grenades_working_dir / "command_line.txt"), "w") as f: f.write(command_line) os.system(command_line) slurm_params = { "pipeline_name": "grenades_parallelproc", "icat_callback_url": self.metadata["icat_callback_url"], "no_pipelines": self.metadata["no_pipelines"], } self.notify_icat( {"step": {"name": slurm_params["pipeline_name"], "status": "FINISHED"}} ) self.outputs.slurm_params = slurm_params
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"]