Source code for nectarchain.makers.component.spe.spe_algorithm

import copy
import logging
import multiprocessing as mp
import os
import time
from inspect import signature
from multiprocessing import Pool
from typing import Tuple

import astropy.units as u
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.style as mplstyle
import numpy as np
import pyqtgraph as pg
import yaml
from astropy.table import QTable
from ctapipe.core.component import Component
from ctapipe.core.traits import Bool, Float, Integer, Path, Unicode
from iminuit import Minuit
from matplotlib.colors import to_rgba
from matplotlib.patches import Rectangle
from pyqtgraph.Qt import QtGui
from scipy.optimize import curve_fit
from scipy.signal import find_peaks, savgol_filter
from scipy.special import gammainc

from ....data.container import ChargesContainer, SPEfitContainer
from ....utils import MPE2, MeanValueError, Statistics, UtilsMinuit, weight_gaussian
from ..charges_component import ChargesComponent
from .parameters import Parameter, Parameters

mplstyle.use("fast")

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers

__all__ = [
    "SPEHHValgorithm",
    "SPEHHVStdalgorithm",
    "SPEnominalStdalgorithm",
    "SPEnominalalgorithm",
    "SPECombinedalgorithm",
]


[docs] class ContextFit: def __init__( self, _class, minuitParameters_array: np.ndarray, charge: np.ndarray, counts: np.ndarray, ): self._class = _class self.minuitParameters_array = minuitParameters_array self.charge = charge self.counts = counts def __enter__(self): init_processes( self._class, self.minuitParameters_array, self.charge, self.counts ) def __exit__(self, type, value, traceback): del globals()["_minuitParameters_array"] del globals()["_charge"] del globals()["_counts"] del globals()["_chi2"]
[docs] def init_processes( _class, minuitParameters_array: np.ndarray, charge: np.ndarray, counts: np.ndarray ): """Initialize each process in the process pool with global variable fit_array.""" global _minuitParameters_array global _charge global _counts global _chi2 _minuitParameters_array = minuitParameters_array _charge = charge _counts = counts def chi2(pedestal, pp, luminosity, resolution, mean, n, pedestalWidth, index): return _class._NG_Likelihood_Chi2( pp, resolution, mean, n, pedestal, pedestalWidth, luminosity, _charge[int(index)].data[~_charge[int(index)].mask], _counts[int(index)].data[~_charge[int(index)].mask], ) _chi2 = chi2
[docs] class SPEalgorithm(Component): window_length = Integer( 40, read_only=True, help="The windows leght used for the savgol filter algorithm", ).tag(config=True) order = Integer( 2, read_only=True, help="The order of the polynome used in the savgol filter algorithm", ).tag(config=True) display_toggle = Bool( False, read_only=False, help="Enable/disable display of SPE fit results", ) def __init__( self, pixels_id: np.ndarray, config=None, parent=None, **kwargs ) -> None: super().__init__(config=config, parent=parent, **kwargs) self.__pixels_id = pixels_id self.__pedestal = Parameter( name="pedestal", value=0, ) self.__parameters = Parameters() self.__parameters.append(self.__pedestal) self.__results = SPEfitContainer( is_valid=np.zeros((self.npixels), dtype=bool), high_gain=np.empty((self.npixels, 3)), low_gain=np.empty((self.npixels, 3)), pixels_id=self.__pixels_id, likelihood=np.empty((self.npixels)), p_value=np.empty((self.npixels)), pedestal=np.empty((self.npixels, 3)), pedestalWidth=np.empty((self.npixels, 3)), resolution=np.empty((self.npixels, 3)), luminosity=np.empty((self.npixels, 3)), mean=np.empty((self.npixels, 3)), n=np.empty((self.npixels, 3)), pp=np.empty((self.npixels, 3)), ) @property def parameters(self): return copy.deepcopy(self.__parameters) @property def _parameters(self): return self.__parameters @property def results(self): return copy.deepcopy(self.__results) @property def _results(self): return self.__results @property def pixels_id(self): return copy.deepcopy(self.__pixels_id) @property def _pixels_id(self): return self.__pixels_id @property def npixels(self): return len(self.__pixels_id) # methods
[docs] def read_param_from_yaml(self, parameters_file, only_update=False) -> None: """Reads parameters from a YAML file and updates the internal parameters of the FlatFieldSPEMaker class. Parameters ---------- parameters_file : str The name of the YAML file containing the parameters. only_update : bool, optional If True, only the parameters that exist in the YAML file will be updated. Default is False. """ with open( f"{os.path.dirname(os.path.abspath(__file__))}/{parameters_file}" ) as parameters: param = yaml.safe_load(parameters) if only_update: for i, name in enumerate(self.__parameters.parnames): dico = param.get(name, False) if dico: self._parameters.parameters[i].value = dico.get("value") self._parameters.parameters[i].min = dico.get("min", np.nan) self._parameters.parameters[i].max = dico.get("max", np.nan) else: for name, dico in param.items(): setattr( self, f"__{name}", Parameter( name=name, value=dico["value"], min=dico.get("min", np.nan), max=dico.get("max", np.nan), unit=dico.get("unit", u.dimensionless_unscaled), ), ) self._parameters.append(eval(f"self.__{name}"))
@staticmethod def _update_parameters( parameters: Parameters, charge: np.ndarray, counts: np.ndarray, **kwargs, ) -> Parameters: """Update the parameters of the FlatFieldSPEMaker class based on the input charge and counts data. Parameters ---------- parameters (Parameters): An instance of the Parameters class that holds the internal parameters of the FlatFieldSPEMaker class. charge (np.ndarray): An array of charge values. counts (np.ndarray): An array of corresponding counts values. kwargs Additional keyword arguments. Returns ------- parameters : Parameters The updated parameters object with the pedestal and mean values and their corresponding limits. """ try: coeff_ped, coeff_mean = __class__._get_mean_gaussian_fit( charge, counts, **kwargs ) pedestal = parameters["pedestal"] pedestal.value = coeff_ped[1] pedestal.min = coeff_ped[1] - coeff_ped[2] pedestal.max = coeff_ped[1] + coeff_ped[2] log.debug(f"pedestal updated: {pedestal}") pedestalWidth = parameters["pedestalWidth"] pedestalWidth.value = pedestal.max - pedestal.value pedestalWidth.max = 3 * pedestalWidth.value log.debug(f"pedestalWidth updated: {pedestalWidth.value}") if (coeff_mean[1] - pedestal.value < 0) or ( (coeff_mean[1] - coeff_mean[2]) - pedestal.max < 0 ): raise MeanValueError("mean gaussian fit not good") mean = parameters["mean"] mean.value = coeff_mean[1] - pedestal.value mean.min = (coeff_mean[1] - coeff_mean[2]) - pedestal.max mean.max = (coeff_mean[1] + coeff_mean[2]) - pedestal.min log.debug(f"mean updated: {mean}") except MeanValueError as e: log.warning(e, exc_info=True) log.warning("mean parameters limits and starting value not changed") except Exception as e: log.warning(e, exc_info=True) log.warning( "pedestal and mean parameters limits and starting value not changed" ) return parameters @staticmethod def _get_mean_gaussian_fit( charge: np.ndarray, counts: np.ndarray, pixel_id=None, **kwargs ) -> Tuple[np.ndarray, np.ndarray]: """Perform a Gaussian fit on the data to determine the pedestal and mean values. Parameters ---------- charge : np.ndarray An array of charge values. counts : np.ndarray An array of corresponding counts. pixel_id : int The id of the current pixel. Default to None kwargs Additional keyword arguments. Returns ------- Tuple[np.ndarray, np.ndarray] A tuple of fit coefficients for the pedestal and mean. Example usage:: flat_field_maker = FlatFieldSPEMaker() charge = np.array([1, 2, 3, 4, 5]) counts = np.array([10, 20, 30, 40, 50]) coeff, coeff_mean = flat_field_maker._get_mean_gaussian_fit(charge, counts) print(coeff) # Output: [norm,peak_value, peak_width] print(coeff_mean) # Output: [norm,peak_value_mean, peak_width_mean] """ window_length = __class__.window_length.default_value order = __class__.order.default_value histo_smoothed = savgol_filter(counts, window_length, order) peaks = find_peaks(histo_smoothed, 10) peak_max = np.argmax(histo_smoothed[peaks[0]]) peak_pos, peak_value = charge[peaks[0][peak_max]], counts[peaks[0][peak_max]] coeff, _ = curve_fit( weight_gaussian, charge[: peaks[0][peak_max]], histo_smoothed[: peaks[0][peak_max]], p0=[peak_value, peak_pos, 1], ) if log.getEffectiveLevel() == logging.DEBUG and kwargs.get("display", False): log.debug("plotting figures with prefit parameters computation") fig, ax = plt.subplots(1, 1, figsize=(5, 5)) ax.errorbar( charge, counts, np.sqrt(counts), zorder=0, fmt=".", label="data" ) ax.plot( charge, histo_smoothed, label=f"smoothed data with savgol filter (windows lenght : " f"{window_length}, order : {order})", ) ax.plot( charge, weight_gaussian(charge, coeff[0], coeff[1], coeff[2]), label="gaussian fit of the pedestal, left tail only", ) ax.set_xlim([peak_pos - 500, None]) ax.vlines( coeff[1], 0, peak_value, label=f"pedestal initial value = {coeff[1]:.0f}", color="red", ) ax.add_patch( Rectangle( (coeff[1] - coeff[2], 0), 2 * coeff[2], peak_value, fc=to_rgba("red", 0.5), ) ) ax.set_xlabel("Charge (ADC)", size=15) ax.set_ylabel("Events", size=15) ax.legend(fontsize=7) os.makedirs( f"{os.environ.get('NECTARCHAIN_LOG', '/tmp')}/{os.getpid()}/" f"figures/", exist_ok=True, ) log.info( f"figures of initialization of parameters will be accessible at " f'{os.environ.get("NECTARCHAIN_LOG","/tmp")}/{os.getpid()}' ) fig.savefig( f"{os.environ.get('NECTARCHAIN_LOG','/tmp')}/{os.getpid()}/figures/" f"initialization_pedestal_pixel{pixel_id}_{os.getpid()}.pdf" ) fig.clf() plt.close(fig) del fig, ax mask = charge > coeff[1] + 3 * coeff[2] peaks_mean = find_peaks(histo_smoothed[mask]) peak_max_mean = np.argmax(histo_smoothed[mask][peaks_mean[0]]) peak_pos_mean, peak_value_mean = ( charge[mask][peaks_mean[0][peak_max_mean]], histo_smoothed[mask][peaks_mean[0][peak_max_mean]], ) mask = (charge > ((coeff[1] + peak_pos_mean) / 2)) * ( charge < (peak_pos_mean + (peak_pos_mean - coeff[1]) / 2) ) coeff_mean, _ = curve_fit( weight_gaussian, charge[mask], histo_smoothed[mask], p0=[peak_value_mean, peak_pos_mean, 1], ) if log.getEffectiveLevel() == logging.DEBUG and kwargs.get("display", False): log.debug("plotting figures with prefit parameters computation") fig, ax = plt.subplots(1, 1, figsize=(5, 5)) ax.errorbar( charge, counts, np.sqrt(counts), zorder=0, fmt=".", label="data" ) ax.plot( charge, histo_smoothed, label=f"smoothed data with savgol filter (windows length : " f"{window_length}, order : {order})", ) ax.plot( charge, weight_gaussian(charge, coeff_mean[0], coeff_mean[1], coeff_mean[2]), label="gaussian fit of the SPE", ) ax.vlines( coeff_mean[1], 0, peak_value, label=f"mean initial value = {coeff_mean[1] - coeff[1]:.0f}", color="red", ) ax.add_patch( Rectangle( (coeff_mean[1] - coeff_mean[2], 0), 2 * coeff_mean[2], peak_value_mean, fc=to_rgba("red", 0.5), ) ) ax.set_xlim([peak_pos - 500, None]) ax.set_xlabel("Charge (ADC)", size=15) ax.set_ylabel("Events", size=15) ax.legend(fontsize=7) os.makedirs( f"{os.environ.get('NECTARCHAIN_LOG','/tmp')}/{os.getpid()}/" f"figures/", exist_ok=True, ) fig.savefig( f"{os.environ.get('NECTARCHAIN_LOG','/tmp')}/{os.getpid()}/figures/" f"initialization_mean_pixel{pixel_id}_{os.getpid()}.pdf" ) fig.clf() plt.close(fig) del fig, ax return coeff, coeff_mean
[docs] class SPEnominalalgorithm(SPEalgorithm): parameters_file = Unicode( "parameters_SPEnominal.yaml", read_only=True, help="The name of the SPE fit parameters file", ).tag(config=True) tol = Float( 1e-1, help="The tolerance used for minuit", read_only=True, ).tag(config=True) nproc = Integer( 8, help="The Number of cpu used for SPE fit", ).tag(config=True) chunksize = Integer( 1, help="The chunk size for multi-processing", ).tag(config=True) multiproc = Bool( True, help="flag to active multi-processing", ).tag(config=True) def __init__( self, pixels_id: np.ndarray, charge: np.ndarray, counts: np.ndarray, config=None, parent=None, **kwargs, ) -> None: """Initializes the FlatFieldSingleHHVSPEMaker object. Parameters ---------- charge : np.ma.masked_array or array-like The charge data. counts : np.ma.masked_array or array-like The counts data. ``*args`` Additional positional arguments. ``**kwargs`` Additional keyword arguments. """ super().__init__(pixels_id=pixels_id, config=config, parent=parent, **kwargs) if isinstance(charge, np.ma.masked_array): self.__charge = charge else: self.__charge = np.ma.asarray(charge) if isinstance(counts, np.ma.masked_array): self.__counts = counts else: self.__counts = np.ma.asarray(counts) self.read_param_from_yaml(kwargs.get("parameters_file", self.parameters_file))
[docs] @classmethod def create_from_chargesContainer( cls, signal: ChargesContainer, config=None, parent=None, **kwargs ): """Creates an instance of FlatFieldSingleHHVSPEMaker using charge and counts data from a ChargesContainer object. Parameters ---------- signal : ChargesContainer The ChargesContainer object. ``**kwargs`` Additional keyword arguments. Returns ------- FlatFieldSingleHHVSPEMaker An instance of FlatFieldSingleHHVSPEMaker. """ histo = ChargesComponent.histo_hg(signal, autoscale=True) return cls( pixels_id=signal.pixels_id, charge=histo[1], counts=histo[0], config=config, parent=parent, **kwargs, )
# getters and setters @property def charge(self): """Returns a deep copy of the ``__charge`` attribute.""" return copy.deepcopy(self.__charge) @property def _charge(self): """Returns the ``__charge`` attribute.""" return self.__charge @property def counts(self): """Returns a deep copy of the ``__counts`` attribute.""" return copy.deepcopy(self.__counts) @property def _counts(self): """Returns the ``__counts`` attribute.""" return self.__counts # methods def _fill_results_table_from_dict( self, dico: dict, pixels_id: np.ndarray, return_fit_array: bool = True ) -> None: """Populates the results table with fit values and errors for each pixel based on the dictionary provided as input. Parameters ---------- dico : dict A dictionary containing fit values and errors for each pixel. pixels_id : np.ndarray An array of pixel IDs. """ # ######### NEED TO BE OPTIMIZED!!! ########### chi2_sig = signature(_chi2) if return_fit_array: fit_array = np.empty(len(pixels_id), dtype=np.object_) for i in range(len(pixels_id)): values = dico[i].get(f"values_{i}", None) errors = dico[i].get(f"errors_{i}", None) fit_status = dico[i].get(f"fit_status_{i}", None) if not ((values is None) or (errors is None) or (fit_status is None)): if return_fit_array: fit_array[i] = fit_status index = np.argmax(self._results.pixels_id == pixels_id[i]) if len(values) != len(chi2_sig.parameters): e = Exception( "the size out the minuit output parameters values array does " "not fit the signature of the minimized cost function" ) log.error(e, exc_info=True) raise e for j, key in enumerate(chi2_sig.parameters): if key != "index": self._results[key][index][0] = values[j] self._results[key][index][1] = errors[j] self._results[key][index][2] = errors[j] if key == "mean": self._results.high_gain[index][0] = values[j] self._results.high_gain[index][1] = errors[j] self._results.high_gain[index][2] = errors[j] self._results.is_valid[index] = ( fit_status["is_valid"] and not (fit_status["has_parameters_at_limit"]) and fit_status["has_valid_parameters"] ) if fit_status["has_reached_call_limit"]: self.log.warning( f"The minuit fit for pixel {pixels_id[i]} reached the call " f"limit" ) self._results.likelihood[index] = fit_status["values"] ndof = ( self._counts.data[index][~self._counts.mask[index]].shape[0] - fit_status["nfit"] ) self._results.p_value[index] = Statistics.chi2_pvalue( ndof, fit_status["values"] ) if return_fit_array: return fit_array @staticmethod def _NG_Likelihood_Chi2( pp: float, res: float, mu2: float, n: float, muped: float, sigped: float, lum: float, charge: np.ndarray, counts: np.ndarray, **kwargs, ): """Calculates the chi-square value using the MPE2 function. The different parameters are explained in `Caroff et al. (2019) <CAROFF>`_. .. _CAROFF: https://ui.adsabs.harvard.edu/abs/2019SPIE11119E..1WC Parameters ---------- pp : float The pp parameter. res : float The res parameter. mu2 : float The mu2 parameter. n : float The n parameter. muped : float The muped parameter. sigped : float The sigped parameter. lum : float The lum parameter. charge : np.ndarray An array of charge values. counts : np.ndarray An array of count values. ``**kwargs`` Additional keyword arguments. Returns ------- Lik : float The chi-square value. """ if not (kwargs.get("ntotalPE", False)): for i in range(1000): if gammainc(i + 1, lum) < 1e-5: ntotalPE = i break kwargs.update({"ntotalPE": ntotalPE}) pdf = MPE2(charge, pp, res, mu2, n, muped, sigped, lum, **kwargs) # log.debug(f"pdf : {np.sum(pdf)}") Ntot = np.sum(counts) # log.debug(f'Ntot : {Ntot}') mask = counts > 0 Lik = np.sum( ((pdf * Ntot - counts)[mask]) ** 2 / counts[mask] ) # 2 times faster return Lik # @njit(parallel=True,nopython = True) def _make_minuitParameters_array_from_parameters( self, pixels_id: np.ndarray = None, **kwargs ) -> np.ndarray: """Create an array of Minuit fit instances based on the parameters and data for each pixel. Parameters ---------- pixels_id : optional An array of pixel IDs. If not provided, all pixels will be used. Returns ------- minuitParameters_array : np.ndarray: An array of Minuit fit instances, one for each pixel. """ if pixels_id is None: npix = self.npixels pixels_id = self.pixels_id else: npix = len(pixels_id) minuitParameters_array = np.empty((npix), dtype=np.object_) for i, _id in enumerate(pixels_id): index = np.where(self.pixels_id == _id)[0][0] parameters = self._update_parameters( self.parameters, self._charge[index].data[~self._charge[index].mask], self._counts[index].data[~self._charge[index].mask], pixel_id=_id, **dict(kwargs, display=self.display_toggle), ) index_parameter = Parameter(name="index", value=index, frozen=True) parameters.append(index_parameter) minuitParameters = UtilsMinuit.make_minuit_par_kwargs(parameters) minuitParameters_array[i] = minuitParameters # self.log.debug(fit_array[i].values) # self.log.debug(fit_array[i].limits) # self.log.debug(fit_array[i].fixed) return minuitParameters_array
[docs] @staticmethod def run_fit(i: int, tol: float) -> dict: """Perform a fit on a specific pixel using the Minuit package. Parameters ---------- i : int The index of the pixel to perform the fit on. Returns ------- : dict A dictionary containing the fit values and errors for the specified pixel. The keys are ``values_i`` and ``errors_i``, where ``i`` is the index of the pixel. """ log = logging.getLogger(__name__) log.setLevel(logging.INFO) log.info("Starting") minuit_kwargs = { parname: _minuitParameters_array[i]["values"][parname] for parname in _minuitParameters_array[i]["values"] } log.info(f"creation of fit instance for pixel: {minuit_kwargs['index']}") fit = Minuit(_chi2, **minuit_kwargs) fit.errordef = Minuit.LIKELIHOOD fit.strategy = 0 fit.tol = tol fit.print_level = 1 fit.throw_nan = True UtilsMinuit.set_minuit_parameters_limits_and_errors( fit, _minuitParameters_array[i] ) log.info("fit created") fit.migrad() fit.hesse() _values = np.array([params.value for params in fit.params]) _errors = np.array([params.error for params in fit.params]) _fit_status = { "is_valid": fit.fmin.is_valid, "has_valid_parameters": fit.fmin.has_valid_parameters, "has_parameters_at_limit": fit.fmin.has_parameters_at_limit, "has_reached_call_limit": fit.fmin.has_reached_call_limit, "nfit": fit.nfit, "values": fit.fcn(fit.values), } log.info("Finished") return { f"values_{i}": _values, f"errors_{i}": _errors, f"fit_status_{i}": _fit_status, }
def run( self, pixels_id: np.ndarray = None, **kwargs, ) -> np.ndarray: self.log.info("running maker") self.log.info("checking asked pixels id") if pixels_id is None: pixels_id = self.pixels_id npix = self.npixels else: self.log.debug("checking that asked pixels id are in data") pixels_id = np.asarray(pixels_id) mask = np.array([_id in self.pixels_id for _id in pixels_id], dtype=bool) if False in mask: self.log.debug( f"The following pixels are not in data : {pixels_id[~mask]}" ) pixels_id = pixels_id[mask] npix = len(pixels_id) if npix == 0: self.log.warning("The asked pixels id are all out of the data") return None else: self.log.info("creation of the minuit parameters array") minuitParameters_array = self._make_minuitParameters_array_from_parameters( pixels_id=pixels_id, **kwargs ) self.log.info("running fits") with ContextFit( __class__, minuitParameters_array, self._charge, self._counts ): if self.multiproc: try: mp.set_start_method("spawn", force=True) self.log.info("spawned multiproc") except RuntimeError: pass nproc = kwargs.get("nproc", self.nproc) chunksize = kwargs.get( "chunksize", max(self.chunksize, npix // (nproc * 10)), ) self.log.info(f"pooling with nproc {nproc}, chunksize {chunksize}") t = time.time() with Pool( nproc, initializer=init_processes, initargs=( __class__, minuitParameters_array, self._charge, self._counts, ), ) as pool: self.log.info( f"time to create the Pool is {time.time() - t:.2e} sec" ) result = pool.starmap_async( __class__.run_fit, [(i, kwargs.get("tol", self.tol)) for i in range(npix)], chunksize=chunksize, ) result.wait() try: res = result.get() except Exception as e: log.error(e, exc_info=True) raise e self.log.info( f"total time for multiproc with starmap_async execution is " f"{time.time() - t:.2e} sec" ) else: self.log.info("running in mono-cpu") t = time.time() res = [ __class__.run_fit(i, kwargs.get("tol", self.tol)) for i in range(npix) ] self.log.info( f"time for singleproc execution is {time.time() - t:.2e} sec" ) self.log.info("filling result table from fits results") output = self._fill_results_table_from_dict( res, pixels_id, return_fit_array=True ) if self.display_toggle: self.log.info("plotting") t = time.time() self.display(pixels_id, **kwargs) log.info( f"time for plotting {len(pixels_id)} pixels : " f"{time.time() - t:.2e} sec" ) return output def plot_single_pyqtgraph( pixel_id: int, charge: np.ndarray, counts: np.ndarray, pp: float, resolution: float, gain: float, gain_error: float, n: float, pedestal: float, pedestalWidth: float, luminosity: float, likelihood: float, ) -> tuple: # from pyqtgraph.Qt import QtCore, QtGui # app = pg.mkQApp(name="minimal") # Create a window win = pg.GraphicsLayoutWidget(show=False) win.setWindowTitle(f"SPE fit pixel id : {pixel_id}") # Add a plot to the window plot = win.addPlot(title=f"SPE fit pixel id : {pixel_id}") plot.addLegend() error = pg.ErrorBarItem( x=charge, y=counts, top=np.sqrt(counts), bottom=np.sqrt(counts), beam=0.5 ) plot.addItem(error) plot.plot( x=charge, y=np.trapz(counts, charge) * MPE2( charge, pp, resolution, gain, n, pedestal, pedestalWidth, luminosity, ), name="SPE model fit", ) legend = pg.TextItem( f"SPE model fit gain : {gain - gain_error:.2f} < {gain:.2f} < " f"{gain + gain_error:.2f} ADC/pe,\n likelihood : {likelihood:.2f}", color=(200, 200, 200), ) legend.setPos(pedestal, np.max(counts) / 2) font = QtGui.QFont() font.setPointSize(12) legend.setFont(font) legend.setTextWidth(500) plot.addItem(legend) label_style = {"color": "#EEE", "font-size": "12pt"} plot.setLabel("bottom", "Charge (ADC)", **label_style) plot.setLabel("left", "Events", **label_style) plot.setRange( xRange=[ pedestal - 6 * pedestalWidth, np.quantile(charge.data[~charge.mask], 0.84), ] ) # ax.legend(fontsize=18) return win
[docs] def plot_single_matplotlib( pixel_id: int, charge: np.ndarray, counts: np.ndarray, pp: float, resolution: float, gain: float, gain_error: float, n: float, pedestal: float, pedestalWidth: float, luminosity: float, likelihood: float, **kwargs, ) -> tuple: """Generate a plot of the data and a model fit for a specific pixel. The different parameters are explained in `Caroff et al. (2019) <CAROFF>`_. .. _CAROFF: https://ui.adsabs.harvard.edu/abs/2019SPIE11119E..1WC Parameters ---------- pixel_id: int The ID of the pixel for which the plot is generated. charge: np.ndarray An array of charge values. counts: np.ndarray An array of event counts corresponding to the charge values. pp: float The value of the ``pp`` parameter. resolution: float The value of the ``resolution`` parameter. gain: float The value of the ``gain`` parameter. gain_error: float The value of the ``gain_error`` parameter. n: float The value of the ``n`` parameter. pedestal: float The value of the ``pedestal`` parameter. pedestalWidth: float The value of the ``pedestalWidth`` parameter. luminosity: float The value of the ``luminosity`` parameter. likelihood: float The value of the ``likelihood`` parameter. Returns ------- : tuple A tuple containing the generated plot figure and the axes of the plot. """ if kwargs.get("ax", False) and kwargs.get("fig", False): fig = kwargs.get("fig") ax = kwargs.get("ax") else: fig, ax = plt.subplots(1, 1, figsize=(8, 8)) ax.errorbar(charge, counts, np.sqrt(counts), zorder=0, fmt=".", label="data") ax.plot( charge, np.trapz(counts, charge) * MPE2( charge, pp, resolution, gain, n, pedestal, pedestalWidth, luminosity, ), zorder=1, linewidth=2, label=f"SPE model fit \n gain : {gain - gain_error:.2f} < {gain:.2f} < " f"{gain + gain_error:.2f} ADC/pe,\n likelihood : {likelihood:.2f}", ) ax.set_xlabel("Charge (ADC)", size=15) ax.set_ylabel("Events", size=15) ax.set_title(f"SPE fit pixel id : {pixel_id}") ax.set_xlim([pedestal - 6 * pedestalWidth, np.max(charge)]) ax.legend(fontsize=18) return fig, ax
[docs] def display(self, pixels_id: np.ndarray, package="pyqtgraph", **kwargs) -> None: """Display and save the plot for each specified pixel ID. Parameters ---------- pixels_id: np.ndarray An array of pixel IDs. package: str the package used to plot, can be matplotlib or pyqtgraph. Default to pyqtgraph kwargs Additional keyword arguments. figpath : str The path to save the generated plot figures. Defaults to ``/tmp/NectarGain_pid{os.getpid()}``. """ figpath = kwargs.get( "figpath", f"{os.environ.get('NECTARCHAIN_FIGURES','/tmp')}/" f"NectarGain_pid{os.getpid()}", ) self.log.debug(f"saving figures in {figpath}") os.makedirs(figpath, exist_ok=True) if package == "matplotlib": matplotlib.use("TkAgg") for _id in pixels_id: index = np.argmax(self._results.pixels_id == _id) fig, ax = __class__.plot_single_matplotlib( _id, self._charge[index], self._counts[index], self._results.pp[index][0], self._results.resolution[index][0], self._results.high_gain[index][0], self._results.high_gain[index][1:].mean(), self._results.n[index][0], self._results.pedestal[index][0], self._results.pedestalWidth[index][0], self._results.luminosity[index][0], self._results.likelihood[index], ) fig.savefig(f"{figpath}/fit_SPE_pixel{_id}.pdf") fig.clf() plt.close(fig) del fig, ax elif package == "pyqtgraph": for _id in pixels_id: index = np.argmax(self._results.pixels_id == _id) try: widget = None widget = __class__.plot_single_pyqtgraph( _id, self._charge[index], self._counts[index], self._results.pp[index][0], self._results.resolution[index][0], self._results.high_gain[index][0], self._results.high_gain[index][1:].mean(), self._results.n[index][0], self._results.pedestal[index][0], self._results.pedestalWidth[index][0], self._results.luminosity[index][0], self._results.likelihood[index], ) exporter = pg.exporters.ImageExporter(widget.getItem(0, 0)) exporter.parameters()["width"] = 1000 exporter.export(f"{figpath}/fit_SPE_pixel{_id}.png") except Exception as e: log.warning(e, exc_info=True) finally: del widget
[docs] class SPEHHValgorithm(SPEnominalalgorithm): """Class to perform fit of the SPE HHV signal with ``n`` and ``pp`` free.""" parameters_file = Unicode( "parameters_SPEHHV.yaml", read_only=True, help="The name of the SPE fit parameters file", ).tag(config=True) tol = Float( 1e5, help="The tolerance used for minuit", read_only=True, ).tag(config=True)
[docs] class SPEnominalStdalgorithm(SPEnominalalgorithm): """Class to perform fit of the SPE signal with ``n`` and ``pp`` fixed.""" parameters_file = Unicode( "parameters_SPEnominalStd.yaml", read_only=True, help="The name of the SPE fit parameters file", ).tag(config=True) def __init__( self, pixels_id: np.ndarray, charge: np.ndarray, counts: np.ndarray, config=None, parent=None, **kwargs, ) -> None: """Initializes a new instance of the FlatFieldSingleHHVStdSPEMaker class. Parameters ---------- charge : np.ndarray The charge data. counts : np.ndarray The counts data. ``*args`` Additional positional arguments. ``**kwargs`` Additional keyword arguments. """ super().__init__( pixels_id=pixels_id, charge=charge, counts=counts, config=config, parent=parent, **kwargs, ) self.__fix_parameters() def __fix_parameters(self) -> None: """Fixes the values of the ``n`` and ``pp`` parameters by setting their frozen attribute to True.""" pp = self._parameters["pp"] n = self._parameters["n"] pp.frozen = True n.frozen = True self.log.info(f"updating parameters by fixing pp={pp} and n={n}")
[docs] class SPEHHVStdalgorithm(SPEnominalStdalgorithm): parameters_file = Unicode( "parameters_SPEHHVStd.yaml", read_only=True, help="The name of the SPE fit parameters file", ).tag(config=True) tol = Float( 1e5, help="The tolerance used for minuit", read_only=True, ).tag(config=True)
[docs] class SPECombinedalgorithm(SPEnominalalgorithm): parameters_file = Unicode( "parameters_SPECombined_fromHHVFit.yaml", read_only=True, help="The name of the SPE fit parameters file", ).tag(config=True) tol = Float( 1e-1, help="The tolerance used for minuit", read_only=True, ).tag(config=True) SPE_result = Path( help="the path of the SPE result container computed with " "very high voltage data", ).tag(config=True) same_luminosity = Bool( help="if the luminosity is the same between high voltage and low voltage runs", default_value=False, ).tag(config=True) def __init__( self, pixels_id: np.ndarray, charge: np.ndarray, counts: np.ndarray, config=None, parent=None, **kwargs, ) -> None: """Initializes a new instance of the FlatFieldSingleHHVStdSPEMaker class. Parameters ---------- charge : np.ndarray The charge data. counts : np.ndarray The counts data. ``*args`` Additional positional arguments. ``**kwargs`` Additional keyword arguments. """ super().__init__( pixels_id=pixels_id, charge=charge, counts=counts, config=config, parent=parent, **kwargs, ) self.__fix_parameters() self._nectarGainSPEresult = next(SPEfitContainer.from_hdf5(self.SPE_result)) if ( len( pixels_id[ np.in1d( pixels_id, self._nectarGainSPEresult.pixels_id[ self._nectarGainSPEresult.is_valid ], ) ] ) == 0 ): self.log.warning( "The intersection between pixels id from the data and those valid from " "the SPE fit result is empty" ) def __fix_parameters(self) -> None: """Fixes the parameters ``n``, ``pp``, ``res``, and possibly ``luminosity``. Parameters ---------- same_luminosity : bool Whether to fix the luminosity parameter. """ pp = self._parameters["pp"] pp.frozen = True n = self._parameters["n"] n.frozen = True resolution = self._parameters["resolution"] resolution.frozen = True self.log.info( f"updating parameters by fixing pp={pp}, n={n} and res={resolution}" ) if self.same_luminosity: self.log.info("fixing luminosity") luminosity = self._parameters["luminosity"] luminosity.frozen = True def _make_minuitParameters_array_from_parameters( self, pixels_id: np.ndarray = None, **kwargs ) -> np.ndarray: """Generates the fit array from the fixed parameters and the fitted data obtained from a 1400V run. Parameters ---------- pixels_id : array-like, optional The pixels to generate the fit array for. Defaults to None. ``**kwargs`` Arbitrary keyword arguments. Returns ------- : array-like The fit array. """ return super()._make_minuitParameters_array_from_parameters( pixels_id=pixels_id, nectarGainSPEresult=self._nectarGainSPEresult, **kwargs, ) @staticmethod def _update_parameters( parameters: Parameters, charge: np.ndarray, counts: np.ndarray, pixel_id, nectarGainSPEresult: QTable, **kwargs, ): """Updates the parameters with the fixed values from the fitted data obtained from a 1400V run. Parameters ---------- parameters : Parameters The parameters to update. charge : np.ndarray The charge values. counts : np.ndarray The counts values. pixel_id : int The pixel ID. nectarGainSPEresult : QTable The fitted data obtained from a 1400 V run. ``**kwargs`` Arbitrary keyword arguments. Returns ------- param : dict The updated parameters. """ param = super(__class__, __class__)._update_parameters( parameters, charge, counts, **kwargs ) luminosity = param["luminosity"] resolution = param["resolution"] # pp = param["pp"] # n = param["n"] index = np.where(pixel_id == nectarGainSPEresult.pixels_id)[0][0] resolution.value = nectarGainSPEresult.resolution[index][0] # I prefer to keep pp and n from the yaml config file to better know what we do # pp.value = nectarGainSPEresult.pp[index][0] # n.value = nectarGainSPEresult.n[index][0] if luminosity.frozen: luminosity.value = nectarGainSPEresult.luminosity[index].value return param def run( self, pixels_id: np.ndarray = None, **kwargs, ) -> np.ndarray: if pixels_id is None: pixels_id = self._nectarGainSPEresult.pixels_id[ self._nectarGainSPEresult.is_valid ] else: pixels_id = np.asarray(pixels_id)[ np.in1d( pixels_id, self._nectarGainSPEresult.pixels_id[ self._nectarGainSPEresult.is_valid ], ) ] return super().run(pixels_id=pixels_id, **kwargs)