Source code for nectarchain.utils.utils

import importlib
import logging
import math
from typing import Sequence, Type, Union

import numpy as np
from ctapipe.core.component import Component
from ctapipe.core.traits import Path
from ctapipe_io_nectarcam import constants
from iminuit import Minuit
from scipy import interpolate, signal
from scipy.special import gammainc
from scipy.stats import chi2, norm

from ..data.container.core import NectarCAMContainer

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers


[docs] class ComponentUtils: @staticmethod def is_in_non_abstract_subclasses( component: Component, motherClass="NectarCAMComponent" ): from nectarchain.makers.component.core import NectarCAMComponent # noqa: F401 # module = importlib.import_module(f'nectarchain.makers.component.core') is_in = False if isinstance(component, eval(f"{motherClass}")): is_in = True else: for _, value in eval(motherClass).non_abstract_subclasses().items(): is_in = np.logical_or(is_in, component == value) return is_in @staticmethod def get_specific_traits(component: Component): importlib.import_module(f"{component.__module__}") traits_dict = component.class_traits() if ComponentUtils.is_in_non_abstract_subclasses( component, "NectarCAMComponent" ) and not (component.SubComponents.default_value is None): for component_name in component.SubComponents.default_value: # CPT _class = getattr( importlib.import_module("nectarchain.makers.component"), component_name, ) traits_dict.update(ComponentUtils.get_specific_traits(_class)) traits_dict.pop("config", True) traits_dict.pop("parent", True) return traits_dict @staticmethod def get_configurable_traits(component: Component): traits_dict = ComponentUtils.get_specific_traits(component) output_traits_dict = traits_dict.copy() for key, item in traits_dict.items(): if item.read_only: output_traits_dict.pop(key) return output_traits_dict @staticmethod def get_class_name_from_ComponentName(componentName: str): from nectarchain.makers.component.core import NectarCAMComponent for class_name, _class in NectarCAMComponent.non_abstract_subclasses().items(): if componentName in class_name: return _class raise ValueError( "componentName is not a valid component, this component is not known as a " "child of NectarCAMComponent" )
[docs] class ContainerUtils:
[docs] @staticmethod def add_missing_pixels_to_container( container: NectarCAMContainer, pad_value=np.nan ): """ Pads fields of `~nectarchain.data.container.core.NectarCAMContainer` with missing pixels (due to e.g. an incomplete camera) with chosen value. For boolean arrays related to pixel status, zero/one-padding is applied appriopriately. """ # Make sure the container has `pixels_id` values try: pixels_id_input = container.pixels_id except Exception as e: raise ValueError(f"{container} has no field named `pixels_id`: {e}") # Do nothing if there are no missing pixels if len(pixels_id_input) == constants.N_PIXELS: log.info("Input container already contains data for all pixels!") return log.info( f"Input container contains data for " f"{len(pixels_id_input)}/{constants.N_PIXELS} pixels, " f"will add missing pixels and fill missing-pixel data with {pad_value}" ) log.debug(f"Original container: {container}") for name, field in zip(container.keys(), container.values()): # Update the pixels_id with the full camera if name == "pixels_id": setattr( container, "pixels_id", constants.PIXEL_INDEX.astype(pixels_id_input.dtype), ) elif isinstance(field, np.ndarray): # Find pixel axis if there is one pixel_axis = None for i, dim in enumerate(field.shape): if dim == len(pixels_id_input): pixel_axis = i break if pixel_axis is None: continue # Reshape fields to full camera with NaN values for missing pixels # For fields related to pixel status, apply zero/one-padding shape_new_field = list(field.shape) shape_new_field[pixel_axis] = constants.N_PIXELS # Pixel status in NectarCAMPedestalContainer, FlatFieldContainer # NOTE: `bad_pixels` does not have a `pixel_axis` so nothing happens if name in ["pixel_mask", "bad_pixels"]: new_field = np.ones(shape_new_field, dtype=field.dtype) # Pixel status in GainContainer elif name in ["is_valid"]: new_field = np.zeros(shape_new_field, dtype=field.dtype) else: new_field = np.full(shape_new_field, pad_value, dtype=field.dtype) # Copy data in slices so that the correct axis is zero/one-padded # Also sorts the arrays in terms of `PIXEL_INDEX` pixel_pos = np.searchsorted(constants.PIXEL_INDEX, pixels_id_input) slc = [slice(None)] * field.ndim slc[pixel_axis] = pixel_pos new_field[tuple(slc)] = field # Update the container setattr(container, name, new_field) log.debug(f"Updated container: {container}") return
[docs] @staticmethod def get_container_from_hdf5( path: Path, container_classes: Union[ Type[NectarCAMContainer], Sequence[Type[NectarCAMContainer]] ], group_names: Union[str, Sequence[str]] = "data", ) -> NectarCAMContainer: """Wrapper function for `NectarCAMContainer.from_hdf5`. Loads a container for a given `path`, allowing to scan for: - different NectarCAMContainer subclasses; - different group names. For example, you may want to load the output of a gain calibration tool, but you don't know whether it is a `SPEFitContainer` or `GainContainer`. Or you may want to load a `NectarCAMPedestalContainer`, which can either have the group name "data" or "data_combined" depending on the slicing applied. Parameters ---------- path : Path Path to the HDF5 file. container_classes : Type[NectarCAMContainer] or sequence of such types One or more container subclasses to attempt loading. group_names : str or sequence of str, optional Group names to try inside the HDF5 structure. Default is "data". Returns ------- NectarCAMContainer The successfully loaded container. Raises ------ TypeError If `container_classes` contains items that are not NectarCAMContainer subclasses. ValueError If no container can be loaded from the file using any class/group combination. """ # Normalize inputs lists if isinstance(container_classes, type): container_classes = [container_classes] if isinstance(group_names, (str, bytes)): group_names = [group_names] # Validate input classes for _cls in container_classes: if not issubclass(_cls, NectarCAMContainer): raise TypeError( f"{_cls.__name__} is not a subclass of " f"{NectarCAMContainer.__name__}" ) exceptions = [] for _cls in container_classes: for group_name in group_names: try: container = next(_cls.from_hdf5(path, group_name=group_name)) log.info(f"Loaded {_cls.__name__} from {path}") return container except Exception as e: log.debug( f"Failed to load {_cls.__name__} with `group_name` = " f"'{group_name}': {e}" ) log.debug("Adding exception to `exceptions` list") exceptions.append((_cls.__name__, group_name, str(e))) # Raise an error if no container could be loaded raise ValueError( f"Failed to load any container from {path}. Tried: {exceptions}" )
[docs] class multiprocessing: @staticmethod def custom_error_callback(error): log.error(f"Got an error: {error}") log.error(error, exc_info=True)
[docs] class Statistics: @staticmethod def chi2_pvalue(ndof: int, likelihood: float): return 1 - chi2(df=ndof).cdf(likelihood)
[docs] class UtilsMinuit:
[docs] @staticmethod def make_minuit_par_kwargs(parameters): """Create *Parameter Keyword Arguments* for the `Minuit` constructor. updated for Minuit >2.0 """ names = parameters.parnames kwargs = {"names": names, "values": {}} for parameter in parameters.parameters: kwargs["values"][parameter.name] = parameter.value min_ = None if np.isnan(parameter.min) else parameter.min max_ = None if np.isnan(parameter.max) else parameter.max error = 0.1 if np.isnan(parameter.error) else parameter.error kwargs[f"limit_{parameter.name}"] = (min_, max_) kwargs[f"error_{parameter.name}"] = error if parameter.frozen: kwargs[f"fix_{parameter.name}"] = True return kwargs
[docs] @staticmethod def set_minuit_parameters_limits_and_errors(m: Minuit, parameters: dict): """Function to set minuit parameter limits and errors with Minuit >2.0 Args: m (Minuit): a Minuit instance parameters (dict): dict containing parameters names, limits errors and values. """ for name in parameters["names"]: m.limits[name] = parameters[f"limit_{name}"] m.errors[name] = parameters[f"error_{name}"] if parameters.get(f"fix_{name}", False): m.fixed[name] = True
# Useful functions for the fit
[docs] def gaussian(x, mu, sig): # return (1./(sig*np.sqrt(2*math.pi))) * # np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) return norm.pdf(x, loc=mu, scale=sig)
[docs] def weight_gaussian(x, N, mu, sig): return N * gaussian(x, mu, sig)
[docs] def doubleGauss(x, sig1, mu2, sig2, p): return p * 2 * gaussian(x, 0, sig1) + (1 - p) * gaussian(x, mu2, sig2)
[docs] def PMax(r): """p_{max} in equation 6 in Caroff et al. (2019) Args: r (float): SPE resolution Returns: float : p_{max} """ if r > np.sqrt((np.pi - 2) / 2): pmax = np.pi / (2 * (r**2 + 1)) else: pmax = np.pi * r**2 / (np.pi * r**2 + np.pi - 2 * r**2 - 2) return pmax
[docs] def ax(p, res): """a in equation 4 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution Returns: float : a """ return (2 / np.pi) * p**2 - p / (res**2 + 1)
[docs] def bx(p, mu2): """b in equation 4 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) mu2 (float): position of the high charge Gaussian Returns: float : b """ return np.sqrt(2 / np.pi) * 2 * p * (1 - p) * mu2
[docs] def cx(sig2, mu2, res, p): """c in equation 4 in Caroff et al. (2019) Note : There is a typo in the article 1-p**2 -> (1-p)**2 Args: sig2 (float): width of the high charge Gaussian mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : c """ return (1 - p) ** 2 * mu2**2 - (1 - p) * (sig2**2 + mu2**2) / (res**2 + 1)
[docs] def delta(p, res, sig2, mu2): """well known delta in 2nd order polynom Args: p (_type_): _description_ res (_type_): _description_ sig2 (_type_): _description_ mu2 (_type_): _description_ Returns: float : b**2 - 4*a*c """ return bx(p, mu2) * bx(p, mu2) - 4 * ax(p, res) * cx(sig2, mu2, res, p)
[docs] def ParamU(p, r): """d in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) r (float): SPE resolution Returns: float : d """ return (8 * (1 - p) ** 2 * p**2) / np.pi - 4 * ( 2 * p**2 / np.pi - p / (r**2 + 1) ) * ((1 - p) ** 2 - (1 - p) / (r**2 + 1))
[docs] def ParamS(p, r): """e in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) r (float): SPE resolution Returns: float : e """ e = (4 * (2 * p**2 / np.pi - p / (r**2 + 1)) * (1 - p)) / (r**2 + 1) return e
[docs] def SigMin(p, res, mu2): """sigma_{high,min} in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution mu2 (float): position of the high charge Gaussian Returns: float : sigma_{high,min} """ return mu2 * np.sqrt( (-ParamU(p, res) + (bx(p, mu2) ** 2 / mu2**2)) / (ParamS(p, res)) )
[docs] def SigMax(p, res, mu2): """sigma_{high,min} in equation 6 in Caroff et al. (2019) Args: p (float): proportion of the low charge component (2 gaussians model) res (float): SPE resolution mu2 (float): position of the high charge Gaussian Returns: float : sigma_{high,min} """ temp = (-ParamU(p, res)) / (ParamS(p, res)) if temp < 0: err = ValueError("-d/e must be < 0") log.error(err, exc_info=True) raise err else: return mu2 * np.sqrt(temp)
[docs] def sigma1(p, res, sig2, mu2): """sigma_{low} in equation 5 in Caroff et al. (2019) Args: sig2 (float): width of the high charge Gaussian mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : sigma_{low} """ return (-bx(p, mu2) + np.sqrt(delta(p, res, sig2, mu2))) / (2 * ax(p, res))
[docs] def sigma2(n, p, res, mu2): """sigma_{high} in equation 7 in Caroff et al. (2019) Args: n (float): parameter n in equation mu2 (float): position of the high charge Gaussian res (float): SPE resolution p (float): proportion of the low charge component (2 gaussians model) Returns: float : sigma_{high} """ if (-ParamU(p, res) + (bx(p, mu2) ** 2 / mu2**2)) / (ParamS(p, res)) > 0: return SigMin(p, res, mu2) + n * (SigMax(p, res, mu2) - SigMin(p, res, mu2)) else: return n * SigMax(p, res, mu2)
# The real final model callign all the above for luminosity (lum) + PED, wil return # probability of number of Spe
[docs] def MPE2(x, pp, res, mu2, n, muped, sigped, lum, **kwargs): log.debug( f"pp = {pp}, res = {res}, mu2 = {mu2}, n = {n}, muped = {muped}, " f"sigped = {sigped}, lum = {lum}" ) f = 0 ntotalPE = kwargs.get("ntotalPE", 0) if ntotalPE == 0: # about 1sec for i in range(1000): if gammainc(i + 1, lum) < 1e-5: ntotalPE = i break # print(ntotalPE) # about 8 sec, 1 sec by nPEPDF call # for i in range(ntotalPE): # f = f + ((lum**i)/math.factorial(i)) * np.exp(-lum) * # nPEPDF(x,pp,res,mu2,n,muped,sigped,i,int(mu2*ntotalPE+10*mu2)) f = np.sum( [ (lum**i) / math.factorial(i) * np.exp(-lum) * nPEPDF( x, pp, res, mu2, n, muped, sigped, i, int(mu2 * ntotalPE + 10 * mu2) ) for i in range(ntotalPE) ], axis=0, ) # 10 % faster return f
# Fnal model shape/function (for one SPE)
[docs] def doubleGaussConstrained(x, pp, res, mu2, n): p = pp * PMax(res) sig2 = sigma2(n, p, res, mu2) sig1 = sigma1(p, res, sig2, mu2) return doubleGauss(x, sig1, mu2, sig2, p)
# Get the gain from the parameters model
[docs] def Gain(pp, res, mu2, n): """analytic gain computatuon Args: mu2 (float): position of the high charge Gaussian res (float): SPE resolution pp (float): p' in equation 7 in Caroff et al. (2019) n (float): n in equation 7 in Caroff et al. (2019) Returns: float : gain """ p = pp * PMax(res) sig2 = sigma2(n, p, res, mu2) return (1 - p) * mu2 + 2 * p * sigma1(p, res, sig2, mu2) / np.sqrt(2 * np.pi)
[docs] def nPEPDF(x, pp, res, mu2, n, muped, sigped, nph, size_charge): allrange = np.linspace(-1 * size_charge, size_charge, size_charge * 2) spe = [] # about 2 sec this is the main pb # for i in range(len(allrange)): # if (allrange[i]>=0): # spe.append(doubleGaussConstrained(allrange[i],pp,res,mu2,n)) # else: # spe.append(0) spe = doubleGaussConstrained(allrange, pp, res, mu2, n) * ( allrange >= 0 * np.ones(allrange.shape) ) # 100 times faster # ~ plt.plot(allrange,spe) # npe = semi_gaussian(allrange, muped, sigped) npe = gaussian(allrange, 0, sigped) # ~ plt.plot(allrange,npe) # ~ plt.show() for i in range(nph): # npe = np.convolve(npe,spe,"same") npe = signal.fftconvolve(npe, spe, "same") # ~ plt.plot(allrange,npe) # ~ plt.show() fff = interpolate.UnivariateSpline(allrange, npe, ext=1, k=3, s=0) norm = np.trapz(fff(allrange), allrange) return fff(x - muped) / norm