Source code for nectarchain.makers.calibration.pedestal_makers

import logging
import os
import pathlib

import numpy as np
from ctapipe.core.traits import ComponentNameList
from ctapipe_io_nectarcam.constants import HIGH_GAIN, LOW_GAIN, N_GAINS

from ...data.container import NectarCAMPedestalContainer, NectarCAMPedestalContainers
from ..component import NectarCAMComponent
from .core import NectarCAMCalibrationTool

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers


__all__ = ["PedestalNectarCAMCalibrationTool"]


[docs] class PedestalNectarCAMCalibrationTool(NectarCAMCalibrationTool): name = "PedestalNectarCAMCalibrationTool" componentsList = ComponentNameList( NectarCAMComponent, default_value=["PedestalEstimationComponent"], help="List of Component names to be applied, the order will be respected", ).tag(config=True) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _init_output_path(self): """ Initialize output path """ if self.events_per_slice is None: ext = ".h5" else: ext = f"_sliced{self.events_per_slice}.h5" if self.max_events is None: filename = f"{self.name}_run{self.run_number}{ext}" else: filename = ( f"{self.name}_run{self.run_number}_maxevents{self.max_events}{ext}" ) self.output_path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA', '/tmp')}/PedestalEstimation/{filename}" ) def _combine_results(self): """ Method that combines sliced results to reduce memory load Can only be called after the file with the sliced results has been saved to disk """ # re-open results pedestalContainers = next( NectarCAMPedestalContainers.from_hdf5(self.output_path) ) # Loop over sliced results to fill the combined results if "data_combined" in pedestalContainers.containers.keys(): log.error("Trying to combine results that already contain combined data") self.log.info("Combine sliced results") for i, (_, pedestalContainer) in enumerate( pedestalContainers.containers.items() ): if i == 0: # initialize fields for the combined results based on first slice nsamples = pedestalContainer.nsamples nevents = np.zeros(len(pedestalContainer.nevents)) pixels_id = pedestalContainer.pixels_id ucts_timestamp_min = pedestalContainer.ucts_timestamp_min ucts_timestamp_max = pedestalContainer.ucts_timestamp_max pedestal_mean_hg = np.zeros( np.shape(pedestalContainer.pedestal_mean_hg) ) pedestal_mean_lg = np.zeros( np.shape(pedestalContainer.pedestal_mean_lg) ) pedestal_std_hg = np.zeros(np.shape(pedestalContainer.pedestal_std_hg)) pedestal_std_lg = np.zeros(np.shape(pedestalContainer.pedestal_std_lg)) else: # otherwise consider the overall time interval ucts_timestamp_min = np.minimum( ucts_timestamp_min, pedestalContainer.ucts_timestamp_min ) ucts_timestamp_max = np.maximum( ucts_timestamp_max, pedestalContainer.ucts_timestamp_max ) # for all slices # derive from pixel mask a mask that sets usable pixels # accept only pixels for which no flags were raised usable_pixels = pedestalContainer.pixel_mask == 0 # use a pixel only if it has no flag on either channel usable_pixels = np.logical_and(usable_pixels[0], usable_pixels[1]) # cumulated number of events nevents += pedestalContainer.nevents * usable_pixels # add mean, std sum elements pedestal_mean_hg += ( pedestalContainer.pedestal_mean_hg * pedestalContainer.nevents[:, np.newaxis] * usable_pixels[:, np.newaxis] ) pedestal_mean_lg += ( pedestalContainer.pedestal_mean_lg * pedestalContainer.nevents[:, np.newaxis] * usable_pixels[:, np.newaxis] ) pedestal_std_hg += ( pedestalContainer.pedestal_std_hg**2 * pedestalContainer.nevents[:, np.newaxis] * usable_pixels[:, np.newaxis] ) pedestal_std_lg += ( pedestalContainer.pedestal_std_lg**2 * pedestalContainer.nevents[:, np.newaxis] * usable_pixels[:, np.newaxis] ) # calculate final values of mean and std pedestal_mean_hg /= nevents[:, np.newaxis] pedestal_mean_lg /= nevents[:, np.newaxis] pedestal_std_hg /= nevents[:, np.newaxis] pedestal_std_hg = np.sqrt(pedestal_std_hg) pedestal_std_lg /= nevents[:, np.newaxis] pedestal_std_lg = np.sqrt(pedestal_std_lg) # flag bad pixels in overall results based on same criteria as for individual # slides # reconstitute dictionary with cumulated results consistently with # PedestalComponent ped_stats = {} array_shape = np.append([N_GAINS], np.shape(pedestal_mean_hg)) for statistic in ["mean", "std"]: ped_stat = np.zeros(array_shape) if statistic == "mean": ped_stat[HIGH_GAIN] = pedestal_mean_hg ped_stat[LOW_GAIN] = pedestal_mean_lg elif statistic == "std": ped_stat[HIGH_GAIN] = pedestal_std_hg ped_stat[LOW_GAIN] = pedestal_std_lg # Store the result in the dictionary ped_stats[statistic] = ped_stat # use flagging method from PedestalComponent pixel_mask = self.components[0].flag_bad_pixels(ped_stats, nevents) output = NectarCAMPedestalContainer( nsamples=nsamples, nevents=nevents, pixels_id=pixels_id, ucts_timestamp_min=ucts_timestamp_min, ucts_timestamp_max=ucts_timestamp_max, pedestal_mean_hg=pedestal_mean_hg, pedestal_mean_lg=pedestal_mean_lg, pedestal_std_hg=pedestal_std_hg, pedestal_std_lg=pedestal_std_lg, pixel_mask=pixel_mask, ) return output
[docs] def finish(self, return_output_component=False, *args, **kwargs): """ Redefines finish method to combine sliced results """ self.log.info("finishing Tool") # finish components output = self._finish_components(*args, **kwargs) # close writer self.writer.close() # Check if there are slices if self.events_per_slice is None: # If not nothing to do pass else: # combine results output = self._combine_results() # add combined results to output # re-initialise writer to store combined results self._init_writer(sliced=True, group_name="data_combined") # add combined results to writer self._write_container(output) self.writer.close() self.log.info("Shutting down.") if return_output_component: return output