Source code for ooragan.file_loading

import os
import h5py
import re
import numpy as np
from glob import glob
from pathlib import Path
from typing import Any, Self, Optional, overload
from warnings import warn

from numpy.typing import NDArray

from .parameters import NullParameter, Parameter
from .util import convert_complex_to_magphase, convert_magphase_to_complex, str_to_time

KNOWN_PARAMETERS = [
    "VNA",
    "VNA Average",
    "VNA Power",
    "VNA Bandwidth",
    "Variable Attenuator",
    "VNA Frequency",
    "s21_real",
    "s21_imag",
    "s21_mag",
    "s21_phase",
    "Index",
    "Magnet",
]


def _broadcast_along_axis(array: NDArray, shape: tuple, axis: int) -> NDArray:
    """Broadcasts a 1D array along a given axis to match the target shape."""
    if array.shape == shape:
        return array
    if array.ndim != 1:
        raise ValueError("Input array muxt be 1-dimensional")
    if not (0 <= axis < len(shape)):
        raise ValueError("Axis out of bounds for target shape")
    if shape[axis] != array.shape[0]:
        raise ValueError(f"Size mismatch: shape[{axis}] != len(array)")
    reshaped = array.reshape(
        [shape[axis] if i == axis else 1 for i in range(len(shape))]
    )
    return np.broadcast_to(reshaped, shape)


def _walk_hdf(
    file_or_group: Any,
    additional_params: list[str],
) -> dict[str, NDArray]:
    """Walks an HDF file hierarchy and converts it into a dictionary."""
    out = {}
    for key in file_or_group.keys():
        if key in KNOWN_PARAMETERS or key in additional_params:
            match type(file_or_group[key]):
                case h5py.Dataset:
                    out[key] = {
                        "values": np.asarray(file_or_group[key]),
                        "description": None,
                        "unit": None,
                    }
                    if "Description" in file_or_group[key].attrs.keys():
                        out[key]["description"] = file_or_group[key].attrs[
                            "Description"
                        ]
                    if "Unit" in file_or_group[key].attrs.keys():
                        out[key]["unit"] = file_or_group[key].attrs["Unit"]
                case h5py.Group:
                    out[key] = _walk_hdf(file_or_group[key], additional_params)
                case _:
                    raise TypeError("Invalid type")
    return out


def _read_hdf(path: str, additional_params: list[str]) -> dict:
    """Reads an HDF file from its path."""
    out = {"attributes": {}, "datasets": {}, "dimensions": []}
    file = h5py.File(path, "r")
    for atr in file.attrs.keys():
        if atr in ["Ended", "Started"]:
            out["attributes"][atr] = str_to_time(file.attrs[atr])
        else:
            out["attributes"][atr] = file.attrs[atr]
    out["datasets"] = _walk_hdf(file, additional_params)
    vna_group = file["VNA"]
    assert isinstance(vna_group, h5py.Group)
    if "s21_real" in list(vna_group.keys()):
        data = vna_group["s21_real"]
        assert isinstance(data, h5py.Dataset)
        dims = [dim.keys()[0] for dim in data.dims]
    else:
        data = vna_group["s21_mag"]
        assert isinstance(data, h5py.Dataset)
        dims = [dim.keys()[0] for dim in data.dims]
    out["dimensions"] = dims
    file.close()
    return out


