Source code for ioos_qc.streams

#!/usr/bin/env python
# coding=utf-8
import logging
from collections import defaultdict
from collections import OrderedDict as odict

import numpy as np
import pandas as pd
import xarray as xr
try:
    from xarray.core.indexing import remap_label_indexers as map_index_queries
except ImportError:
    from xarray.core.indexing import map_index_queries

from ioos_qc.config import Config
from ioos_qc.utils import mapdates
from ioos_qc.results import ContextResult

L = logging.getLogger(__name__)  # noqa


[docs]class BaseStream: """Each stream should define how to return a list of datastreams along with their time and depth association. Each of these streams will passed through quality control configurations and returned back to it. Each stream needs to also define what to do with the resulting results (how to store them.)""" def __init__(self, *args, **kwargs): """ df: the dataframe """ pass
[docs] def time(self): """Return the time array from the source dataset. This is useful when plotting QC results.""" pass
[docs] def data(self, stream_id): """Return the data array from the source dataset based on stream_id. This is useful when plotting QC results.""" pass
[docs] def run(self, config : Config): """Iterate over the configs, splitting the streams up by geographic and time window before applying the individual config using QcConfig.run(). Store results for future usage. """ pass
[docs]class PandasStream: def __init__(self, df, time=None, z=None, lat=None, lon=None, geom=None): """ df: the dataframe time: the column to use for time z: the column to use for depth lat: the column to use for latitude, this or geom is required if using regional subsets lon: the column to use for longitude, this or geom is required if using regional subsets geom: the column containing the geometry, this or lat and lon are required if using regional subsets """ self.df = df self.time_column = time or 'time' self.z_column = z or 'z' self.lat_column = lat or 'lat' self.lon_column = lon or 'lon' self.geom_column = geom or 'geom' axis_columns = [ self.time_column, self.z_column, self.lat_column, self.lon_column, self.geom_column ] self.axis_columns = [ x for x in axis_columns if x in df ]
[docs] def time(self): return self.df[self.time_column]
[docs] def data(self, stream_id): return self.df[stream_id]
[docs] def run(self, config : Config): for context, calls in config.contexts.items(): # Subset first by the stream id in each call stream_ids = [] for call in calls: if call.stream_id not in self.df: L.warning(f'{call.stream_id} is not a column in the dataframe, skipping') continue stream_ids.append(call.stream_id) subset = self.df.loc[:, list(set(stream_ids + self.axis_columns))] if context.region: # TODO: yeah this does nothing right now # Figure out if this is a geopandas DataFrame already. If not, create one using # the specified lat_column and lon_column attributes in the constructor # if self.geom_column not in subset: # subset = gpd.DataFrame(subset) # subset[self.geom_column] = 'wut' # subset = subset[[ subset[self.geom_column].within(context.region) ]] pass if context.window.starting is not None or context.window.ending is not None: if self.time_column in self.axis_columns: if context.window.starting: subset = subset.loc[subset[self.time_column] >= context.window.starting, :] if context.window.ending: subset = subset.loc[subset[self.time_column] < context.window.ending, :] else: L.warning(f'Skipping window subset, {self.time_column} not in columns') pass # This is a boolean array of what was subset and tested based on the initial data feed # Take the index of the subset and set those to true subset_indexes = pd.Series(0, index=self.df.index, dtype='bool') subset_indexes.iloc[subset.index] = True # The source is subset, now the resulting rows need to be tested # Put together the static inputs that were subset for this config subset_kwargs = {} if self.time_column in self.axis_columns: subset_kwargs['tinp'] = subset.loc[:, self.time_column] if self.z_column in self.axis_columns: subset_kwargs['zinp'] = subset.loc[:, self.z_column] if self.lon_column in self.axis_columns: subset_kwargs['lon'] = subset.loc[:, self.lon_column] if self.lat_column in self.axis_columns: subset_kwargs['lat'] = subset.loc[:, self.lat_column] # Perform the "run" function on each Call for call in calls: # if call.is_aggregate: # # We compute aggregates using the results # continue if call.stream_id not in subset: L.warning(f'{call.stream_id} not a column in the input dataframe, skipping') continue data_input = subset.loc[:, call.stream_id] # This evaluates the generator test results run_result = list(call.run( inp=data_input, **subset_kwargs )) yield ContextResult( results=run_result, stream_id=call.stream_id, subset_indexes=subset_indexes.values, data=data_input.values, tinp=subset_kwargs.get('tinp', pd.Series(dtype='datetime64[ns]')).values, zinp=subset_kwargs.get('zinp', pd.Series(dtype='float64')).values, lat=subset_kwargs.get('lat', pd.Series(dtype='float64')).values, lon=subset_kwargs.get('lon', pd.Series(dtype='float64')).values, )
[docs]class NumpyStream: def __init__(self, inp=None, time=None, z=None, lat=None, lon=None, geom=None): """ inp: a numpy array or a dictionary of numpy arrays where the keys are the stream ids time: numpy array of date-like objects. z: numpy array of z lat: numpy array of latitude, this or geom is required if using regional subsets lon: numpy array of longitude, this or geom is required if using regional subsets geom: numpy array of geometry, this or lat and lon are required if using regional subsets """ self.inp = inp try: assert time is not None self.tinp = pd.DatetimeIndex(mapdates(time)) except BaseException: self.tinp = time self.zinp = z self.lat = lat self.lon = lon self.geom = geom
[docs] def time(self): return self.tinp
[docs] def data(self, stream_id=None): return self.inp
[docs] def run(self, config: Config): for context, calls in config.contexts.items(): # This is a boolean array of what was subset and tested based on the initial data feed # Take the index of the subset and set those to true subset_indexes = np.full_like(self.inp, 1, dtype=bool) if context.region: # TODO: yeah this does nothing right now # Subset against the passed in lat/lons arrays in passedkwargs if self.lat is not None and self.lon is not None: pass else: L.warning('Skipping region subset, "lat" and "lon" must be passed into NumpySource') if context.window.starting is not None or context.window.ending is not None: if self.tinp is not None: if context.window.starting: subset_indexes = (subset_indexes) & (self.tinp >= context.window.starting) if context.window.ending: subset_indexes = (subset_indexes) & (self.tinp < context.window.ending) else: L.warning('Skipping window subset, "time" array must be passed into "run"') pass subset_kwargs = {} if self.tinp is not None: subset_kwargs['tinp'] = self.tinp[subset_indexes] if self.zinp is not None: subset_kwargs['zinp'] = self.zinp[subset_indexes] if self.lon is not None: subset_kwargs['lon'] = self.lon[subset_indexes] if self.lat is not None: subset_kwargs['lat'] = self.lat[subset_indexes] for call in calls: # If the input was passed in the config. # This is here for backwards compatibility and doesn't support # being a different size than what the subset/context size is. # Pass in values in the config should be deprecated in the future! if self.inp is None and 'inp' in call.kwargs: self.inp = np.array(call.kwargs['inp']) subset_indexes = np.full_like(self.inp, 1, dtype=bool) # Support more than one named inp, but fall back to a single if isinstance(self.inp, np.ndarray): runinput = self.inp elif isinstance(self.inp, dict): if call.stream_id in self.inp: runinput = self.inp[call.stream_id] else: L.warning(f'{call.stream_id} not in input dict, skipping') continue else: L.error(f"Input is not a dict or np.ndarray, skipping {call.stream_id}") continue # Slicing with [True] changes the shape of an array so always re-shape. That # will happen when the input array is of size 1. Corner case but still need to # handle it here. original_shape = runinput.shape data_input = runinput[subset_indexes].reshape(original_shape) # This evaluates the generator test results run_result = list(call.run( inp=data_input, **subset_kwargs )) yield ContextResult( results=run_result, stream_id=call.stream_id, subset_indexes=subset_indexes, data=data_input, tinp=subset_kwargs.get('tinp', pd.Series(dtype='datetime64[ns]')).values, zinp=subset_kwargs.get('zinp', pd.Series(dtype='float64').values), lat=subset_kwargs.get('lat', pd.Series(dtype='float64').values), lon=subset_kwargs.get('lon', pd.Series(dtype='float64').values), )
[docs]class NetcdfStream: def __init__(self, path_or_ncd, time=None, z=None, lat=None, lon=None, geom=None): self.path_or_ncd = path_or_ncd self.time_var = time or 'time' self.z_var = z or 'z' self.lat_var = lat or 'lat' self.lon_var = lon or 'lon'
[docs] def time(self): do_close, ds = self._open() tdata = ds.variables[self.time_var] if do_close is True: ds.close() return tdata
[docs] def data(self, stream_id): do_close, ds = self._open() vdata = ds.variables[stream_id] if do_close is True: ds.close() return vdata
def _open(self): if isinstance(self.path_or_ncd, str): do_close = True ds = xr.open_dataset(self.path_or_ncd, decode_cf=False) else: do_close = False ds = self.path_or_ncd return do_close, ds
[docs] def run(self, config: Config): do_close, ds = self._open() stream_ids = [] for context, calls in config.contexts.items(): for call in calls: if call.stream_id not in ds.variables: L.warning(f'{call.stream_id} is not a variable in the netCDF dataset, skipping') continue stream_ids.append(call.stream_id) # Find any var specific kwargs to pass onto the run varkwargs = { 'inp': {} } if self.time_var in ds.variables: varkwargs['time'] = pd.DatetimeIndex(mapdates(ds.variables[self.time_var].values)) if self.z_var in ds.variables: varkwargs['z'] = ds.variables[self.z_var].values if self.lat_var in ds.variables: varkwargs['lat'] = ds.variables[self.lat_var].values if self.lon_var in ds.variables: varkwargs['lon'] = ds.variables[self.lon_var].values # Now populate the `inp` dict for each valid data stream for s in stream_ids: if s in ds.variables: varkwargs['inp'][s] = ds.variables[s].values if do_close is True: ds.close() ns = NumpyStream(**varkwargs) yield from ns.run(config)
[docs]class XarrayStream: def __init__(self, path_or_ncd, time=None, z=None, lat=None, lon=None): self.path_or_ncd = path_or_ncd self.time_var = time or 'time' self.z_var = z or 'z' self.lat_var = lat or 'lat' self.lon_var = lon or 'lon'
[docs] def time(self): do_close, ds = self._open() tdata = ds[self.time_var].values if do_close is True: ds.close() return tdata
[docs] def data(self, stream_id): do_close, ds = self._open() vdata = ds[stream_id].values if do_close is True: ds.close() return vdata
def _open(self): if isinstance(self.path_or_ncd, str): do_close = True ds = xr.open_dataset( self.path_or_ncd, decode_cf=True, decode_coords=True, decode_times=True, mask_and_scale=True ) else: do_close = False ds = self.path_or_ncd return do_close, ds
[docs] def run(self, config: Config): # Magic for nested key generation # https://stackoverflow.com/a/27809959 results = defaultdict(lambda: defaultdict(odict)) do_close, ds = self._open() for context, calls in config.contexts.items(): for call in calls: # Find any var specific kwargs to pass onto the run if call.stream_id not in ds.variables: L.warning(f'{call.stream_id} is not a variable in the xarray dataset, skipping') continue # Because the variables could have different dimensions # we calculate the coordinates and subset for each # This is xarray style subsetting, so will look something like: # { # 'time': slice(datetime.datetime(2020, 1, 1, 0, 0), datetime.datetime(2020, 4, 1, 0, 0), None) # } label_indexes = {} subset_kwargs = {} # Region subset # TODO: yeah this does nothing right now # Subset against the passed in lat/lons variable keys # and build up the subset dict to apply later # Time subset if self.time_var in ds[call.stream_id].coords: if context.window.starting and context.window.ending: label_indexes[self.time_var] = slice(context.window.starting, context.window.ending) subset_stream = ds[call.stream_id].sel(**label_indexes) if self.time_var in subset_stream.coords: # Already subset with the stream, best case. Good netCDF file. subset_kwargs['tinp'] = subset_stream.coords[self.time_var].values elif self.time_var in ds.variables and ds[self.time_var].dims == ds[call.stream_id].dims: # Same dimensions as the stream, so use the same subset subset_kwargs['tinp'] = ds[self.time_var].sel(**label_indexes).values elif self.time_var in ds.variables and ds[self.time_var].size == ds[call.stream_id].size: # Not specifically connected, but hey, the user asked for it subset_kwargs['tinp'] = ds[self.time_var].sel(**label_indexes).values if self.z_var in subset_stream.coords: # Already subset with the stream, best case. Good netCDF file. subset_kwargs['zinp'] = subset_stream.coords[self.z_var].values elif self.z_var in ds.variables and ds[self.z_var].dims == ds[call.stream_id].dims: # Same dimensions as the stream, so use the same subset subset_kwargs['zinp'] = ds[self.z_var].sel(**label_indexes).values elif self.z_var in ds.variables and ds[self.z_var].size == ds[call.stream_id].size: # Not specifically connected, but hey, the user asked for it subset_kwargs['zinp'] = ds[self.z_var].sel(**label_indexes).values if self.lat_var in subset_stream.coords: # Already subset with the stream, best case. Good netCDF file. subset_kwargs['lat'] = subset_stream.coords[self.lat_var].values elif self.lat_var in ds.variables and ds[self.lat_var].dims == ds[call.stream_id].dims: # Same dimensions as the stream, so use the same subset subset_kwargs['lat'] = ds[self.lat_var].sel(**label_indexes).values elif self.lat_var in ds.variables and ds[self.lat_var].size == ds[call.stream_id].size: # Not specifically connected, but hey, the user asked for it subset_kwargs['lat'] = ds[self.lat_var].sel(**label_indexes).values if self.lon_var in subset_stream.coords: # Already subset with the stream, best case. Good netCDF file. subset_kwargs['lon'] = subset_stream.coords[self.lon_var].values elif self.lon_var in ds.variables and ds[self.lon_var].dims == ds[call.stream_id].dims: # Same dimensions as the stream, so use the same subset subset_kwargs['lon'] = ds[self.lon_var].sel(**label_indexes).values elif self.lon_var in ds.variables and ds[self.lon_var].size == ds[call.stream_id].size: # Not specifically connected, but hey, the user asked for it subset_kwargs['lon'] = ds[self.lon_var].sel(**label_indexes).values data_input = subset_stream.values run_result = call.run( **subset_kwargs, **dict(inp=data_input) ) # Here we turn the labeled xarray indexes into boolean index arrays that numpy # can use to subset a basic array. This takes each labeled index, converts it to # its integer index representation (label -> integers) and then matches the keys # on each label with the dimension of the data variable. This result should be # able to be used on the original data feed AS IS using a direct subset notation # data[subset_indexes]. I'm pretty sure this works and if it doesn't blame my cat. # We start by subsetting nothing subset_indexes = np.full_like(ds[call.stream_id].values, 0, dtype=bool) int_indexes = map_index_queries(ds[call.stream_id], label_indexes) # This if-else clause is required only to support Python <3.8. # we can remove it when ioos_qc drops support for Python <=3.7. if isinstance(int_indexes, tuple): int_indexes = int_indexes[0] else: int_indexes = int_indexes.dim_indexers # Initial slicer will select everything. This selects all values in a dimension # if there are no labeled indexes for it. slicers = [ slice(None) for x in range(ds[call.stream_id].ndim) ] for index_key, index_value in int_indexes.items(): if index_key in ds[call.stream_id].dims: slicers[ds[call.stream_id].dims.index(index_key)] = index_value # We started with an empty subset_indexes, not set to True what we actually subset # using the labeled dimensions. # Casting to a tuple to handle a numpy deprecation: # FutureWarning: Using a non-tuple sequence for multidimensional indexing is # deprecated; use `arr[tuple(seq)]` instead of `arr[seq]`. In the future this will # be interpreted as an array index, `arr[np.array(seq)]`, which will result either # in an error or a different result. subset_indexes[tuple(slicers)] = True yield ContextResult( results=run_result, stream_id=call.stream_id, subset_indexes=subset_indexes, data=data_input, tinp=subset_kwargs.get('tinp', pd.Series(dtype='datetime64[ns]').values), zinp=subset_kwargs.get('zinp', pd.Series(dtype='float64').values), lat=subset_kwargs.get('lat', pd.Series(dtype='float64').values), lon=subset_kwargs.get('lon', pd.Series(dtype='float64').values), ) if do_close is True: ds.close() return results