Source code for ewoksmx.utils.cbf_utils
import re
from datetime import datetime
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
import h5py
import numpy
from esrf_pathlib import ESRFPath
from fabio import cbfimage
MiniCBFUnit = Union[str, None]
MiniCBFValue = Union[float, int, str, Tuple[float, float], datetime, None]
MiniCBFHeader = List[Tuple[str, MiniCBFValue, MiniCBFUnit]]
MiniCBFHeaderDict = Dict[str, Tuple[MiniCBFValue, MiniCBFUnit]]
[docs]
def deserialize_mini_cbf_header(
mini_cbf_header: str, as_dict: bool = False
) -> Union[MiniCBFHeader, MiniCBFHeaderDict]:
header: MiniCBFHeader = list()
# Normalize line endings to \n to handle both \n and \r\n
lines = mini_cbf_header.replace("\r\n", "\n").splitlines()
# Matches the key before the number (can include spaces)
key_pattern = r"([^\d]*)"
# Matches numbers, including scientific notation
number_pattern = r"(\d*\.?\d+(?:[eE][+-]?\d+)?)"
# Matches units with leading space
unit_pattern = r"(\s[a-zA-Z\.]+)"
# Matches numbers with optional units
number_with_unit_pattern = rf"{number_pattern}{unit_pattern}?"
# Pattern for "3.0000" or "3.0000 deg."
numeric_value = re.compile(rf"^{number_with_unit_pattern}$")
# Pattern for "2024/Nov/07 10:17:09"
datetime_pattern = re.compile(r"^\d{4}/[A-Za-z]{3}/\d{2}\s\d{2}:\d{2}:\d{2}$")
# Pattern for "Silicon sensor, thickness 0.001000 m"
without_separator = re.compile(rf"^{key_pattern}{number_with_unit_pattern}$")
# Pattern for "Pixel_size 172e-6 m x 172e-6 m"
tuple_with_x = re.compile(
rf"^{key_pattern}[:=]?\s*{number_with_unit_pattern}\s*x\s*{number_with_unit_pattern}$"
)
# Pattern for "Energy_range (0, 0) eV" or "Beam_xy (1198.07, 1305.20) pixels"
tuple_with_brackets = re.compile(
rf"^{key_pattern}[:=]?s*\({number_pattern}\s*,\s*{number_pattern}\){unit_pattern}?$"
)
for line in lines:
key = ""
value = None
unit = None
# Empty line
if not line.strip():
value = ""
header.append((key, value, unit))
continue
# Remove '#' and any leading whitespace
assert line.startswith("#"), line
line = line[1:].strip()
# DateTime format
if datetime_pattern.match(line):
value = datetime.strptime(line, "%Y/%b/%d %H:%M:%S")
header.append((key, value, unit))
continue
# Split the key and value
match_result = re.match(r"(?P<key>[^=]+)\s*=\s*(?P<value>.*)", line)
if not match_result:
match_result = re.match(r"(?P<key>[^:]+)\s*:\s*(?P<value>.*)", line)
if match_result:
key = match_result.group("key").strip()
value = match_result.group("value").strip()
else:
key = line.strip()
value = ""
if value:
if value == "(nil)":
value = None
header.append((key, value, unit))
continue
else:
match_result = tuple_with_brackets.match(key)
if match_result:
key = match_result.group(1).strip()
value = (
_as_number(match_result.group(2)),
_as_number(match_result.group(3)),
)
if match_result.group(4):
unit = match_result.group(4).strip()
header.append((key, value, unit))
continue
match_result = tuple_with_x.match(key)
if match_result:
key = match_result.group(1).strip()
value = (
_as_number(match_result.group(2)),
_as_number(match_result.group(4)),
)
if match_result.group(3):
unit = match_result.group(3).strip()
if match_result.group(5):
_unit = match_result.group(5).strip()
if unit:
assert unit == _unit, (unit, _unit)
else:
unit = _unit
header.append((key, value, unit))
continue
match_result = without_separator.match(key)
if match_result:
key = match_result.group(1).strip()
value = _as_number(match_result.group(2).strip())
if match_result.group(3):
unit = match_result.group(3).strip()
header.append((key, value, unit))
continue
key, _, value = key.partition(" ")
key = key.strip()
value = value.strip()
match_result = numeric_value.match(value)
if match_result:
value = _as_number(match_result.group(1).strip())
if match_result.group(2):
unit = match_result.group(2).strip()
header.append((key, value, unit))
continue
header.append((key, value, unit))
if as_dict:
return {key: (value, unit) for key, value, unit in header if key}
return header
[docs]
def serialize_mini_cbf_header(
header: MiniCBFHeader, add_two_points: bool = False
) -> str:
formatted_lines = []
for key, value, unit in header:
if value is None:
value = "(nil)"
elif isinstance(value, datetime):
value = value.strftime("%Y/%b/%d %H:%M:%S")
elif isinstance(value, tuple):
value = f"({value[0]}, {value[1]})"
else:
value = str(value)
if unit:
value += f" {unit}"
if key and value:
if not key.endswith(":") and add_two_points:
key += ":"
formatted_lines.append(f"# {key} {value}")
elif key:
formatted_lines.append(f"# {key}")
elif value:
formatted_lines.append(f"# {value}")
else:
formatted_lines.append("")
return "\r\n".join(formatted_lines)
[docs]
def modify_mini_cbf_header(mini_cbf_header: str, values: Dict[str, float]) -> str:
"""Replace values in a mini-CBF header in case sanitation by
deserialize+serialize should be avoided."""
for key, value in values.items():
mini_cbf_header = _replace_mini_cbf_header_value(mini_cbf_header, key, value)
return mini_cbf_header
def _as_number(value: str) -> Union[str, int, float]:
try:
try:
return int(value.strip())
except ValueError:
return float(value.strip())
except ValueError:
return value
def _replace_mini_cbf_header_value(mini_cbf_header: str, key: str, value: float) -> str:
"""Replace a value in a mini-CBF header.
.. code-block:: python
key = "Angle_increment"
value = 0.2000
mini_cbf_header = "...# Angle_increment 0.1000 deg.\r\n..."
Becomes
.. code-block:: python
mini_cbf_header = "...# Angle_increment 0.2000 deg.\r\n..."
"""
index_key = mini_cbf_header.find(key)
start = mini_cbf_header.find(" ", index_key) + 1
stop = mini_cbf_header.find(" ", start)
old_value = mini_cbf_header[start:stop]
nb_decimals = len(old_value.split(".")[1])
new_value = "{0:.0{nb_decimals}f}".format(value, nb_decimals=nb_decimals)
return mini_cbf_header[:start] + new_value + mini_cbf_header[stop:]
[docs]
def generate_cbf_sum_images(
merged_h5, forced_detector_serial=None, output_directory=None
):
merged_h5 = Path(merged_h5)
prefix = merged_h5.name.split("1_master.h5")[0]
if output_directory:
output_directory = ESRFPath(output_directory)
else:
output_directory = ESRFPath(merged_h5.parent)
url_omega = "/entry/sample/transformations/omega"
url_omega_range = "/entry/sample/transformations/omega_range_average"
url_mask = "/entry/instrument/detector/detectorSpecific/pixel_mask"
with h5py.File(merged_h5, mode="r", locking=False) as fh_in:
detector = fh_in["/entry/instrument/detector/description"][()].decode("utf-8")
if forced_detector_serial:
detector_number = forced_detector_serial
else:
detector_number = fh_in["/entry/instrument/detector/detector_number"][
()
].decode("utf-8")
pixel_size = (
fh_in["/entry/instrument/detector/x_pixel_size"][()],
fh_in["/entry/instrument/detector/y_pixel_size"][()],
)
thickness = fh_in["/entry/instrument/detector/sensor_thickness"][()]
exposure_time = fh_in["/entry/instrument/detector/count_time"][()]
count_cutoff = fh_in[
"/entry/instrument/detector/detectorSpecific/countrate_correction_count_cutoff"
][()]
wavelength = fh_in["/entry/sample/beam/incident_wavelength"][()]
detector_distance = fh_in["/entry/instrument/detector/detector_distance"][()]
beam_xy = (
fh_in["/entry/instrument/detector/beam_center_x"][()],
fh_in["/entry/instrument/detector/beam_center_y"][()],
)
static_header_list = [
("", "", None), # An empty line
("Detector:", f"{detector}, {detector_number}", None),
("Pixel_size", f"{pixel_size[0]} m x {pixel_size[1]} m", None),
("Silicon sensor, thickness", thickness, "m"),
("Exposure_time", exposure_time, "s"),
("Exposure_period", exposure_time, "s"),
("Count_cutoff", count_cutoff, "counts"),
("Wavelength", wavelength, "A"),
("Detector_distance", detector_distance, "m"),
("Beam_xy", beam_xy, "pixels"),
]
omega_array = fh_in[url_omega][()]
omega_range = fh_in[url_omega_range][()]
angle_steps, indices = compute_sum_frames_indices(omega_array, omega_range)
mask_image = fh_in[url_mask][()]
internal_stop_indices = indices + 1
boundaries = numpy.concatenate(([0], internal_stop_indices, [len(omega_array)]))
list_cbf_path = []
list_subwedges = []
list_dataset = list(fh_in["/entry/data/"].keys())
list_dataset.sort()
for i in range(len(boundaries) - 1):
idx_0 = boundaries[i]
idx_1 = boundaries[i + 1]
angle_increment = (idx_1 - idx_0) * omega_range
axis_start = round(float(omega_array[idx_0]), 1)
axis_end = round(float(axis_start + angle_increment), 1)
list_subwedges.append({"axis_start": axis_start, "axis_end": axis_end})
header_list = create_full_header_list(
static_header_list, axis_start, angle_increment
)
sum_image = numpy.sum(
fh_in[f"/entry/data/{list_dataset[i]}"][()], axis=0, dtype="int32"
)
sum_image[mask_image == 1] = -1
sum_image[mask_image > 1] = -2
commented_header_content = serialize_mini_cbf_header(header_list)
fabio_header = {
"_array_data.header_convention": "SLS_1.0",
"_array_data.header_contents": commented_header_content,
}
cbf_output_path = output_directory / f"{prefix}{i+1:04d}.cbf"
cbf_image_fabio = cbfimage.CbfImage(data=sum_image, header=fabio_header)
cbf_image_fabio.write(cbf_output_path)
list_cbf_path.append(cbf_output_path)
metadata_dict = {
"master_path": str(merged_h5),
"detector_type": "pilatus4_4m",
"beam_position_x": round(float(beam_xy[0]), 1),
"beam_position_y": round(float(beam_xy[1]), 1),
"detector_distance": round(float(detector_distance), 4),
"wavelength": round(float(wavelength), 4),
"exposure_time": round(float(exposure_time), 5),
"oscillation_width": round(float(omega_range), 5),
"subwedge": list_subwedges,
}
return list_cbf_path, metadata_dict
[docs]
def compute_sum_frames_indices(omega_array, omega_range):
differences = numpy.diff(omega_array)
threshold = omega_range * 5
large_step_indices = numpy.where(differences > threshold)[0]
num_steps = len(large_step_indices)
return num_steps, large_step_indices
[docs]
def create_full_header_list(base_list, start_angle, angle_increment):
full_header = base_list.copy()
full_header.append(("Start_angle", start_angle, "deg."))
full_header.append(("Angle_increment", angle_increment, "deg."))
return full_header