Source code for ewoksmx.tasks.Grenades_parallelproc_pipeline
import os
import pathlib
from typing import Optional
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class Grenades_parallelproc_pipeline(
IcatCallbackTask,
input_names=["metadata", "grenades_parallelproc"],
output_names=["slurm_params"],
):
[docs]
def run(self):
metadata = self.metadata
icat_dir = pathlib.Path(metadata["reprocess_path"]) / "grenades_parallelproc"
grenades_working_dir = icat_dir / "nobackup"
grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755)
# Read XDS.INP file
xds_inp_path = pathlib.Path(metadata["xds_inp_path"])
with open(xds_inp_path) as f:
xds_inp = f.read()
# Replace "../links" with "../../../links"
xds_inp = xds_inp.replace("../links", "../../../links")
new_xds_inp_path = grenades_working_dir / "XDS.INP"
with open(new_xds_inp_path, "w") as f:
f.write(xds_inp)
command_line = "xdsproc.pl"
command_line += f" -path {str(grenades_working_dir)}"
command_line += f" -datacollectID {metadata['MX_dataCollectionId']}"
command_line += " -mode after"
command_line += " -slurm"
with open(str(grenades_working_dir / "command_line.txt"), "w") as f:
f.write(command_line)
os.system(command_line)
slurm_params = {
"pipeline_name": "grenades_parallelproc",
"icat_callback_url": self.metadata["icat_callback_url"],
"no_pipelines": self.metadata["no_pipelines"],
}
self.notify_icat(
{"step": {"name": slurm_params["pipeline_name"], "status": "FINISHED"}}
)
self.outputs.slurm_params = slurm_params
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]