Source code for ioos_qc.results

#!/usr/bin/env python
# coding=utf-8
import logging
from typing import NamedTuple, List
from dataclasses import dataclass
from collections import OrderedDict as odict, defaultdict

import numpy as np
from ioos_qc.qartod import QartodFlags

L = logging.getLogger(__name__)  # noqa


[docs]class CallResult(NamedTuple): package: str test: str function: callable results: np.ndarray def __repr__(self): return f'<CallResult package={self.package} test={self.test}>'
[docs]class ContextResult(NamedTuple): stream_id: str results: List[CallResult] subset_indexes: np.ndarray data: np.ndarray = None tinp: np.ndarray = None zinp: np.ndarray = None lat: np.ndarray = None lon: np.ndarray = None def __repr__(self): return f'<ContextResult stream_id={self.stream_id}>'
[docs]@dataclass class CollectedResult: stream_id: str package: str test: str function: callable results: np.ma.core.MaskedArray = None data: np.ndarray = None tinp: np.ndarray = None zinp: np.ndarray = None lat: np.ndarray = None lon: np.ndarray = None def __repr__(self): return f'<CollectedResult stream_id={self.stream_id} package={self.package} test={self.test}>'
[docs] def function_name(self) -> str: return self.function.__name__
@property def hash_key(self) -> str: return f'{self.stream_id}:{self.package}.{self.test}'
[docs]def collect_results(results, how='list'): if how in ['list', list]: return collect_results_list(results) elif how in ['dict', dict]: return collect_results_dict(results)
[docs]def collect_results_list(results): """ Turns a list of ContextResult objects into an iterator of CollectedResult objects by combining the subset_index information in each ContextResult together into a single array of results. """ collected = odict() # ContextResults for r in results: cr = None # Shortcut for CallResult objects when someone uses QcConfig.run() directly # and doesn't go through a Stream object if isinstance(r, CallResult): cr = CollectedResult( stream_id=None, package=r.package, test=r.test, function=r.function, results=r.results, ) collected[cr.hash_key] = cr continue # CallResults for tr in r.results: cr = CollectedResult( stream_id=r.stream_id, package=tr.package, test=tr.test, function=tr.function ) if cr.hash_key not in collected: # Set the initial values cr.results = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=tr.results.dtype) cr.data = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=r.data.dtype) cr.tinp = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=r.tinp.dtype) cr.zinp = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=r.zinp.dtype) cr.lat = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=r.lat.dtype) cr.lon = np.ma.masked_all(shape=r.subset_indexes.shape, dtype=r.lon.dtype) collected[cr.hash_key] = cr collected[cr.hash_key].results[r.subset_indexes] = tr.results if cr is not None: if r.subset_indexes.all(): collected[cr.hash_key].data = r.data collected[cr.hash_key].tinp = r.tinp collected[cr.hash_key].zinp = r.zinp collected[cr.hash_key].lat = r.lat collected[cr.hash_key].lon = r.lon else: collected[cr.hash_key].data[r.subset_indexes] = r.data collected[cr.hash_key].tinp[r.subset_indexes] = r.tinp collected[cr.hash_key].zinp[r.subset_indexes] = r.zinp collected[cr.hash_key].lat[r.subset_indexes] = r.lat collected[cr.hash_key].lon[r.subset_indexes] = r.lon return list(collected.values())
[docs]def collect_results_dict(results): """ Turns a list of ContextResult objects into a dictionary of test results by combining the subset_index information in each ContextResult together into a single array of results. This is mostly here for historical purposes. Users should migrate to using the Result objects directly. """ # Magic for nested key generation # https://stackoverflow.com/a/27809959 collected = defaultdict(lambda: defaultdict(odict)) # ContextResults for r in results: # Shortcut for CallResult objects when someone uses QcConfig.run() directly # and doesn't go through a Stream object if isinstance(r, CallResult): collected[r.package][r.test] = r.results continue flag_arr = np.ma.empty_like(r.subset_indexes, dtype='uint8') flag_arr.fill(QartodFlags.UNKNOWN) # iterate over the CallResults for tr in r.results: testpackage = tr.package testname = tr.test testresults = tr.results if testname not in collected[r.stream_id][testpackage]: collected[r.stream_id][testpackage][testname] = np.copy(flag_arr) collected[r.stream_id][testpackage][testname][r.subset_indexes] = testresults return collected