Source code for ewoksmx.utils.cbf_utils

import re
from datetime import datetime
from pathlib import Path
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

import h5py
import numpy
from esrf_pathlib import ESRFPath
from fabio import cbfimage

MiniCBFUnit = Union[str, None]
MiniCBFValue = Union[float, int, str, Tuple[float, float], datetime, None]
MiniCBFHeader = List[Tuple[str, MiniCBFValue, MiniCBFUnit]]
MiniCBFHeaderDict = Dict[str, Tuple[MiniCBFValue, MiniCBFUnit]]


[docs] def deserialize_mini_cbf_header( mini_cbf_header: str, as_dict: bool = False ) -> Union[MiniCBFHeader, MiniCBFHeaderDict]: header: MiniCBFHeader = list() # Normalize line endings to \n to handle both \n and \r\n lines = mini_cbf_header.replace("\r\n", "\n").splitlines() # Matches the key before the number (can include spaces) key_pattern = r"([^\d]*)" # Matches numbers, including scientific notation number_pattern = r"(\d*\.?\d+(?:[eE][+-]?\d+)?)" # Matches units with leading space unit_pattern = r"(\s[a-zA-Z\.]+)" # Matches numbers with optional units number_with_unit_pattern = rf"{number_pattern}{unit_pattern}?" # Pattern for "3.0000" or "3.0000 deg." numeric_value = re.compile(rf"^{number_with_unit_pattern}$") # Pattern for "2024/Nov/07 10:17:09" datetime_pattern = re.compile(r"^\d{4}/[A-Za-z]{3}/\d{2}\s\d{2}:\d{2}:\d{2}$") # Pattern for "Silicon sensor, thickness 0.001000 m" without_separator = re.compile(rf"^{key_pattern}{number_with_unit_pattern}$") # Pattern for "Pixel_size 172e-6 m x 172e-6 m" tuple_with_x = re.compile( rf"^{key_pattern}[:=]?\s*{number_with_unit_pattern}\s*x\s*{number_with_unit_pattern}$" ) # Pattern for "Energy_range (0, 0) eV" or "Beam_xy (1198.07, 1305.20) pixels" tuple_with_brackets = re.compile( rf"^{key_pattern}[:=]?s*\({number_pattern}\s*,\s*{number_pattern}\){unit_pattern}?$" ) for line in lines: key = "" value = None unit = None # Empty line if not line.strip(): value = "" header.append((key, value, unit)) continue # Remove '#' and any leading whitespace assert line.startswith("#"), line line = line[1:].strip() # DateTime format if datetime_pattern.match(line): value = datetime.strptime(line, "%Y/%b/%d %H:%M:%S") header.append((key, value, unit)) continue # Split the key and value match_result = re.match(r"(?P<key>[^=]+)\s*=\s*(?P<value>.*)", line) if not match_result: match_result = re.match(r"(?P<key>[^:]+)\s*:\s*(?P<value>.*)", line) if match_result: key = match_result.group("key").strip() value = match_result.group("value").strip() else: key = line.strip() value = "" if value: if value == "(nil)": value = None header.append((key, value, unit)) continue else: match_result = tuple_with_brackets.match(key) if match_result: key = match_result.group(1).strip() value = ( _as_number(match_result.group(2)), _as_number(match_result.group(3)), ) if match_result.group(4): unit = match_result.group(4).strip() header.append((key, value, unit)) continue match_result = tuple_with_x.match(key) if match_result: key = match_result.group(1).strip() value = ( _as_number(match_result.group(2)), _as_number(match_result.group(4)), ) if match_result.group(3): unit = match_result.group(3).strip() if match_result.group(5): _unit = match_result.group(5).strip() if unit: assert unit == _unit, (unit, _unit) else: unit = _unit header.append((key, value, unit)) continue match_result = without_separator.match(key) if match_result: key = match_result.group(1).strip() value = _as_number(match_result.group(2).strip()) if match_result.group(3): unit = match_result.group(3).strip() header.append((key, value, unit)) continue key, _, value = key.partition(" ") key = key.strip() value = value.strip() match_result = numeric_value.match(value) if match_result: value = _as_number(match_result.group(1).strip()) if match_result.group(2): unit = match_result.group(2).strip() header.append((key, value, unit)) continue header.append((key, value, unit)) if as_dict: return {key: (value, unit) for key, value, unit in header if key} return header
[docs] def serialize_mini_cbf_header( header: MiniCBFHeader, add_two_points: bool = False ) -> str: formatted_lines = [] for key, value, unit in header: if value is None: value = "(nil)" elif isinstance(value, datetime): value = value.strftime("%Y/%b/%d %H:%M:%S") elif isinstance(value, tuple): value = f"({value[0]}, {value[1]})" else: value = str(value) if unit: value += f" {unit}" if key and value: if not key.endswith(":") and add_two_points: key += ":" formatted_lines.append(f"# {key} {value}") elif key: formatted_lines.append(f"# {key}") elif value: formatted_lines.append(f"# {value}") else: formatted_lines.append("") return "\r\n".join(formatted_lines)
[docs] def modify_mini_cbf_header(mini_cbf_header: str, values: Dict[str, float]) -> str: """Replace values in a mini-CBF header in case sanitation by deserialize+serialize should be avoided.""" for key, value in values.items(): mini_cbf_header = _replace_mini_cbf_header_value(mini_cbf_header, key, value) return mini_cbf_header
def _as_number(value: str) -> Union[str, int, float]: try: try: return int(value.strip()) except ValueError: return float(value.strip()) except ValueError: return value def _replace_mini_cbf_header_value(mini_cbf_header: str, key: str, value: float) -> str: """Replace a value in a mini-CBF header. .. code-block:: python key = "Angle_increment" value = 0.2000 mini_cbf_header = "...# Angle_increment 0.1000 deg.\r\n..." Becomes .. code-block:: python mini_cbf_header = "...# Angle_increment 0.2000 deg.\r\n..." """ index_key = mini_cbf_header.find(key) start = mini_cbf_header.find(" ", index_key) + 1 stop = mini_cbf_header.find(" ", start) old_value = mini_cbf_header[start:stop] nb_decimals = len(old_value.split(".")[1]) new_value = "{0:.0{nb_decimals}f}".format(value, nb_decimals=nb_decimals) return mini_cbf_header[:start] + new_value + mini_cbf_header[stop:]
[docs] def generate_cbf_sum_images( merged_h5, forced_detector_serial=None, output_directory=None ): merged_h5 = Path(merged_h5) prefix = merged_h5.name.split("1_master.h5")[0] if output_directory: output_directory = ESRFPath(output_directory) else: output_directory = ESRFPath(merged_h5.parent) url_omega = "/entry/sample/transformations/omega" url_omega_range = "/entry/sample/transformations/omega_range_average" url_mask = "/entry/instrument/detector/detectorSpecific/pixel_mask" with h5py.File(merged_h5, mode="r", locking=False) as fh_in: detector = fh_in["/entry/instrument/detector/description"][()].decode("utf-8") if forced_detector_serial: detector_number = forced_detector_serial else: detector_number = fh_in["/entry/instrument/detector/detector_number"][ () ].decode("utf-8") pixel_size = ( fh_in["/entry/instrument/detector/x_pixel_size"][()], fh_in["/entry/instrument/detector/y_pixel_size"][()], ) thickness = fh_in["/entry/instrument/detector/sensor_thickness"][()] exposure_time = fh_in["/entry/instrument/detector/count_time"][()] count_cutoff = fh_in[ "/entry/instrument/detector/detectorSpecific/countrate_correction_count_cutoff" ][()] wavelength = fh_in["/entry/sample/beam/incident_wavelength"][()] detector_distance = fh_in["/entry/instrument/detector/detector_distance"][()] beam_xy = ( fh_in["/entry/instrument/detector/beam_center_x"][()], fh_in["/entry/instrument/detector/beam_center_y"][()], ) static_header_list = [ ("", "", None), # An empty line ("Detector:", f"{detector}, {detector_number}", None), ("Pixel_size", f"{pixel_size[0]} m x {pixel_size[1]} m", None), ("Silicon sensor, thickness", thickness, "m"), ("Exposure_time", exposure_time, "s"), ("Exposure_period", exposure_time, "s"), ("Count_cutoff", count_cutoff, "counts"), ("Wavelength", wavelength, "A"), ("Detector_distance", detector_distance, "m"), ("Beam_xy", beam_xy, "pixels"), ] omega_array = fh_in[url_omega][()] omega_range = fh_in[url_omega_range][()] angle_steps, indices = compute_sum_frames_indices(omega_array, omega_range) mask_image = fh_in[url_mask][()] internal_stop_indices = indices + 1 boundaries = numpy.concatenate(([0], internal_stop_indices, [len(omega_array)])) list_cbf_path = [] list_subwedges = [] list_dataset = list(fh_in["/entry/data/"].keys()) list_dataset.sort() for i in range(len(boundaries) - 1): idx_0 = boundaries[i] idx_1 = boundaries[i + 1] angle_increment = (idx_1 - idx_0) * omega_range axis_start = round(float(omega_array[idx_0]), 1) axis_end = round(float(axis_start + angle_increment), 1) list_subwedges.append({"axis_start": axis_start, "axis_end": axis_end}) header_list = create_full_header_list( static_header_list, axis_start, angle_increment ) sum_image = numpy.sum( fh_in[f"/entry/data/{list_dataset[i]}"][()], axis=0, dtype="int32" ) sum_image[mask_image == 1] = -1 sum_image[mask_image > 1] = -2 commented_header_content = serialize_mini_cbf_header(header_list) fabio_header = { "_array_data.header_convention": "SLS_1.0", "_array_data.header_contents": commented_header_content, } cbf_output_path = output_directory / f"{prefix}{i+1:04d}.cbf" cbf_image_fabio = cbfimage.CbfImage(data=sum_image, header=fabio_header) cbf_image_fabio.write(cbf_output_path) list_cbf_path.append(cbf_output_path) metadata_dict = { "master_path": str(merged_h5), "detector_type": "pilatus4_4m", "beam_position_x": round(float(beam_xy[0]), 1), "beam_position_y": round(float(beam_xy[1]), 1), "detector_distance": round(float(detector_distance), 4), "wavelength": round(float(wavelength), 4), "exposure_time": round(float(exposure_time), 5), "oscillation_width": round(float(omega_range), 5), "subwedge": list_subwedges, } return list_cbf_path, metadata_dict
[docs] def compute_sum_frames_indices(omega_array, omega_range): differences = numpy.diff(omega_array) threshold = omega_range * 5 large_step_indices = numpy.where(differences > threshold)[0] num_steps = len(large_step_indices) return num_steps, large_step_indices
[docs] def create_full_header_list(base_list, start_angle, angle_increment): full_header = base_list.copy() full_header.append(("Start_angle", start_angle, "deg.")) full_header.append(("Angle_increment", angle_increment, "deg.")) return full_header