import os
import shutil
from pathlib import Path
import h5py
import numpy
from ewoksmx.utils.cbf_utils import compute_sum_frames_indices
[docs]
def aggregate_hdf5_masters_with_vds(list_input_master_path, output_master_path):
# Check if the input files are in the same directory as the output file
input_dir = Path(list_input_master_path[0]).parent
output_dir = Path(output_master_path).parent
if input_dir != output_dir:
# Not the same directory - we must copy the input files to the output directory
# in order to create correct relative links for the hdf5 files
new_list_input_master_path = []
for input_master_path in list_input_master_path:
new_master_path = output_dir / input_master_path.name
shutil.copy(input_master_path, new_master_path)
data_filename = input_master_path.name.replace(
"_master.h5", "_data_000001.h5"
)
new_data_path = output_dir / data_filename
shutil.copy(input_dir / data_filename, new_data_path)
new_list_input_master_path.append(new_master_path)
list_input_master_path = new_list_input_master_path
URL_DATA = "/entry/data/data_000001"
with h5py.File(output_master_path, mode="w-") as fh_out:
first_master = True
data_dict = dict()
gather_datas = dict()
gathered_attributes = dict()
for i, input_filename in enumerate(list_input_master_path):
with h5py.File(input_filename, mode="r", locking=False) as fh_in:
if first_master:
copy_attributes(fh_in, fh_out)
for entry_name in fh_in:
entry_in = fh_in[entry_name]
entry_out = fh_out.require_group(entry_name)
if first_master:
copy_attributes(entry_in, entry_out)
gather_datas[entry_name] = {
"instrument/detector/detectorSpecific/nimages": [],
"sample/goniometer/omega": [],
"sample/goniometer/omega_end": [],
"sample/goniometer/omega_range_average": [],
"sample/goniometer/omega_range_total": [],
"sample/transformations/omega": [],
"sample/transformations/omega_end": [],
"sample/transformations/omega_range_average": [],
"sample/transformations/omega_range_total": [],
}
gather_data = gather_datas[entry_name]
create_metadata_links(
entry_in,
entry_out,
input_filename,
output_master_path,
gather_data,
gathered_attributes=gathered_attributes,
)
if "data" in entry_in:
data_dict[str(i)] = Path(input_filename).name
first_master = False
for entry_name in gather_datas:
gather_data = gather_datas[entry_name]
entry_out = fh_out.require_group(entry_name)
data_out = entry_out.require_group("data")
data_out.attrs["NX_class"] = "NXdata"
for group in ["sample/goniometer", "sample/transformations"]:
omega = numpy.asarray(gather_data[f"{group}/omega"])
omega_end = numpy.asarray(gather_data[f"{group}/omega_end"])
omega_range_average = numpy.asarray(
gather_data[f"{group}/omega_range_average"]
)
def write_ds(dataset_path, val):
ds = entry_out.create_dataset(dataset_path, data=val)
if dataset_path in gathered_attributes:
ds.attrs.update(gathered_attributes[dataset_path])
write_ds(f"{group}/omega", omega)
write_ds(f"{group}/omega_end", omega_end)
write_ds(f"{group}/omega_range_total", omega_end[-1] - omega[0])
write_ds(f"{group}/omega_range_average", omega_range_average[0])
nimages = 0
index = 0
for entry in data_dict.keys():
index_dataset = int(int(omega[index] - omega[0]) + 1)
name_dataset = f"data_{index_dataset:06}"
data_out[name_dataset] = h5py.ExternalLink(data_dict[entry], URL_DATA)
frames_nb = data_out[name_dataset].shape[0]
nimages += frames_nb
data_out[name_dataset].attrs["image_nr_low"] = (
int(round((omega[index] - omega[0]) / omega_range_average[0])) + 1
)
data_out[name_dataset].attrs["image_nr_high"] = (
int(
round(
(omega[index + frames_nb - 1] - omega[0])
/ omega_range_average[0]
)
)
+ 1
)
index = nimages
nimages_name = "instrument/detector/detectorSpecific/nimages"
ds_nimages = entry_out.create_dataset(nimages_name, data=nimages)
if nimages_name in gathered_attributes:
ds_nimages.attrs.update(gathered_attributes[nimages_name])
[docs]
def create_external_link(source_file, source_path, output_filename):
relative_path = os.path.relpath(source_file, os.path.dirname(output_filename))
return h5py.ExternalLink(relative_path, source_path)
[docs]
def copy_attributes(src, dst):
dst.attrs.update(src.attrs)
[docs]
def convertToIntFloatString(stringValue):
newValue = None
if stringValue is not None:
if stringValue.isdigit():
newValue = int(stringValue)
else:
try:
newValue = float(stringValue)
except ValueError:
newValue = stringValue
return newValue
[docs]
def removeValueFromDict(inDict):
newDict = {}
for key, value in inDict.items():
if isinstance(value, dict):
if list(value.keys())[0] == "value":
newDict[key] = convertToIntFloatString(value["value"])
else:
newDict[key] = removeValueFromDict(value)
elif isinstance(value, list):
newDict[key] = []
for item in value:
if isinstance(item, dict):
if list(item.keys())[0] == "value":
if isinstance(value, list):
for entry in value:
newDict[key].append(
convertToIntFloatString(entry["value"])
)
else:
newDict[key].append(convertToIntFloatString(value["value"]))
else:
newDict[key].append(removeValueFromDict(item))
else:
newDict[key] = value
return newDict
[docs]
def replace_dataset_keep_attrs(h5_dataset, new_data):
parent_group = h5_dataset.parent
name = os.path.basename(h5_dataset.name)
saved_attrs = dict(h5_dataset.attrs)
del parent_group[name]
parent_group[name] = new_data
parent_group[name].attrs.update(saved_attrs)
return parent_group[name]
[docs]
def merge_hdf5_wedges(wedges_master_h5_path, merged_hdf5_path):
if merged_hdf5_path.exists():
merged_hdf5_path.unlink()
shutil.copy(wedges_master_h5_path, merged_hdf5_path)
url_data = "/entry/data/data_000001"
url_nimages = "/entry/instrument/detector/detectorSpecific/nimages"
url_omega = "/entry/sample/transformations/omega"
url_omega_end = "/entry/sample/transformations/omega_end"
url_omega_range_average = "/entry/sample/transformations/omega_range_average"
url_omega_range_total = "/entry/sample/transformations/omega_range_average"
url_gonio_omega = "/entry/sample/goniometer/omega"
url_gonio_omega_end = "/entry/sample/goniometer/omega_end"
url_gonio_omega_range_average = "/entry/sample/goniometer/omega_range_average"
url_gonio_omega_range_total = "/entry/sample/goniometer/omega_range_average"
url_mask = "/entry/instrument/detector/detectorSpecific/pixel_mask"
with h5py.File(merged_hdf5_path, mode="r+") as hf:
omega_array = hf[url_omega][()]
omega_range_average = hf[url_omega_range_average][()]
angle_steps, indices = compute_sum_frames_indices(
omega_array, omega_range_average
)
internal_stop_indices = indices + 1
boundaries = numpy.concatenate(([0], internal_stop_indices, [len(omega_array)]))
nimages = len(boundaries) - 1
data_merged = numpy.zeros(
(nimages, hf[url_data].shape[1], hf[url_data].shape[2]), dtype="int32"
)
merged_omega = numpy.zeros(nimages, dtype="float64")
merged_omega_end = numpy.zeros(nimages, dtype="float64")
merged_omega_range_average = 0
mask_image = hf[url_mask][()]
list_dataset = list(hf["/entry/data/"].keys())
list_dataset.sort()
for i in range(nimages):
idx_0 = boundaries[i]
idx_1 = boundaries[i + 1]
angle_increment = (idx_1 - idx_0) * omega_range_average
merged_omega_range_average += angle_increment
merged_omega[i] = omega_array[idx_0]
merged_omega_end[i] = omega_array[idx_1 - 1]
data_merged[i] = numpy.sum(
hf[f"/entry/data/{list_dataset[i]}"], axis=0, dtype="int32"
)
data_merged[i, mask_image == 1] = -1
data_merged[i, mask_image > 1] = -2
merged_omega_range_average /= nimages
replace_dataset_keep_attrs(hf[url_nimages], numpy.uint64(nimages))
replace_dataset_keep_attrs(hf[url_omega], merged_omega)
replace_dataset_keep_attrs(hf[url_omega_end], merged_omega_end)
replace_dataset_keep_attrs(
hf[url_omega_range_average], numpy.float64(merged_omega_range_average)
)
replace_dataset_keep_attrs(
hf[url_omega_range_total], numpy.float64(merged_omega_range_average)
)
if "/entry/data" in hf:
del hf["/entry/data"]
data_group = hf["entry"].create_group("data")
data_group.attrs["NX_class"] = "NXdata"
data_group.create_dataset("data_000001", data=data_merged)
if url_gonio_omega in hf:
del hf[url_gonio_omega]
hf[url_gonio_omega] = h5py.SoftLink(url_omega)
if url_gonio_omega_end in hf:
del hf[url_gonio_omega_end]
hf[url_gonio_omega_end] = h5py.SoftLink(url_omega_end)
if url_gonio_omega_range_average in hf:
del hf[url_gonio_omega_range_average]
hf[url_gonio_omega_range_average] = h5py.SoftLink(url_omega_range_average)
if url_gonio_omega_range_total in hf:
del hf[url_gonio_omega_range_total]
hf[url_gonio_omega_range_total] = h5py.SoftLink(url_omega_range_total)
return merged_hdf5_path