Source code for nectarchain.makers.calibration.gain.photostat_makers
import logging
import os
import pathlib
import numpy as np
from ctapipe.containers import Container
from ctapipe.core.traits import ComponentNameList, Integer, Path
from ....data.container import ChargesContainer, ChargesContainers
from ....data.container.core import merge_map_ArrayDataContainer
from ....data.management import DataManagement
from ...component import ArrayDataComponent, NectarCAMComponent
from ...extractor.utils import CtapipeExtractor
from .core import GainNectarCAMCalibrationTool
logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers
__all__ = ["PhotoStatisticNectarCAMCalibrationTool"]
[docs]
class PhotoStatisticNectarCAMCalibrationTool(GainNectarCAMCalibrationTool):
# TO DO : IMPLEMENT a MOTHER PHOTOSTAT CLASS WITH ONLY 1 RUN WITH FF AND PEDESTAL
# INTERLEAVED.
name = "PhotoStatisticNectarCAM"
componentsList = ComponentNameList(
NectarCAMComponent,
default_value=["PhotoStatisticNectarCAMComponent"],
help="List of Component names to be apply, the order will be respected",
).tag(config=True)
run_number = Integer(
help="the flat-field run number to be treated", default_value=-1
).tag(config=True)
Ped_run_number = Integer(
help="the pedestal run number to be treated", default_value=-1
).tag(config=True)
run_file = Path(
help="desactivated for PhotoStatistic maker\
with FF and pedestal runs separated",
default_value=None,
allow_none=True,
read_only=True,
).tag(config=False)
events_per_slice = Integer(
help="desactivated for PhotoStatistic maker\
with FF and pedestal runs separated",
default_value=None,
allow_none=True,
read_only=True,
).tag(config=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _init_output_path(self):
str_extractor_kwargs = CtapipeExtractor.get_extractor_kwargs_str(
method=self.method,
extractor_kwargs=self.extractor_kwargs,
)
if self.max_events is None:
filename = (
f"{self.name}_FFrun{self.run_number}_{self.method}"
f"_{str_extractor_kwargs}_Pedrun{self.Ped_run_number}"
f"_FullWaveformSum.h5"
)
else:
filename = (
f"{self.name}_FFrun{self.run_number}_{self.method}"
f"_{str_extractor_kwargs}_Pedrun{self.Ped_run_number}_"
f"FullWaveformSum_maxevents{self.max_events}.h5"
)
self.output_path = pathlib.Path(
f"{os.environ.get('NECTARCAMDATA','/tmp')}/PhotoStat/{filename}"
)
def _load_eventsource(self, FF_run=True):
if FF_run:
self.log.debug("loading FF event source")
self.event_source = self.enter_context(
self.load_run(self.run_number, self.max_events, camera=self.camera)
)
else:
self.log.debug("loading Ped event source")
self.event_source = self.enter_context(
self.load_run(self.Ped_run_number, self.max_events, camera=self.camera)
)
def start(
self,
n_events=np.inf,
*args,
**kwargs,
):
str_extractor_kwargs = CtapipeExtractor.get_extractor_kwargs_str(
method=self.method,
extractor_kwargs=self.extractor_kwargs,
)
try:
FF_files = DataManagement.find_charges(
run_number=self.run_number,
method=self.method,
str_extractor_kwargs=str_extractor_kwargs,
max_events=self.max_events,
)
except Exception as e:
self.log.warning(f"{e}", exc_info=True)
FF_files = []
try:
Ped_files = DataManagement.find_charges(
run_number=self.Ped_run_number,
max_events=self.max_events,
)
except Exception as e:
self.log.warning(f"{e}", exc_info=True)
Ped_files = []
if self.reload_events or len(FF_files) != 1 or len(Ped_files) != 1:
if len(FF_files) != 1 or len(Ped_files) != 1:
self.log.info(
f"{len(FF_files)} computed charges FF files found"
f" with max_events >"
f"{self.max_events} for run {self.run_number}"
f" with extraction method"
f"{self.method} and {str_extractor_kwargs},\n reload charges"
f" from event loop"
)
self.log.info(
f"{len(Ped_files)} computed charges FF files found"
f" with max_events >"
f"{self.max_events} for run {self.Ped_run_number}"
f" with extraction"
f" method FullWaveformSum,\n reload charges from event loop"
)
super().start(
n_events=n_events, restart_from_begining=False, *args, **kwargs
)
self._setup_eventsource(FF_run=False)
super().start(
n_events=n_events, restart_from_begining=False, *args, **kwargs
)
else:
self.log.info(f"reading computed charge from FF file {FF_files[0]}")
chargesContainers = ChargesContainers.from_hdf5(FF_files[0])
if isinstance(chargesContainers, ChargesContainer):
self.components[0]._FF_chargesContainers = chargesContainers
else:
n_slices = 0
try:
while True:
next(chargesContainers)
n_slices += 1
except StopIteration:
pass
chargesContainers = ChargesContainers.from_hdf5(FF_files[0])
if n_slices == 1:
self.log.info("merging along TriggerType")
self.components[
0
]._FF_chargesContainers = merge_map_ArrayDataContainer(
next(chargesContainers)
)
else:
self.log.info("merging along slices")
chargesContaienrs_merdes_along_slices = (
ArrayDataComponent.merge_along_slices(
containers_generator=chargesContainers
)
)
self.log.info("merging along TriggerType")
self.components[
0
]._FF_chargesContainers = merge_map_ArrayDataContainer(
chargesContaienrs_merdes_along_slices
)
self.log.info(f"reading computed charge from Ped file {Ped_files[0]}")
chargesContainers = ChargesContainers.from_hdf5(Ped_files[0])
if isinstance(chargesContainers, ChargesContainer):
self.components[0]._Ped_chargesContainers = chargesContainers
else:
n_slices = 0
try:
while True:
next(chargesContainers)
n_slices += 1
except StopIteration:
pass
chargesContainers = ChargesContainers.from_hdf5(Ped_files[0])
if n_slices == 1:
self.log.info("merging along TriggerType")
self.components[
0
]._Ped_chargesContainers = merge_map_ArrayDataContainer(
next(chargesContainers)
)
else:
self.log.info("merging along slices")
chargesContaienrs_merdes_along_slices = (
ArrayDataComponent.merge_along_slices(
containers_generator=chargesContainers
)
)
self.log.info("merging along TriggerType")
self.components[
0
]._Ped_chargesContainers = merge_map_ArrayDataContainer(
chargesContaienrs_merdes_along_slices
)
def _write_container(self, container: Container, index_component: int = 0) -> None:
# if isinstance(container,SPEfitContainer) :
# self.writer.write(table_name = f"{self.method}_{CtapipeExtractor.get_extrac
# tor_kwargs_str(self.extractor_kwargs)}",
# containers = container,
# )
# else :
super()._write_container(container=container, index_component=index_component)