#!/usr/bin/env python
import inspect
import json
import logging
from importlib import import_module
from pathlib import Path
from typing import List, Optional
import h5netcdf.legacyapi as nc4
import numpy as np
import pandas as pd
from ioos_qc.config import Config
from ioos_qc.qartod import aggregate
from ioos_qc.results import CollectedResult, collect_results
from ioos_qc.utils import GeoNumpyDateEncoder, cf_safe_name
L = logging.getLogger(__name__)
[docs]
def column_from_collected_result(cr):
stream_label = f"{cr.stream_id}." if cr.stream_id else ""
package_label = f"{cr.package}." if cr.package else ""
test_label = f"{cr.test}" if cr.test else ""
return cf_safe_name(f"{stream_label}{package_label}{test_label}")
[docs]
class BaseStore:
[docs]
def save(self, *args, **kwargs) -> None:
"""Serialize results to a store. This could save a file or publish messages."""
@property
def stream_ids(self) -> List[str]:
"""A list of stream_ids to save to the store."""
[docs]
class PandasStore(BaseStore):
"""Store results in a dataframe."""
def __init__(self, results, axes: Optional[dict] = None) -> None:
# OK, time to evaluate the actual tests now that we need the results
self.results = list(results)
self.collected_results = collect_results(self.results, how="list")
self._stream_ids = [cr.stream_id for cr in self.collected_results]
self.axes = axes or {
"t": "time",
"z": "z",
"y": "lat",
"x": "lon",
}
@property
def stream_ids(self) -> List[str]:
return self._stream_ids
[docs]
def compute_aggregate(self, name="rollup") -> None:
"""Internally compute the total aggregate and add it to the results."""
agg = CollectedResult(
stream_id="",
package="qartod",
test=name,
function=aggregate,
results=aggregate(self.collected_results),
)
self.collected_results.append(agg)
[docs]
def save(
self,
write_data: bool = False,
write_axes: bool = True,
include: Optional[list] = None,
exclude: Optional[list] = None,
) -> pd.DataFrame:
df = pd.DataFrame()
for cr in self.collected_results:
# Add time axis
if write_axes is True and self.axes["t"] not in df and cr.tinp is not None and cr.tinp.size != 0:
L.info(
f"Adding column {self.axes['t']} from stream {cr.stream_id}",
)
df[self.axes["t"]] = cr.tinp
# Add z axis
if write_axes is True and self.axes["z"] not in df and cr.zinp is not None and cr.zinp.size != 0:
L.info(
f"Adding column {self.axes['z']} from stream {cr.stream_id}",
)
df[self.axes["z"]] = cr.zinp
# Add x axis
if write_axes is True and self.axes["x"] not in df and cr.lon is not None and cr.lon.size != 0:
L.info(
f"Adding column {self.axes['x']} from stream {cr.stream_id}",
)
df[self.axes["x"]] = cr.lon
# Add x axis
if write_axes is True and self.axes["y"] not in df and cr.lat is not None and cr.lat.size != 0:
L.info(
f"Adding column {self.axes['y']} from stream {cr.stream_id}",
)
df[self.axes["y"]] = cr.lat
# Inclusion list, skip everything not defined
if include is not None and (
cr.function not in include and cr.stream_id not in include and cr.test not in include
):
continue
# Exclusion list, skip everything defined
if exclude is not None and (
cr.function in exclude or cr.stream_id in exclude or cr.test in cr.test in include
):
continue
# Add data column
if write_data and cr.stream_id not in df and cr.stream_id:
L.info(f"Adding column {cr.stream_id}")
df[cr.stream_id] = cr.data
# Add QC results column
# Aggregate will have None stream_id, so allow it to be that way!
column_name = column_from_collected_result(cr)
if column_name not in df:
df[column_name] = cr.results
else:
L.warning(
f"Found duplicate QC results column: {column_name}, skipping.",
)
return df
[docs]
class CFNetCDFStore(BaseStore):
def __init__(self, results, axes=None, **kwargs) -> None:
# OK, time to evaluate the actual tests now that we need the results
self.results = list(results)
self.collected_results = collect_results(self.results, how="list")
self._stream_ids = [cr.stream_id for cr in self.collected_results]
self.axes = axes or {
"t": "time",
"z": "z",
"y": "lat",
"x": "lon",
}
@property
def stream_ids(self) -> List[str]:
return self._stream_ids
[docs]
def save(
self,
path_or_ncd,
dsg,
config: Config,
dsg_kwargs: Optional[dict] = None,
write_data: bool = False,
include: Optional[list] = None,
exclude: Optional[list] = None,
compute_aggregate: bool = False,
):
if dsg_kwargs is None:
dsg_kwargs = {}
ps = PandasStore(self.results, self.axes)
if compute_aggregate is True:
ps.compute_aggregate(name="qc_rollup")
df = ps.save(write_data=write_data, include=include, exclude=exclude)
# Write a new file
attrs = {}
for cr in ps.collected_results:
column_name = column_from_collected_result(cr)
# Set the ancillary variables
if cr.stream_id not in attrs:
attrs[cr.stream_id] = {
"ancillary_variables": column_name,
}
else:
# Update the source ancillary_variables
existing = getattr(
attrs[cr.stream_id],
"ancillary_variables",
"",
).split(" ")
existing += [column_name]
attrs[cr.stream_id] = " ".join(list(set(existing))).strip()
# determine standard name and long name. These should be defined on each test function
# https://github.com/cf-convention/cf-conventions/issues/216
standard_name = getattr(
cr.function,
"standard_name",
"quality_flag",
)
long_name = getattr(cr.function, "long_name", "Quality Flag")
# Get flags from module attribute called FLAGS
flags = inspect.getmodule(cr.function).FLAGS
varflagnames = [d for d in flags.__dict__ if not d.startswith("__")]
varflagvalues = [getattr(flags, d) for d in varflagnames]
# Set QC variable attributes
if column_name not in attrs:
attrs[column_name] = {
"standard_name": standard_name,
"long_name": long_name,
"flag_values": np.byte(varflagvalues),
"flag_meanings": " ".join(varflagnames),
"valid_min": np.byte(min(varflagvalues)),
"valid_max": np.byte(max(varflagvalues)),
"ioos_qc_module": cr.package,
"ioos_qc_test": cr.test,
"ioos_qc_target": cr.stream_id,
}
# If there is only one context we can write variable specific configs
# We can't do this across different contexts and this would repeat the regions
# and windows for each variable even if they are equal. This needs another look.
if len(config.contexts) == 1:
calls = config.calls_by_stream_id(cr.stream_id)
calls = [c for c in calls if c.module == cr.package and c.method == cr.test]
if not calls:
# No stream_id found!
continue
# Use the first call of this stream_id. There will be only 1 because there
# is only one context with one matching package and method
call = calls[0]
if call.region:
attrs[column_name]["ioos_qc_region"] = json.dumps(
call.region,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
)
if call.window.starting or call.window.ending:
attrs[column_name]["ioos_qc_window"] = json.dumps(
call.window,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
)
qc_varconfig = json.dumps(
call.kwargs,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
)
attrs[column_name]["ioos_qc_config"] = qc_varconfig
if len(config.contexts) > 1:
# We represent the config as one global config JSON object
attrs["ioos_qc_config"] = json.dumps(
config.config,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
)
dsg_kwargs = {
**dsg_kwargs,
"attributes": attrs,
}
# pocean requires these default columns, which should be removed as a requirement
# in pocean.
df["station"] = 0
df["trajectory"] = 0
df["profile"] = 0
if "z" not in df:
df["z"] = 0
return dsg.from_dataframe(
df,
path_or_ncd,
axes=self.axes,
**dsg_kwargs,
)
[docs]
class NetcdfStore:
[docs]
def save(self, path_or_ncd, config, results):
"""Updates the given netcdf with test configuration and results.
If there is already a variable for a given test, it will update that variable with the latest results.
Otherwise, it will create a new variable.
:param path_or_ncd: path or netcdf4 Dataset in which to store results
:param results: output of run()
"""
try:
ncd = None
should_close = True
if isinstance(path_or_ncd, (str, Path)):
ncd = nc4.Dataset(str(path_or_ncd), "a")
elif isinstance(path_or_ncd, nc4.Dataset):
ncd = path_or_ncd
should_close = False
else:
return ValueError("Input is not a valid file path or Dataset")
for vname, qcobj in results.items():
if vname not in ncd.variables:
L.warning(f"{vname} not found in the Dataset, skipping")
continue
source_var = ncd.variables[vname]
# Keep track of the test names so we can add to the source's
# ancillary_variables at the end
qcvar_names = []
for modu, tests in qcobj.items():
try:
testpackage = import_module(f"ioos_qc.{modu}")
except ImportError:
L.error(
f'No ioos_qc test package "{modu}" was found, skipping.',
)
continue
for testname, testresults in tests.items():
# Try to find a qc variable that matches this config
qcvars = ncd.get_variables_by_attributes(
ioos_qc_module=modu,
ioos_qc_test=testname,
ioos_qc_target=vname,
)
if not qcvars:
qcvarname = cf_safe_name(
vname + "." + modu + "." + testname,
)
else:
if len(qcvars) > 1:
names = [v.name for v in qcvars]
L.warning(
f"Found more than one QC variable match: {names}",
)
# Use the last one found
qcvarname = qcvars[-1].name
# Get flags from module attribute called FLAGS
flags = testpackage.FLAGS
varflagnames = [d for d in flags.__dict__ if not d.startswith("__")]
varflagvalues = [getattr(flags, d) for d in varflagnames]
if qcvarname not in ncd.variables:
v = ncd.createVariable(
qcvarname,
np.byte,
source_var.dimensions,
)
else:
v = ncd[qcvarname]
qcvar_names.append(qcvarname)
# determine standard name
# https://github.com/cf-convention/cf-conventions/issues/216
try:
testfn = getattr(testpackage, testname)
standard_name = testfn.standard_name
long_name = testfn.long_name
except AttributeError:
standard_name = "quality_flag"
long_name = "Quality Flag"
# write to netcdf
v[:] = testresults
v.setncattr("standard_name", standard_name)
v.setncattr("long_name", long_name)
v.setncattr("flag_values", np.byte(varflagvalues))
v.setncattr("flag_meanings", " ".join(varflagnames))
v.setncattr("valid_min", np.byte(min(varflagvalues)))
v.setncattr("valid_max", np.byte(max(varflagvalues)))
v.setncattr("ioos_qc_module", modu)
v.setncattr("ioos_qc_test", testname)
v.setncattr("ioos_qc_target", vname)
# If there is only one context we can write variable specific configs
if len(config.contexts) == 1:
varconfig = config.contexts[0].streams[vname].config[modu][testname]
varconfig = json.dumps(
varconfig,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
)
v.setncattr("ioos_qc_config", varconfig)
v.setncattr(
"ioos_qc_region",
json.dumps(
config.contexts[0].region,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
),
)
v.setncattr(
"ioos_qc_window",
json.dumps(
config.contexts[0].window,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
),
)
# Update the source ancillary_variables
existing = getattr(
source_var,
"ancillary_variables",
"",
).split(" ")
if qcvar_names:
existing += qcvar_names
source_var.ancillary_variables = " ".join(
list(set(existing)),
).strip()
if len(config.contexts) > 1:
# We can't represent these at the variable level, so make one global config
ncd.setncattr(
"ioos_qc_config",
json.dumps(
config.config,
cls=GeoNumpyDateEncoder,
allow_nan=False,
ignore_nan=True,
),
)
finally:
if ncd and should_close is True:
ncd.close()