Source code for nectarchain.dqm.start_dqm

import argparse
import json
import logging
import os
import sys
import time

# ctapipe imports
from ctapipe_io_nectarcam import LightNectarCAMEventSource as EventSource
from ctapipe_io_nectarcam.constants import HIGH_GAIN, LOW_GAIN
from matplotlib import pyplot as plt
from tqdm import tqdm
from traitlets.config import Config

from nectarchain.dqm.camera_monitoring import CameraMonitoring
from nectarchain.dqm.charge_integration import ChargeIntegrationHighLowGain
from nectarchain.dqm.db_utils import DQMDB
from nectarchain.dqm.mean_camera_display import MeanCameraDisplayHighLowGain
from nectarchain.dqm.mean_waveforms import MeanWaveFormsHighLowGain
from nectarchain.dqm.pixel_participation import PixelParticipationHighLowGain
from nectarchain.dqm.pixel_timeline import PixelTimelineHighLowGain
from nectarchain.dqm.trigger_statistics import TriggerStatistics
from nectarchain.makers import ChargesNectarCAMCalibrationTool
from nectarchain.utils.constants import ALLOWED_CAMERAS

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers


[docs] def main(): """ Main DQM script """ # Create an ArgumentParser object parser = argparse.ArgumentParser( description="NectarCAM Data Quality Monitoring tool" ) parser.add_argument( "-p", "--plot", action="store_true", help="Enables plots to be generated" ) parser.add_argument( "--write-db", action="store_true", help="Write DQM output in DQM ZODB data base" ) parser.add_argument( "-n", "--noped", action="store_true", help="Enables pedestal subtraction in charge integration", ) # extractor arguments parser.add_argument( "--method", choices=[ "FullWaveformSum", "FixedWindowSum", "GlobalPeakWindowSum", "LocalPeakWindowSum", "SlidingWindowMaxSum", "TwoPassWindowSum", ], default="GlobalPeakWindowSum", help="charge extractor method", type=str, ) parser.add_argument( "--extractor_kwargs", default='{"window_shift": 4, "window_width": 16}', help="charge extractor kwargs", type=json.loads, ) parser.add_argument( "-r", "--runnb", help="Optional run number, automatically found on DIRAC", type=int, ) parser.add_argument( "-c", "--camera", choices=ALLOWED_CAMERAS, default=[camera for camera in ALLOWED_CAMERAS if "QM" in camera][0], help="""Process data for a specific NectarCAM camera. Default: NectarCAMQM (Qualification Model).""", type=str, ) parser.add_argument( "--r0", action="store_true", default=False, help="Disable all R0->R1 " "corrections", ) parser.add_argument( "--max-events", default=None, type=int, help="Maximum number of events to loop through in each run slice", ) parser.add_argument("-i", "--input-files", nargs="+", help="Local input files") parser.add_argument("--log", default="info", help="debug output", type=str) parser.add_argument("input_paths", help="Input paths") parser.add_argument("output_paths", help="Output paths") args, leftovers = parser.parse_known_args() log.setLevel(args.log.upper()) # Reading arguments, paths and plot-boolean NectarPath = args.input_paths log.info(f"Input file path: {NectarPath}") # Defining and printing the paths of the output files. output_path = args.output_paths log.info(f"Output path: {output_path}") if args.runnb is not None: # Grab runs automatically from DIRAC if the -r option is provided from nectarchain.data.management import DataManagement dm = DataManagement() _, filelist = dm.findrun(args.runnb, camera=args.camera) args.input_files = [s.name for s in filelist] elif args.input_files is None: log.error("Input files should be provided, exiting...") sys.exit(1) # OTHERWISE READ THE RUNS FROM ARGS path1 = args.input_files[0] # THE PATH OF INPUT FILES path = f"{NectarPath}/runs/{path1}" log.debug(f"Input files:\n{path}") for arg in args.input_files[1:]: log.debug(arg) # Defining and printing the options PlotFig = args.plot noped = args.noped method = args.method extractor_kwargs = args.extractor_kwargs log.info(f"Plot: {PlotFig}") log.info(f"Noped: {noped}") log.info(f"method: {method}") log.info(f"extractor_kwargs: {extractor_kwargs}") kwargs = {"method": method, "extractor_kwargs": extractor_kwargs} charges_kwargs = {} tool = ChargesNectarCAMCalibrationTool() for key in tool.traits().keys(): if key in kwargs.keys(): charges_kwargs[key] = kwargs[key] log.info(f"charges_kwargs: {charges_kwargs}") def GetName(RunFile): name = RunFile.split("/")[-1] name = name.split(".")[0] + "_" + name.split(".")[1] log.debug(name) return name def CreateFigFolder(name, type): if type == 0: folder = "Plots" ParentFolderName = name.split("_")[0] + "_" + name.split("_")[1] ChildrenFolderName = "./" + ParentFolderName + "/" + name + "_calib" FolderPath = f"{output_path}/output/{ChildrenFolderName}/{folder}" if not os.path.exists(FolderPath): os.makedirs(FolderPath) return ParentFolderName, ChildrenFolderName, FolderPath start = time.time() # INITIATE path = path log.debug(path) # Read and seek config = None if args.r0: config = Config( dict( NectarCAMEventSource=dict( NectarCAMR0Corrections=dict( calibration_path=None, apply_flatfield=False, select_gain=False, ) ) ) ) reader = EventSource(input_url=path, config=config, max_events=args.max_events) reader1 = EventSource(input_url=path, config=config, max_events=1) # print(reader.file_list) name = GetName(path) ParentFolderName, ChildrenFolderName, fig_path = CreateFigFolder(name, 0) ResPath = f"{output_path}/output/{ChildrenFolderName}/{name}" # LIST OF PROCESSES TO RUN #################################################################################### processors = [ TriggerStatistics(HIGH_GAIN, args.r0), MeanWaveFormsHighLowGain(HIGH_GAIN, args.r0), MeanWaveFormsHighLowGain(LOW_GAIN, args.r0), MeanCameraDisplayHighLowGain(HIGH_GAIN, args.r0), MeanCameraDisplayHighLowGain(LOW_GAIN, args.r0), ChargeIntegrationHighLowGain(HIGH_GAIN, args.r0), ChargeIntegrationHighLowGain(LOW_GAIN, args.r0), CameraMonitoring(HIGH_GAIN, args.r0), PixelParticipationHighLowGain(HIGH_GAIN, args.r0), PixelParticipationHighLowGain(LOW_GAIN, args.r0), PixelTimelineHighLowGain(HIGH_GAIN, args.r0), PixelTimelineHighLowGain(LOW_GAIN, args.r0), ] # LIST OF DICT RESULTS NESTED_DICT = {} # The final results dictionary NESTED_DICT_KEYS = [ "Results_TriggerStatistics", "Results_MeanWaveForms_HighGain", "Results_MeanWaveForms_LowGain", "Results_MeanCameraDisplay_HighGain", "Results_MeanCameraDisplay_LowGain", "Results_ChargeIntegration_HighGain", "Results_ChargeIntegration_LowGain", "Results_CameraMonitoring", "Results_PixelParticipation_HighGain", "Results_PixelParticipation_LowGain", "Results_PixelTimeline_HighGain", "Results_PixelTimeline_LowGain", ] # START for p in processors: Pix, Samp = p.define_for_run(reader1) break for p in processors: p.configure_for_run(path, Pix, Samp, reader1, **charges_kwargs) for evt in tqdm( reader, total=args.max_events if args.max_events else len(reader), unit="ev" ): for p in processors: p.process_event(evt, noped) # for the rest of the event files for arg in args.input_files[1:]: path2 = f"{NectarPath}/runs/{arg}" log.debug(path2) with EventSource( input_url=path2, config=config, max_events=args.max_events ) as reader: for evt in tqdm( reader, total=args.max_events if args.max_events else len(reader), unit="ev", ): for p in processors: p.process_event(evt, noped) for p in processors: p.finish_run() dict_num = 0 for p in processors: NESTED_DICT[NESTED_DICT_KEYS[dict_num]] = p.get_results() dict_num += 1 # Write all results in 1 fits file: p.write_all_results(ResPath, NESTED_DICT) if args.write_db: db = DQMDB(read_only=False) if db.insert(name, NESTED_DICT): db.commit_and_close() else: db.abort_and_close() # if plot option in arguments, it will construct the figures and save them if PlotFig: for p in processors: processor_figure_dict, processor_figure_name_dict = p.plot_results( name, fig_path ) for fig_plot in processor_figure_dict: fig = processor_figure_dict[fig_plot] SavePath = processor_figure_name_dict[fig_plot] fig.savefig(SavePath) plt.close() end = time.time() log.info(f"Processing time: {end-start:.2f} s.")
if __name__ == "__main__": main()