# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
===================
desidatamodel.check
===================
Check actual files against the data model for validity.
"""
import os
import re
import itertools
from pathlib import Path
from sys import argv
from argparse import ArgumentParser
from desiutil.log import log, DEBUG
from . import DataModelError
from .stub import Stub
from .unit import validate_unit
[docs]
class DataModel(object):
"""Simple object to store data model data and metadata.
Parameters
----------
filename : :class:`str` or :class:`pathlib.Path`
The full path of the data model file.
section : :class:`str` or :class:`pathlib.Path`
The full path to the section of the data model containing the file.
Raises
------
TypeError
If `filename` or `section` have an unexpected type.
"""
# Marker for optional keywords and columns.
_o = '[1]_'
# A mapping of human-readable metavariables to regular expressions.
_d2r = {'BACKUP': '(backup|supp)', # used in desitarget with gaiadr2
'BRICKNAME': '[0-9]+[pm][0-9]+', # e.g. 3319p140
'CAMERA': '[brz][0-9]', # e.g. b0, r7
'DR': 'dr[89]', # Imaging release, used by desitarget
'EXPID': '[0-9]{8}', # zero-padded eight digit number.
'GROUPID': '[0-9]+', # Group id *directory* depending on type of GROUPTYPE
# 'GROUPID': '([14]xsubset[1-6]|lowspeedsubset[1-6]|exp[0-9]{8}|thru[0-9]{8}|[0-9]{8})', # Group id depending on type of GROUPTYPE
'GROUPTYPE': '(1x_depth|4x_depth|lowspeed|cumulative|perexp|pernight)', # Tile grouping, e.g. pernight, perexp
'ITERATION': '[0-9]+', # Iteration number when generating randoms by desitarget
'NIGHT': '[0-9]{8}', # YYYYMMDD
'NSIDE': '[0-9]+', # Healpix sides, e.g. 64
'OBSCON': '(bright|dark|no-obscon)', # observational condition used by desitarget
'PHASE': '(cmx|sv1|sv2|sv3|main|main2)', # DESI observational phase used by desitarget
'PIXGROUP': '[0-9]+', # Healpix group, e.g. 53
'PIXPROD': '[a-z0-9_-]+', # e.g. alpha-3
'PIXNUM': '[0-9]+', # Healpix pixel, e.g. 5302
'PRODNAME': '[a-z0-9_-]+', # e.g. dc3c
'PROGRAM': '(backup|bright|dark|other)', # observation program
'RANN': '[0-9]+', # Realization number for LSS random catalogs
'REGION': '(north|south)', # Imaging region from Legacy Survey and desitarget
'RELEASE': '[edr0-9]+', # Data Release
'RESOLVE': '(resolve|noresolve|secondary)', # resolve status used by desitarget
'SEED': '[0-9]+', # Seed number used when generating randoms by desitarget
'SPECPROD': '[a-z0-9_-]+', # replacement for PRODNAME
'SPECTROGRAPH': '[0-9]', # spectrograph number 0-9
'SURVEY': '(cmx|main|special|sv1|sv2|sv3)', # Survey name
'TILEID': '[0-9]+', # Tile ID, e.g. 70005 or 123456
'TILEXX': '[0-9]{3}', # Tile ID grouping == TILEID // 100. Used by fiberassign.
'UnivUNUM': 'Univ[0-9][0-9][0-9]', # Realizations of MTL ledgers, in LSS catalog
'VERSION': '[v0-9.]+', # A version string, e.g. v2.0
}
# Matches titles.
_titleline = re.compile(r'=+\n([^=]+)\n=+\n', re.M)
# Matches HDU section headers.
_hduline = re.compile(r'HDU(\d+)$')
# Match HDU range specifications.
_hduspan = re.compile(r'HDU(\d+)[-: ]+HDU(\d+)$')
# Matches lines that contain regular expressions.
_regexpline = re.compile(r':?regexp?:', re.I)
# Matches the file-type line.
_filetypeline = re.compile(r':?file type?:', re.I)
# Matches lines that contain cross-references.
_refline = re.compile(r'See (:doc:|)`([^<]+)<([^>]+)>`_?')
# Matches table borders.
_tableboundary = re.compile(r'[= ]+$')
# The list of file types allowed by the data model.
_expectedtypes = ('ascii', 'csv', 'ecsv', 'fits', 'json', 'yaml')
def __init__(self, filename, section):
if isinstance(filename, str):
self.filename = filename
self.section = section
shortname = filename.replace(f'{section}/', '')
elif isinstance(filename, Path):
self.filename = str(filename)
self.section = str(section)
shortname = str(filename).replace(f'{section}/', '')
else:
raise TypeError('Unexpected type for filename!')
if isinstance(section, str):
self.section = section
elif isinstance(section, Path):
self.section = str(section)
else:
raise TypeError('Unexpected type for section!')
log.debug('Creating DataModel for %s.', shortname)
self.title = None
self.ref = None
self.regexp = None
self.filetype = None
self.filesize = None
self.hdumeta = None
self.prototype = None
self._metafile_data = None
self._stub = None
self._stub_meta = None
self._prototypes = None
return
[docs]
def get_regexp(self, root, error=False):
"""Obtain the regular expression used to match files on disk.
Also internally updates the file type, if detected.
Parameters
----------
root : :class:`str`
Path to real files on disk.
error : :class:`bool`, optional
If ``True``, failure to find a regular expression raises an
exception instead of just a warning.
Returns
-------
regular expression
The regular expression found, or ``None`` if not found.
The regular expression is also stored internally.
Raises
------
:exc:`~desimodel.DataModelError`
If `error` is set and problems with the data model file are
detected.
"""
with open(self.filename) as dm:
for line in dm.readlines():
if line.startswith('See :doc:'):
self.ref = self._cross_reference(line)
log.debug("Cross reference detected %s -> %s.",
self.filename, self.ref)
break
if self._regexpline.match(line) is not None:
d = os.path.dirname(self.filename).replace(self.section,
root)
for k in self._d2r:
d = d.replace(k, self._d2r[k])
r = line.strip().split()[1].replace('``', '')
self.regexp = re.compile(os.path.join(d, r))
log.debug("%s", repr(self.regexp))
if self._filetypeline.match(line) is not None:
self.filetype, self.filesize = self._type_size(line)
if self.regexp is None and self.ref is not None:
with open(self.ref) as dm:
for line in dm.readlines():
#
# Hopefully cross-references are not nested.
#
# if line.startswith('See :doc:'):
# self.ref = self._cross_reference(line)
# break
if self._regexpline.match(line) is not None:
d = os.path.dirname(self.filename).replace(self.section,
root)
for k in self._d2r:
d = d.replace(k, self._d2r[k])
r = line.strip().split()[1].replace('``', '')
self.regexp = re.compile(os.path.join(d, r))
log.debug("%s", repr(self.regexp))
if self._filetypeline.match(line) is not None:
self.filetype, self.filesize = self._type_size(line)
if self.regexp is None:
m = "%s has no file regexp!"
if error:
log.critical(m, self.filename)
raise DataModelError(m % self.filename)
else:
log.warning(m, self.filename)
if self.filetype is None:
m = "%s has missing or invalid file type!"
if error:
log.critical(m, self.filename)
raise DataModelError(m % self.filename)
else:
log.warning(m, self.filename)
else:
if self.filetype not in self._expectedtypes:
log.warning("Unusual file type, %s, detected for %s!", self.filetype, self.filename)
return self.regexp
[docs]
def _type_size(self, line):
"""Obtain file type and size from a matching `line`.
Parameters
----------
line : :class:`str`
Line from file that contains the type and size.
Returns
-------
:class:`tuple`
A tuple containing the type and size.
"""
ts = line.lower().replace(':', '').replace('file type', '').strip().split(',')
t = ts[0]
try:
i = ts[1].upper().index('B')
except (ValueError, IndexError):
s = 'Unknown'
else:
s = ts[1].upper()[:(i+1)].strip()
return (t, s)
[docs]
def _cross_reference(self, line):
"""Obtain the path to a file referred to in another file.
Parameters
----------
line : :class:`str`
Line from original file that *is* the cross-reference.
Returns
-------
:class:`str`
The path to the referenced file.
"""
ref = None
m = self._refline.match(line)
if m is not None:
reftype, refstring, reflink = m.groups()
if reftype == ':doc:':
r = os.path.abspath(os.path.join(os.path.dirname(self.filename),
reflink))
if not r.endswith('.rst'):
r += '.rst'
if os.path.exists(r):
ref = r
else:
rr = reflink.replace('.html', '.rst').split('#')
r = os.path.abspath(os.path.join(os.path.dirname(self.filename),
rr[0]))
if os.path.exists(r):
ref = r + '#' + rr[1]
return ref
[docs]
def _extract_columns(self, row, columns):
"""Given column sizes, extract the data in each column.
Assumes a reStructuredText-compatible table.
Parameters
----------
row : :class:`str`
A table row.
columns : :class:`list`
The sizes of the columns.
Returns
-------
:func:`tuple`
A tuple containing the extracted data.
"""
lbound = [0] + [sum(columns[:i])+i for i in range(1, len(columns))]
ubound = [lbound[i] + c for i, c in enumerate(columns)]
ubound[-1] = None
data = [row[lbound[i]:ubound[i]].strip() for i in range(len(columns))]
return tuple(data)
[docs]
def validate_prototype(self, error=False, skip_keywords=False):
"""Compares a model's prototype data file to the data models.
Parameters
----------
error : :class:`bool`, optional
If ``True``, failure to extract certain required metadata raises an
exception.
skip_keywords : :class:`bool`, optional
If ``True``, don't check FITS header keywords
Notes
-----
* Use set theory to compare the data headers to model headers. This should
automatically find missing headers, extraneous headers, etc.
"""
verifiable_extensions = ('.fits', '.fits.fz', '.fits.gz')
if self._prototypes is None:
#
# A warning should have been issued already, so just skip silently.
#
return
#
# Currently, Stub() only works with FITS files, so don't try
# to fully validate things that aren't FITS files.
#
if all([not p.endswith(ext) for p, ext in itertools.product(self._prototypes, verifiable_extensions)]):
log.info("Prototypes for %s cannot be validated with current software, skipping.", self.filename)
return
modelmeta = self.extract_metadata(error=error)
if self._stub is None:
for p in self._prototypes:
try:
s = Stub(p, error=error)
except OSError as err:
log.warning("Error opening %s, skipping to next candidate.", p)
log.warning("Message was: '%s'.", err.args[0])
else:
log.debug("(s.nhdr = %s) == (len(modelmeta.keys()) = %s)",
s.nhdr, len(modelmeta.keys()))
if s.nhdr == len(modelmeta.keys()):
self.prototype = p
self._stub = s
break
else:
log.warning("%s has the wrong number of " +
"sections (HDUs) according to %s, " +
"skipping to next candidate.",
p, self.filename)
if self.prototype is None:
log.error("No useful prototype files found for %s!", self.filename)
return
log.info("Comparing %s to %s.", self.prototype, self.filename)
stub_meta = self._stub_meta = self._stub.hdumeta
#
# Compare HDUs.
#
for i in range(self._stub.nhdr):
dexex = stub_meta[i]['extname']
if dexex == '' and i > 0:
log.warning("Prototype file %s has no EXTNAME in HDU%d.",
self.prototype, i)
try:
modelhdumeta = modelmeta[dexex]
except KeyError:
try:
modelhdumeta = modelmeta['HDU{0:02d}'.format(i)]
except KeyError:
#
# Fall back on trying to find HDU by number.
#
log.warning("Could not find EXTNAME = '%s' in %s; trying by HDU number.", dexex, self.filename)
for key in modelmeta:
if modelmeta[key]['number'] == i:
modelhdumeta = modelmeta[key]
#
# Check for EXTNAME
#
mexex = modelhdumeta['extname']
if (dexex != '' and mexex != '' and dexex != mexex):
log.warning("Prototype file %s has an EXTNAME mismatch " +
"in HDU%d (%s != %s) " +
"according to %s.",
self.prototype, i, dexex, mexex, self.filename)
#
# Compare keywords
#
if not skip_keywords:
data_keywords = set([tmp[0] for tmp in stub_meta[i]['keywords']])
model_keywords = set([tmp[0].split()[0] for tmp in modelhdumeta['keywords'] if self._o not in tmp[0]])
optional_keywords = set([tmp[0].split()[0] for tmp in modelhdumeta['keywords'] if self._o in tmp[0]])
if len(data_keywords - (model_keywords | optional_keywords)) > 0:
log.warning('Prototype file %s has these keywords in HDU%d missing from model: %s',
self.prototype, i, str(data_keywords - (model_keywords | optional_keywords)))
if len(model_keywords - data_keywords) > 0:
log.warning('Model file %s has these keywords in HDU%d missing from data: %s',
self.filename, i, str(model_keywords - data_keywords))
#
# Compare the keywords that are in both sets.
#
common_keywords = data_keywords & (model_keywords | optional_keywords)
for kw in common_keywords:
mkw_type = [tmp[2] for tmp in modelhdumeta['keywords'] if tmp[0].split()[0] == kw][0]
dkw_type = [tmp[2] for tmp in stub_meta[i]['keywords'] if tmp[0] == kw][0]
if mkw_type != dkw_type:
log.warning("File %s HDU%d keyword %s has different keyword type according to %s (%s != %s).",
self.prototype, i, kw, self.filename, dkw_type, mkw_type)
#
# Check the extension type.
#
dex = stub_meta[i]['extension']
try:
mex = modelhdumeta['extension']
except KeyError:
mex = "Extension type not found"
if dex != mex:
log.warning("Prototype file %s has an extension type " +
"mismatch in HDU%d (%s != %s) " +
"according to %s.",
self.prototype, i, dex, mex, self.filename)
continue
#
# If the extension type is correct, check the contents of the
# extension.
#
dexf = stub_meta[i]['format']
try:
mexf = modelhdumeta['format']
except KeyError:
mexf = "Extension format not found"
if dex == 'IMAGE':
try:
icomma = dexf.index(',')
except ValueError:
icomma = len(dexf)
if dexf[:icomma] != mexf[:icomma]:
log.warning("Prototype file %s has an extension " +
"format mismatch in HDU%d " +
"according to %s.",
self.prototype, i, self.filename)
else:
dexf = dexf[1:] # Get rid of header line.
data_columns = set([tmp[0] for tmp in dexf])
model_columns = set([tmp[0].split()[0] for tmp in mexf if self._o not in tmp[0]])
optional_columns = set([tmp[0].split()[0] for tmp in mexf if self._o in tmp[0]])
#
# Do we really care if the number of columns is off?
# We want all of the required columns to be there, but some or all
# of the optional columns may be there as well.
#
# if len(datacolumns) != len(modelcolumns):
# log.warning("Prototype file %s has the wrong " +
# "number of HDU%d columns according to %s.",
# self.prototype, i, self.filename)
if len(data_columns - (model_columns | optional_columns)) > 0:
log.warning('Prototype file %s has these columns in HDU%d missing from model: %s',
self.prototype, i, str(data_columns - model_columns))
if len(model_columns - data_columns) > 0:
log.warning('Model file %s has these columns in HDU%d missing from data: %s',
self.filename, i, str(model_columns - data_columns))
common_columns = data_columns & (model_columns | optional_columns)
for column in common_columns:
#
# Compare type
#
mcol_type = [tmp[1] for tmp in mexf if tmp[0].split()[0] == column][0]
dcol_type = [tmp[1] for tmp in dexf if tmp[0] == column][0]
if mcol_type != dcol_type:
if mcol_type == 'char[*]' and dcol_type[:4] == 'char':
log.debug("File %s HDU%d column %s has an acceptable variable-length string according to %s.",
self.prototype, i, column, self.filename)
else:
log.warning("File %s HDU%d column %s has different type according to %s (%s != %s).",
self.prototype, i, column, self.filename, dcol_type, mcol_type)
#
# Compare unit
#
mcol_unit = [tmp[2] for tmp in mexf if tmp[0].split()[0] == column][0]
dcol_unit = [tmp[2] for tmp in dexf if tmp[0] == column][0]
if mcol_unit != '' and dcol_unit != '' and mcol_unit != dcol_unit:
log.warning("File %s HDU%d column %s has different units according to %s (%s != %s).",
self.prototype, i, column, self.filename, dcol_unit, mcol_unit)
return
[docs]
def scan_model(section):
"""Find all data model files in a top-level directory.
Parameters
----------
section : :class:`str`
Full path to a section of the data model.
Returns
-------
:class:`list`
The data model files found.
"""
scan = list()
for dirpath, dirnames, filenames in os.walk(section):
scan += [DataModel(os.path.join(dirpath, f), section)
for f in filenames
if f.endswith('.rst') and f != 'index.rst']
return scan
[docs]
def files_to_regexp(root, files, error=False):
"""Convert a list of data model files into a list of regular expressions.
Parameters
----------
root : :class:`str`
Path to real files on disk.
files : :class:`list`
List of files obtained from the data model.
error : :class:`bool`, optional
If ``True``, failure to find a regular expression raises an
exception instead of just a warning.
Raises
------
:exc:`~desidatamodel.DataModelError`
If `error` is set and data model files with malformed regular
expressions are detected.
"""
for f in files:
f.get_regexp(root, error)
return
[docs]
def collect_files(root, files, n_prototypes=5):
"""Scan a directory tree for files that correspond to data model files.
Parameters
----------
root : :class:`str`
Path to real files on disk.
files : :class:`list`
A list of data model files.
n_prototypes : :class:`int`, optional
Save up to `n_prototypes` possible prototype files, in case the
first one is bad. Defaults to 5.
Notes
-----
Files are analyzed using this algorithm:
* The first `n_prototypes` files that matches a regexp become the
'prototype candidates' for that data model file. The first candidate
that can be opened cleanly is the 'prototype'.
* If no files match a data model file, then files of that type are
'missing'.
* If a file does not match any regular expression, it is 'extraneous'.
* If a file matches a regular expression that already has a prototype,
it is 'ignored'.
"""
ignore_directories = ('logs', 'scripts')
include_extensions = ('.csv', '.ecsv',
'.fits', '.fits.fz', '.fits.gz',
'.json', '.txt', '.yaml')
for dirpath, dirnames, filenames in os.walk(root):
for d in ignore_directories:
try:
dirnames.remove(d)
except ValueError:
pass
include_filenames = list()
for e in include_extensions:
include_filenames += [f for f in filenames if f.endswith(e)]
for f in include_filenames:
extraneous_file = True
fullname = os.path.join(dirpath, f)
for r in files:
if r.regexp is not None:
m = r.regexp.match(fullname)
if m is not None:
extraneous_file = False
if r._prototypes is None:
r._prototypes = [fullname]
else:
if len(r._prototypes) < n_prototypes:
r._prototypes.append(fullname)
if extraneous_file:
log.warning("Extraneous file detected: %s", fullname)
#
# Scan for missing files, but don't penalize (here) data models that
# don't have a valid regular expression. Files with bad regexeps will
# be flagged elsewhere.
#
for r in files:
if r.regexp is not None and r._prototypes is None:
log.warning("No files found matching %s!", r.filename)
return
[docs]
def validate_prototypes(files, error=False, skip_keywords=False):
"""Compares a set of prototype data files to their data models.
Parameters
----------
files : :class:`list`
A list of data model files.
error : :class:`bool`, optional
If ``True``, failure to extract certain required metadata raises an
exception.
skip_keywords : :class:`bool`, optional
If ``True``, don't check FITS header keywords
Notes
-----
* Use set theory to compare the data headers to model headers. This should
automatically find missing headers, extraneous headers, etc.
"""
for f in files:
f.validate_prototype(error=error, skip_keywords=skip_keywords)
return
[docs]
def _options():
"""Parse command-line options.
Returns
-------
:class:`~argparse.Namespace`
The parsed options.
"""
desc = """Check actual files against the data model for validity.
"""
parser = ArgumentParser(description=desc, prog=os.path.basename(argv[0]))
parser.add_argument('-d', '--datamodel-dir', dest='desidatamodel',
metavar='DIR',
help='Override the value of DESIDATAMODEL.')
parser.add_argument('-F', '--compare-files', dest='files',
action='store_true',
help='Compare an individual data model to an individual file.')
parser.add_argument('-K', '--skip-keywords', dest='skip_keywords', action='store_true',
help="Don't check FITS header keywords")
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='Set log level to DEBUG.')
parser.add_argument('-W', '--warning-is-error', dest='error',
action='store_true',
help='Data model warnings raise exceptions.')
parser.add_argument('section', metavar='MODEL_DIR_or_FILE',
help='Section of the data model or individual model file.')
parser.add_argument('directory', metavar='DATA_DIR_or_FILE',
help='Check files in this top-level directory, or one individual file.')
options = parser.parse_args()
return options
[docs]
def main():
"""Entry point for the check_model script.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
options = _options()
if options.verbose:
log.setLevel(DEBUG)
if 'DESIDATAMODEL' in os.environ:
data_model_root = os.environ['DESIDATAMODEL']
else:
if options.desidatamodel is not None:
data_model_root = options.desidatamodel
else:
log.critical(("DESIDATAMODEL is not defined. " +
"Cannot find data model files!"))
return 1
log.debug("DESIDATAMODEL=%s", data_model_root)
if options.files:
filename = os.path.join(data_model_root, 'doc', options.section)
section = os.path.join(data_model_root, 'doc', options.section.split('/')[0])
log.info("Loading individual data model: %s.", filename)
files = [DataModel(filename, section)]
log.info("Skipping regular expression processing.")
# files[0].get_regexp(options.directory, error=options.error)
log.info("Setting prototype file for %s to %s.", filename, options.directory)
files[0].prototype = options.directory
else:
section = os.path.join(data_model_root, 'doc', options.section)
log.info("Loading data model file in %s.", section)
files = scan_model(section)
log.info("Searching for data files in %s.", options.directory)
files_to_regexp(options.directory, files, error=options.error)
log.info("Identifying prototype files in %s.", options.directory)
collect_files(options.directory, files)
validate_prototypes(files, error=options.error, skip_keywords=options.skip_keywords)
return 0