Source code for nectarchain.data.management

import glob
import logging
import os
import pathlib
import sys
from pathlib import Path
from typing import List, Tuple

import numpy as np

from ..utils import KeepLoggingUnchanged

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers

__all__ = ["DataManagement"]

# The DIRAC magic 2 lines !
try:
    with KeepLoggingUnchanged():
        import DIRAC

        DIRAC.initialize()
except ImportError:
    log.warning("DIRAC probably not installed")
except Exception as e:
    log.warning(f"DIRAC could not be properly initialized: {e}")


[docs] class DataManagement:
[docs] @staticmethod def findrun(run_number: int, search_on_GRID=True) -> Tuple[Path, List[Path]]: """Method to find in NECTARCAMDATA the list of ``*.fits.fz`` files associated to run_number. Parameters ---------- run_number: int the run number Returns ------- (PosixPath,list): the path list of ``*.fits.fz`` files """ basepath = f"{os.environ['NECTARCAMDATA']}/runs/" list = glob.glob( basepath + "**/*" + str(run_number) + "*.fits.fz", recursive=True ) list_path = [Path(chemin) for chemin in list] if len(list_path) == 0: e = FileNotFoundError(f"run {run_number} is not present in {basepath}") if search_on_GRID: log.warning(e, exc_info=True) log.info("will search files on GRID and fetch them") lfns = DataManagement.get_GRID_location(run_number) DataManagement.getRunFromDIRAC(lfns) list = glob.glob( basepath + "**/*" + str(run_number) + "*.fits.fz", recursive=True ) list_path = [Path(chemin) for chemin in list] else: log.error(e, exc_info=True) raise e name = list_path[0].name.split(".") name[2] = "*" name = Path(str(list_path[0].parent)) / ( f"{name[0]}.{name[1]}.{name[2]}.{name[3]}.{name[4]}" ) log.info(f"Found {len(list_path)} files matching {name}") # to sort list path _sorted = sorted([[file, int(file.suffixes[1][1:])] for file in list_path]) list_path = [_sorted[i][0] for i in range(len(_sorted))] return name, list_path
[docs] @staticmethod def getRunFromDIRAC(lfns: list): """Method to get run files from the EGI grid from input lfns. Parameters ---------- lfns: list list of lfns path """ with KeepLoggingUnchanged(): from DIRAC.Interfaces.API.Dirac import Dirac dirac = Dirac() for lfn in lfns: if not ( os.path.exists( f'{os.environ["NECTARCAMDATA"]}/runs/{os.path.basename(lfn)}' ) ): dirac.getFile( lfn=lfn, destDir=f"{os.environ['NECTARCAMDATA']}/runs/", printOutput=True, ) pass
[docs] @staticmethod def get_GRID_location( run_number: int, output_lfns=True, basepath="/vo.cta.in2p3.fr/nectarcam/", fromElog=False, username=None, password=None, ): """Method to get run location on GRID from Elog (work in progress!) Parameters ---------- run_number: int Run number output_lfns: bool, optional If True, return lfns path of fits.gz files, else return parent directory of run location. Defaults to True. basepath: str The path on GRID where nectarCAM data are stored. Default to ``/vo.cta.in2p3.fr/nectarcam/``. fromElog: bool, optional To force to use the method which read the Elog. Default to False. To use the method with DIRAC API. username: _type_, optional username for Elog login. Defaults to None. password: _type_, optional password for Elog login. Defaults to None. Returns ------- __get_GRID_location_ELog or __get_GRID_location_DIRAC """ if fromElog: return __class__.__get_GRID_location_ELog( run_number=run_number, output_lfns=output_lfns, username=username, password=password, ) else: return __class__.__get_GRID_location_DIRAC( run_number=run_number, basepath=basepath )
@staticmethod def __get_GRID_location_DIRAC( run_number: int, basepath="/vo.cta.in2p3.fr/nectarcam/" ): with KeepLoggingUnchanged(): from contextlib import redirect_stdout from DIRAC.DataManagementSystem.Client.FileCatalogClientCLI import ( FileCatalogClientCLI, ) from DIRAC.Interfaces.Utilities.DCommands import DCatalog from nectarchain.utils import StdoutRecord catalog = DCatalog() with redirect_stdout(sys.stdout): fccli = FileCatalogClientCLI(catalog.catalog) sys.stdout = StdoutRecord(keyword=f"Run{run_number}") fccli.do_find("-q " + basepath) lfns = sys.stdout.output sys.stdout = sys.__stdout__ return lfns @staticmethod def __get_GRID_location_ELog( run_number: int, output_lfns=True, username=None, password=None ): import browser_cookie3 import mechanize import requests url = "http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/?cmd=Find" if not (username is None or password is None): log.debug("log to Elog with username and password") # log to Elog br = mechanize.Browser() br.open(url) # form = br.select_form("form1") for i in range(4): log.debug(br.form.find_control(nr=i).name) br.form["uname"] = username br.form["upassword"] = password br.method = "POST" req = br.submit() # html_page = req.get_data() cookies = br._ua_handlers["_cookies"].cookiejar # get data req = requests.get( f"http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/" f"?jcmd=&mode=Raw&attach=1&printable=1&reverse=0&reverse=1&npp=20&" f"ma=&da=&ya=&ha=&na=&ca=&last=&mb=&db=&yb=&hb=&nb=&cb=&Author=&" f"Setup=&Category=&Keyword=&Subject=%23{run_number}&" f"ModuleCount=&subtext=", cookies=cookies, ) else: # try to acces data by getting cookies from firefox and Chrome log.debug("try to get data with cookies from Firefox abnd Chrome") cookies = browser_cookie3.load() req = requests.get( f"http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/?" f"jcmd=&mode=Raw&attach=1&printable=1&reverse=0&reverse=1" f"&npp=20&ma=&da=&ya=&ha=&na=&ca=&last=&mb=&db=&yb=&hb=" f"&nb=&cb=&Author=&Setup=&Category=&Keyword=" f"&Subject=%23{run_number}&ModuleCount=&subtext=", cookies=cookies, ) # if "<title>ELOG Login</title>" in req.text : lines = req.text.split("\r\n") url_data = None for i, line in enumerate(lines): if "<p>" in line: url_data = line.split("</p>")[0].split("FC:")[1] log.debug(f"url_data found {url_data}") break if i == len(lines) - 1: e = Exception("lfns not found on GRID") log.error(e, exc_info=True) log.debug(lines) raise e if output_lfns: lfns = [] try: # Dirac from DIRAC.Interfaces.API.Dirac import Dirac dirac = Dirac() loc = ( f"/vo.cta.in2p3.fr/nectarcam/{url_data.split('/')[-2]}/" f"{url_data.split('/')[-1]}" ) log.debug(f"searching in Dirac filecatalog at {loc}") res = dirac.listCatalogDirectory(loc, printOutput=True) for key in res["Value"]["Successful"][loc]["Files"].keys(): if str(run_number) in key and "fits.fz" in key: lfns.append(key) except Exception as e: log.error(e, exc_info=True) return lfns else: return url_data @staticmethod def find_waveforms(run_number, max_events=None): return __class__.__find_computed_data( run_number=run_number, max_events=max_events, data_type="waveforms" ) @staticmethod def find_charges( run_number, method="FullWaveformSum", str_extractor_kwargs="", max_events=None ): return __class__.__find_computed_data( run_number=run_number, max_events=max_events, ext=f"_{method}_{str_extractor_kwargs}.h5", data_type="charges", ) @staticmethod def find_photostat( FF_run_number, ped_run_number, FF_method="FullWaveformSum", ped_method="FullWaveformSum", str_extractor_kwargs="", ): full_file = glob.glob( pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/PhotoStat/" f"PhotoStatisticNectarCAM_FFrun{FF_run_number}_{FF_method}" f"_{str_extractor_kwargs}_Pedrun{ped_run_number}_{ped_method}.h5" ).__str__() ) log.debug("for now it does not check if there are files with max events") if len(full_file) != 1: raise Exception(f"the files is {full_file}") return full_file @staticmethod def find_SPE_combined( run_number, method="FullWaveformSum", str_extractor_kwargs="" ): return __class__.find_SPE_HHV( run_number=run_number, method=method, str_extractor_kwargs=str_extractor_kwargs, keyword="FlatFieldCombined", ) @staticmethod def find_SPE_nominal( run_number, method="FullWaveformSum", str_extractor_kwargs="", free_pp_n=False ): return __class__.find_SPE_HHV( run_number=run_number, method=method, str_extractor_kwargs=str_extractor_kwargs, free_pp_n=free_pp_n, keyword="FlatFieldSPENominal", ) @staticmethod def find_SPE_HHV( run_number, method="FullWaveformSum", str_extractor_kwargs="", free_pp_n=False, **kwargs, ): keyword = kwargs.get("keyword", "FlatFieldSPEHHV") std_key = "" if free_pp_n else "Std" full_file = glob.glob( pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/SPEfit/" f"{keyword}{std_key}NectarCAM_run{run_number}*_{method}" f"_{str_extractor_kwargs}.h5" ).__str__() ) # need to improve the files search !! # -> unstable behavior with SPE results computed # with maxevents not to None if len(full_file) != 1: all_files = glob.glob( pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/SPEfit/" f"FlatFieldSPEHHVStdNectarCAM_run{run_number}_maxevents*_" f"{method}_{str_extractor_kwargs}.h5" ).__str__() ) max_events = 0 for i, file in enumerate(all_files): data = file.split("/")[-1].split(".h5")[0].split("_") for _data in data: if "maxevents" in _data: _max_events = int(_data.split("maxevents")[-1]) break if _max_events >= max_events: max_events = _max_events index = i return [all_files[index]] else: return full_file @staticmethod def __find_computed_data( run_number, max_events=None, ext=".h5", data_type="waveforms" ): out = glob.glob( pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/runs/" f"{data_type}/*_run{run_number}{ext}" ).__str__() ) if not (max_events is None): all_files = glob.glob( pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/runs/" f"{data_type}/*_run{run_number}_maxevents*{ext}" ).__str__() ) best_max_events = np.inf best_index = None for i, file in enumerate(all_files): data = file.split("/")[-1].split(".h5")[0].split("_") for _data in data: if "maxevents" in _data: _max_events = int(_data.split("maxevents")[-1]) break if _max_events >= max_events: if _max_events < best_max_events: best_max_events = _max_events best_index = i if not (best_index is None): out = [all_files[best_index]] return out