import glob
import os
import pathlib
import shutil
from typing import List
from typing import Optional
from typing import Tuple
from edna2.utils import UtilsSlurm
from ewoksmx.template import get_rendered_template
from .base_tasks.slurm_pipeline import PrepareSlurmPipeline
[docs]
class Grenades_fastproc_pipeline(
PrepareSlurmPipeline, input_names=["grenades_fastproc"]
):
"""Prepare the Grenades pipeline.
https://www.esrf.fr/home/UsersAndScience/Experiments/MX/How_to_use_our_beamlines/Run_Your_Experiment/grenades.html
"""
PIPELINE_NAME = "grenades_fastproc"
CALC_CELL_CMD = "cell_from_space_group.pl"
[docs]
def prepare_pipeline(self):
self.grenades_working_dir.mkdir(parents=True, exist_ok=False, mode=0o755)
self._create_xds_inp()
self._modify_xds_inp()
self._create_dcolid_file()
self._create_grenades_fastproc_script()
@property
def grenades_working_dir(self) -> pathlib.Path:
return self.processing_base_dir / self.PIPELINE_NAME
@property
def xds_inp_path(self) -> pathlib.Path:
return self.grenades_working_dir / "XDS.INP"
def _modify_file_path(
self,
file_path: str,
source_dir: str,
target_dir: str,
check_exists: bool,
abs_path: bool = False,
) -> str:
if not os.path.isabs(file_path):
file_path = os.path.join(source_dir, file_path)
file_path = os.path.normpath(file_path)
if check_exists:
if not os.path.exists(file_path):
# Make up for a bug in the XDS file:
# file name has an additional extension ".gz", ".bz2", ...
file_paths = list(glob.glob(file_path + "*"))
if not file_paths:
raise RuntimeError(
f"XDS field has a path that does not exist: {file_path}"
)
file_path = sorted(file_paths)[0]
if abs_path:
return file_path
return os.path.relpath(file_path, target_dir)
[docs]
def copy_xds_inp_file(
self,
source_file_path: str,
target_file_path: str,
abs_field_paths: bool = False,
):
source_xds_dir = os.path.dirname(str(source_file_path))
target_xds_dir = os.path.dirname(str(target_file_path))
with open(source_file_path) as f:
lines = f.readlines()
fields_with_paths = {
"NAME_TEMPLATE_OF_DATA_FRAMES": False,
"X-GEO_CORR": True,
"Y-GEO_CORR": True,
}
new_lines = []
for line in lines:
for field, check_exists in fields_with_paths.items():
if field not in line:
continue
file_path = line.split("=")[1].strip()
new_file_path = self._modify_file_path(
file_path,
source_xds_dir,
target_xds_dir,
check_exists,
abs_path=abs_field_paths,
)
line = line.replace(file_path, new_file_path)
break
new_lines.append(line)
os.makedirs(target_xds_dir, exist_ok=True)
with open(target_file_path, "w") as f:
f.write("".join(new_lines))
def _create_xds_inp(self) -> None:
self.copy_xds_inp_file(self.metadata["xds_inp_path"], self.xds_inp_path)
def _modify_xds_inp(self) -> None:
self.cell_params = None
dectris_neggia_path = None
self.space_group_number = None
raw_directory = self.metadata.get("MX_directory")
exclude_range = self.metadata.get("exclude_range", None)
# Find path to dectris neggia DLL
for library_path in os.environ.get("LD_LIBRARY_PATH", "").split(":"):
lib_path = pathlib.Path(library_path) / "dectris-neggia.so"
if lib_path.is_file():
dectris_neggia_path = lib_path
break
self._modify_xds_inp_file(
raw_directory=raw_directory,
exclude_range=exclude_range,
lib=dectris_neggia_path,
)
forced_spacegroup = self.metadata.get("forced_spacegroup", None)
if forced_spacegroup is not None:
self.notify_icat(
{"step": {"name": self.PIPELINE_NAME, "status": "PREPROCESSING"}}
)
self.space_group_number, self.cell_params = self._get_spg_and_cell_params(
forced_spacegroup
)
if self.space_group_number is None or self.cell_params is None:
raise RuntimeError(
f"Cannot determine cell for space group {forced_spacegroup}"
)
self._modify_xds_inp_file(
space_group_number=self.space_group_number,
cell_params=self.cell_params,
)
def _modify_xds_inp_file(
self,
raw_directory: Optional[str] = None,
space_group_number: Optional[int] = None,
cell_params: Optional[List[float]] = None,
exclude_range: Optional[List[Tuple[int, int]]] = None,
lib: Optional[str] = None,
):
if cell_params is not None:
assert len(cell_params) == 6
with open(self.xds_inp_path) as f:
lines = f.readlines()
new_lines = []
for line in lines:
if (
"SPACE_GROUP_NUMBER" in line
and space_group_number is not None
and cell_params is not None
):
line = f" SPACE_GROUP_NUMBER= {space_group_number}\n"
space_group_number = None
elif "UNIT_CELL_CONSTANTS" in line and cell_params is not None:
line = f" UNIT_CELL_CONSTANTS= {' '.join(map(str, cell_params))}\n"
cell_params = None
if lib is not None and "LIB=" in line:
line = f" LIB= {lib}\n"
if "NAME_TEMPLATE_OF_DATA_FRAMES=" in line and raw_directory is not None:
template = line.split("/")[-1]
frames_path = os.path.join(raw_directory, template)
line = f" NAME_TEMPLATE_OF_DATA_FRAMES= {frames_path}\n"
new_lines.append(line)
if space_group_number is not None:
new_lines.append(f" SPACE_GROUP_NUMBER= {space_group_number}\n")
if cell_params is not None:
line = f" UNIT_CELL_CONSTANTS= {' '.join(map(str, cell_params))}\n"
new_lines.append(line)
if exclude_range is not None:
for begin, end in exclude_range:
new_lines.append(f" EXCLUDE_DATA_RANGE= {begin} {end}\n")
with open(self.xds_inp_path, "w") as f:
f.write("".join(new_lines))
def _get_spg_and_cell_params(
self, space_group_name: str
) -> Tuple[Optional[int], Optional[List[float]]]:
working_dir = self.grenades_working_dir / "cell_params"
working_dir.mkdir()
shutil.copy(self.xds_inp_path, working_dir)
command_line = f"{self.CALC_CELL_CMD} --space_group_name {space_group_name}"
# Submit job to SLURM
(
slurm_script_path,
slurm_id,
stdout_file_path,
stderr_file_path,
) = UtilsSlurm.submit_job_to_slurm(
command_line=command_line,
working_directory=working_dir,
queue="mx",
mem=self.SLURM_MEM,
nodes=1,
core=20,
time=1800,
name="get_spg_and_cell_params",
list_modules=["grenades"],
asynchronous=False,
)
print(working_dir)
# Parse output for space group and cell
spg_number = None
cell_params = None
with open(stdout_file_path) as f:
stdout = f.read()
list_stdout = stdout.split("\n")
for line in list_stdout:
if "LATTICE" in line:
# Convert "LATTICE: 1 77.5 155.8 155.9 60.1 89.9 89.8"
# to space group number and list of floats
list_lattice = line.split(":")[1].split()
spg_number = int(list_lattice[0])
cell_params = list(map(float, list_lattice[1:]))
break
return spg_number, cell_params
@property
def dcloid_file_paths(self) -> Tuple[pathlib.Path]:
return self.grenades_working_dir / "DCOLID.txt", self.icat_dir / "DCOLID.txt"
def _create_dcolid_file(self) -> None:
for path in self.dcloid_file_paths:
with open(path, "w") as f:
f.write(f"datacollectionID:{self.metadata['MX_dataCollectionId']}\n")
@property
def script_file_path(self) -> pathlib.Path:
return self.processing_base_dir / "grenades_fastproc.sh"
def _create_grenades_fastproc_script(self) -> None:
script = get_rendered_template(
"grenades_fastproc.sh", self.script_template_variables()
)
with open(self.script_file_path, mode="w") as f:
f.write(script)
self.script_file_path.chmod(0o755)
[docs]
def script_template_variables(self) -> dict:
anomalous = 1
if "anomalous" in self.metadata and not self.metadata["anomalous"]:
anomalous = 0
return {
"processing_base_dir": self.processing_base_dir,
"grenades_working_dir": self.grenades_working_dir,
"MX_dataCollectionId": self.metadata["MX_dataCollectionId"],
"anomalous": anomalous,
}