Source code for ewoksmx.tasks.Grenades_fastproc_pipeline

import glob
import os
import pathlib
import shutil
from typing import List
from typing import Optional
from typing import Tuple

from edna2.utils import UtilsSlurm

from ewoksmx.template import get_rendered_template

from .base_tasks.slurm_pipeline import PrepareSlurmPipeline


[docs] class Grenades_fastproc_pipeline( PrepareSlurmPipeline, input_names=["grenades_fastproc"] ): """Prepare the Grenades pipeline. https://www.esrf.fr/home/UsersAndScience/Experiments/MX/How_to_use_our_beamlines/Run_Your_Experiment/grenades.html """ PIPELINE_NAME = "grenades_fastproc" CALC_CELL_CMD = "cell_from_space_group.pl"
[docs] def prepare_pipeline(self): self.grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755) self._create_xds_inp() self._modify_xds_inp() self._create_dcolid_file() self._create_grenades_fastproc_script()
@property def grenades_working_dir(self) -> pathlib.Path: return self.processing_base_dir / self.PIPELINE_NAME @property def xds_inp_path(self) -> pathlib.Path: return self.grenades_working_dir / "XDS.INP" def _modify_file_path( self, file_path: str, source_dir: str, target_dir: str, check_exists: bool, abs_path: bool = False, ) -> str: if not os.path.isabs(file_path): file_path = os.path.join(source_dir, file_path) file_path = os.path.normpath(file_path) if check_exists: if not os.path.exists(file_path): # Make up for a bug in the XDS file: # file name has an additional extension ".gz", ".bz2", ... file_paths = list(glob.glob(file_path + "*")) if not file_paths: raise RuntimeError( f"XDS field has a path that does not exist: {file_path}" ) file_path = sorted(file_paths)[0] if abs_path: return file_path return os.path.relpath(file_path, target_dir)
[docs] def copy_xds_inp_file( self, source_file_path: str, target_file_path: str, abs_field_paths: bool = False, ): source_xds_dir = os.path.dirname(str(source_file_path)) target_xds_dir = os.path.dirname(str(target_file_path)) with open(source_file_path) as f: lines = f.readlines() fields_with_paths = { "NAME_TEMPLATE_OF_DATA_FRAMES": False, "X-GEO_CORR": True, "Y-GEO_CORR": True, } new_lines = [] for line in lines: for field, check_exists in fields_with_paths.items(): if field not in line: continue file_path = line.split("=")[1].strip() new_file_path = self._modify_file_path( file_path, source_xds_dir, target_xds_dir, check_exists, abs_path=abs_field_paths, ) line = line.replace(file_path, new_file_path) break new_lines.append(line) os.makedirs(target_xds_dir, exist_ok=True) with open(target_file_path, "w") as f: f.write("".join(new_lines))
def _create_xds_inp(self) -> None: self.copy_xds_inp_file(self.metadata["xds_inp_path"], self.xds_inp_path) def _modify_xds_inp(self) -> None: self.cell_params = None dectris_neggia_path = None self.space_group_number = None raw_directory = self.metadata.get("MX_directory") exclude_range = self.metadata.get("exclude_range", None) # Find path to dectris neggia DLL for library_path in os.environ.get("LD_LIBRARY_PATH", "").split(":"): lib_path = pathlib.Path(library_path) / "dectris-neggia.so" if lib_path.is_file(): dectris_neggia_path = lib_path break self._modify_xds_inp_file( raw_directory=raw_directory, exclude_range=exclude_range, lib=dectris_neggia_path, ) forced_spacegroup = self.metadata.get("forced_spacegroup", None) if forced_spacegroup is not None: self.notify_icat( {"step": {"name": self.PIPELINE_NAME, "status": "PREPROCESSING"}} ) self.space_group_number, self.cell_params = self._get_spg_and_cell_params( forced_spacegroup ) if self.space_group_number is None or self.cell_params is None: raise RuntimeError( f"Cannot determine cell for space group {forced_spacegroup}" ) self._modify_xds_inp_file( space_group_number=self.space_group_number, cell_params=self.cell_params, ) def _modify_xds_inp_file( self, raw_directory: Optional[str] = None, space_group_number: Optional[int] = None, cell_params: Optional[List[float]] = None, exclude_range: Optional[List[Tuple[int, int]]] = None, lib: Optional[str] = None, ): if cell_params is not None: assert len(cell_params) == 6 with open(self.xds_inp_path) as f: lines = f.readlines() new_lines = [] for line in lines: if ( "SPACE_GROUP_NUMBER" in line and space_group_number is not None and cell_params is not None ): line = f" SPACE_GROUP_NUMBER= {space_group_number}\n" space_group_number = None elif "UNIT_CELL_CONSTANTS" in line and cell_params is not None: line = f" UNIT_CELL_CONSTANTS= {' '.join(map(str, cell_params))}\n" cell_params = None if lib is not None and "LIB=" in line: line = f" LIB= {lib}\n" if "NAME_TEMPLATE_OF_DATA_FRAMES=" in line and raw_directory is not None: template = line.split("/")[-1] frames_path = os.path.join(raw_directory, template) line = f" NAME_TEMPLATE_OF_DATA_FRAMES= {frames_path}\n" new_lines.append(line) if space_group_number is not None: new_lines.append(f" SPACE_GROUP_NUMBER= {space_group_number}\n") if cell_params is not None: line = f" UNIT_CELL_CONSTANTS= {' '.join(map(str, cell_params))}\n" new_lines.append(line) if exclude_range is not None: for begin, end in exclude_range: new_lines.append(f" EXCLUDE_DATA_RANGE= {begin} {end}\n") with open(self.xds_inp_path, "w") as f: f.write("".join(new_lines)) def _get_spg_and_cell_params( self, space_group_name: str ) -> Tuple[Optional[int], Optional[List[float]]]: working_dir = self.grenades_working_dir / "cell_params" working_dir.mkdir() shutil.copy(self.xds_inp_path, working_dir) command_line = f"{self.CALC_CELL_CMD} --space_group_name {space_group_name}" # Submit job to SLURM ( slurm_script_path, slurm_id, stdout_file_path, stderr_file_path, ) = UtilsSlurm.submit_job_to_slurm( command_line=command_line, working_directory=working_dir, queue="mx", mem=self.SLURM_MEM, nodes=1, core=20, time=1800, name="get_spg_and_cell_params", list_modules=["grenades"], asynchronous=False, ) print(working_dir) # Parse output for space group and cell spg_number = None cell_params = None with open(stdout_file_path) as f: stdout = f.read() list_stdout = stdout.split("\n") for line in list_stdout: if "LATTICE" in line: # Convert "LATTICE: 1 77.5 155.8 155.9 60.1 89.9 89.8" # to space group number and list of floats list_lattice = line.split(":")[1].split() spg_number = int(list_lattice[0]) cell_params = list(map(float, list_lattice[1:])) break return spg_number, cell_params @property def dcloid_file_paths(self) -> Tuple[pathlib.Path]: return self.grenades_working_dir / "DCOLID.txt", self.icat_dir / "DCOLID.txt" def _create_dcolid_file(self) -> None: for path in self.dcloid_file_paths: with open(path, "w") as f: f.write(f"datacollectionID:{self.metadata['MX_dataCollectionId']}\n") @property def script_file_path(self) -> pathlib.Path: return self.processing_base_dir / "grenades_fastproc.sh" def _create_grenades_fastproc_script(self) -> None: script = get_rendered_template( "grenades_fastproc.sh", self.script_template_variables() ) with open(self.script_file_path, mode="w") as f: f.write(script) self.script_file_path.chmod(0o755)
[docs] def script_template_variables(self) -> dict: anomalous = 1 if "anomalous" in self.metadata and not self.metadata["anomalous"]: anomalous = 0 return { "processing_base_dir": self.processing_base_dir, "grenades_working_dir": self.grenades_working_dir, "MX_dataCollectionId": self.metadata["MX_dataCollectionId"], "anomalous": anomalous, }