Source code for nectarchain.dqm.charge_integration

import os

import ctapipe.instrument.camera.readout
import numpy as np
from ctapipe.coordinates import EngineeringCameraFrame
from ctapipe.image.extractor import FixedWindowSum  # noqa: F401
from ctapipe.image.extractor import FullWaveformSum  # noqa: F401
from ctapipe.image.extractor import GlobalPeakWindowSum  # noqa: F401
from ctapipe.image.extractor import LocalPeakWindowSum  # noqa: F401
from ctapipe.image.extractor import NeighborPeakWindowSum  # noqa: F401
from ctapipe.image.extractor import SlidingWindowMaxSum  # noqa: F401
from ctapipe.image.extractor import TwoPassWindowSum  # noqa: F401
from ctapipe.visualization import CameraDisplay
from ctapipe_io_nectarcam import constants
from matplotlib import pyplot as plt
from traitlets.config.loader import Config

from ..makers.component import ChargesComponent
from ..makers.component.core import ArrayDataComponent
from ..makers.extractor.utils import CtapipeExtractor
from .dqm_summary_processor import DQMSummary

__all__ = ["ChargeIntegrationHighLowGain"]


[docs] class ChargeIntegrationHighLowGain(DQMSummary): def __init__(self, gaink, r0=False): self.k = gaink self.gain_c = "High" if gaink == 0 else "Low" self.Pix = None self.Samp = None self.counter_evt = None self.counter_ped = None self.tel_id = None self.image_all = [] self.peakpos_all = [] self.image_ped = [] self.peakpos_ped = [] self.ped_all = [] self.ped_ped = [] self.camera = None self.integrator = None self.pixelBAD = None self.image_all_stats = None self.image_ped_stats = None self.ped_all_stats = None self.ped_ped_stats = None self.ChargeInt_Results_Dict = {} self.ChargeInt_Figures_Dict = {} self.ChargeInt_Figures_Names_Dict = {} super().__init__(r0) def configure_for_run(self, path, Pix, Samp, Reader1, **charges_kwargs): # define number of pixels and samples self.Pix = Pix self.Samp = Samp self.counter_evt = 0 self.counter_ped = 0 self.tel_id = Reader1.subarray.tel_ids[0] self.camera = Reader1.subarray.tel[self.tel_id].camera.geometry.transform_to( EngineeringCameraFrame() ) self.cmap = "gnuplot2" self.subarray = Reader1.subarray subarray = Reader1.subarray subarray.tel[ self.tel_id ].camera.readout = ctapipe.instrument.camera.readout.CameraReadout.from_name( "NectarCam" ) if charges_kwargs: extractor_kwargs = ( ChargesComponent._get_extractor_kwargs_from_method_and_kwargs( method=charges_kwargs["method"], kwargs=charges_kwargs["extractor_kwargs"], ) ) self.integrator = eval(charges_kwargs["method"])( subarray, **extractor_kwargs ) else: config = Config( {"GlobalPeakWindowSum": {"window_shift": 4, "window_width": 12}} ) self.integrator = GlobalPeakWindowSum(subarray, config=config) def process_event(self, evt, noped): self.pixels = evt.nectarcam.tel[self.tel_id].svc.pixel_ids self.pixelBADplot = evt.mon.tel[ self.tel_id ].pixel_status.hardware_failing_pixels ( broken_pixels_hg, broken_pixels_lg, ) = ArrayDataComponent._compute_broken_pixels_event(evt, self.pixels) if self.k == 0: self.pixelBAD = broken_pixels_hg channel = constants.HIGH_GAIN if self.k == 1: self.pixelBAD = broken_pixels_lg channel = constants.LOW_GAIN if self.r0: waveform = evt.r0.tel[self.tel_id].waveform[self.k] else: # This should accommodate cases were the shape of waveforms is 2D # (1855,60), or 3D (2, 1855, 60) for 2-gain channels or # (1, 1855, 60) for single-gain channel waveform = evt.r1.tel[self.tel_id].waveform waveform = waveform[self.pixels] ped = np.mean(waveform[:, 20]) if noped: waveform = waveform - ped try: output = CtapipeExtractor.get_image_peak_time( self.integrator( waveforms=waveform, tel_id=self.tel_id, selected_gain_channel=channel, broken_pixels=self.pixelBAD, ) ) except IndexError: waveform = waveform[np.newaxis, :] output = CtapipeExtractor.get_image_peak_time( self.integrator( waveforms=waveform, tel_id=self.tel_id, selected_gain_channel=channel, broken_pixels=self.pixelBAD, ) ) image = output[0] peakpos = output[1] if evt.trigger.event_type.value == 32: # count peds self.counter_ped += 1 self.image_ped.append(image) self.peakpos_ped.append(peakpos) self.ped_ped.append(ped) else: self.counter_evt += 1 self.image_all.append(image) self.peakpos_all.append(peakpos) self.ped_all.append(ped) def finish_run(self): self.peakpos_all = np.array(self.peakpos_all, dtype=float) if self.counter_ped > 0: self.peakpos_ped = np.array(self.peakpos_ped, dtype=float) # rms, percentile, mean deviation, median, mean, self.image_all = np.array(self.image_all, dtype=float) if self.image_all.size: self.image_all_stats = { "average": np.mean(self.image_all, axis=0), "median": np.median(self.image_all, axis=0), "std": np.std(self.image_all, axis=0), "rms": np.sqrt(np.sum(self.image_all**2, axis=0)), } self.ped_all = np.array(self.ped_all, dtype=float) if self.ped_all.size: self.ped_all_stats = { "average": np.mean(self.ped_all, axis=0), "median": np.median(self.ped_all, axis=0), "std": np.std(self.ped_all, axis=0), "rms": np.sqrt(np.sum(self.ped_all**2, axis=0)), } if self.counter_ped > 0: self.image_ped = np.array(self.image_ped, dtype=float) if self.image_ped.size: self.image_ped_stats = { "average": np.mean(self.image_ped, axis=0), "median": np.median(self.image_ped, axis=0), "std": np.std(self.image_ped, axis=0), "rms": np.sqrt(np.sum(self.image_ped**2, axis=0)), } self.ped_ped = np.array(self.ped_ped, dtype=float) if self.ped_ped.size: self.ped_ped_stats = { "average": np.mean(self.ped_ped, axis=0), "median": np.median(self.ped_ped, axis=0), "std": np.std(self.ped_ped, axis=0), "rms": np.sqrt(np.sum(self.ped_ped**2, axis=0)), } def get_results(self): if self.counter_evt > 0: for k, v in self.image_all_stats.items(): self.ChargeInt_Results_Dict[ ( f"CHARGE-INTEGRATION-IMAGE-ALL-{k.upper()}-" f"{self.gain_c.upper()}-GAIN" ) ] = v for k, v in self.ped_all_stats.items(): self.ChargeInt_Results_Dict[ f"PED-INTEGRATION-IMAGE-ALL-{k.upper()}-{self.gain_c.upper()}-GAIN" ] = v if self.counter_ped > 0: for k, v in self.image_ped_stats.items(): self.ChargeInt_Results_Dict[ ( f"CHARGE-INTEGRATION-PED-ALL-{k.upper()}-" f"{self.gain_c.upper()}-GAIN" ) ] = v for k, v in self.ped_ped_stats.items(): self.ChargeInt_Results_Dict[ f"PED-INTEGRATION-PED-ALL-{k.upper()}-{self.gain_c.upper()}-GAIN" ] = v return self.ChargeInt_Results_Dict def _plot_camera_image(self, image, title, text, filename, key, fig_path): fig, disp = plt.subplots() disp = CameraDisplay(geometry=self.camera[~self.pixelBADplot[0]]) disp.image = image disp.cmap = plt.cm.coolwarm disp.axes.text(2, -0.8, text, fontsize=12, rotation=90) disp.add_colorbar() plt.title(title) full_path = os.path.join(fig_path, filename) self.ChargeInt_Figures_Dict[key] = fig self.ChargeInt_Figures_Names_Dict[key] = full_path plt.close() def plot_results(self, name, fig_path): if self.counter_evt > 0: # Charge integration MEAN plot image = self.image_all_stats["average"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration Mean {self.gain_c} Gain (ALL)" filename = name + f"_ChargeInt_Mean_{self.gain_c}Gain_All.png" key = f"CHARGE-INTEGRATION-IMAGE-ALL-AVERAGE-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) # Charge integration MEDIAN plot image = self.image_all_stats["median"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration Median {self.gain_c} Gain (ALL)" filename = name + f"_ChargeInt_Median_{self.gain_c}Gain_All.png" key = f"CHARGE-INTEGRATION-IMAGE-ALL-MEDIAN-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) # Charge integration STD plot image = self.image_all_stats["std"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration STD {self.gain_c} Gain (ALL)" filename = name + f"_ChargeInt_Std_{self.gain_c}Gain_All.png" key = f"CHARGE-INTEGRATION-IMAGE-ALL-STD-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) # Charge integration RMS plot image = self.image_all_stats["rms"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration RMS {self.gain_c} Gain (ALL)" filename = name + f"_ChargeInt_Rms_{self.gain_c}Gain_All.png" key = f"CHARGE-INTEGRATION-IMAGE-ALL-RMS-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) if self.counter_ped > 0: image = self.image_ped_stats["average"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration Mean {self.gain_c} Gain (PED)" filename = name + f"_ChargeInt_Mean_{self.gain_c}Gain_Ped.png" key = f"CHARGE-INTEGRATION-IMAGE-PED-AVERAGE-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) image = self.image_ped_stats["median"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration Median {self.gain_c} Gain (PED)" filename = name + f"_ChargeInt_Median_{self.gain_c}Gain_Ped.png" key = f"CHARGE-INTEGRATION-IMAGE-PED-MEDIAN-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) image = self.image_ped_stats["std"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration STD {self.gain_c} Gain (PED)" filename = name + f"_ChargeInt_Std_{self.gain_c}Gain_Ped.png" key = f"CHARGE-INTEGRATION-IMAGE-PED-STD-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) image = self.image_ped_stats["rms"] text = f"{self.gain_c} gain integrated charge (DC)" title = f"Charge Integration RMS {self.gain_c} Gain (PED)" filename = name + f"_ChargeInt_Rms_{self.gain_c}Gain_Ped.png" key = f"CHARGE-INTEGRATION-IMAGE-PED-RMS-{self.gain_c.upper()}-GAIN" self._plot_camera_image(image, title, text, filename, key, fig_path) # Charge integration SPECTRUM if self.counter_evt > 0: fig, _ = plt.subplots() for i in range(len(self.pixels)): plt.hist( self.image_all[:, i], 100, fill=False, density=True, stacked=True, linewidth=1, log=True, alpha=0.01, ) plt.hist( np.mean(self.image_all, axis=1), 100, color="r", linewidth=1, log=True, alpha=1, label="Camera average", ) plt.legend(loc="upper right") plt.xlabel("Charge (DC)") plt.title("Charge spectrum %s gain (ALL)" % self.gain_c) full_name = name + "_Charge_Spectrum_%sGain_All.png" % self.gain_c FullPath = os.path.join(fig_path, full_name) self.ChargeInt_Figures_Dict[ "CHARGE-INTEGRATION-SPECTRUM-ALL-%s-GAIN" % self.gain_c ] = fig self.ChargeInt_Figures_Names_Dict[ "CHARGE-INTEGRATION-SPECTRUM-ALL-%s-GAIN" % self.gain_c ] = FullPath plt.close(fig) del fig if self.counter_ped > 0: fig, _ = plt.subplots() for i in range(len(self.pixels)): plt.hist( self.image_ped[:, i], 100, fill=False, density=True, stacked=True, linewidth=1, log=True, alpha=0.01, ) plt.hist( np.mean(self.image_ped, axis=1), 100, color="r", linewidth=1, log=True, alpha=1, label="Camera average", ) plt.legend(loc="upper right") plt.xlabel("Charge (DC)") plt.title("Charge spectrum %s gain (PED)" % self.gain_c) full_name = name + "_Charge_Spectrum_%sGain_Ped.png" % self.gain_c FullPath = os.path.join(fig_path, full_name) self.ChargeInt_Figures_Dict[ "CHARGE-INTEGRATION-SPECTRUM-PED-%s-GAIN" % self.gain_c ] = fig self.ChargeInt_Figures_Names_Dict[ "CHARGE-INTEGRATION-SPECTRUM-PED-%s-GAIN" % self.gain_c ] = FullPath plt.close() return self.ChargeInt_Figures_Dict, self.ChargeInt_Figures_Names_Dict