Source code for ioos_qc.results
#!/usr/bin/env python
import logging
from collections import OrderedDict as odict
from collections import defaultdict
from dataclasses import dataclass
from typing import List, NamedTuple
import numpy as np
from ioos_qc.qartod import QartodFlags
L = logging.getLogger(__name__)
[docs]
class CallResult(NamedTuple):
package: str
test: str
function: callable
results: np.ndarray
def __repr__(self) -> str:
return f"<CallResult package={self.package} test={self.test}>"
[docs]
class ContextResult(NamedTuple):
stream_id: str
results: List[CallResult]
subset_indexes: np.ndarray
data: np.ndarray = None
tinp: np.ndarray = None
zinp: np.ndarray = None
lat: np.ndarray = None
lon: np.ndarray = None
def __repr__(self) -> str:
return f"<ContextResult stream_id={self.stream_id}>"
[docs]
@dataclass
class CollectedResult:
stream_id: str
package: str
test: str
function: callable
results: np.ma.core.MaskedArray = None
data: np.ndarray = None
tinp: np.ndarray = None
zinp: np.ndarray = None
lat: np.ndarray = None
lon: np.ndarray = None
def __repr__(self) -> str:
return f"<CollectedResult stream_id={self.stream_id} package={self.package} test={self.test}>"
[docs]
def function_name(self) -> str:
return self.function.__name__
@property
def hash_key(self) -> str:
return f"{self.stream_id}:{self.package}.{self.test}"
[docs]
def collect_results(results, how="list"):
if how in ["list", list]:
return collect_results_list(results)
if how in ["dict", dict]:
return collect_results_dict(results)
return None
[docs]
def collect_results_list(results):
"""Turns a list of ContextResult objects into an iterator of CollectedResult objects
by combining the subset_index information in each ContextResult together into
a single array of results.
"""
collected = odict()
# ContextResults
for r in results:
cr = None
# Shortcut for CallResult objects when someone uses QcConfig.run() directly
# and doesn't go through a Stream object
if isinstance(r, CallResult):
cr = CollectedResult(
stream_id=None,
package=r.package,
test=r.test,
function=r.function,
results=r.results,
)
collected[cr.hash_key] = cr
continue
# CallResults
for tr in r.results:
cr = CollectedResult(
stream_id=r.stream_id,
package=tr.package,
test=tr.test,
function=tr.function,
)
if cr.hash_key not in collected:
# Set the initial values
cr.results = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=tr.results.dtype,
)
cr.data = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=r.data.dtype,
)
cr.tinp = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=r.tinp.dtype,
)
cr.zinp = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=r.zinp.dtype,
)
cr.lat = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=r.lat.dtype,
)
cr.lon = np.ma.masked_all(
shape=r.subset_indexes.shape,
dtype=r.lon.dtype,
)
collected[cr.hash_key] = cr
collected[cr.hash_key].results[r.subset_indexes] = tr.results
if cr is not None:
if r.subset_indexes.all():
collected[cr.hash_key].data = r.data
collected[cr.hash_key].tinp = r.tinp
collected[cr.hash_key].zinp = r.zinp
collected[cr.hash_key].lat = r.lat
collected[cr.hash_key].lon = r.lon
else:
collected[cr.hash_key].data[r.subset_indexes] = r.data
collected[cr.hash_key].tinp[r.subset_indexes] = r.tinp
collected[cr.hash_key].zinp[r.subset_indexes] = r.zinp
collected[cr.hash_key].lat[r.subset_indexes] = r.lat
collected[cr.hash_key].lon[r.subset_indexes] = r.lon
return list(collected.values())
[docs]
def collect_results_dict(results):
"""Turns a list of ContextResult objects into a dictionary of test results
by combining the subset_index information in each ContextResult together into
a single array of results. This is mostly here for historical purposes. Users
should migrate to using the Result objects directly.
"""
# Magic for nested key generation
# https://stackoverflow.com/a/27809959
collected = defaultdict(lambda: defaultdict(odict))
# ContextResults
for r in results:
# Shortcut for CallResult objects when someone uses QcConfig.run() directly
# and doesn't go through a Stream object
if isinstance(r, CallResult):
collected[r.package][r.test] = r.results
continue
flag_arr = np.ma.empty_like(r.subset_indexes, dtype="uint8")
flag_arr.fill(QartodFlags.UNKNOWN)
# iterate over the CallResults
for tr in r.results:
testpackage = tr.package
testname = tr.test
testresults = tr.results
if testname not in collected[r.stream_id][testpackage]:
collected[r.stream_id][testpackage][testname] = np.copy(
flag_arr,
)
collected[r.stream_id][testpackage][testname][r.subset_indexes] = testresults
return collected