#!/usr/bin/env python
# coding=utf-8
import inspect
import logging
from typing import List
import json
from pathlib import Path
from importlib import import_module
import numpy as np
import pandas as pd
import h5netcdf.legacyapi as nc4
from ioos_qc.config import Config
from ioos_qc.qartod import aggregate
from ioos_qc.utils import GeoNumpyDateEncoder, cf_safe_name
from ioos_qc.results import collect_results, CollectedResult
L = logging.getLogger(__name__) # noqa
[docs]def column_from_collected_result(cr):
stream_label = f'{cr.stream_id}.' if cr.stream_id else ''
package_label = f'{cr.package}.' if cr.package else ''
test_label = f'{cr.test}' if cr.test else ''
return cf_safe_name(f'{stream_label}{package_label}{test_label}')
[docs]class BaseStore:
[docs] def save(self, *args, **kwargs):
"""
Serialize results to a store. This could save a file or publish messages.
"""
pass
@property
def stream_ids(self) -> List[str]:
"""
A list of stream_ids to save to the store
"""
pass
[docs]class PandasStore(BaseStore):
"""Store results in a dataframe"""
def __init__(self, results, axes: dict = None):
# OK, time to evaluate the actual tests now that we need the results
self.results = list(results)
self.collected_results = collect_results(self.results, how='list')
self._stream_ids = [ cr.stream_id for cr in self.collected_results ]
self.axes = axes or {
't': 'time',
'z': 'z',
'y': 'lat',
'x': 'lon'
}
@property
def stream_ids(self) -> List[str]:
return self._stream_ids
[docs] def compute_aggregate(self, name='rollup'):
""" Internally compute the total aggregate and add it to the results
"""
agg = CollectedResult(
stream_id='',
package='qartod',
test=name,
function=aggregate,
results=aggregate(self.collected_results)
)
self.collected_results.append(agg)
[docs] def save(self,
write_data: bool = False,
write_axes: bool = True,
include: list = None,
exclude: list = None) -> pd.DataFrame:
df = pd.DataFrame()
for cr in self.collected_results:
# Add time axis
if write_axes is True and self.axes['t'] not in df and cr.tinp is not None and cr.tinp.size != 0:
L.info(f"Adding column {self.axes['t']} from stream {cr.stream_id}")
df[self.axes['t']] = cr.tinp
# Add z axis
if write_axes is True and self.axes['z'] not in df and cr.zinp is not None and cr.zinp.size != 0:
L.info(f"Adding column {self.axes['z']} from stream {cr.stream_id}")
df[self.axes['z']] = cr.zinp
# Add x axis
if write_axes is True and self.axes['x'] not in df and cr.lon is not None and cr.lon.size != 0:
L.info(f"Adding column {self.axes['x']} from stream {cr.stream_id}")
df[self.axes['x']] = cr.lon
# Add x axis
if write_axes is True and self.axes['y'] not in df and cr.lat is not None and cr.lat.size != 0:
L.info(f"Adding column {self.axes['y']} from stream {cr.stream_id}")
df[self.axes['y']] = cr.lat
# Inclusion list, skip everything not defined
if include is not None and (cr.function not in include and cr.stream_id not in include and cr.test not in include):
continue
# Exclusion list, skip everything defined
if exclude is not None and (cr.function in exclude or cr.stream_id in exclude or cr.test in cr.test in include):
continue
# Add data column
if write_data and cr.stream_id not in df and cr.stream_id:
L.info(f"Adding column {cr.stream_id}")
df[cr.stream_id] = cr.data
# Add QC results column
# Aggregate will have None stream_id, so allow it to be that way!
column_name = column_from_collected_result(cr)
if column_name not in df:
df[column_name] = cr.results
else:
L.warning(f"Found duplicate QC results column: {column_name}, skipping.")
return df
[docs]class CFNetCDFStore(BaseStore):
def __init__(self, results, axes=None, **kwargs):
# OK, time to evaluate the actual tests now that we need the results
self.results = list(results)
self.collected_results = collect_results(self.results, how='list')
self._stream_ids = [ cr.stream_id for cr in self.collected_results ]
self.axes = axes or {
't': 'time',
'z': 'z',
'y': 'lat',
'x': 'lon'
}
@property
def stream_ids(self) -> List[str]:
return self._stream_ids
[docs] def save(self, path_or_ncd, dsg, config: Config, dsg_kwargs: dict = {}, write_data: bool = False, include: list = None, exclude: list = None, compute_aggregate: bool = False):
ps = PandasStore(self.results, self.axes)
if compute_aggregate is True:
ps.compute_aggregate(name='qc_rollup')
df = ps.save(write_data=write_data, include=include, exclude=exclude)
# Write a new file
attrs = {}
for cr in ps.collected_results:
column_name = column_from_collected_result(cr)
# Set the ancillary variables
if cr.stream_id not in attrs:
attrs[cr.stream_id] = {
'ancillary_variables': column_name
}
else:
# Update the source ancillary_variables
existing = getattr(attrs[cr.stream_id], 'ancillary_variables', '').split(' ')
existing += [column_name]
attrs[cr.stream_id] = ' '.join(list(set(existing))).strip()
# determine standard name and long name. These should be defined on each test function
# https://github.com/cf-convention/cf-conventions/issues/216
standard_name = getattr(cr.function, 'standard_name', 'quality_flag')
long_name = getattr(cr.function, 'long_name', 'Quality Flag')
# Get flags from module attribute called FLAGS
flags = getattr(inspect.getmodule(cr.function), 'FLAGS')
varflagnames = [ d for d in flags.__dict__ if not d.startswith('__') ]
varflagvalues = [ getattr(flags, d) for d in varflagnames ]
# Set QC variable attributes
if column_name not in attrs:
attrs[column_name] = {
'standard_name': standard_name,
'long_name': long_name,
'flag_values': np.byte(varflagvalues),
'flag_meanings': ' '.join(varflagnames),
'valid_min': np.byte(min(varflagvalues)),
'valid_max': np.byte(max(varflagvalues)),
'ioos_qc_module': cr.package,
'ioos_qc_test': cr.test,
'ioos_qc_target': cr.stream_id,
}
# If there is only one context we can write variable specific configs
# We can't do this across different contexts and this would repeat the regions
# and windows for each variable even if they are equal. This needs another look.
if len(config.contexts) == 1:
calls = config.calls_by_stream_id(cr.stream_id)
calls = [
c for c in calls
if c.module == cr.package and c.method == cr.test
]
if not calls:
# No stream_id found!
continue
# Use the first call of this stream_id. There will be only 1 because there
# is only one context with one matching package and method
call = calls[0]
if call.region:
attrs[column_name]['ioos_qc_region'] = json.dumps(
call.region,
cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True
)
if call.window.starting or call.window.ending:
attrs[column_name]['ioos_qc_window'] = json.dumps(
call.window,
cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True
)
qc_varconfig = json.dumps(
call.kwargs,
cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True
)
attrs[column_name]['ioos_qc_config'] = qc_varconfig
if len(config.contexts) > 1:
# We represent the config as one global config JSON object
attrs['ioos_qc_config'] = json.dumps(
config.config, cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True
)
dsg_kwargs = {
**dsg_kwargs,
**{
'attributes': attrs
}
}
# pocean requires these default columns, which should be removed as a requirement
# in pocean.
df['station'] = 0
df['trajectory'] = 0
df['profile'] = 0
if 'z' not in df:
df['z'] = 0
ncd = dsg.from_dataframe(df, path_or_ncd, axes=self.axes, **dsg_kwargs)
return ncd
[docs]class NetcdfStore:
[docs] def save(self, path_or_ncd, config, results):
"""
Updates the given netcdf with test configuration and results.
If there is already a variable for a given test, it will update that variable with the latest results.
Otherwise, it will create a new variable.
:param path_or_ncd: path or netcdf4 Dataset in which to store results
:param results: output of run()
"""
try:
ncd = None
should_close = True
if isinstance(path_or_ncd, (str, Path)):
ncd = nc4.Dataset(str(path_or_ncd), 'a')
elif isinstance(path_or_ncd, nc4.Dataset):
ncd = path_or_ncd
should_close = False
else:
return ValueError('Input is not a valid file path or Dataset')
for vname, qcobj in results.items():
if vname not in ncd.variables:
L.warning(f'{vname} not found in the Dataset, skipping')
continue
source_var = ncd.variables[vname]
# Keep track of the test names so we can add to the source's
# ancillary_variables at the end
qcvar_names = []
for modu, tests in qcobj.items():
try:
testpackage = import_module('ioos_qc.{}'.format(modu))
except ImportError:
L.error('No ioos_qc test package "{}" was found, skipping.'.format(modu))
continue
for testname, testresults in tests.items():
# Try to find a qc variable that matches this config
qcvars = ncd.get_variables_by_attributes(
ioos_qc_module=modu,
ioos_qc_test=testname,
ioos_qc_target=vname
)
if not qcvars:
qcvarname = cf_safe_name(vname + '.' + modu + '.' + testname)
else:
if len(qcvars) > 1:
names = [ v.name for v in qcvars ]
L.warning('Found more than one QC variable match: {}'.format(names))
# Use the last one found
qcvarname = qcvars[-1].name
# Get flags from module attribute called FLAGS
flags = getattr(testpackage, 'FLAGS')
varflagnames = [ d for d in flags.__dict__ if not d.startswith('__') ]
varflagvalues = [ getattr(flags, d) for d in varflagnames ]
if qcvarname not in ncd.variables:
v = ncd.createVariable(qcvarname, np.byte, source_var.dimensions)
else:
v = ncd[qcvarname]
qcvar_names.append(qcvarname)
# determine standard name
# https://github.com/cf-convention/cf-conventions/issues/216
try:
testfn = getattr(testpackage, testname)
standard_name = testfn.standard_name
long_name = testfn.long_name
except AttributeError:
standard_name = 'quality_flag'
long_name = 'Quality Flag'
# write to netcdf
v[:] = testresults
v.setncattr('standard_name', standard_name)
v.setncattr('long_name', long_name)
v.setncattr('flag_values', np.byte(varflagvalues))
v.setncattr('flag_meanings', ' '.join(varflagnames))
v.setncattr('valid_min', np.byte(min(varflagvalues)))
v.setncattr('valid_max', np.byte(max(varflagvalues)))
v.setncattr('ioos_qc_module', modu)
v.setncattr('ioos_qc_test', testname)
v.setncattr('ioos_qc_target', vname)
# If there is only one context we can write variable specific configs
if len(config.contexts) == 1:
varconfig = config.contexts[0].streams[vname].config[modu][testname]
varconfig = json.dumps(varconfig, cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True)
v.setncattr('ioos_qc_config', varconfig)
v.setncattr('ioos_qc_region', json.dumps(config.contexts[0].region, cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True))
v.setncattr('ioos_qc_window', json.dumps(config.contexts[0].window, cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True))
# Update the source ancillary_variables
existing = getattr(source_var, 'ancillary_variables', '').split(' ')
if qcvar_names:
existing += qcvar_names
source_var.ancillary_variables = ' '.join(list(set(existing))).strip()
if len(config.contexts) > 1:
# We can't represent these at the variable level, so make one global config
ncd.setncattr(
'ioos_qc_config',
json.dumps(config.config, cls=GeoNumpyDateEncoder, allow_nan=False, ignore_nan=True)
)
finally:
if ncd and should_close is True:
ncd.close()