import copy
import importlib
import logging
from pathlib import Path
import numpy as np
from ctapipe.containers import Container, EventType, Field, Map, partial
from ctapipe.core.container import FieldValidationError
from ctapipe.io import HDF5TableReader
from tables.exceptions import NoSuchNodeError
logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers
__all__ = [
"ArrayDataContainer",
"TriggerMapContainer",
"get_array_keys",
"merge_map_ArrayDataContainer",
]
[docs]
def get_array_keys(container: Container):
"""Return a list of keys corresponding to fields which are array type in the given
container.
Parameters:
container (Container): The container object to search for array fields.
Returns:
list: A list of keys corresponding to array fields in the container.
Example:
>>> container = Container()
>>> container.field1 = np.array([1, 2, 3])
>>> container.field2 = 5
>>> container.field3 = np.array([4, 5, 6])
>>> get_array_keys(container)
['field1', 'field3']
"""
keys = []
for key, field in container.fields.items():
if field.type == np.ndarray:
keys.append(key)
return keys
[docs]
class NectarCAMContainer(Container):
"""Base class for the NectarCAM containers.
This container cannot be recursive, to be directly written with a HDF5TableWriter.
"""
@staticmethod
def _container_from_hdf5(
path, container_class, index_component=0, group_name="data"
):
"""Static method to read a container from an HDF5 file.
Parameters:
path (str or Path): The path to the HDF5 file.
container_class (Container): The class of the container to be filled with data.
Yields:
Container: The container from the data in the HDF5 file.
Example:
>>> container = NectarCAMContainer._container_from_hdf5('path_to_file.h5',
MyContainerClass)
"""
if isinstance(path, str):
path = Path(path)
container = container_class()
with HDF5TableReader(path) as reader:
tableReader = reader.read(
table_name=(
f"/{group_name}/{container_class.__name__}_{index_component}"
),
containers=container_class,
)
container = next(tableReader)
yield container
[docs]
@classmethod
def from_hdf5(cls, path, index_component=0, group_name="data"):
"""Reads a container from an HDF5 file.
Parameters:
path (str or Path): The path to the HDF5 file.
This method will call the _container_from_hdf5 method with the container
argument associated to its own class (ArrayDataContainer)
Yields:
Container: The container generator linked to the HDF5 file.
Example:
>>> container = NectarCAMContainer.from_hdf5('path_to_file.h5')
"""
return cls._container_from_hdf5(
path,
container_class=cls,
index_component=index_component,
group_name=group_name,
)
[docs]
class ArrayDataContainer(NectarCAMContainer):
"""A container that holds information about waveforms from a specific run.
Attributes:
run_number (int): The run number associated with the waveforms.
nevents (int): The number of events.
npixels (int): The number of pixels.
camera (str): The name of the camera.
pixels_id (np.ndarray): An array of pixel IDs.
broken_pixels_hg (np.ndarray): An array of high gain broken pixels.
broken_pixels_lg (np.ndarray): An array of low gain broken pixels.
ucts_timestamp (np.ndarray): An array of events' UCTS timestamps.
ucts_busy_counter (np.ndarray): An array of UCTS busy counters.
ucts_event_counter (np.ndarray): An array of UCTS event counters.
event_type (np.ndarray): An array of trigger event types.
event_id (np.ndarray): An array of event IDs.
trig_pattern_all (np.ndarray): An array of trigger patterns.
trig_pattern (np.ndarray): An array of reduced trigger patterns.
multiplicity (np.ndarray): An array of events' multiplicities.
"""
run_number = Field(
type=np.uint16,
description="run number associated to the waveforms",
)
nevents = Field(
type=np.uint64,
description="number of events",
)
npixels = Field(
type=np.uint16,
description="number of effective pixels",
)
pixels_id = Field(type=np.ndarray, dtype=np.uint16, ndim=1, description="pixel ids")
broken_pixels_hg = Field(
type=np.ndarray, dtype=bool, ndim=2, description="high gain broken pixels"
)
broken_pixels_lg = Field(
type=np.ndarray, dtype=bool, ndim=2, description="low gain broken pixels"
)
camera = Field(
type=str,
description="camera name",
)
ucts_timestamp = Field(
type=np.ndarray, dtype=np.uint64, ndim=1, description="events ucts timestamp"
)
ucts_busy_counter = Field(
type=np.ndarray, dtype=np.uint32, ndim=1, description="ucts busy counter"
)
ucts_event_counter = Field(
type=np.ndarray, dtype=np.uint32, ndim=1, description="ucts event counter"
)
event_type = Field(
type=np.ndarray, dtype=np.uint8, ndim=1, description="trigger event type"
)
event_id = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event ids")
trig_pattern_all = Field(
type=np.ndarray, dtype=bool, ndim=3, description="trigger pattern"
)
trig_pattern = Field(
type=np.ndarray, dtype=bool, ndim=2, description="reduced trigger pattern"
)
multiplicity = Field(
type=np.ndarray, dtype=np.uint16, ndim=1, description="events multiplicity"
)
[docs]
class TriggerMapContainer(Container):
"""Class representing a TriggerMapContainer.
This class inherits from the `Container` class and is used to store trigger mappings
of containers.
Attributes:
containers (Field): A field representing the trigger mapping of containers.
Methods:
is_empty(): Checks if the TriggerMapContainer is empty.
validate(): Validates the TriggerMapContainer by checking if all the containers
mapped are filled by correct type.
Example:
>>> container = TriggerMapContainer()
>>> container.is_empty()
True
>>> container.validate()
None
"""
containers = Field(
default_factory=partial(Map, Container),
description="trigger mapping of Container",
)
[docs]
@classmethod
def from_hdf5(cls, path, slice_index=None, index_component=0):
"""Reads a container from an HDF5 file.
Parameters:
path (str or Path): The path to the HDF5 file.
slice_index (int, optional): The index of the slice of data within the hdf5 file
to read. Default is None.This method will call the _container_from_hdf5 method
with the container argument associated to its own class (ArrayDataContainer)
Yields:
Container: The container generator linked to the HDF5 file.
Example:
>>> container = ArrayDataContainer.from_hdf5('path_to_file.h5')
"""
return cls._container_from_hdf5(
path,
slice_index=slice_index,
container_class=cls,
index_component=index_component,
)
@staticmethod
def _container_from_hdf5(
path, container_class, slice_index=None, index_component=0
):
# The way this method is coded is bad, there are confliuct behavior bettween
# containers inherited from TriggerMapContainer to truly be mapped with trigger,
# and those mapped with slices
"""Reads a container from an HDF5 file.
Parameters:
path (str or Path): The path to the HDF5 file.
container_class (Container): The class of the container to be read.
slice_index (int, optional): The index of the slice of data within the hdf5 file
to read. Default is None.
This method first checks if the path is a string and converts it to a Path
object
if it is.
It then imports the module of the container class and creates an instance of the
container class.
If the HDF5 file contains more than one slice and no slice index is provided,
it reads all slices and yields a generator of containers.
If a slice index is provided, it reads only the specified slice and returns the
container instance.
Yields:
Container: The container associated to the HDF5 file.
Raises:
NoSuchNodeError: If the specified node does not exist in the HDF5 file.
Exception: If any other error occurs.
Example:
>>> container = ArrayDataContainer._container_from_hdf5('path_to_file.h5',
MyContainerClass)
"""
if isinstance(path, str):
path = Path(path)
module = importlib.import_module(f"{container_class.__module__}") # noqa :F841
with HDF5TableReader(path) as reader:
if len(reader._h5file.root.__members__) > 1 and slice_index is None:
log.info(
f"reading {container_class.__name__} containing"
f" {len(reader._h5file.root.__members__)}"
f" slices, will return a generator"
)
for data in np.sort(reader._h5file.root.__members__):
container = eval(f"module.{container_class.__name__}")()
# container.containers[data] =
# eval(f"module.{container_class.__name__}s")()
_container = eval(
f"module."
f"{container.fields['containers'].default_factory.args[0].__name__}" # noqa
)
waveforms_data = eval(f"reader._h5file.root.{data}.__members__")
_mask = [_container.__name__ in _word for _word in waveforms_data]
_waveforms_data = np.array(waveforms_data)[_mask]
if len(_waveforms_data) == 1:
if issubclass(_container, TriggerMapContainer) or issubclass(
_container, ArrayDataContainer
):
for key, trigger in EventType.__members__.items():
try:
tableReader = reader.read(
table_name=f"/{data}/{_waveforms_data[0]}"
f"/{trigger.name}",
containers=_container,
)
container.containers[trigger] = next(tableReader)
except NoSuchNodeError as err:
log.warning(err)
except Exception as err:
log.error(err, exc_info=True)
raise err
else:
tableReader = reader.read(
table_name=f"/{data}/{_waveforms_data[0]}",
containers=_container,
)
container.containers[data] = next(tableReader)
else:
log.info(
f"there is {len(_waveforms_data)} entry"
f"corresponding to a {container_class}"
f"table save, unable to load"
)
yield container
else:
container = eval(f"module.{container_class.__name__}")()
if slice_index is None:
log.info(
f"reading {container_class.__name__} containing"
f"a single slice,"
f"will return the {container_class.__name__} instance"
)
data = "data"
else:
log.info(
f"reading slice {slice_index} of {container_class.__name__},"
f"will return the {container_class.__name__} instance"
)
data = f"data_{slice_index}"
_container = eval(
f"module.{container.fields['containers'].default_factory.args[0].__name__}" # noqa
)
if issubclass(_container, TriggerMapContainer) or issubclass(
_container, ArrayDataContainer
):
for key, trigger in EventType.__members__.items():
try:
tableReader = reader.read(
table_name=f"/{data}/{_container.__name__}_"
f"{index_component}/{trigger.name}",
containers=_container,
)
container.containers[trigger] = next(tableReader)
except NoSuchNodeError as err:
log.warning(err)
except Exception as err:
log.error(err, exc_info=True)
raise err
else:
tableReader = reader.read(
table_name=f"/{data}/{_container.__name__}_"
f"{index_component}",
containers=_container,
)
container.containers[data] = next(tableReader)
yield container
[docs]
def is_empty(self):
"""This method check if the container is empty.
Returns:
bool: True if the container is empty, False otherwise.
"""
return len(self.containers.keys()) == 0
[docs]
def validate(self):
"""Apply the validate method recursively to all the containers that are mapped
within the TriggerMapContainer.
Raises:
FieldValidationError: if one container is not valid.
"""
super().validate()
for i, container in enumerate(self.containers):
if i == 0:
container_type = type(container)
else:
if not (isinstance(container, container_type)):
raise FieldValidationError(
"all the containers mapped must have the same type to be merged"
)
[docs]
def merge_map_ArrayDataContainer(triggerMapContainer: TriggerMapContainer):
"""Merge and map ArrayDataContainer.
This function takes a TriggerMapContainer as input and merges the array fields of
the containers mapped within the TriggerMapContainer. The merged array fields are
concatenated along the 0th axis. The function also updates the 'nevents' field of
the output container by summing the 'nevents' field of all the mapped containers.
Parameters:
triggerMapContainer (TriggerMapContainer): The TriggerMapContainer object
containing the containers to be merged and mapped.
Returns:
ArrayDataContainer: The merged and mapped ArrayDataContainer object.
Example:
>>> triggerMapContainer = TriggerMapContainer()
>>> container1 = ArrayDataContainer()
>>> container1.field1 = np.array([1, 2, 3])
>>> container1.field2 = np.array([4, 5, 6])
>>> container1.nevents
3
>>> container2 = ArrayDataContainer()
>>> container2.field1 = np.array([7, 8, 9])
>>> container2.field2 = np.array([10, 11, 12])
>>> container2.nevents
3
>>> triggerMapContainer.containers['container1'] = container1
>>> triggerMapContainer.containers['container2'] = container2
>>> merged_container = merge_map_ArrayDataContainer(triggerMapContainer)
>>> merged_container.field1
array([1, 2, 3, 7, 8, 9])
>>> merged_container.field2
array([ 4, 5, 6, 10, 11, 12])
>>> merged_container.nevents
6
"""
triggerMapContainer.validate()
log.warning(
"TAKE CARE TO MERGE CONTAINERS ONLY IF PIXELS ID, RUN_NUMBER (OR ANY FIELD THAT\
ARE NOT ARRAY) ARE THE SAME"
)
keys = list(triggerMapContainer.containers.keys())
output_container = copy.deepcopy(triggerMapContainer.containers[keys[0]])
for key in keys[1:]:
for field in get_array_keys(triggerMapContainer.containers[key]):
output_container[field] = np.concatenate(
(output_container[field], triggerMapContainer.containers[key][field]),
axis=0,
)
if "nevents" in output_container.fields:
output_container.nevents += triggerMapContainer.containers[key].nevents
return output_container
# class TriggerMapArrayDataContainer(TriggerMapContainer):
# containers = Field(default_factory=partial(Map, ArrayDataContainer),
# description = "trigger mapping of arrayDataContainer"
# )