Source code for ewoksid24.io.xasmap

import logging
from dataclasses import dataclass
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple

import h5py
import numpy
from silx.io import h5py_utils

_logger = logging.getLogger(__name__)


[docs] @dataclass class XasMapData: mu: numpy.ndarray # shape (npts, nenergy) energy: numpy.ndarray # shape (npts, nenergy) x0: numpy.ndarray # shape (npts,) x1: numpy.ndarray # shape (npts,) x0_name: str x1_name: str energy_units: Optional[str] = None x0_units: Optional[str] = None x1_units: Optional[str] = None
[docs] def is_empty(self) -> bool: return ( self.mu.size == 0 or self.energy.size == 0 or self.x0.size == 0 or self.x1.size == 0 )
@dataclass class _AccumulateData: mu: List[numpy.ndarray] # npts x shape (nenergy,) energy: List[numpy.ndarray] # npts x shape (nenergy,) x0: List[numpy.ndarray] # shape (npts,) x1: List[numpy.ndarray] # shape (npts,) energy_units: Optional[str] = None x0_units: Optional[str] = None x1_units: Optional[str] = None def extend(self, other: "_AccumulateData") -> None: self.mu.extend(other.mu) self.energy.extend(other.energy) self.x0.extend(other.x0) self.x1.extend(other.x1) self.energy_units = self.energy_units or other.energy_units self.x0_units = self.x0_units or other.x0_units self.x1_units = self.x1_units or other.x1_units
[docs] def read_xasmap( filenames: Sequence[str], dim0_counter: str, dim1_counter: str, energy_counter: str, mu_counter: str, scan_ranges: Optional[Sequence[Optional[Tuple[int, int]]]] = None, exclude_scans: Optional[Sequence[Optional[Sequence[int]]]] = None, subscan: int = 1, ) -> XasMapData: xasmapdata = _AccumulateData( mu=list(), energy=list(), x0=list(), x1=list(), ) if not scan_ranges: scan_ranges = [None] * len(filenames) if not exclude_scans: exclude_scans = [None] * len(filenames) for filename, scan_range, exclude_scans in zip( filenames, scan_ranges, exclude_scans ): file_xasmapdata = _read_xasmap_file( filename, dim0_counter, dim1_counter, energy_counter, mu_counter, scan_range=scan_range, exclude_scans=exclude_scans, subscan=subscan, ) xasmapdata.extend(file_xasmapdata) return XasMapData( mu=numpy.array(xasmapdata.mu), energy=numpy.array(xasmapdata.energy), x0=numpy.array(xasmapdata.x0), x1=numpy.array(xasmapdata.x1), x0_name=dim0_counter, x1_name=dim1_counter, energy_units=xasmapdata.energy_units, x0_units=xasmapdata.x0_units, x1_units=xasmapdata.x1_units, )
def _read_xasmap_file( filename: str, dim0_counter: str, dim1_counter: str, energy_counter: str, mu_counter: str, scan_range: Optional[Tuple[int, int]] = None, exclude_scans: Optional[Sequence[int]] = None, subscan: int = 1, ) -> _AccumulateData: with h5py_utils.File(filename) as nxroot: scans = sorted(int(scan.split(".")[0]) for scan in nxroot) if scan_range: scans = [nr for nr in scans if nr >= scan_range[0] and nr <= scan_range[1]] if exclude_scans: scans = [nr for nr in scans if nr not in exclude_scans] xasmapdata = _AccumulateData( mu=list(), energy=list(), x0=list(), x1=list(), ) for scan in scans: uri = f"{filename}::/{scan}.{subscan}" try: scan_xasmapdata = _read_xas_scan( nxroot, scan, subscan, uri, dim0_counter, dim1_counter, energy_counter, mu_counter, ) xasmapdata.extend(scan_xasmapdata) except Exception: _logger.exception("Error in scan %r", uri) return xasmapdata def _read_xas_scan( nxroot: h5py.Group, scan: int, subscan: int, uri: str, dim0_counter: str, dim1_counter: str, energy_counter: str, mu_counter: str, ) -> _AccumulateData: xasmapdata = _AccumulateData( mu=list(), energy=list(), x0=list(), x1=list(), ) # Scan group (Nxentry) entry_name = f"{scan}.{subscan}" if entry_name not in nxroot: _logger.warning("BLISS scan does not exist (%r)", uri) return xasmapdata nxentry = nxroot[entry_name] end_reason = _get_scan_end_reason(nxentry) # Measurement group (counters) if "measurement" not in nxentry: if end_reason: _logger.warning("Skip BLISS scan %r (%s)", uri, end_reason) else: _logger.warning("BLISS scan has no 'measurement' group (%r)", uri) return xasmapdata measurement = nxentry["measurement"] # mu vs. energy if energy_counter not in measurement: _logger.warning("BLISS scan has no '%s' counter (%r)", energy_counter, uri) return xasmapdata if mu_counter not in measurement: _logger.warning("BLISS scan has no '%s' counter (%r)", mu_counter, uri) return xasmapdata energy_dset = measurement[energy_counter] mu_dset = measurement[mu_counter] # Map axes x0_dset = None x1_dset = None if dim0_counter in measurement and dim1_counter in measurement: # Bug in ID24-ED dmap: 0D in positioners instead of 1D x0_dset = measurement[dim0_counter] x1_dset = measurement[dim1_counter] elif "instrument/positioners" in nxentry: positioners = nxentry["instrument/positioners"] if dim0_counter in positioners and dim1_counter in positioners: x0_dset = positioners[dim0_counter] x1_dset = positioners[dim1_counter] if x0_dset is None: _logger.warning( "BLISS scan has no '%s' or '%s' counter (%r)", dim0_counter, dim1_counter, uri, ) return xasmapdata # Shapes (npts, nenergy) or (npts,) or (nenergy,) x0_data = _extract_data(x0_dset) x1_data = _extract_data(x1_dset) energy_data = _extract_data(energy_dset) mu_data = _extract_data(mu_dset) # Shape (npts,) if x0_data.ndim != 1: raise ValueError(f"{dim0_counter!r} must be 1D ({uri})") if x1_data.ndim != 1: raise ValueError(f"{dim1_counter!r} must be 1D ({uri})") if mu_data.ndim == 1: # One scan == one XAS spectrum if end_reason and end_reason != "SUCCESS": _logger.warning("Skip single XAS scan %r (%s)", uri, end_reason) return xasmapdata if energy_data.ndim != 1: raise ValueError(f"{energy_counter!r} must be 1D ({uri})") if x0_data.size != 1: raise ValueError(f"{dim0_counter!r} must be 0D ({uri})") if x1_data.size != 1: raise ValueError(f"{dim1_counter!r} must be 0D ({uri})") xasmapdata = _parse_single_xas_data( energy_data, mu_data, x0_data, x1_data, ) else: # One scan == many XAS spectra xasmapdata = _parse_multi_xas_data( energy_data, mu_data, x0_data, x1_data, ) xasmapdata.energy_units = energy_dset.attrs.get("units") xasmapdata.x0_units = x0_dset.attrs.get("units") xasmapdata.x1_units = x1_dset.attrs.get("units") return xasmapdata def _get_scan_end_reason(nxentry: h5py.Group) -> Optional[str]: if "end_reason" not in nxentry: return end_reason = nxentry["end_reason"][()] if isinstance(end_reason, bytes): end_reason = end_reason.decode() return end_reason def _parse_single_xas_data( energy_data: numpy.ndarray, mu_data: numpy.ndarray, x0_data: numpy.ndarray, x1_data: numpy.ndarray, ) -> _AccumulateData: nenergy = min(len(energy_data), len(mu_data)) x0_data = x0_data[0] x1_data = x1_data[0] return _AccumulateData( mu=[mu_data[:nenergy]], energy=[energy_data[:nenergy]], x0=[x0_data], x1=[x1_data], ) def _parse_multi_xas_data( energy_data: numpy.ndarray, mu_data: numpy.ndarray, x0_data: numpy.ndarray, x1_data: numpy.ndarray, ) -> _AccumulateData: npts, nenergy = mu_data.shape if energy_data.ndim == 1: # Shape (nenergy,) -> (npts, nenergy) energy_data = numpy.tile(energy_data, (npts, 1)) nenergy = min(nenergy, energy_data.shape[1]) npts = min(len(mu_data), len(energy_data), len(x0_data), len(x1_data)) return _AccumulateData( mu=mu_data[:npts, :nenergy].tolist(), energy=energy_data[:npts, :nenergy].tolist(), x0=x0_data[:npts].tolist(), x1=x1_data[:npts].tolist(), ) def _extract_data(dataset: h5py.Dataset) -> numpy.ndarray: """The shape to the returned dataset is 1D or 2D.""" if dataset.ndim == 0: return numpy.array([dataset[()]]) elif dataset.ndim == 1: return dataset[()] elif dataset.ndim == 2: if dataset.shape[0] == 1: return dataset[0] else: return dataset[()] else: raise ValueError( f"Dataset has {dataset.ndim} dimensions which is not supported ({dataset.file.filename}::{dataset.name})" )