Usage¶
At its core, ioos_qc
is a collection of modules and methods to run various quality control checks on an input stream of data.
The following implementations are available in ioos_qc
:
AXDS - API
- A collection of checks used by Axiom Data Science
Basic usage¶
1from ioos_qc import qartod
2
3results = qartod.gross_range_test(
4 inp=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
5 suspect_span=[0, 8],
6 fail_span=[0, 10]
7)
8
9print(results)
10
11# prints a masked array with values:
12# [1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 4, 4]
In this example, we call the gross_range_test
on a list of dummy data.
We’ve configured the test to fail if the data is outside the range [0, 10]
and be marked suspect if outside [0, 8]
.
The test returns an array of qc results for each data point, where 1
is PASS, 3
is SUSPECT, and 4
is FAIL.
Motivation¶
If all you want to do is run a one-time test against a stream of data, then all you really need is the example above. However, in most projects, the hard part is not implementing the qc test methods themselves, rather it is problems such as:
How to store QC test configurations and manage them?
How to manage the inputs (data) going into the test, and the output (results) coming out?
How to share QC result with your users in a consistent way that follows community standards?
How to ensure that your test implementations perform well against large datasets?
How to generate baseline QC configurations for a dataset?
How to visualize and communicate QC results in a standard way?
The ioos_qc
project does not just implement QC algorithms – it attempts to help you with these problems as well.
The following sections explore concepts that ioos_qc
uses to help you manage and run your tests efficiently.
Concepts¶
There are three main concepts in the ioos_qc
project:
Configurations: Standardized quality control definitions
Streams: Flexible data source classes to support running qualith checks against various data formats
Stores: Flexible data storage classes to support storing quality results in various data formats
ConfigGeneration: Classes to generate configuration objects based on external climatology datasets
Configurations¶
Configuration objects represent a collection of quality control tests to run and the parameters for each one. There are three main types of Config objects:
StreamConfig: This configures QC tests for a single stream of data like a
list
,tuple
,numpy.ndarray
,dask.array
,pandas.Series
,netCDF4.Variable
, orxarray.DataArray
. This can be used standalone, or as a building block for the following more complex configs.ContextConfig: This defines a collection of
StreamConfig
objects. These can be applied to multiple variables provided in apandas.DataFrame
,dask.DataFrame
,netCDF4.Dataset
, orxarray.Dataset
. Optionally, these configs can be constrained to specific time domains (windows
) – and/or spatial domains (regions
).Config: A collection of
ContextConfig
objects, suitable for configuring a single input dataset to be broken up by region and time window before having QC checks applied.
Each configuration type can be initialized through Python objects or from files and can be represented in the following ways:
python -
dict
orOrderedDict
JSON/YAML filepath (
str
orPath
object),str
, orStringIO
In addition, the ContextConfig
and Config
objects can be initialized with:
netCDF4/xarray filepath (
str
orPath
object) orDataset
StreamConfig¶
A StreamConfig
object defines a specific ioos_qc test module and test function along with the configuration parameters in which to run it with.
Note
In earlier versions, StreamConfig
was known as QcConfig
.
Usage¶
1from ioos_qc.config import StreamConfig
2
3config = {
4 'qartod': {
5 'gross_range_test': {
6 'suspect_span': [1, 11],
7 'fail_span': [0, 12],
8 }
9 }
10}
11c = StreamConfig(config)
ContextConfig¶
A ContextConfig
object defines multiple StreamConfig
objects as well as optional region and window objects.
region¶
A GeoJSON representation of a geographical region. This is processed into a shapely.geometry.GeometryCollection
internally for intersection calculations.
window¶
An object defining a time window using starting
and ending
. Internally this is defined as
window = namedtuple(
'TimeWindow',
('starting', 'ending'),
defaults=[None, None]
)
Usage¶
1from ioos_qc.config import ContextConfig
2
3config = """
4 region: null
5 window:
6 starting: 2020-01-01T00:00:00Z
7 ending: 2020-04-01T00:00:00Z
8 streams:
9 variable1:
10 qartod:
11 location_test:
12 bbox: [-80, 40, -70, 60]
13 variable2:
14 qartod:
15 gross_range_test:
16 suspect_span: [1, 11]
17 fail_span: [0, 12]
18"""
19c = ContextConfig(config)
20c = Config(config) # Also loadable as a Config
Config¶
The highest level and most flexible configuration object is a Config
. It can describe quality control configurations for any number of regions, windows and streams.
Usage¶
1from ioos_qc.config import Config
2
3config = """
4 contexts:
5 - region: null
6 window:
7 starting: 2020-01-01T00:00:00Z
8 ending: 2020-04-01T00:00:00Z
9 streams:
10 variable1:
11 qartod:
12 location_test:
13 bbox: [-80, 40, -70, 60]
14 variable2:
15 qartod:
16 gross_range_test:
17 suspect_span: [1, 11]
18 fail_span: [0, 12]
19 - region: null
20 window:
21 starting: 2020-01-01T00:00:00Z
22 ending: 2020-04-01T00:00:00Z
23 streams:
24 variable1:
25 qartod:
26 location_test:
27 bbox: [-80, 40, -70, 60]
28 variable2:
29 qartod:
30 gross_range_test:
31 suspect_span: [1, 11]
32 fail_span: [0, 12]
33"""
34c = Config(config)
Streams¶
Streams represent the data input types for running quality control tests. A user “runs” a stream of data through a collection of quality control tests defined by a Config. A list of possible Streams can be found in the Streams API
.
All streams return a generator of QC results that contain contextual information that can be useful when using the results. You can iterate over the results generator directly or you can collect them into more familiar list
or dict
objects before usage. If you are
working in a streaming environment you will want to use generator result objects yourself. If you are running one-time or batch process quality checks you likely want to collect the results or use one of the Stores provided by ioos_qc
.
ioos_qc
comes with some built-in Streams:
NumpyStream - Run QC checks against an numpy array
PandasStream - Run QC checks against a DataFrame
XarrayStream - Run QC checks staing an xarray Dataset
NetcdfStream - Run QC checks against a netCDF file (deprecated - use
XarrayStream
)
Results¶
Each yielded result will be a StreamConfigResult
or a ContextResult
, depending on which type of Config object was used. Collected results are only ever of one type, a CollectedResult
, and only one CollectedResult
will be returned after collecting Results. The benefit of using a CollectedResult
is that it will piece back together all of the different ContextConfig objects in a Config and return you one result per unique stream_id
and module/test combination.
Note
For example: If you had a Config object that contained (3) different ContextConfig objects (each defining a time window and test inputs) for a single variable/stream_id
, running that Config
through any Stream
implementation would yield (3) different ContextResult
objects. You could use them yourself to construct whatever results you wanted to manually, or you could collect those results back into a single CollectedResult
object to only have to deal with one result.
Warning
Historically, test results were returned in a dict
structure. While this is still supported it should be considered deprecated. The individually yielded result objects or a list of CollectedResult objects
should be used in any applications, including any implementation of Stores, going forward.
1import numpy as np
2import pandas as pd
3from ioos_qc.config import Config
4from ioos_qc.streams import PandasStream
5from ioos_qc.results import collect_results
6
7config = """
8 contexts:
9 - window:
10 starting: 2020-01-01T00:00:00Z
11 ending: 2020-02-01T00:00:00Z
12 streams:
13 variable1:
14 qartod:
15 aggregate:
16 gross_range_test:
17 suspect_span: [3, 4]
18 fail_span: [2, 5]
19 variable2:
20 qartod:
21 aggregate:
22 gross_range_test:
23 suspect_span: [23, 24]
24 fail_span: [22, 25]
25 - window:
26 starting: 2020-02-01T00:00:00Z
27 ending: 2020-03-01T00:00:00Z
28 streams:
29 variable1:
30 qartod:
31 aggregate:
32 gross_range_test:
33 suspect_span: [43, 44]
34 fail_span: [42, 45]
35 variable2:
36 qartod:
37 aggregate:
38 gross_range_test:
39 suspect_span: [23, 24]
40 fail_span: [22, 25]
41"""
42c = Config(config)
43
44rows = 50
45data_inputs = {
46 'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
47 'z': 2.0,
48 'lat': 36.1,
49 'lon': -76.5,
50 'variable1': np.arange(0, rows),
51 'variable2': np.arange(0, rows),
52}
53df = pd.DataFrame(data_inputs)
54
55# Setup the stream
56ps = PandasStream(df)
57
58# Pass the run method the config to use
59results = ps.run(c)
60
61# results is a generator of ContextResult objects
62print(results)
63# <generator object PandasStream.run at ...>
64
65# list_collected is a list of CollectedResult objects
66# for each stream_id and module/test combination
67list_collected = collect_results(results, how='list')
68print(list_collected)
69# [
70# CollectedResult(stream_id='variable1', package='qartod', test='gross_range_test', ...),
71# CollectedResult(stream_id='variable1', package='qartod', test='aggregate', ...),
72# CollectedResult(stream_id='variable2', package='qartod', test='gross_range_test', ...),
73# CollectedResult(stream_id='variable2', package='qartod', test='aggregate', ...),
74# ]
NumpyStream¶
1import numpy as np
2import pandas as pd
3from ioos_qc.config import Config
4from ioos_qc.streams import NumpyStream
5
6config = """
7 window:
8 starting: 2020-01-01T00:00:00Z
9 ending: 2020-04-01T00:00:00Z
10 streams:
11 variable1:
12 qartod:
13 aggregate:
14 gross_range_test:
15 suspect_span: [20, 30]
16 fail_span: [10, 40]
17"""
18c = Config(config)
19
20rows = 50
21tinp = pd.date_range(start='01/01/2020', periods=rows, freq='D').values
22inp = np.arange(0, tinp.size)
23zinp = np.full_like(tinp, 2.0)
24lat = np.full_like(tinp, 36.1)
25lon = np.full_like(tinp, -76.5)
26
27# Setup the stream
28ns = NumpyStream(inp, tinp, zinp, lat, lon)
29# Pass the run method the config to use
30results = ns.run(c)
PandasStream¶
A PandasStream pulls all required information to run the qc tests from a single DataFrame. If the axes column names are not in time
, z
, lat
, lon
or geom
, you may provide them as key word arguments. See the API docs for more information.
1import numpy as np
2import pandas as pd
3from ioos_qc.config import Config
4from ioos_qc.streams import PandasStream
5
6config = """
7 contexts:
8 - window:
9 starting: 2020-01-01T00:00:00Z
10 ending: 2020-02-01T00:00:00Z
11 streams:
12 variable1:
13 qartod:
14 aggregate:
15 gross_range_test:
16 suspect_span: [3, 4]
17 fail_span: [2, 5]
18 variable2:
19 qartod:
20 aggregate:
21 gross_range_test:
22 suspect_span: [23, 24]
23 fail_span: [22, 25]
24 - window:
25 starting: 2020-02-01T00:00:00Z
26 ending: 2020-03-01T00:00:00Z
27 streams:
28 variable1:
29 qartod:
30 aggregate:
31 gross_range_test:
32 suspect_span: [43, 44]
33 fail_span: [42, 45]
34 variable2:
35 qartod:
36 aggregate:
37 gross_range_test:
38 suspect_span: [23, 24]
39 fail_span: [22, 25]
40"""
41c = Config(config)
42
43rows = 50
44data_inputs = {
45 'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
46 'z': 2.0,
47 'lat': 36.1,
48 'lon': -76.5,
49 'variable1': np.arange(0, rows),
50 'variable2': np.arange(0, rows),
51}
52df = pd.DataFrame(data_inputs)
53
54# Setup the stream
55ps = PandasStream(df)
56# ps = PandasStream(df, time='time', z='z', lat='lat', lon='lon', geom='geom')
57# Pass the run method the config to use
58results = ps.run(c)
XarrayStream¶
1import numpy as np
2import xarray as xr
3import pandas as pd
4from ioos_qc.config import Config
5from ioos_qc.streams import XarrayStream
6
7config = """
8 window:
9 starting: 2020-01-01T00:00:00Z
10 ending: 2020-04-01T00:00:00Z
11 streams:
12 variable1:
13 qartod:
14 aggregate:
15 gross_range_test:
16 suspect_span: [20, 30]
17 fail_span: [10, 40]
18"""
19c = Config(config)
20
21rows = 50
22data_inputs = {
23 'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
24 'z': 2.0,
25 'lat': 36.1,
26 'lon': -76.5,
27 'variable1': np.arange(0, rows),
28}
29df = pd.DataFrame(data_inputs)
30ds = xr.Dataset.from_dataframe(df)
31
32# Setup the stream
33xs = XarrayStream(ds)
34# xs = XarrayStream(ds, time='time', z='z', lat='lat', lon='lon')
35# Pass the run method the config to use
36results = xs.run(c)
NetcdfStream¶
A subset of the NumpyStream, the NetcdfStream simply extracts numpy arrays from variables within a netCDF file and passes them through as arrays to NumpyStream. If you are using this class you should look towards the XarrayStream class which subsets more efficiently.
1import numpy as np
2import xarray as xr
3import pandas as pd
4from ioos_qc.config import Config
5from ioos_qc.streams import NetcdfStream
6
7config = """
8 window:
9 starting: 2020-01-01T00:00:00Z
10 ending: 2020-04-01T00:00:00Z
11 streams:
12 variable1:
13 qartod:
14 aggregate:
15 gross_range_test:
16 suspect_span: [20, 30]
17 fail_span: [10, 40]
18"""
19c = Config(config)
20
21rows = 50
22data_inputs = {
23 'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
24 'z': 2.0,
25 'lat': 36.1,
26 'lon': -76.5,
27 'variable1': np.arange(0, rows),
28}
29df = pd.DataFrame(data_inputs)
30ds = xr.Dataset.from_dataframe(df)
31
32# Setup the stream
33ns = NetcdfStream(ds)
34# ns = NetcdfStream(ds, time='time', z='z', lat='lat', lon='lon')
35# Pass the run method the config to use
36results = ns.run(c)
Stores¶
Stores represent different data formats for storing quality control Results from Streams. The results from any Stream
should be able to be passed into any Store
implementation defined in the Stores API
.
ioos_qc
comes with some built-in Stores:
PandasStore - Store QC results in a DataFrame.
CFNetCDFStore - Store QC results in a CF DSG file supported by pocean-core.
PandasStore¶
Collects all results and stores them as columns in a DataFrame.
1import pandas as pd
2from ioos_qc.streams import PandasStream
3from ioos_qc.stores import PandasStore
4
5# Setup the stream
6stream = PandasStream(df)
7
8# Run the tests by passing in a Config object
9results = stream.run(config)
10
11# Store the results in another DataFrame
12store = PandasStore(
13 results,
14 axes={
15 't': 'time',
16 'z': 'z',
17 'y': 'lat',
18 'x': 'lon'
19 }
20)
21
22# Compute any aggregations
23store.compute_aggregate(name='rollup_qc') # Appends to the results internally
24
25# Write only the test results to the store
26results_store = store.save(write_data=False, write_axes=False)
27
28# Append columns from qc results back into the data
29results_store = pd.concat([df, results_store], axis=1)
CFNetCDFStore¶
Store the QC results in a CF compliant DSG type netCDF file, along with all metadata information and serializing the configuration used in the tests into the netCDF file. This currently only supports creating a new file with all results and does not support appending to existing files or results, although that is expected to be implemented at some point. You can also choose to store a subset of results in a file to support storing the aggregate results in one file and the individual test results in another file.
1import pandas as pd
2from ioos_qc.streams import PandasStream
3from ioos_qc.stores import CFNetCDFStore
4from pocean.dsg import IncompleteMultidimensionalTrajectory
5
6# Setup the stream
7stream = PandasStream(df)
8
9# Run the tests by passing in a Config object
10results = stream.run(config)
11
12# Save a netCDF file
13ncd = CFNetCDFStore(results)
14ncd.save(
15 'results.nc',
16 IncompleteMultidimensionalTrajectory,
17 config,
18 dsg_kwargs=dict(
19 reduce_dims=True,
20 unlimited=False,
21 unique_dims=True
22 )
23)
ConfigGeneration¶
A QcConfigCreator instance generates a config for QcConfig informed by reference datasets, such as climatologies, defined via configuration.
CreatorConfig¶
CreatorConfig performs checks on the configuration to ensure that all required fields and attributes are provided.
For convenience, the get_assets.py script is provided to download and prepare climatology dataset from NARR and Ocean Atlas.
1creator_config = {
2 "datasets": [
3 {
4 "name": "ocean_atlas",
5 "file_path": "assets/ocean_atlas.nc",
6 "variables": {
7 "o2": "o_an",
8 "salinity": "s_an",
9 "temperature": "t_an"
10 },
11 "3d": "depth"
12 },
13 {
14 "name": "narr",
15 "file_path": "assets/narr.nc",
16 "variables": {
17 "air": "air",
18 "pres": "slp",
19 "rhum": "rhum",
20 "uwnd": "uwnd",
21 "vwnd": "vwnd"
22 }
23 }
24 ]
25}
26cc = CreatorConfig(creator_config)
27
28print(cc)
29{
30 "narr": {
31 "file_path": "assets/narr.nc",
32 "variables": {
33 "air": "air",
34 "pres": "slp",
35 "rhum": "rhum",
36 "uwnd": "uwnd",
37 "vwnd": "vwnd"
38 }
39 },
40 "ocean_atlas": {
41 "3d": "depth",
42 "file_path": "assets/ocean_atlas.nc",
43 "variables": {
44 "o2": "o_an",
45 "salinity": "s_an",
46 "temperature": "t_an"
47 }
48 }
49}
QcConfigCreator¶
1qccc = QcConfigCreator(cc)
2
3print(qccc)
4{
5 "narr": {
6 "file_path": "assets/narr.nc",
7 "variables": {
8 "air": "air",
9 "pres": "slp",
10 "rhum": "rhum",
11 "uwnd": "uwnd",
12 "vwnd": "vwnd"
13 }
14 },
15 "ocean_atlas": {
16 "3d": "depth",
17 "file_path": "assets/ocean_atlas.nc",
18 "variables": {
19 "o2": "o_an",
20 "salinity": "s_an",
21 "temperature": "t_an"
22 }
23 }
24}
QcVariableConfig¶
An instance of QcVariableConfig specifies how quality control will be tested for a given variable.
In this example, the variable air, or air temperature, will be quality controlled based on climatological data in the region defined by bbox (xmin, ymin, xmax, ymax), for a time range (between 2020-01-01 and 2020-01-08). The tests sections specifies that two tests will be performed: spike_test and gross_range_test. Each test section requires suspect_min, suspect_max, fail_min, and fail_max to be defined.
The {fail,suspect}_{min,max} values will be evaluated as functions with values for min, max, mean, and std derived from the dataset for the bounds specified. Note that each term, operator, and grouping symbol must be surrounded by whitespace.
Test function allowed symbols:
Data derived descriptive statistics: min, max, mean, std
Operators: -, +, *, /
Grouping symbols: (, )
Like CreatorConfig, QcVaribleConfig performs checks on the configuration to ensure that it adheres to the specified schema and includes all required fields and attributes.
1qc_variable_config = {
2 "variable": "air",
3 "bbox": [-165, 70, 160, 80],
4 "start_time": "2020-01-01",
5 "end_time": "2020-01-08",
6 "tests": {
7 "spike_test": {
8 "suspect_min": "1",
9 "suspect_max": "( 1 + 2 )",
10 "fail_min": "3 * 2 - 6",
11 "fail_max": "3 * mean + std / ( max * min )"
12 },
13 "gross_range_test": {
14 "suspect_min": "min - std * 2",
15 "suspect_max": "max + std / 2",
16 "fail_min": "mean * std",
17 "fail_max": "mean / std"
18 }
19 }
20}
21vc = QcVariableConfig(qc_variable_config)
22print(vc)
23{
24 "bbox": [
25 -165,
26 70,
27 160,
28 80
29 ],
30 "end_time": "2020-01-08",
31 "start_time": "2020-01-01",
32 "tests": {
33 "gross_range_test": {
34 "fail_max": "mean / std",
35 "fail_min": "mean * std",
36 "suspect_max": "max + std / 2",
37 "suspect_min": "min - std * 2"
38 },
39 "spike_test": {
40 "fail_max": "3 * mean + std / ( max * min )",
41 "fail_min": "3 * 2 - 6",
42 "suspect_max": "( 1 + 2 )",
43 "suspect_min": "1"
44 }
45 }
46}
Create config for QcConfig¶
Finally, the QcConfigCreator instance (qccc) takes the QcVariableConfig instance (vc) and returns a config that can then be used with QcConfig.
1config = qccc.create_config(vc)
2print(json.dumps(config, indent=4, sort_keys=True))
3{
4 "qartod": {
5 "gross_range_test": {
6 "fail_span": [
7 -224.23900165924232,
8 -2.673170364457356
9 ],
10 "suspect_span": [
11 -54.89132748864793,
12 7.09364403443822
13 ]
14 },
15 "spike_test": {
16 "fail_span": [
17 0.0,
18 -73.54932418742399
19 ],
20 "suspect_span": [
21 1.0,
22 3.0
23 ]
24 }
25 }
26}