[docs] class File: """ Defines a loaded HDF file. .. note:: The ``File`` objects are created automatically when creating a :class:`Dataset` from a path. Parameters ---------- path : str Path to the HDF file. cryostat_attenuation : float Total attenuation present in the cryostat. Must be a negative number. additional_params : list of str, optional list of additional parameter names to extract from the files. .. note:: If left to ``None`` only those parameters will be extracted: - VNA - VNA Average - VNA Power - VNA Bandwidth - VNA Frequency - Variable Attenuator - s21_real - s21_imag - s21_mag - s21_phase - Index - Magnet """ def __init__( self, path: str, cryostat_attenuation: float, additional_params: Optional[list[str]] = None, ) -> None: self.path = path self._additional_params = ( additional_params if additional_params is not None else [] ) if cryostat_attenuation > 0: raise ValueError("Attenuation must be negative") self.cryostat_attenuation = cryostat_attenuation self._file_dict = _read_hdf(path, self._additional_params) self.infos = self._file_dict["attributes"] # Declare all possible parameters and s21_* parameters. self.vna_average = NullParameter() self.vna_bandwidth = NullParameter() self.vna_frequency = NullParameter() self.vna_power = NullParameter() self.variable_attenuator = NullParameter() self.magnet = NullParameter() self.index = NullParameter() self.voltage_bias = NullParameter() self.s21_mag = NullParameter() self.s21_phase = NullParameter() self.s21_real = NullParameter() self.s21_imag = NullParameter() self._populate_params() # Get the shape from one of the data arrays self.shape = tuple(self.s21_real.range.shape) self._dimensions = self._file_dict["dimensions"] self._reshape_params() def _populate_params(self) -> None: """ Replaces the NullParameters for Parameters when they exist in the file """ for key, value in self._file_dict["datasets"].items(): if len(value.keys()) == 1 and list(value.keys()) == [key]: attribute = key.lower().replace(" ", "_") parameter = Parameter( value[key]["values"], key, value[key]["description"], value[key]["unit"], ) self.with_param(attribute, parameter) elif list(value.keys()) == ["values", "description", "unit"]: attribute = key.lower().replace(" ", "_") parameter = Parameter( value["values"], key, value["description"], value["unit"] ) self.with_param(attribute, parameter) elif key == "VNA": self.with_param( "vna_frequency", Parameter( value["VNA Frequency"]["values"], "VNA Frequency", value["VNA Frequency"]["description"], value["VNA Frequency"]["unit"], ), ) if "s21_real" in value.keys(): self.with_param( "s21_real", Parameter( value["s21_real"]["values"], "s21_real", ), ) self.with_param( "s21_imag", Parameter( value["s21_imag"]["values"], "s21_imag", ), ) mag, phase = convert_complex_to_magphase( value["s21_real"]["values"], value["s21_imag"]["values"] ) self.with_param( "s21_mag", Parameter(mag, "s21_mag", unit="dB"), ) self.with_param( "s21_phase", Parameter(phase, "s21_phase", unit="deg") ) elif "s21_mag" in value.keys(): self.with_param( "s21_mag", Parameter( value["s21_mag"]["values"], "s21_mag", unit=value["s21_mag"]["unit"], ), ) self.with_param( "s21_phase", Parameter( value["s21_phase"]["values"], "s21_phase", unit=value["s21_phase"]["unit"], ), ) real, imag = convert_magphase_to_complex( value["s21_mag"]["values"], value["s21_phase"]["values"] ) self.with_param("s21_real", Parameter(real, "s21_real")) self.with_param("s21_imag", Parameter(imag, "s21_imag")) else: raise NotImplementedError() else: raise NotImplementedError() def _reshape_params(self) -> None: """Reshapes all parameters to match data shape.""" for p in self.list_params(): if p != "VNA Frequency" and not p.startswith("s21_"): param = self.__dict__[p.lower().replace(" ", "_")] if param.range.shape == self.shape[:-1]: continue else: param.range = _broadcast_along_axis( param.range, self.shape[:-1], self._dimensions.index(p) )
[docs] def with_param(self, attribute: str, parameter: Parameter) -> Self: """ Adds a :class:`Parameter` to a file. Parameters ---------- attribute : str Attribute name. The Parameter will be called using the ``File.attribute`` syntax. parameter : :class:`Parameter` The parameter to add. """ self.__dict__.update({attribute: parameter}) return self
def __getattribute__(self, name: str) -> Any: value = super().__getattribute__(name) if not name.startswith("__") and isinstance(value, NullParameter): print( f"UserWarning: The attribute {name} is not defined for this file! Will return a NullParameter." ) return value
[docs] def list_params(self) -> list[str]: """ Lists available parameter names. """ out = [] for value in self.__dict__.values(): if isinstance(value, Parameter) and not isinstance(value, NullParameter): out.append(value.name) return out
def __str__(self) -> str: out = f"parameters : {self.list_params()}," out += f" mean frequency = {np.mean(self.vna_frequency.range)}" return out def __repr__(self) -> str: return f"ooragan.File({self.path}, {self.list_params()}, {self.infos})"
def _load_files_from_path( path: str, cryostat_attenuation: float, additional_params: list[str], ) -> list[File]: """Loads multiple files from a directory, walking through it.""" files = [] for paths, _, _ in os.walk(path): glob_out = glob(os.path.join(paths, "*.hdf5")) glob_out.sort() for file in glob_out: file_obj = File(file, cryostat_attenuation, additional_params) files.append(file_obj) if not files: raise RuntimeError("No HDF5 files were found in this directory") return files
[docs] class Dataset: """ Data container. Contains data and information on measurements saved in HDF5 files. .. attention:: Supports only data from HDF5 files as of now. For txt files, use the old Dataset class located in ooragan.old.Dataset. Parameters ---------- path : str, optional Path of the folder for multiple data files or for a single data file. cryostat_attenuation : float, optional Total attenuation present in the cryostat. Must be a negative number. files : list of Files, optional List of :class:`File` to build a dataset from. additional_params : list of str, optional, optional list of additional parameter names to extract from the files. .. note:: If left to ``None`` only those parameters will be extracted: - VNA - VNA Average - VNA Power - VNA Bandwidth - VNA Frequency - Variable Attenuator - s21_real - s21_imag - s21_mag - s21_phase - Index - Magnet Attributes ---------- files : dict Dictionary of the contained files where the keys are the index of the file starting with 0. """ def __init__( self, path: Optional[str] = None, cryostat_attenuation: Optional[float] = None, files: Optional[list[File]] = None, additional_params: Optional[list[str]] = None, ) -> None: self.files: dict[str, File] = {} if path is not None: if cryostat_attenuation is None: raise ValueError("Cryostat attenuation must be specified with a path") if cryostat_attenuation > 0: raise ValueError("Attenuation must be negative") self.cryostat_attenuation = cryostat_attenuation if Path(path).suffix == "": additional_params = ( additional_params if additional_params is not None else [] ) files_list = _load_files_from_path( path, cryostat_attenuation, additional_params ) else: files_list = [File(path, cryostat_attenuation, additional_params)] for i, file in enumerate(files_list): self.files.update({str(i): file}) else: if files is None: raise ValueError( "A list of files must be specified if no path is given" ) att = files[0].cryostat_attenuation for file in files: if file.cryostat_attenuation != att: raise ValueError( "All attenuation must be equal to create a Dataset from files" ) self.cryostat_attenuation = att for i, file in enumerate(files): self.files.update({str(i): file}) def __getattribute__(self, name: str) -> Any: if not name.startswith("__") and re.fullmatch(r"f\d+", name): warn("Use the indexing syntax instead", DeprecationWarning, stacklevel=2) try: return self.files[name.removeprefix("f")] except KeyError: raise IndexError(f"No file with index {name.removeprefix('f')}") return super().__getattribute__(name) def __str__(self) -> str: out = "Files :" for idx, file in self.files.items(): out += "\n" + idx + ". " + file.__str__() return out def __repr__(self) -> str: return f"ooragan.Dataset({self.files}, {self.cryostat_attenuation})" @overload def __getitem__(self, index: int) -> File: ... @overload def __getitem__(self, index: slice) -> list[File]: ...
[docs] def __getitem__(self, index: int | slice) -> File | list[File]: """ Returns the File(s) with the given index (or indices). Parameters ---------- index : int or slice The index or indices (as a slice) of Files to get from this Dataset. If the start or stop of the slice are left empty, will return all Files with indices inside the given bounds. This means for returning all Files, use slice ``[:]``. Returns ------- File or list of File If a single index is specified, a single File is returned. A list otherwise. """ total_files = len(self.files.keys()) if isinstance(index, int): if not -total_files <= index <= total_files - 1: raise IndexError( "index {} out of bounds for number of files {}".format( index, total_files ) ) return self.files[str(index)] if isinstance(index, slice): if index.start and not -total_files <= index.start <= total_files - 1: raise IndexError( "index [{}, {}] out of bounds for number of files {}".format( index.start, index.stop, total_files ) ) if index.stop and not -total_files <= index.stop <= total_files - 1: raise IndexError( "index [{}, {}] out of bounds for number of files {}".format( index.start, index.stop, total_files ) ) idx_list = list(range(*index.indices(total_files))) if index.start is None or index.stop is None: idx_list = set(idx_list).intersection(map(int, list(self.files.keys()))) return [self.files[str(i)] for i in idx_list] raise TypeError("indices must be int or slice, not {}".format(type(index)))