Source code for nectarchain.makers.calibration.tests.test_pedestal_tool
import os
import tempfile
import numpy as np
import tables
from ctapipe.utils import get_dataset_path
from ctapipe_io_nectarcam.constants import N_SAMPLES
from nectarchain.data.container import NectarCAMPedestalContainers, PedestalFlagBits
from nectarchain.makers.calibration import PedestalNectarCAMCalibrationTool
runs = {
"Run number": [3938, 5288],
"Run file": [
get_dataset_path("NectarCAM.Run3938.30events.fits.fz"),
get_dataset_path("NectarCAM.Run5288.0001.fits.fz"),
],
"N pixels": [1834, 1848],
}
[docs]
class TestPedestalCalibrationTool:
[docs]
def test_base(self):
"""
Test basic functionality, including IO on disk
"""
# setup
n_slices = [3, 2]
events_per_slice = 10
max_events = [n_slices[0] * events_per_slice, 13]
expected_ucts_timestamp_min = [1674462932637854793, 1715007113924900896]
expected_ucts_timestamp_max = [1674462932695877994, 1715007123524920096]
for i, run_number in enumerate(runs["Run number"]):
run_file = runs["Run file"][i]
n_pixels = runs["N pixels"][i]
with tempfile.TemporaryDirectory() as tmpdirname:
outfile = tmpdirname + "/pedestal.h5"
try:
os.remove(outfile)
except FileNotFoundError:
pass
except Exception as e:
raise e
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events[i],
events_per_slice=events_per_slice,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method=None,
pixel_mask_nevents_min=1,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)
# Check output in memory
assert output.nsamples == N_SAMPLES
assert np.all(output.nevents == max_events[i])
assert np.shape(output.pixels_id) == (n_pixels,)
assert output.ucts_timestamp_min == np.uint64(
expected_ucts_timestamp_min[i]
)
assert output.ucts_timestamp_max == np.uint64(
expected_ucts_timestamp_max[i]
)
assert np.shape(output.pedestal_mean_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_mean_lg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_lg) == (n_pixels, N_SAMPLES)
assert np.allclose(output.pedestal_mean_hg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_mean_lg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_std_hg, 10, atol=10)
assert np.allclose(output.pedestal_std_lg, 3.0, atol=2.5)
assert np.allclose(output.pedestal_charge_mean_hg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_mean_lg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_std_hg, 50.0, atol=50)
assert np.allclose(output.pedestal_charge_std_lg, 50.0, atol=50)
# Check output on disk
pedestalContainers = NectarCAMPedestalContainers.from_hdf5(outfile)
j = 0
list_slices = []
for _pedestalContainer in pedestalContainers:
key = list(_pedestalContainer.containers.keys())[0]
if "combined" in key:
continue
pedestalContainer = _pedestalContainer.containers[key]
list_slices.append(int(key.split("data_")[1]))
assert pedestalContainer.nsamples == N_SAMPLES
assert np.allclose(
pedestalContainer.nevents, events_per_slice, atol=7
)
assert np.shape(pedestalContainer.pixels_id) == (n_pixels,)
assert np.shape(pedestalContainer.pedestal_mean_hg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_mean_lg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_std_hg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_std_lg) == (
n_pixels,
N_SAMPLES,
)
j += 1
assert (np.sort(list_slices) == np.arange(1, n_slices[i] + 1)).all()
# Check combined results
group_name = "data_combined"
with tables.open_file(outfile, mode="r") as f:
keys = list(f.root._v_children)
assert group_name in keys
pedestalContainers = next(
NectarCAMPedestalContainers.from_hdf5(
outfile, slice_index="combined"
)
)
pedestalContainer = pedestalContainers.containers[group_name]
assert pedestalContainer.nsamples == N_SAMPLES
assert np.all(pedestalContainer.nevents == max_events[i])
assert np.shape(pedestalContainer.pixels_id) == (n_pixels,)
assert pedestalContainer.ucts_timestamp_min == np.uint64(
expected_ucts_timestamp_min[i]
)
assert pedestalContainer.ucts_timestamp_max == np.uint64(
expected_ucts_timestamp_max[i]
)
assert np.shape(pedestalContainer.pedestal_mean_hg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_mean_lg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_std_hg) == (
n_pixels,
N_SAMPLES,
)
assert np.shape(pedestalContainer.pedestal_std_lg) == (
n_pixels,
N_SAMPLES,
)
assert np.allclose(pedestalContainer.pedestal_mean_hg, 245.0, atol=20.0)
assert np.allclose(pedestalContainer.pedestal_mean_lg, 245.0, atol=20.0)
assert np.allclose(pedestalContainer.pedestal_std_hg, 10, atol=10)
assert np.allclose(pedestalContainer.pedestal_std_lg, 3, atol=2.5)
assert np.allclose(
pedestalContainer.pedestal_charge_mean_hg, 14700.0, rtol=0.1
)
assert np.allclose(
pedestalContainer.pedestal_charge_mean_lg, 14700.0, rtol=0.1
)
assert np.allclose(
pedestalContainer.pedestal_charge_std_hg, 50.0, atol=50
)
assert np.allclose(
pedestalContainer.pedestal_charge_std_lg, 50.0, atol=50
)
[docs]
def test_timesel(self):
"""
Test time selection
"""
# setup
n_slices = [3, 2]
events_per_slice = 10
max_events = [n_slices[0] * events_per_slice, 13]
tmin = [1674462932637860000, 1715007113924900000]
tmax = [1674462932695700000, 1715007123524921000]
for i, _ in enumerate(runs["Run number"]):
run_number = runs["Run number"][i]
run_file = runs["Run file"][i]
n_pixels = runs["N pixels"][i]
with tempfile.TemporaryDirectory() as tmpdirname:
outfile = tmpdirname + "/pedestal.h5"
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events[i],
events_per_slice=events_per_slice,
log_level=0,
output_path=outfile,
ucts_tmin=tmin[i],
ucts_tmax=tmax[i],
overwrite=True,
filter_method=None,
pixel_mask_nevents_min=1,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)
# check output
assert output.nsamples == N_SAMPLES
assert np.all(output.nevents <= max_events[i])
assert np.shape(output.pixels_id) == (n_pixels,)
assert output.ucts_timestamp_min >= tmin[i]
assert output.ucts_timestamp_max <= tmax[i]
assert np.shape(output.pedestal_mean_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_mean_lg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_lg) == (n_pixels, N_SAMPLES)
assert np.allclose(output.pedestal_mean_hg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_mean_lg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_std_hg, 10, atol=10)
assert np.allclose(output.pedestal_std_lg, 3, atol=2.5)
assert np.allclose(output.pedestal_charge_mean_hg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_mean_lg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_std_hg, 50.0, atol=50)
assert np.allclose(output.pedestal_charge_std_lg, 50.0, atol=50)
[docs]
def test_WaveformsStdFilter(self):
"""
Test filtering based on waveforms standard dev
"""
# setup
n_slices = [3, 2]
events_per_slice = 10
max_events = [n_slices[0] * events_per_slice, 13]
for i, _ in enumerate(runs["Run number"]):
run_number = runs["Run number"][i]
run_file = runs["Run file"][i]
n_pixels = runs["N pixels"][i]
with tempfile.TemporaryDirectory() as tmpdirname:
outfile = tmpdirname + "/pedestal.h5"
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events[i],
events_per_slice=events_per_slice,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method="WaveformsStdFilter",
wfs_std_threshold=4.0,
pixel_mask_nevents_min=1,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)
# check output
assert output.nsamples == N_SAMPLES
assert np.all(output.nevents <= max_events[i])
assert np.shape(output.pixels_id) == (n_pixels,)
assert np.shape(output.pedestal_mean_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_mean_lg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_lg) == (n_pixels, N_SAMPLES)
assert np.allclose(output.pedestal_mean_hg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_mean_lg, 245.0, atol=20.0)
# verify that fluctuations are reduced
assert np.allclose(output.pedestal_std_hg, 4.0, atol=4.0)
assert np.allclose(output.pedestal_std_lg, 2.5, atol=3.0)
assert np.allclose(output.pedestal_charge_mean_hg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_mean_lg, 14700.0, rtol=0.1)
# verify that fluctuations are reduced
assert np.allclose(output.pedestal_charge_std_hg, 40.0, atol=50)
assert np.allclose(output.pedestal_charge_std_lg, 40.0, atol=50)
[docs]
def test_ChargeDistributionFilter(self):
"""
Test filtering based on waveforms charge distribution
"""
# setup
n_slices = [2, 1]
events_per_slice = 10
max_events = [n_slices[0] * events_per_slice - 1, 12]
for i, _ in enumerate(runs["Run number"]):
run_number = runs["Run number"][i]
run_file = runs["Run file"][i]
n_pixels = runs["N pixels"][i]
with tempfile.TemporaryDirectory() as tmpdirname:
outfile = tmpdirname + "/pedestal.h5"
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events[i],
events_per_slice=events_per_slice,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method="ChargeDistributionFilter",
charge_sigma_low_thr=1.0,
charge_sigma_high_thr=2.0,
pixel_mask_nevents_min=1,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)
# check output
assert output.nsamples == N_SAMPLES
assert np.all(output.nevents <= max_events[i])
assert np.shape(output.pixels_id) == (n_pixels,)
assert np.shape(output.pedestal_mean_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_mean_lg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_hg) == (n_pixels, N_SAMPLES)
assert np.shape(output.pedestal_std_lg) == (n_pixels, N_SAMPLES)
assert np.allclose(output.pedestal_mean_hg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_mean_lg, 245.0, atol=20.0)
assert np.allclose(output.pedestal_std_hg, 10.0, atol=10.0)
assert np.allclose(output.pedestal_std_lg, 3, atol=2.6)
assert np.allclose(output.pedestal_charge_mean_hg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_mean_lg, 14700.0, rtol=0.1)
assert np.allclose(output.pedestal_charge_std_hg, 50.0, atol=50)
assert np.allclose(output.pedestal_charge_std_lg, 50.0, atol=50)
[docs]
def test_pixel_mask(self):
"""
Test that bad pixels are correctly recognized and flagged
"""
# Flag number
# setup
max_events = 10
# just test one run to be faster
run_number = runs["Run number"][0]
run_file = runs["Run file"][0]
with tempfile.TemporaryDirectory() as tmpdirname:
outfile = tmpdirname + "/pedestal.h5"
# Condition on number of events
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method=None,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)[0]
# Check that all pixels were flagged as having not enough events
flag_bit = PedestalFlagBits.NEVENTS
assert np.all(output.pixel_mask & flag_bit == flag_bit)
# Check that other flags were not raised
flag_bits = [
PedestalFlagBits.MEAN_PEDESTAL,
PedestalFlagBits.STD_SAMPLE,
PedestalFlagBits.STD_PIXEL,
]
for flag_bit in flag_bits:
assert np.all(output.pixel_mask & flag_bit == 0)
# For all the following tests we set the acceptable values to a range
# out of what
# is normal. Since our test run is good we expect to flag all pixels
# Condition on mean pedestal value
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method=None,
pixel_mask_nevents_min=1,
pixel_mask_mean_min=1000.0,
pixel_mask_mean_max=1100.0,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)[0]
# Check that all pixels were flagged as having a bad mean
flag_bit = PedestalFlagBits.MEAN_PEDESTAL
assert np.all(output.pixel_mask & flag_bit == flag_bit)
# Check that other flags were not raised
flag_bits = [
PedestalFlagBits.NEVENTS,
PedestalFlagBits.STD_SAMPLE,
PedestalFlagBits.STD_PIXEL,
]
for flag_bit in flag_bits:
assert np.all(output.pixel_mask & flag_bit == 0)
# Condition on sample std
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method=None,
pixel_mask_nevents_min=1,
pixel_mask_std_sample_min=100.0,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)[0]
# Check that all pixels were flagged as
# having a small sample std
flag_bit = PedestalFlagBits.STD_SAMPLE
assert np.all(output.pixel_mask & flag_bit == flag_bit)
# Check that other flags were not raised
flag_bits = [
PedestalFlagBits.NEVENTS,
PedestalFlagBits.MEAN_PEDESTAL,
PedestalFlagBits.STD_PIXEL,
]
for flag_bit in flag_bits:
assert np.all(output.pixel_mask & flag_bit == 0)
# Condition on pixel std
# run tool
tool = PedestalNectarCAMCalibrationTool(
run_number=run_number,
run_file=run_file,
max_events=max_events,
log_level=0,
output_path=outfile,
overwrite=True,
filter_method=None,
pixel_mask_nevents_min=1,
pixel_mask_std_pixel_max=0.01,
)
# tool.initialize()
tool.setup()
tool.start()
output = tool.finish(return_output_component=True)[0]
# Check that all pixels were flagged as having a large pixel std
flag_bit = PedestalFlagBits.STD_PIXEL
assert np.all(output.pixel_mask & flag_bit == flag_bit)
# Check that other flags were not raised
flag_bits = [
PedestalFlagBits.NEVENTS,
PedestalFlagBits.MEAN_PEDESTAL,
PedestalFlagBits.STD_SAMPLE,
]
for flag_bit in flag_bits:
assert np.all(output.pixel_mask & flag_bit == 0)