Source code for nectarchain.utils.utils

import importlib
import logging
import math

import numpy as np
from ctapipe.core.component import Component
from iminuit import Minuit
from scipy import interpolate, signal
from scipy.special import gammainc
from scipy.stats import chi2, norm

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers


[docs] class ComponentUtils: @staticmethod def is_in_non_abstract_subclasses( component: Component, motherClass="NectarCAMComponent" ): from nectarchain.makers.component.core import NectarCAMComponent # noqa: F401 # module = importlib.import_module(f'nectarchain.makers.component.core') is_in = False if isinstance(component, eval(f"{motherClass}")): is_in = True else: for _, value in eval(motherClass).non_abstract_subclasses().items(): is_in = np.logical_or(is_in, component == value) return is_in @staticmethod def get_specific_traits(component: Component): importlib.import_module(f"{component.__module__}") traits_dict = component.class_traits() if ComponentUtils.is_in_non_abstract_subclasses( component, "NectarCAMComponent" ) and not (component.SubComponents.default_value is None): for component_name in component.SubComponents.default_value: # CPT _class = getattr( importlib.import_module("nectarchain.makers.component"), component_name, ) traits_dict.update(ComponentUtils.get_specific_traits(_class)) traits_dict.pop("config", True) traits_dict.pop("parent", True) return traits_dict @staticmethod def get_configurable_traits(component: Component): traits_dict = ComponentUtils.get_specific_traits(component) output_traits_dict = traits_dict.copy() for key, item in traits_dict.items(): if item.read_only: output_traits_dict.pop(key) return output_traits_dict @staticmethod def get_class_name_from_ComponentName(componentName: str): from nectarchain.makers.component.core import NectarCAMComponent for class_name, _class in NectarCAMComponent.non_abstract_subclasses().items(): if componentName in class_name: return _class raise ValueError( "componentName is not a valid component, this component is not known as a " "child of NectarCAMComponent" )
[docs] class multiprocessing: @staticmethod def custom_error_callback(error): log.error(f"Got an error: {error}") log.error(error, exc_info=True)
[docs] class Statistics: @staticmethod def chi2_pvalue(ndof: int, likelihood: float): return 1 - chi2(df=ndof).cdf(likelihood)
[docs] class UtilsMinuit:
[docs] @staticmethod def make_minuit_par_kwargs(parameters): """Create *Parameter Keyword Arguments* for the `Minuit` constructor. updated for Minuit >2.0 """ names = parameters.parnames kwargs = {"names": names, "values": {}} for parameter in parameters.parameters: kwargs["values"][parameter.name] = parameter.value min_ = None if np.isnan(parameter.min) else parameter.min max_ = None if np.isnan(parameter.max) else parameter.max error = 0.1 if np.isnan(parameter.error) else parameter.error kwargs[f"limit_{parameter.name}"] = (min_, max_) kwargs[f"error_{parameter.name}"] = error if parameter.frozen: kwargs[f"fix_{parameter.name}"] = True return kwargs
[docs] @staticmethod def set_minuit_parameters_limits_and_errors(m: Minuit, parameters: dict): """Function to set minuit parameter limits and errors with Minuit >2.0 Args: m (Minuit): a Minuit instance parameters (dict): dict containing parameters names, limits errors and values. """ for name in parameters["names"]: m.limits[name] = parameters[f"limit_{name}"] m.errors[name] = parameters[f"error_{name}"] if parameters.get(f"fix_{name}", False): m.fixed[name] = True
# Useful functions for the fit
[docs] def gaussian(x, mu, sig): # return (1./(sig*np.sqrt(2*math.pi))) * # np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) return norm.pdf(x, loc=mu, scale=sig)
[docs] def weight_gaussian(x, N, mu, sig): return N * gaussian(x, mu, sig)
[docs] def doubleGauss(x, sig1, mu2, sig2, p): return p * 2 * gaussian(x, 0, sig1) + (1 - p) * gaussian(x, mu2, sig2)
[docs] def PMax(r): """p_{max} in equation 6 in Caroff et al. (2019) Args: r (float): SPE resolution Returns: float : p_{max} """ if r > np.sqrt((np.pi - 2) / 2): pmax = np.pi / (2 * (r**2 + 1)) else: pmax = np.pi * r**2 / (np.pi * r**2 + np.pi - 2 * r**2 - 2) return pmax
[docs] def ax(p, res): """a in equation 4 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution Returns: float : a """ return (2 / np.pi) * p**2 - p / (res**2 + 1)
[docs] def bx(p, mu2): """b in equation 4 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) mu2 (float): position of the high charge Gaussian Returns: float : b """ return np.sqrt(2 / np.pi) * 2 * p * (1 - p) * mu2
[docs] def cx(sig2, mu2, res, p): """c in equation 4 in Caroff et al. (2019) Note : There is a typo in the article 1-p**2 -> (1-p)**2 Args: sig2 (float): width of the high charge Gaussian mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : c """ return (1 - p) ** 2 * mu2**2 - (1 - p) * (sig2**2 + mu2**2) / (res**2 + 1)
[docs] def delta(p, res, sig2, mu2): """well known delta in 2nd order polynom Args: p (_type_): _description_ res (_type_): _description_ sig2 (_type_): _description_ mu2 (_type_): _description_ Returns: float : b**2 - 4*a*c """ return bx(p, mu2) * bx(p, mu2) - 4 * ax(p, res) * cx(sig2, mu2, res, p)
[docs] def ParamU(p, r): """d in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) r (float): SPE resolution Returns: float : d """ return (8 * (1 - p) ** 2 * p**2) / np.pi - 4 * ( 2 * p**2 / np.pi - p / (r**2 + 1) ) * ((1 - p) ** 2 - (1 - p) / (r**2 + 1))
[docs] def ParamS(p, r): """e in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) r (float): SPE resolution Returns: float : e """ e = (4 * (2 * p**2 / np.pi - p / (r**2 + 1)) * (1 - p)) / (r**2 + 1) return e
[docs] def SigMin(p, res, mu2): """sigma_{high,min} in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution mu2 (float): position of the high charge Gaussian Returns: float : sigma_{high,min} """ return mu2 * np.sqrt( (-ParamU(p, res) + (bx(p, mu2) ** 2 / mu2**2)) / (ParamS(p, res)) )
[docs] def SigMax(p, res, mu2): """sigma_{high,min} in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution mu2 (float): position of the high charge Gaussian Returns: float : sigma_{high,min} """ temp = (-ParamU(p, res)) / (ParamS(p, res)) if temp < 0: err = ValueError("-d/e must be < 0") log.error(err, exc_info=True) raise err else: return mu2 * np.sqrt(temp)
[docs] def sigma1(p, res, sig2, mu2): """sigma_{low} in equation 5 in Caroff et al. (2019) Args: sig2 (float): width of the high charge Gaussian mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : sigma_{low} """ return (-bx(p, mu2) + np.sqrt(delta(p, res, sig2, mu2))) / (2 * ax(p, res))
[docs] def sigma2(n, p, res, mu2): """sigma_{high} in equation 7 in Caroff et al. (2019) Args: n (float): parameter n in equation mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : sigma_{high} """ if (-ParamU(p, res) + (bx(p, mu2) ** 2 / mu2**2)) / (ParamS(p, res)) > 0: return SigMin(p, res, mu2) + n * (SigMax(p, res, mu2) - SigMin(p, res, mu2)) else: return n * SigMax(p, res, mu2)
# The real final model callign all the above for luminosity (lum) + PED, wil return # probability of number of Spe
[docs] def MPE2(x, pp, res, mu2, n, muped, sigped, lum, **kwargs): log.debug( f"pp = {pp}, res = {res}, mu2 = {mu2}, n = {n}, muped = {muped}, " f"sigped = {sigped}, lum = {lum}" ) f = 0 ntotalPE = kwargs.get("ntotalPE", 0) if ntotalPE == 0: # about 1sec for i in range(1000): if gammainc(i + 1, lum) < 1e-5: ntotalPE = i break # print(ntotalPE) # about 8 sec, 1 sec by nPEPDF call # for i in range(ntotalPE): # f = f + ((lum**i)/math.factorial(i)) * np.exp(-lum) * # nPEPDF(x,pp,res,mu2,n,muped,sigped,i,int(mu2*ntotalPE+10*mu2)) f = np.sum( [ (lum**i) / math.factorial(i) * np.exp(-lum) * nPEPDF( x, pp, res, mu2, n, muped, sigped, i, int(mu2 * ntotalPE + 10 * mu2) ) for i in range(ntotalPE) ], axis=0, ) # 10 % faster return f
# Fnal model shape/function (for one SPE)
[docs] def doubleGaussConstrained(x, pp, res, mu2, n): p = pp * PMax(res) sig2 = sigma2(n, p, res, mu2) sig1 = sigma1(p, res, sig2, mu2) return doubleGauss(x, sig1, mu2, sig2, p)
# Get the gain from the parameters model
[docs] def Gain(pp, res, mu2, n): """analytic gain computatuon Args: mu2 (float): position of the high charge Gaussian res (float): SPE resolution pp (float): p' in equation 7 in Caroff et al. (2019) n (float): n in equation 7 in Caroff et al. (2019) Returns: float : gain """ p = pp * PMax(res) sig2 = sigma2(n, p, res, mu2) return (1 - p) * mu2 + 2 * p * sigma1(p, res, sig2, mu2) / np.sqrt(2 * np.pi)
[docs] def nPEPDF(x, pp, res, mu2, n, muped, sigped, nph, size_charge): allrange = np.linspace(-1 * size_charge, size_charge, size_charge * 2) spe = [] # about 2 sec this is the main pb # for i in range(len(allrange)): # if (allrange[i]>=0): # spe.append(doubleGaussConstrained(allrange[i],pp,res,mu2,n)) # else: # spe.append(0) spe = doubleGaussConstrained(allrange, pp, res, mu2, n) * ( allrange >= 0 * np.ones(allrange.shape) ) # 100 times faster # ~ plt.plot(allrange,spe) # npe = semi_gaussian(allrange, muped, sigped) npe = gaussian(allrange, 0, sigped) # ~ plt.plot(allrange,npe) # ~ plt.show() for i in range(nph): # npe = np.convolve(npe,spe,"same") npe = signal.fftconvolve(npe, spe, "same") # ~ plt.plot(allrange,npe) # ~ plt.show() fff = interpolate.UnivariateSpline(allrange, npe, ext=1, k=3, s=0) norm = np.trapz(fff(allrange), allrange) return fff(x - muped) / norm