Source code for ewoksmx.tasks.mx_pipelines.grenades_fastproc_pipeline

import glob
import os
import pathlib
import shutil
from typing import List
from typing import Optional
from typing import Tuple

from ewoksmx.config.instrument_config import InstrumentConfigRegistry
from ewoksmx.shell_utils.execute_slurm import execute_bash_commands
from ewoksmx.tasks.base_tasks.slurm_pipeline import PrepareSlurmPipeline
from ewoksmx.template import get_rendered_template


[docs] class Grenades_fastproc_pipeline( PrepareSlurmPipeline, input_names=["grenades_fastproc"] ): """Prepare the Grenades pipeline. https://www.esrf.fr/home/UsersAndScience/Experiments/MX/How_to_use_our_beamlines/Run_Your_Experiment/grenades.html """ PIPELINE_NAME = "grenades_fastproc" CFG = InstrumentConfigRegistry.load() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._load_cfg() def _load_cfg(self) -> None: beamline = self.metadata.get("beamline", "default") cfg_all_pipelines = getattr(self.CFG, beamline) self.cfg = getattr(cfg_all_pipelines, self.PIPELINE_NAME)
[docs] def prepare_pipeline(self): self._load_cfg() self.grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755) self._create_xds_inp() self._modify_xds_inp() self._create_dcolid_file() self._create_grenades_fastproc_script()
@property def grenades_working_dir(self) -> pathlib.Path: return self.processing_base_dir / self.PIPELINE_NAME @property def xds_inp_path(self) -> pathlib.Path: xds_config_filename = self.cfg.xds_config_filename return self.grenades_working_dir / xds_config_filename def _modify_file_path( self, file_path: str, source_dir: str, target_dir: str, check_exists: bool, abs_path: bool = False, ) -> str: if not os.path.isabs(file_path): file_path = os.path.join(source_dir, file_path) file_path = os.path.normpath(file_path) if check_exists: if not os.path.exists(file_path): # Make up for a bug in the XDS file: # file name has an additional extension ".gz", ".bz2", ... file_paths = list(glob.glob(file_path + "*")) if not file_paths: raise RuntimeError( f"XDS field has a path that does not exist: {file_path}" ) file_path = sorted(file_paths)[0] if abs_path: return file_path return os.path.relpath(file_path, target_dir)
[docs] def copy_xds_inp_file( self, source_file_path: str, target_file_path: str, abs_field_paths: bool = False, ): source_xds_dir = os.path.dirname(str(source_file_path)) target_xds_dir = os.path.dirname(str(target_file_path)) with open(source_file_path) as f: lines = f.readlines() fields_with_paths = self.cfg.xds_config_fields_with_path new_lines = [] for line in lines: for field, check_exists in fields_with_paths.items(): if field not in line: continue file_path = line.split("=")[1].strip() new_file_path = self._modify_file_path( file_path, source_xds_dir, target_xds_dir, check_exists, abs_path=abs_field_paths, ) line = line.replace(file_path, new_file_path) break new_lines.append(line) os.makedirs(target_xds_dir, exist_ok=True) with open(target_file_path, "w") as f: f.write("".join(new_lines))
def _create_xds_inp(self) -> None: self.copy_xds_inp_file(self.metadata["xds_inp_path"], self.xds_inp_path) def _modify_xds_inp(self) -> None: self.cell_params = None hdf5_lib_path = None self.space_group_number = None raw_directory = self.metadata.get("MX_directory") exclude_range = self.metadata.get("exclude_range", None) hdf5_lib_path_cfg = self.cfg.hdf5_lib_path if hdf5_lib_path_cfg.exists(): hdf5_lib_path = str(hdf5_lib_path_cfg) else: for library_path in os.environ.get("LD_LIBRARY_PATH", "").split(":"): lib_path = pathlib.Path(library_path) / hdf5_lib_path_cfg if lib_path.is_file(): hdf5_lib_path = lib_path break self._modify_xds_inp_file( raw_directory=raw_directory, exclude_range=exclude_range, lib=hdf5_lib_path, ) forced_spacegroup = self.metadata.get("forced_spacegroup", None) if forced_spacegroup is not None: self.notify_icat( {"step": {"name": self.PIPELINE_NAME, "status": "PREPROCESSING"}} ) self.space_group_number, self.cell_params = self._get_spg_and_cell_params( forced_spacegroup ) if self.space_group_number is None or self.cell_params is None: raise RuntimeError( f"Cannot determine cell for space group {forced_spacegroup}" ) self._modify_xds_inp_file( space_group_number=self.space_group_number, cell_params=self.cell_params, ) def _modify_xds_inp_file( self, raw_directory: Optional[str] = None, space_group_number: Optional[int] = None, cell_params: Optional[List[float]] = None, exclude_range: Optional[List[Tuple[int, int]]] = None, lib: Optional[str] = None, ): if cell_params is not None: assert len(cell_params) == 6 with open(self.xds_inp_path) as f: lines = f.readlines() kw = self.cfg.xds_config_fields_to_modify_add new_lines = [] for line in lines: if ( str(kw["space_group_number"]) in line and space_group_number is not None and cell_params is not None ): line = f" {kw['space_group_number']}= {space_group_number}\n" space_group_number = None elif f"{kw['cell_params']}" in line and cell_params is not None: line = f" {kw['cell_params']}= {' '.join(map(str, cell_params))}\n" cell_params = None if lib is not None and f"{kw['library']}=" in line: line = f" {kw['library']}= {lib}\n" if f"{kw['frames_path']}=" in line and raw_directory is not None: template = line.split("/")[-1] frames_path = os.path.join(raw_directory, template) line = f" {kw['frames_path']}= {frames_path}\n" new_lines.append(line) if space_group_number is not None: new_lines.append(f" {kw['space_group_number']}= {space_group_number}\n") if cell_params is not None: line = f" {kw['cell_params']}= {' '.join(map(str, cell_params))}\n" new_lines.append(line) if exclude_range is not None: for begin, end in exclude_range: new_lines.append(f" {kw['exclude_range']}= {begin} {end}\n") with open(self.xds_inp_path, "w") as f: f.write("".join(new_lines)) def _get_spg_and_cell_params( self, space_group_name: str ) -> Tuple[Optional[int], Optional[List[float]]]: cell_refinement_dir_name = self.cfg.cell_refinement_dir_name working_dir = self.grenades_working_dir / cell_refinement_dir_name working_dir.mkdir() shutil.copy(self.xds_inp_path, working_dir) shell_commands = [ "module load grenades", f"{self.cfg.calc_cell_cmd} --space_group_name {space_group_name}", ] # Submit job to SLURM bash_result = execute_bash_commands( shell_commands=shell_commands, working_directory=working_dir, stdout=True, stderr=True, stdmerge=False, name="get_spg_and_cell_params", parameters={ "partition": self.cfg.slurm_queue, "mem": self.cfg.slurm_mem, "nodes": self.cfg.slurm_nodes, "cpus-per-task": self.cfg.slurm_core, "time": self.cfg.slurm_time, }, ) bash_result.raise_on_error() stdout_file_path = bash_result.stdout_path # Parse output for space group and cell spg_number = None cell_params = None with open(stdout_file_path) as f: stdout = f.read() list_stdout = stdout.split("\n") stdout_lattice_kw = self.cfg.stdout_latice_keyword for line in list_stdout: if stdout_lattice_kw in line: # Convert "LATTICE: 1 77.5 155.8 155.9 60.1 89.9 89.8" # to space group number and list of floats list_lattice = line.split(":")[1].split() spg_number = int(list_lattice[0]) cell_params = list(map(float, list_lattice[1:])) break return spg_number, cell_params @property def dcloid_file_paths(self) -> Tuple[pathlib.Path]: dc_file_name = self.cfg.dcloid_file_name return ( self.grenades_working_dir / dc_file_name, self.icat_dir / dc_file_name, ) def _create_dcolid_file(self) -> None: for path in self.dcloid_file_paths: with open(path, "w") as f: f.write(f"datacollectionID:{self.metadata['MX_dataCollectionId']}\n") @property def script_file_path(self) -> pathlib.Path: grenades_script_filename = self.cfg.grenades_script_filename return self.processing_base_dir / grenades_script_filename def _create_grenades_fastproc_script(self) -> None: dict_template_variables = self.script_template_variables() if dict_template_variables.get("high_res_limit"): grenades_script_filename = self.cfg.grenades_script_filename_high_res script = get_rendered_template( "grenades_fastproc_res.sh", self.script_template_variables() ) else: grenades_script_filename = self.cfg.grenades_script_filename script = get_rendered_template( grenades_script_filename, self.script_template_variables() ) with open(self.script_file_path, mode="w") as f: f.write(script) self.script_file_path.chmod(0o755)
[docs] def script_template_variables(self) -> dict: anomalous = 1 if "anomalous" in self.metadata and not self.metadata["anomalous"]: anomalous = 0 return { "processing_base_dir": self.processing_base_dir, "grenades_working_dir": self.grenades_working_dir, "MX_dataCollectionId": self.metadata["MX_dataCollectionId"], "anomalous": anomalous, "high_res_limit": self.metadata["high_res_limit"], }