Source code for nectarchain.data.management

import glob
import logging
import os
import pathlib
import sys
from pathlib import Path
from typing import List, Tuple

import numpy as np

from ..utils import KeepLoggingUnchanged
from ..utils.constants import ALLOWED_CAMERAS

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
log.handlers = logging.getLogger("__main__").handlers

__all__ = ["DataManagement"]

# The DIRAC magic 2 lines !
try:
    with KeepLoggingUnchanged():
        import DIRAC

        DIRAC.initialize()
except ImportError:
    log.warning("DIRAC probably not installed")
except Exception as e:
    log.warning(f"DIRAC could not be properly initialized: {e}")


[docs] class DataManagement:
[docs] @staticmethod def findrun( run_number: int, search_on_GRID=True, camera=[camera for camera in ALLOWED_CAMERAS if "QM" in camera][0], ) -> Tuple[Path, List[Path]]: """Method to find in NECTARCAMDATA the list of ``*.fits.fz`` files associated to run_number. Parameters ---------- run_number: int the run number search_on_GRID: bool If enabled, search for data on the EGI grid via DIRAC, and fetch them automatically camera: str The NectarCAM camera for which data are search for. (Default: NectarCAMQM) Returns ------- (PosixPath,list): the path list of ``*.fits.fz`` files """ basepath = f"{os.environ['NECTARCAMDATA']}/runs/" list = glob.glob( basepath + "**/*" + str(run_number).zfill(4) + "*.fits.fz", recursive=True ) list_path = [Path(chemin) for chemin in list] if len(list_path) == 0: e = FileNotFoundError(f"run {run_number} is not present in {basepath}") if search_on_GRID: log.warning(e, exc_info=True) log.info("will search files on GRID and fetch them") lfns = DataManagement.get_GRID_location( run_number, basepath=f"/ctao/nectarcam/{camera}" ) if camera.endswith("QM"): lfns.extend( DataManagement.get_GRID_location( run_number, basepath="/vo.cta.in2p3.fr/nectarcam" ) ) if not lfns: not_found = FileNotFoundError( f"Could not find run {run_number} on DIRAC (neither under " f"/ctao/nectarcam nor /vo.cta.in2p3.fr/nectarcam)" ) log.error(not_found, exc_info=True) raise not_found DataManagement.getRunFromDIRAC(lfns) list = glob.glob( basepath + "**/*" + str(run_number) + "*.fits.fz", recursive=True ) list_path = [Path(chemin) for chemin in list] else: log.error(e, exc_info=True) raise e name = list_path[0].name.split(".") name[2] = "*" name = Path(str(list_path[0].parent)) / ( f"{name[0]}.{name[1]}.{name[2]}.{name[3]}.{name[4]}" ) log.info(f"Found {len(list_path)} files matching {name}") # to sort list path _sorted = sorted([[file, int(file.suffixes[1][1:])] for file in list_path]) list_path = [_sorted[i][0] for i in range(len(_sorted))] return name, list_path
[docs] @staticmethod def getRunFromDIRAC(lfns: list): """Method to get run files from the EGI grid from input lfns. Parameters ---------- lfns: list list of lfns path """ with KeepLoggingUnchanged(): from DIRAC.DataManagementSystem.Client.DataManager import DataManager for lfn in lfns: voName = lfn.split("/")[1] dm = DataManager(vo=voName) if not ( os.path.exists( f'{os.environ["NECTARCAMDATA"]}/runs/{os.path.basename(lfn)}' ) ): dm.getFile( lfn=lfn, destinationDir=f"{os.environ['NECTARCAMDATA']}/runs/", ) pass
[docs] @staticmethod def get_GRID_location( run_number: int, output_lfns=True, basepath="/vo.cta.in2p3.fr/nectarcam/", fromElog=False, username=None, password=None, ): """Method to get run location on GRID from Elog (work in progress!) Parameters ---------- run_number: int Run number output_lfns: bool, optional If True, return lfns path of fits.gz files, else return parent directory of run location. Defaults to True. basepath: str The path on GRID where nectarCAM data are stored. Default to ``/vo.cta.in2p3.fr/nectarcam/``. fromElog: bool, optional To force to use the method which read the Elog. Default to False. To use the method with DIRAC API. username: _type_, optional username for Elog login. Defaults to None. password: _type_, optional password for Elog login. Defaults to None. Returns ------- __get_GRID_location_ELog or __get_GRID_location_DIRAC """ if fromElog: return __class__.__get_GRID_location_ELog( run_number=run_number, output_lfns=output_lfns, username=username, password=password, ) else: return __class__.__get_GRID_location_DIRAC( run_number=run_number, basepath=basepath )
@staticmethod def __get_GRID_location_DIRAC( run_number: int, basepath="/vo.cta.in2p3.fr/nectarcam/" ): with KeepLoggingUnchanged(): from contextlib import redirect_stdout from DIRAC.DataManagementSystem.Client.FileCatalogClientCLI import ( FileCatalogClientCLI, ) from DIRAC.Interfaces.Utilities.DCommands import DCatalog from nectarchain.utils import StdoutRecord catalog = DCatalog() with redirect_stdout(sys.stdout): fccli = FileCatalogClientCLI(catalog.catalog) sys.stdout = StdoutRecord(keyword=f"Run{str(run_number).zfill(4)}") fccli.do_find("-q " + basepath) lfns = sys.stdout.output sys.stdout = sys.__stdout__ return lfns @staticmethod def __get_GRID_location_ELog( run_number: int, output_lfns=True, username=None, password=None ): import browser_cookie3 import mechanize import requests url = "http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/?cmd=Find" if not (username is None or password is None): log.debug("log to Elog with username and password") # log to Elog br = mechanize.Browser() br.open(url) # form = br.select_form("form1") for i in range(4): log.debug(br.form.find_control(nr=i).name) br.form["uname"] = username br.form["upassword"] = password br.method = "POST" req = br.submit() # html_page = req.get_data() cookies = br._ua_handlers["_cookies"].cookiejar # get data req = requests.get( f"http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/" f"?jcmd=&mode=Raw&attach=1&printable=1&reverse=0&reverse=1&npp=20&" f"ma=&da=&ya=&ha=&na=&ca=&last=&mb=&db=&yb=&hb=&nb=&cb=&Author=&" f"Setup=&Category=&Keyword=&Subject=%23{run_number}&" f"ModuleCount=&subtext=", cookies=cookies, ) else: # try to acces data by getting cookies from firefox and Chrome log.debug("try to get data with cookies from Firefox abnd Chrome") cookies = browser_cookie3.load() req = requests.get( f"http://nectarcam.in2p3.fr/elog/nectarcam-data-qm/?" f"jcmd=&mode=Raw&attach=1&printable=1&reverse=0&reverse=1" f"&npp=20&ma=&da=&ya=&ha=&na=&ca=&last=&mb=&db=&yb=&hb=" f"&nb=&cb=&Author=&Setup=&Category=&Keyword=" f"&Subject=%23{run_number}&ModuleCount=&subtext=", cookies=cookies, ) # if "<title>ELOG Login</title>" in req.text : lines = req.text.split("\r\n") url_data = None for i, line in enumerate(lines): if "<p>" in line: url_data = line.split("</p>")[0].split("FC:")[1] log.debug(f"url_data found {url_data}") break if i == len(lines) - 1: e = FileNotFoundError("lfns not found on GRID") log.error(e, exc_info=True) log.debug(lines) raise e if output_lfns: lfns = [] try: # Dirac from DIRAC.Interfaces.API.Dirac import Dirac dirac = Dirac() loc = ( f"/vo.cta.in2p3.fr/nectarcam/{url_data.split('/')[-2]}/" f"{url_data.split('/')[-1]}" ) log.debug(f"searching in Dirac filecatalog at {loc}") res = dirac.listCatalogDirectory(loc, printOutput=True) for key in res["Value"]["Successful"][loc]["Files"].keys(): if str(run_number) in key and "fits.fz" in key: lfns.append(key) except Exception as e: log.error(e, exc_info=True) return lfns else: return url_data @staticmethod def find_waveforms(run_number, max_events=None): return __class__.__find_computed_data( run_number=run_number, max_events=max_events, data_type="waveforms" ) @staticmethod def find_charges( run_number, method="FullWaveformSum", str_extractor_kwargs="", max_events=None ): return __class__.__find_computed_data( run_number=run_number, max_events=max_events, ext=f"_{method}_{str_extractor_kwargs}.h5", data_type="charges", ) @staticmethod def find_photostat( FF_run_number, ped_run_number, FF_method="FullWaveformSum", ped_method="FullWaveformSum", str_extractor_kwargs="", ): path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/PhotoStat/" f"PhotoStatisticNectarCAM_FFrun{FF_run_number}_{FF_method}" f"_{str_extractor_kwargs}_Pedrun{ped_run_number}_{ped_method}.h5" ) full_file = glob.glob(str(path)) log.debug("for now it does not check if there are files with max events") if len(full_file) != 1: raise FileNotFoundError( f"When looking for {str(path)} : the found files are {full_file}" ) return full_file @staticmethod def find_SPE_combined( run_number, method="FullWaveformSum", str_extractor_kwargs="" ): return __class__.find_SPE_HHV( run_number=run_number, method=method, str_extractor_kwargs=str_extractor_kwargs, keyword="FlatFieldCombined", ) @staticmethod def find_SPE_nominal( run_number, method="FullWaveformSum", str_extractor_kwargs="", free_pp_n=False ): return __class__.find_SPE_HHV( run_number=run_number, method=method, str_extractor_kwargs=str_extractor_kwargs, free_pp_n=free_pp_n, keyword="FlatFieldSPENominal", ) @staticmethod def find_SPE_HHV( run_number, method="FullWaveformSum", str_extractor_kwargs="", free_pp_n=False, **kwargs, ): keyword = kwargs.get("keyword", "FlatFieldSPEHHV") std_key = "" if free_pp_n else "Std" path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/SPEfit/" f"{keyword}{std_key}NectarCAM_run{run_number}*_{method}" f"_{str_extractor_kwargs}.h5" ) full_file = glob.glob(str(path)) if len(full_file) == 0: raise FileNotFoundError(f"No file found looking for {str(path)}") elif len(full_file) > 1: log.debug(f"Several files found for {str(path)} : {full_file}") for file in full_file: if "maxevents" not in file: log.debug( f"File found with the most important" f"number of events for {str(path)} : {file}" ) return file path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/SPEfit/" f"{keyword}{std_key}NectarCAM_run{run_number}_maxevents*_" f"{method}_{str_extractor_kwargs}.h5" ) all_files = glob.glob(str(path)) if len(all_files) == 0: raise FileNotFoundError(f"No file found looking for {str(path)}") else: log.debug(f"Files found for {str(path)} : {all_files}") max_events = 0 for i, file in enumerate(all_files): data = file.split("/")[-1].split(".h5")[0].split("_") for _data in data: if "maxevents" in _data: _max_events = int(_data.split("maxevents")[-1]) break if _max_events >= max_events: max_events = _max_events index = i log.debug(f"Best file found : {all_files[index]}") return [all_files[index]] else: log.debug(f"File found for {str(path)} : {full_file}") return full_file @staticmethod def __find_computed_data( run_number, max_events=None, ext=".h5", data_type="waveforms" ): if max_events is not None: path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/runs/" f"{data_type}/*_run{run_number}_maxevents*{ext}" ) else: path = pathlib.Path( f"{os.environ.get('NECTARCAMDATA','/tmp')}/runs/" f"{data_type}/*_run{run_number}{ext}" ) out = glob.glob(str(path)) if len(out) == 0: raise FileNotFoundError(f"No file found looking for {str(path)}") elif len(out) > 1: if max_events is None: raise FileExistsError(f"Several files found for {str(path)} : {out}") else: log.debug( f"Several files found for {str(path)} : {out}," f"will look for the most complete one" ) best_max_events = np.inf best_index = None for i, file in enumerate(out): data = file.split("/")[-1].split(".h5")[0].split("_") for _data in data: if "maxevents" in _data: _max_events = int(_data.split("maxevents")[-1]) break if _max_events >= max_events: if _max_events < best_max_events: best_max_events = _max_events best_index = i if best_index is not None: out = [out[best_index]] else: if max_events is not None: data = out[0].split("/")[-1].split(".h5")[0].split("_") for _data in data: if "maxevents" in _data: _max_events = int(_data.split("maxevents")[-1]) break if _max_events < max_events: raise FileNotFoundError( f"File found for {str(path)} : {out[0]} has less events " f"than max_events asked {max_events}" ) return out