Source code for ewoksmx.utils.hdf5_utils

import os
import shutil
from pathlib import Path

import h5py
import numpy

from ewoksmx.utils.cbf_utils import compute_sum_frames_indices


[docs] def aggregate_hdf5_masters_with_vds(list_input_master_path, output_master_path): # Check if the input files are in the same directory as the output file input_dir = Path(list_input_master_path[0]).parent output_dir = Path(output_master_path).parent if input_dir != output_dir: # Not the same directory - we must copy the input files to the output directory # in order to create correct relative links for the hdf5 files new_list_input_master_path = [] for input_master_path in list_input_master_path: new_master_path = output_dir / input_master_path.name shutil.copy(input_master_path, new_master_path) data_filename = input_master_path.name.replace( "_master.h5", "_data_000001.h5" ) new_data_path = output_dir / data_filename shutil.copy(input_dir / data_filename, new_data_path) new_list_input_master_path.append(new_master_path) list_input_master_path = new_list_input_master_path URL_DATA = "/entry/data/data_000001" with h5py.File(output_master_path, mode="w-") as fh_out: first_master = True data_dict = dict() gather_datas = dict() gathered_attributes = dict() for i, input_filename in enumerate(list_input_master_path): with h5py.File(input_filename, mode="r", locking=False) as fh_in: if first_master: copy_attributes(fh_in, fh_out) for entry_name in fh_in: entry_in = fh_in[entry_name] entry_out = fh_out.require_group(entry_name) if first_master: copy_attributes(entry_in, entry_out) gather_datas[entry_name] = { "instrument/detector/detectorSpecific/nimages": [], "sample/goniometer/omega": [], "sample/goniometer/omega_end": [], "sample/goniometer/omega_range_average": [], "sample/goniometer/omega_range_total": [], "sample/transformations/omega": [], "sample/transformations/omega_end": [], "sample/transformations/omega_range_average": [], "sample/transformations/omega_range_total": [], } gather_data = gather_datas[entry_name] create_metadata_links( entry_in, entry_out, input_filename, output_master_path, gather_data, gathered_attributes=gathered_attributes, ) if "data" in entry_in: data_dict[str(i)] = Path(input_filename).name first_master = False for entry_name in gather_datas: gather_data = gather_datas[entry_name] entry_out = fh_out.require_group(entry_name) data_out = entry_out.require_group("data") data_out.attrs["NX_class"] = "NXdata" for group in ["sample/goniometer", "sample/transformations"]: omega = numpy.asarray(gather_data[f"{group}/omega"]) omega_end = numpy.asarray(gather_data[f"{group}/omega_end"]) omega_range_average = numpy.asarray( gather_data[f"{group}/omega_range_average"] ) def write_ds(dataset_path, val): ds = entry_out.create_dataset(dataset_path, data=val) if dataset_path in gathered_attributes: ds.attrs.update(gathered_attributes[dataset_path]) write_ds(f"{group}/omega", omega) write_ds(f"{group}/omega_end", omega_end) write_ds(f"{group}/omega_range_total", omega_end[-1] - omega[0]) write_ds(f"{group}/omega_range_average", omega_range_average[0]) nimages = 0 index = 0 for entry in data_dict.keys(): index_dataset = int(int(omega[index] - omega[0]) + 1) name_dataset = f"data_{index_dataset:06}" data_out[name_dataset] = h5py.ExternalLink(data_dict[entry], URL_DATA) frames_nb = data_out[name_dataset].shape[0] nimages += frames_nb data_out[name_dataset].attrs["image_nr_low"] = ( int(round((omega[index] - omega[0]) / omega_range_average[0])) + 1 ) data_out[name_dataset].attrs["image_nr_high"] = ( int( round( (omega[index + frames_nb - 1] - omega[0]) / omega_range_average[0] ) ) + 1 ) index = nimages nimages_name = "instrument/detector/detectorSpecific/nimages" ds_nimages = entry_out.create_dataset(nimages_name, data=nimages) if nimages_name in gathered_attributes: ds_nimages.attrs.update(gathered_attributes[nimages_name])
[docs] def copy_attributes(src, dst): dst.attrs.update(src.attrs)
[docs] def convertToIntFloatString(stringValue): newValue = None if stringValue is not None: if stringValue.isdigit(): newValue = int(stringValue) else: try: newValue = float(stringValue) except ValueError: newValue = stringValue return newValue
[docs] def removeValueFromDict(inDict): newDict = {} for key, value in inDict.items(): if isinstance(value, dict): if list(value.keys())[0] == "value": newDict[key] = convertToIntFloatString(value["value"]) else: newDict[key] = removeValueFromDict(value) elif isinstance(value, list): newDict[key] = [] for item in value: if isinstance(item, dict): if list(item.keys())[0] == "value": if isinstance(value, list): for entry in value: newDict[key].append( convertToIntFloatString(entry["value"]) ) else: newDict[key].append(convertToIntFloatString(value["value"])) else: newDict[key].append(removeValueFromDict(item)) else: newDict[key] = value return newDict
[docs] def replace_dataset_keep_attrs(h5_dataset, new_data): parent_group = h5_dataset.parent name = os.path.basename(h5_dataset.name) saved_attrs = dict(h5_dataset.attrs) del parent_group[name] parent_group[name] = new_data parent_group[name].attrs.update(saved_attrs) return parent_group[name]
[docs] def merge_hdf5_wedges(wedges_master_h5_path, merged_hdf5_path): if merged_hdf5_path.exists(): merged_hdf5_path.unlink() shutil.copy(wedges_master_h5_path, merged_hdf5_path) url_data = "/entry/data/data_000001" url_nimages = "/entry/instrument/detector/detectorSpecific/nimages" url_omega = "/entry/sample/transformations/omega" url_omega_end = "/entry/sample/transformations/omega_end" url_omega_range_average = "/entry/sample/transformations/omega_range_average" url_omega_range_total = "/entry/sample/transformations/omega_range_average" url_gonio_omega = "/entry/sample/goniometer/omega" url_gonio_omega_end = "/entry/sample/goniometer/omega_end" url_gonio_omega_range_average = "/entry/sample/goniometer/omega_range_average" url_gonio_omega_range_total = "/entry/sample/goniometer/omega_range_average" url_mask = "/entry/instrument/detector/detectorSpecific/pixel_mask" with h5py.File(merged_hdf5_path, mode="r+") as hf: omega_array = hf[url_omega][()] omega_range_average = hf[url_omega_range_average][()] angle_steps, indices = compute_sum_frames_indices( omega_array, omega_range_average ) internal_stop_indices = indices + 1 boundaries = numpy.concatenate(([0], internal_stop_indices, [len(omega_array)])) nimages = len(boundaries) - 1 data_merged = numpy.zeros( (nimages, hf[url_data].shape[1], hf[url_data].shape[2]), dtype="int32" ) merged_omega = numpy.zeros(nimages, dtype="float64") merged_omega_end = numpy.zeros(nimages, dtype="float64") merged_omega_range_average = 0 mask_image = hf[url_mask][()] list_dataset = list(hf["/entry/data/"].keys()) list_dataset.sort() for i in range(nimages): idx_0 = boundaries[i] idx_1 = boundaries[i + 1] angle_increment = (idx_1 - idx_0) * omega_range_average merged_omega_range_average += angle_increment merged_omega[i] = omega_array[idx_0] merged_omega_end[i] = omega_array[idx_1 - 1] data_merged[i] = numpy.sum( hf[f"/entry/data/{list_dataset[i]}"], axis=0, dtype="int32" ) data_merged[i, mask_image == 1] = -1 data_merged[i, mask_image > 1] = -2 merged_omega_range_average /= nimages replace_dataset_keep_attrs(hf[url_nimages], numpy.uint64(nimages)) replace_dataset_keep_attrs(hf[url_omega], merged_omega) replace_dataset_keep_attrs(hf[url_omega_end], merged_omega_end) replace_dataset_keep_attrs( hf[url_omega_range_average], numpy.float64(merged_omega_range_average) ) replace_dataset_keep_attrs( hf[url_omega_range_total], numpy.float64(merged_omega_range_average) ) if "/entry/data" in hf: del hf["/entry/data"] data_group = hf["entry"].create_group("data") data_group.attrs["NX_class"] = "NXdata" data_group.create_dataset("data_000001", data=data_merged) if url_gonio_omega in hf: del hf[url_gonio_omega] hf[url_gonio_omega] = h5py.SoftLink(url_omega) if url_gonio_omega_end in hf: del hf[url_gonio_omega_end] hf[url_gonio_omega_end] = h5py.SoftLink(url_omega_end) if url_gonio_omega_range_average in hf: del hf[url_gonio_omega_range_average] hf[url_gonio_omega_range_average] = h5py.SoftLink(url_omega_range_average) if url_gonio_omega_range_total in hf: del hf[url_gonio_omega_range_total] hf[url_gonio_omega_range_total] = h5py.SoftLink(url_omega_range_total) return merged_hdf5_path