Source code for nectarchain.dqm.start_dqm

import argparse
import json
import os
import sys
import time

# ctapipe imports
from ctapipe.io import EventSource
from ctapipe_io_nectarcam.constants import HIGH_GAIN, LOW_GAIN
from matplotlib import pyplot as plt
from tqdm import tqdm
from traitlets.config import Config

from nectarchain.dqm.camera_monitoring import CameraMonitoring
from nectarchain.dqm.charge_integration import ChargeIntegrationHighLowGain
from nectarchain.dqm.db_utils import DQMDB
from nectarchain.dqm.mean_camera_display import MeanCameraDisplayHighLowGain
from nectarchain.dqm.mean_waveforms import MeanWaveFormsHighLowGain
from nectarchain.dqm.pixel_participation import PixelParticipationHighLowGain
from nectarchain.dqm.pixel_timeline import PixelTimelineHighLowGain
from nectarchain.dqm.trigger_statistics import TriggerStatistics
from nectarchain.makers import ChargesNectarCAMCalibrationTool


[docs] def main(): """ Main DQM script """ # Create an ArgumentParser object parser = argparse.ArgumentParser( description="NectarCAM Data Quality Monitoring tool" ) parser.add_argument( "-p", "--plot", action="store_true", help="Enables plots to be generated" ) parser.add_argument( "--write-db", action="store_true", help="Write DQM output in DQM ZODB data base" ) parser.add_argument( "-n", "--noped", action="store_true", help="Enables pedestal subtraction in charge integration", ) # extractor arguments parser.add_argument( "--method", choices=[ "FullWaveformSum", "FixedWindowSum", "GlobalPeakWindowSum", "LocalPeakWindowSum", "SlidingWindowMaxSum", "TwoPassWindowSum", ], default="GlobalPeakWindowSum", help="charge extractor method", type=str, ) parser.add_argument( "--extractor_kwargs", default='{"window_shift": 4, "window_width": 16}', help="charge extractor kwargs", type=json.loads, ) parser.add_argument( "-r", "--runnb", help="Optional run number, automatically found on DIRAC", type=int, ) parser.add_argument( "--r0", action="store_true", help="Disable all R0->R1 corrections" ) parser.add_argument( "--max-events", default=None, type=int, help="Maximum number of events to loop through in each run slice", ) parser.add_argument("-i", "--input-files", nargs="+", help="Local input files") parser.add_argument("input_paths", help="Input paths") parser.add_argument("output_paths", help="Output paths") args, leftovers = parser.parse_known_args() # Reading arguments, paths and plot-boolean NectarPath = args.input_paths print("Input file path:", NectarPath) # Defining and printing the paths of the output files. output_path = args.output_paths print("Output path:", output_path) if args.runnb is not None: # Grab runs automatically from DIRAC is the -r option is provided from nectarchain.data.management import DataManagement dm = DataManagement() _, filelist = dm.findrun(args.runnb) args.input_files = [s.name for s in filelist] elif args.input_files is None: print("Input files should be provided, exiting...") sys.exit(1) # OTHERWISE READ THE RUNS FROM ARGS path1 = args.input_files[0] # THE PATH OF INPUT FILES path = f"{NectarPath}/runs/{path1}" print("Input files:") print(path) for arg in args.input_files[1:]: print(arg) # Defining and printing the options PlotFig = args.plot noped = args.noped method = args.method extractor_kwargs = args.extractor_kwargs print("Plot:", PlotFig) print("Noped:", noped) print("method:", method) print("extractor_kwargs:", extractor_kwargs) kwargs = {"method": method, "extractor_kwargs": extractor_kwargs} charges_kwargs = {} tool = ChargesNectarCAMCalibrationTool() for key in tool.traits().keys(): if key in kwargs.keys(): charges_kwargs[key] = kwargs[key] print(charges_kwargs) def GetName(RunFile): name = RunFile.split("/")[-1] name = ( name.split(".")[0] + "_" + name.split(".")[1] ) # + '_' +name.split('.')[2] print(name) return name def CreateFigFolder(name, type): if type == 0: folder = "Plots" ParentFolderName = name.split("_")[0] + "_" + name.split("_")[1] ChildrenFolderName = "./" + ParentFolderName + "/" + name + "_calib" FolderPath = f"{output_path}/output/{ChildrenFolderName}/{folder}" if not os.path.exists(FolderPath): os.makedirs(FolderPath) return ParentFolderName, ChildrenFolderName, FolderPath start = time.time() # INITIATE path = path print(path) # Read and seek config = None if args.r0: config = Config( dict( NectarCAMEventSource=dict( NectarCAMR0Corrections=dict( calibration_path=None, apply_flatfield=False, select_gain=False, ) ) ) ) reader = EventSource(input_url=path, config=config, max_events=args.max_events) reader1 = EventSource(input_url=path, config=config, max_events=1) # print(reader.file_list) name = GetName(path) ParentFolderName, ChildrenFolderName, FigPath = CreateFigFolder(name, 0) ResPath = f"{output_path}/output/{ChildrenFolderName}/{name}" # LIST OF PROCESSES TO RUN #################################################################################### processors = [ TriggerStatistics(HIGH_GAIN), MeanWaveFormsHighLowGain(HIGH_GAIN), MeanWaveFormsHighLowGain(LOW_GAIN), MeanCameraDisplayHighLowGain(HIGH_GAIN), MeanCameraDisplayHighLowGain(LOW_GAIN), ChargeIntegrationHighLowGain(HIGH_GAIN), ChargeIntegrationHighLowGain(LOW_GAIN), CameraMonitoring(HIGH_GAIN), PixelParticipationHighLowGain(HIGH_GAIN), PixelParticipationHighLowGain(LOW_GAIN), PixelTimelineHighLowGain(HIGH_GAIN), PixelTimelineHighLowGain(LOW_GAIN), ] # LIST OF DICT RESULTS NESTED_DICT = {} # The final results dictionary NESTED_DICT_KEYS = [ "Results_TriggerStatistics", "Results_MeanWaveForms_HighGain", "Results_MeanWaveForms_LowGain", "Results_MeanCameraDisplay_HighGain", "Results_MeanCameraDisplay_LowGain", "Results_ChargeIntegration_HighGain", "Results_ChargeIntegration_LowGain", "Results_CameraMonitoring", "Results_PixelParticipation_HighGain", "Results_PixelParticipation_LowGain", "Results_PixelTimeline_HighGain", "Results_PixelTimeline_LowGain", ] # START for p in processors: Pix, Samp = p.DefineForRun(reader1) break for p in processors: p.ConfigureForRun(path, Pix, Samp, reader1, **charges_kwargs) for evt in tqdm( reader, total=args.max_events if args.max_events else len(reader), unit="ev" ): for p in processors: p.ProcessEvent(evt, noped) # for the rest of the event files for arg in args.input_files[1:]: path2 = f"{NectarPath}/runs/{arg}" print(path2) with EventSource( input_url=path2, config=config, max_events=args.max_events ) as reader: for evt in tqdm( reader, total=args.max_events if args.max_events else len(reader), unit="ev", ): for p in processors: p.ProcessEvent(evt, noped) for p in processors: p.FinishRun() dict_num = 0 for p in processors: NESTED_DICT[NESTED_DICT_KEYS[dict_num]] = p.GetResults() dict_num += 1 # Write all results in 1 fits file: p.WriteAllResults(ResPath, NESTED_DICT) if args.write_db: db = DQMDB(read_only=False) if db.insert(name, NESTED_DICT): db.commit_and_close() else: db.abort_and_close() # if plot option in arguments, it will construct the figures and save them if PlotFig: for p in processors: processor_figure_dict, processor_figure_name_dict = p.PlotResults( name, FigPath ) for fig_plot in processor_figure_dict: fig = processor_figure_dict[fig_plot] SavePath = processor_figure_name_dict[fig_plot] fig.savefig(SavePath) plt.close() end = time.time() print(f"Processing time: {end-start:.2f} s.")
# TODO # Reduce code by using loops: for figs and results if __name__ == "__main__": main()