Source code for desidatamodel.check

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
===================
desidatamodel.check
===================

Check actual files against the data model for validity.
"""
import os
import re
import itertools
from pathlib import Path
from sys import argv
from argparse import ArgumentParser

from desiutil.log import log, DEBUG

from . import DataModelError
from .stub import Stub
from .unit import validate_unit


[docs] class DataModel(object): """Simple object to store data model data and metadata. Parameters ---------- filename : :class:`str` or :class:`pathlib.Path` The full path of the data model file. section : :class:`str` or :class:`pathlib.Path` The full path to the section of the data model containing the file. Raises ------ TypeError If `filename` or `section` have an unexpected type. """ # Marker for optional keywords and columns. _o = '[1]_' # A mapping of human-readable metavariables to regular expressions. _d2r = {'BACKUP': '(backup|supp)', # used in desitarget with gaiadr2 'BRICKNAME': '[0-9]+[pm][0-9]+', # e.g. 3319p140 'CAMERA': '[brz][0-9]', # e.g. b0, r7 'DR': 'dr[89]', # Imaging release, used by desitarget 'EXPID': '[0-9]{8}', # zero-padded eight digit number. 'GROUPID': '[0-9]+', # Group id *directory* depending on type of GROUPTYPE # 'GROUPID': '([14]xsubset[1-6]|lowspeedsubset[1-6]|exp[0-9]{8}|thru[0-9]{8}|[0-9]{8})', # Group id depending on type of GROUPTYPE 'GROUPTYPE': '(1x_depth|4x_depth|lowspeed|cumulative|perexp|pernight)', # Tile grouping, e.g. pernight, perexp 'ITERATION': '[0-9]+', # Iteration number when generating randoms by desitarget 'NIGHT': '[0-9]{8}', # YYYYMMDD 'NSIDE': '[0-9]+', # Healpix sides, e.g. 64 'OBSCON': '(bright|dark|no-obscon)', # observational condition used by desitarget 'PHASE': '(cmx|sv1|sv2|sv3|main|main2)', # DESI observational phase used by desitarget 'PIXGROUP': '[0-9]+', # Healpix group, e.g. 53 'PIXPROD': '[a-z0-9_-]+', # e.g. alpha-3 'PIXNUM': '[0-9]+', # Healpix pixel, e.g. 5302 'PRODNAME': '[a-z0-9_-]+', # e.g. dc3c 'PROGRAM': '(backup|bright|dark|other)', # observation program 'RANN': '[0-9]+', # Realization number for LSS random catalogs 'REGION': '(north|south)', # Imaging region from Legacy Survey and desitarget 'RELEASE': '[edr0-9]+', # Data Release 'RESOLVE': '(resolve|noresolve|secondary)', # resolve status used by desitarget 'SEED': '[0-9]+', # Seed number used when generating randoms by desitarget 'SPECPROD': '[a-z0-9_-]+', # replacement for PRODNAME 'SPECTROGRAPH': '[0-9]', # spectrograph number 0-9 'SURVEY': '(cmx|main|special|sv1|sv2|sv3)', # Survey name 'TILEID': '[0-9]+', # Tile ID, e.g. 70005 or 123456 'TILEXX': '[0-9]{3}', # Tile ID grouping == TILEID // 100. Used by fiberassign. 'UnivUNUM': 'Univ[0-9][0-9][0-9]', # Realizations of MTL ledgers, in LSS catalog 'VERSION': '[v0-9.]+', # A version string, e.g. v2.0 } # Matches titles. _titleline = re.compile(r'=+\n([^=]+)\n=+\n', re.M) # Matches HDU section headers. _hduline = re.compile(r'HDU(\d+)$') # Match HDU range specifications. _hduspan = re.compile(r'HDU(\d+)[-: ]+HDU(\d+)$') # Matches lines that contain regular expressions. _regexpline = re.compile(r':?regexp?:', re.I) # Matches the file-type line. _filetypeline = re.compile(r':?file type?:', re.I) # Matches lines that contain cross-references. _refline = re.compile(r'See (:doc:|)`([^<]+)<([^>]+)>`_?') # Matches table borders. _tableboundary = re.compile(r'[= ]+$') # The list of file types allowed by the data model. _expectedtypes = ('ascii', 'csv', 'ecsv', 'fits', 'json', 'yaml') def __init__(self, filename, section): if isinstance(filename, str): self.filename = filename self.section = section shortname = filename.replace(f'{section}/', '') elif isinstance(filename, Path): self.filename = str(filename) self.section = str(section) shortname = str(filename).replace(f'{section}/', '') else: raise TypeError('Unexpected type for filename!') if isinstance(section, str): self.section = section elif isinstance(section, Path): self.section = str(section) else: raise TypeError('Unexpected type for section!') log.debug('Creating DataModel for %s.', shortname) self.title = None self.ref = None self.regexp = None self.filetype = None self.filesize = None self.hdumeta = None self.prototype = None self._metafile_data = None self._stub = None self._stub_meta = None self._prototypes = None return
[docs] def get_regexp(self, root, error=False): """Obtain the regular expression used to match files on disk. Also internally updates the file type, if detected. Parameters ---------- root : :class:`str` Path to real files on disk. error : :class:`bool`, optional If ``True``, failure to find a regular expression raises an exception instead of just a warning. Returns ------- regular expression The regular expression found, or ``None`` if not found. The regular expression is also stored internally. Raises ------ :exc:`~desimodel.DataModelError` If `error` is set and problems with the data model file are detected. """ with open(self.filename) as dm: for line in dm.readlines(): if line.startswith('See :doc:'): self.ref = self._cross_reference(line) log.debug("Cross reference detected %s -> %s.", self.filename, self.ref) break if self._regexpline.match(line) is not None: d = os.path.dirname(self.filename).replace(self.section, root) for k in self._d2r: d = d.replace(k, self._d2r[k]) r = line.strip().split()[1].replace('``', '') self.regexp = re.compile(os.path.join(d, r)) log.debug("%s", repr(self.regexp)) if self._filetypeline.match(line) is not None: self.filetype, self.filesize = self._type_size(line) if self.regexp is None and self.ref is not None: with open(self.ref) as dm: for line in dm.readlines(): # # Hopefully cross-references are not nested. # # if line.startswith('See :doc:'): # self.ref = self._cross_reference(line) # break if self._regexpline.match(line) is not None: d = os.path.dirname(self.filename).replace(self.section, root) for k in self._d2r: d = d.replace(k, self._d2r[k]) r = line.strip().split()[1].replace('``', '') self.regexp = re.compile(os.path.join(d, r)) log.debug("%s", repr(self.regexp)) if self._filetypeline.match(line) is not None: self.filetype, self.filesize = self._type_size(line) if self.regexp is None: m = "%s has no file regexp!" if error: log.critical(m, self.filename) raise DataModelError(m % self.filename) else: log.warning(m, self.filename) if self.filetype is None: m = "%s has missing or invalid file type!" if error: log.critical(m, self.filename) raise DataModelError(m % self.filename) else: log.warning(m, self.filename) else: if self.filetype not in self._expectedtypes: log.warning("Unusual file type, %s, detected for %s!", self.filetype, self.filename) return self.regexp
[docs] def _type_size(self, line): """Obtain file type and size from a matching `line`. Parameters ---------- line : :class:`str` Line from file that contains the type and size. Returns ------- :class:`tuple` A tuple containing the type and size. """ ts = line.lower().replace(':', '').replace('file type', '').strip().split(',') t = ts[0] try: i = ts[1].upper().index('B') except (ValueError, IndexError): s = 'Unknown' else: s = ts[1].upper()[:(i+1)].strip() return (t, s)
[docs] def _cross_reference(self, line): """Obtain the path to a file referred to in another file. Parameters ---------- line : :class:`str` Line from original file that *is* the cross-reference. Returns ------- :class:`str` The path to the referenced file. """ ref = None m = self._refline.match(line) if m is not None: reftype, refstring, reflink = m.groups() if reftype == ':doc:': r = os.path.abspath(os.path.join(os.path.dirname(self.filename), reflink)) if not r.endswith('.rst'): r += '.rst' if os.path.exists(r): ref = r else: rr = reflink.replace('.html', '.rst').split('#') r = os.path.abspath(os.path.join(os.path.dirname(self.filename), rr[0])) if os.path.exists(r): ref = r + '#' + rr[1] return ref
[docs] def _extract_columns(self, row, columns): """Given column sizes, extract the data in each column. Assumes a reStructuredText-compatible table. Parameters ---------- row : :class:`str` A table row. columns : :class:`list` The sizes of the columns. Returns ------- :func:`tuple` A tuple containing the extracted data. """ lbound = [0] + [sum(columns[:i])+i for i in range(1, len(columns))] ubound = [lbound[i] + c for i, c in enumerate(columns)] ubound[-1] = None data = [row[lbound[i]:ubound[i]].strip() for i in range(len(columns))] return tuple(data)
[docs] def extract_metadata(self, error=False): """Extract metadata from a data model file. Parameters ---------- error : :class:`bool`, optional If ``True``, failure to extract certain required metadata raises an exception. Returns ------- :class:`dict` Metadata in a form similar to :class:`~desidatamodel.stub.Stub` metadata. The keys are the ``EXTNAME`` header values. Raises ------ :exc:`~desidatamodel.DataModelError` If `error` is set and the HDU has no ``EXTNAME`` keyword. """ if self.hdumeta is not None: return self.hdumeta metafile = self.filename if self.ref is not None: metafile = self.ref if self._metafile_data is None: with open(metafile) as f: self._metafile_data = f.read() if self.title is None: m = self._titleline.match(self._metafile_data) if m is not None: self.title = m.groups()[0] lines = self._metafile_data.split('\n') hdu_sections = [i for i, l in enumerate(lines) if (self._hduline.match(l) is not None or self._hduspan.match(l) is not None)] self.hdumeta = dict() for k in range(len(hdu_sections)): try: section = lines[hdu_sections[k]:hdu_sections[k+1]] except IndexError: section = lines[hdu_sections[k]:] m = self._hduspan.match(section[0]) if m is not None: # # Detected HDU span. # g = m.groups() spanstart = int(g[0]) spanend = int(g[1]) log.debug('Detected range specification from HDU %d to HDU %d', spanstart, spanend) spanref = [l for l in section if l.startswith('Data:')][0] spanext = spanref[spanref.lower().index('see') + 4:].replace('.', '') try: spanmeta = self.hdumeta[spanext] except KeyError: m = "Cannot find EXTNAME = '%s' which is supposed to define HDU %d to HDU %d!" log.critical(m, spanext, spanstart, spanend) raise DataModelError(m % (spanext, spanstart, spanend)) spanname = [l.split('=')[1].strip() for l in section if l.startswith('EXTNAME = ')][0] extnames = [p.strip() for p in spanname.split(',')] if len(range(spanstart, spanend+1)) == len(extnames): for i, l in enumerate(range(spanstart, spanend+1)): meta = dict() meta['number'] = l meta['title'] = 'HDU{0:d}'.format(l) meta['extname'] = extnames[i] meta['extension'] = spanmeta['extension'] meta['format'] = spanmeta['format'] meta['keywords'] = spanmeta['keywords'] self.hdumeta[extnames[i]] = meta else: log.warning(('Range specification from HDU %d to HDU %d ' + 'does not have a matching EXTNAME specification!'), spanstart, spanend) continue meta = dict() meta['number'] = k meta['title'] = section[0] hdu_cross_ref = [l for l in section if l.startswith('See `')] if hdu_cross_ref: log.debug("Found HDU cross-reference: %s", hdu_cross_ref[0]) hcr = self._cross_reference(hdu_cross_ref[0]).split('#') log.debug("['%s', '%s']", hcr[0], hcr[1]) hcr_meta = DataModel(hcr[0], self.section).extract_metadata() for key in hcr_meta: if hcr_meta[key]['title'] == hcr[1].upper(): for subkey in ('extension', 'format', 'keywords', 'extname'): meta[subkey] = hcr_meta[key][subkey] self.hdumeta[key] = meta continue if 'Empty HDU.' in section: meta['extension'] = 'IMAGE' meta['format'] = 'Empty HDU.' image_data = [l for l in section if l.startswith('Data:')] if image_data: meta['extension'] = 'IMAGE' meta['format'] = image_data[0] try: rdtc = section.index('Required Data Table Columns') except ValueError: rdtc = None if rdtc is not None: meta['extension'] = 'BINTABLE' table = [i for i, l in enumerate(section[rdtc:]) if self._tableboundary.match(l) is not None][1:3] columns = list(map(len, section[rdtc:][table[0]].lstrip().split())) table_lines = section[rdtc:][table[0]+1:table[1]] meta['format'] = [self._extract_columns(t.lstrip(), columns) for t in table_lines] for mk in meta['format']: if not mk[1]: m = "Missing type for column %s in HDU %d of %s!" if error: log.critical(m, mk[0], k, metafile) raise DataModelError(m % (mk[0], k, metafile)) else: log.warning(m, mk[0], k, metafile) if mk[2]: bad_unit = validate_unit(mk[2], error=error) if bad_unit: log.debug("Non-standard (but acceptable) unit %s detected for column %s in HDU %d of %s.", bad_unit, mk[0], k, metafile) try: rhk = section.index('Required Header Keywords') except ValueError: meta['keywords'] = [] else: table = [i for i, l in enumerate(section[rhk:]) if self._tableboundary.match(l) is not None][1:3] columns = list(map(len, section[rhk:][table[0]].lstrip().split())) table_lines = section[rhk:][table[0]+1:table[1]] meta['keywords'] = [self._extract_columns(t.lstrip(), columns) for t in table_lines] for mk in meta['keywords']: if not mk[2]: m = "Missing type for keyword %s in HDU %d of %s!" if error: log.critical(m, mk[0], k, metafile) raise DataModelError(m % (mk[0], k, metafile)) else: log.warning(m, mk[0], k, metafile) if mk[0] == 'BUNIT': bad_unit = validate_unit(mk[1], error=error) if bad_unit: log.debug("Non-standard (but acceptable) unit %s detected for column %s in HDU %d of %s.", bad_unit, mk[0], k, metafile) # # Need to know the format by this point! # try: foo = meta['format'] except KeyError: m = "Unable to determine format for HDU %d in %s!" log.critical(m, k, metafile) raise DataModelError(m % (k, metafile)) # # See https://github.com/desihub/desidatamodel/issues/69 for # the detailed policy on EXTNAME. # try: meta['extname'] = [l.split()[2] for l in section if l.startswith('EXTNAME = ')][0] except IndexError: meta['extname'] = 'HDU{0:02d}'.format(k) if (k > 0 or (k == 0 and meta['format'] != 'Empty HDU.')): m = "HDU %d in %s has no EXTNAME!" if error: log.critical(m, k, metafile) raise DataModelError(m % (k, metafile)) else: log.warning(m, k, metafile) else: if k == 0 and meta['format'] == 'Empty HDU.': if len(meta['keywords']) > 0: m = "HDU %d in %s should have EXTNAME = 'PRIMARY', since it has non-trivial keywords." log.warning(m, k, metafile) else: # # If we reach here, meta['extname'] *is* defined. # if k == 0: if meta['format'] == 'Empty HDU.': if len(meta['keywords']) > 0 and meta['extname'] != 'PRIMARY': m = "HDU %d in %s has acceptable alternative EXTNAME = '%s'." log.debug(m, k, metafile, meta['extname']) else: if meta['extname'] == 'PRIMARY': m = "HDU %d in %s should have a more meaningful EXTNAME than 'PRIMARY'." log.warning(m, k, metafile) self.hdumeta[meta['extname']] = meta return self.hdumeta
[docs] def validate_prototype(self, error=False, skip_keywords=False): """Compares a model's prototype data file to the data models. Parameters ---------- error : :class:`bool`, optional If ``True``, failure to extract certain required metadata raises an exception. skip_keywords : :class:`bool`, optional If ``True``, don't check FITS header keywords Notes ----- * Use set theory to compare the data headers to model headers. This should automatically find missing headers, extraneous headers, etc. """ verifiable_extensions = ('.fits', '.fits.fz', '.fits.gz') if self._prototypes is None: # # A warning should have been issued already, so just skip silently. # return # # Currently, Stub() only works with FITS files, so don't try # to fully validate things that aren't FITS files. # if all([not p.endswith(ext) for p, ext in itertools.product(self._prototypes, verifiable_extensions)]): log.info("Prototypes for %s cannot be validated with current software, skipping.", self.filename) return modelmeta = self.extract_metadata(error=error) if self._stub is None: for p in self._prototypes: try: s = Stub(p, error=error) except OSError as err: log.warning("Error opening %s, skipping to next candidate.", p) log.warning("Message was: '%s'.", err.args[0]) else: log.debug("(s.nhdr = %s) == (len(modelmeta.keys()) = %s)", s.nhdr, len(modelmeta.keys())) if s.nhdr == len(modelmeta.keys()): self.prototype = p self._stub = s break else: log.warning("%s has the wrong number of " + "sections (HDUs) according to %s, " + "skipping to next candidate.", p, self.filename) if self.prototype is None: log.error("No useful prototype files found for %s!", self.filename) return log.info("Comparing %s to %s.", self.prototype, self.filename) stub_meta = self._stub_meta = self._stub.hdumeta # # Compare HDUs. # for i in range(self._stub.nhdr): dexex = stub_meta[i]['extname'] if dexex == '' and i > 0: log.warning("Prototype file %s has no EXTNAME in HDU%d.", self.prototype, i) try: modelhdumeta = modelmeta[dexex] except KeyError: try: modelhdumeta = modelmeta['HDU{0:02d}'.format(i)] except KeyError: # # Fall back on trying to find HDU by number. # log.warning("Could not find EXTNAME = '%s' in %s; trying by HDU number.", dexex, self.filename) for key in modelmeta: if modelmeta[key]['number'] == i: modelhdumeta = modelmeta[key] # # Check for EXTNAME # mexex = modelhdumeta['extname'] if (dexex != '' and mexex != '' and dexex != mexex): log.warning("Prototype file %s has an EXTNAME mismatch " + "in HDU%d (%s != %s) " + "according to %s.", self.prototype, i, dexex, mexex, self.filename) # # Compare keywords # if not skip_keywords: data_keywords = set([tmp[0] for tmp in stub_meta[i]['keywords']]) model_keywords = set([tmp[0].split()[0] for tmp in modelhdumeta['keywords'] if self._o not in tmp[0]]) optional_keywords = set([tmp[0].split()[0] for tmp in modelhdumeta['keywords'] if self._o in tmp[0]]) if len(data_keywords - (model_keywords | optional_keywords)) > 0: log.warning('Prototype file %s has these keywords in HDU%d missing from model: %s', self.prototype, i, str(data_keywords - (model_keywords | optional_keywords))) if len(model_keywords - data_keywords) > 0: log.warning('Model file %s has these keywords in HDU%d missing from data: %s', self.filename, i, str(model_keywords - data_keywords)) # # Compare the keywords that are in both sets. # common_keywords = data_keywords & (model_keywords | optional_keywords) for kw in common_keywords: mkw_type = [tmp[2] for tmp in modelhdumeta['keywords'] if tmp[0].split()[0] == kw][0] dkw_type = [tmp[2] for tmp in stub_meta[i]['keywords'] if tmp[0] == kw][0] if mkw_type != dkw_type: log.warning("File %s HDU%d keyword %s has different keyword type according to %s (%s != %s).", self.prototype, i, kw, self.filename, dkw_type, mkw_type) # # Check the extension type. # dex = stub_meta[i]['extension'] try: mex = modelhdumeta['extension'] except KeyError: mex = "Extension type not found" if dex != mex: log.warning("Prototype file %s has an extension type " + "mismatch in HDU%d (%s != %s) " + "according to %s.", self.prototype, i, dex, mex, self.filename) continue # # If the extension type is correct, check the contents of the # extension. # dexf = stub_meta[i]['format'] try: mexf = modelhdumeta['format'] except KeyError: mexf = "Extension format not found" if dex == 'IMAGE': try: icomma = dexf.index(',') except ValueError: icomma = len(dexf) if dexf[:icomma] != mexf[:icomma]: log.warning("Prototype file %s has an extension " + "format mismatch in HDU%d " + "according to %s.", self.prototype, i, self.filename) else: dexf = dexf[1:] # Get rid of header line. data_columns = set([tmp[0] for tmp in dexf]) model_columns = set([tmp[0].split()[0] for tmp in mexf if self._o not in tmp[0]]) optional_columns = set([tmp[0].split()[0] for tmp in mexf if self._o in tmp[0]]) # # Do we really care if the number of columns is off? # We want all of the required columns to be there, but some or all # of the optional columns may be there as well. # # if len(datacolumns) != len(modelcolumns): # log.warning("Prototype file %s has the wrong " + # "number of HDU%d columns according to %s.", # self.prototype, i, self.filename) if len(data_columns - (model_columns | optional_columns)) > 0: log.warning('Prototype file %s has these columns in HDU%d missing from model: %s', self.prototype, i, str(data_columns - model_columns)) if len(model_columns - data_columns) > 0: log.warning('Model file %s has these columns in HDU%d missing from data: %s', self.filename, i, str(model_columns - data_columns)) common_columns = data_columns & (model_columns | optional_columns) for column in common_columns: # # Compare type # mcol_type = [tmp[1] for tmp in mexf if tmp[0].split()[0] == column][0] dcol_type = [tmp[1] for tmp in dexf if tmp[0] == column][0] if mcol_type != dcol_type: if mcol_type == 'char[*]' and dcol_type[:4] == 'char': log.debug("File %s HDU%d column %s has an acceptable variable-length string according to %s.", self.prototype, i, column, self.filename) else: log.warning("File %s HDU%d column %s has different type according to %s (%s != %s).", self.prototype, i, column, self.filename, dcol_type, mcol_type) # # Compare unit # mcol_unit = [tmp[2] for tmp in mexf if tmp[0].split()[0] == column][0] dcol_unit = [tmp[2] for tmp in dexf if tmp[0] == column][0] if mcol_unit != '' and dcol_unit != '' and mcol_unit != dcol_unit: log.warning("File %s HDU%d column %s has different units according to %s (%s != %s).", self.prototype, i, column, self.filename, dcol_unit, mcol_unit) return
[docs] def scan_model(section): """Find all data model files in a top-level directory. Parameters ---------- section : :class:`str` Full path to a section of the data model. Returns ------- :class:`list` The data model files found. """ scan = list() for dirpath, dirnames, filenames in os.walk(section): scan += [DataModel(os.path.join(dirpath, f), section) for f in filenames if f.endswith('.rst') and f != 'index.rst'] return scan
[docs] def files_to_regexp(root, files, error=False): """Convert a list of data model files into a list of regular expressions. Parameters ---------- root : :class:`str` Path to real files on disk. files : :class:`list` List of files obtained from the data model. error : :class:`bool`, optional If ``True``, failure to find a regular expression raises an exception instead of just a warning. Raises ------ :exc:`~desidatamodel.DataModelError` If `error` is set and data model files with malformed regular expressions are detected. """ for f in files: f.get_regexp(root, error) return
[docs] def collect_files(root, files, n_prototypes=5): """Scan a directory tree for files that correspond to data model files. Parameters ---------- root : :class:`str` Path to real files on disk. files : :class:`list` A list of data model files. n_prototypes : :class:`int`, optional Save up to `n_prototypes` possible prototype files, in case the first one is bad. Defaults to 5. Notes ----- Files are analyzed using this algorithm: * The first `n_prototypes` files that matches a regexp become the 'prototype candidates' for that data model file. The first candidate that can be opened cleanly is the 'prototype'. * If no files match a data model file, then files of that type are 'missing'. * If a file does not match any regular expression, it is 'extraneous'. * If a file matches a regular expression that already has a prototype, it is 'ignored'. """ ignore_directories = ('logs', 'scripts') include_extensions = ('.csv', '.ecsv', '.fits', '.fits.fz', '.fits.gz', '.json', '.txt', '.yaml') for dirpath, dirnames, filenames in os.walk(root): for d in ignore_directories: try: dirnames.remove(d) except ValueError: pass include_filenames = list() for e in include_extensions: include_filenames += [f for f in filenames if f.endswith(e)] for f in include_filenames: extraneous_file = True fullname = os.path.join(dirpath, f) for r in files: if r.regexp is not None: m = r.regexp.match(fullname) if m is not None: extraneous_file = False if r._prototypes is None: r._prototypes = [fullname] else: if len(r._prototypes) < n_prototypes: r._prototypes.append(fullname) if extraneous_file: log.warning("Extraneous file detected: %s", fullname) # # Scan for missing files, but don't penalize (here) data models that # don't have a valid regular expression. Files with bad regexeps will # be flagged elsewhere. # for r in files: if r.regexp is not None and r._prototypes is None: log.warning("No files found matching %s!", r.filename) return
[docs] def validate_prototypes(files, error=False, skip_keywords=False): """Compares a set of prototype data files to their data models. Parameters ---------- files : :class:`list` A list of data model files. error : :class:`bool`, optional If ``True``, failure to extract certain required metadata raises an exception. skip_keywords : :class:`bool`, optional If ``True``, don't check FITS header keywords Notes ----- * Use set theory to compare the data headers to model headers. This should automatically find missing headers, extraneous headers, etc. """ for f in files: f.validate_prototype(error=error, skip_keywords=skip_keywords) return
[docs] def _options(): """Parse command-line options. Returns ------- :class:`~argparse.Namespace` The parsed options. """ desc = """Check actual files against the data model for validity. """ parser = ArgumentParser(description=desc, prog=os.path.basename(argv[0])) parser.add_argument('-d', '--datamodel-dir', dest='desidatamodel', metavar='DIR', help='Override the value of DESIDATAMODEL.') parser.add_argument('-F', '--compare-files', dest='files', action='store_true', help='Compare an individual data model to an individual file.') parser.add_argument('-K', '--skip-keywords', dest='skip_keywords', action='store_true', help="Don't check FITS header keywords") parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Set log level to DEBUG.') parser.add_argument('-W', '--warning-is-error', dest='error', action='store_true', help='Data model warnings raise exceptions.') parser.add_argument('section', metavar='MODEL_DIR_or_FILE', help='Section of the data model or individual model file.') parser.add_argument('directory', metavar='DATA_DIR_or_FILE', help='Check files in this top-level directory, or one individual file.') options = parser.parse_args() return options
[docs] def main(): """Entry point for the check_model script. Returns ------- :class:`int` An integer suitable for passing to :func:`sys.exit`. """ options = _options() if options.verbose: log.setLevel(DEBUG) if 'DESIDATAMODEL' in os.environ: data_model_root = os.environ['DESIDATAMODEL'] else: if options.desidatamodel is not None: data_model_root = options.desidatamodel else: log.critical(("DESIDATAMODEL is not defined. " + "Cannot find data model files!")) return 1 log.debug("DESIDATAMODEL=%s", data_model_root) if options.files: filename = os.path.join(data_model_root, 'doc', options.section) section = os.path.join(data_model_root, 'doc', options.section.split('/')[0]) log.info("Loading individual data model: %s.", filename) files = [DataModel(filename, section)] log.info("Skipping regular expression processing.") # files[0].get_regexp(options.directory, error=options.error) log.info("Setting prototype file for %s to %s.", filename, options.directory) files[0].prototype = options.directory else: section = os.path.join(data_model_root, 'doc', options.section) log.info("Loading data model file in %s.", section) files = scan_model(section) log.info("Searching for data files in %s.", options.directory) files_to_regexp(options.directory, files, error=options.error) log.info("Identifying prototype files in %s.", options.directory) collect_files(options.directory, files) validate_prototypes(files, error=options.error, skip_keywords=options.skip_keywords) return 0