Usage

At its core, ioos_qc is a collection of modules and methods to run various quality control checks on an input stream of data.

The following implementations are available in ioos_qc:

Basic usage

Calling a test manually
 1from ioos_qc import qartod
 2
 3results = qartod.gross_range_test(
 4    inp=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
 5    suspect_span=[0, 8],
 6    fail_span=[0, 10]
 7)
 8
 9print(results)
10
11# prints a masked array with values:
12# [1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 4, 4]

In this example, we call the gross_range_test on a list of dummy data. We’ve configured the test to fail if the data is outside the range [0, 10] and be marked suspect if outside [0, 8]. The test returns an array of qc results for each data point, where 1 is PASS, 3 is SUSPECT, and 4 is FAIL.

Motivation

If all you want to do is run a one-time test against a stream of data, then all you really need is the example above. However, in most projects, the hard part is not implementing the qc test methods themselves, rather it is problems such as:

  • How to store QC test configurations and manage them?

  • How to manage the inputs (data) going into the test, and the output (results) coming out?

  • How to share QC result with your users in a consistent way that follows community standards?

  • How to ensure that your test implementations perform well against large datasets?

  • How to generate baseline QC configurations for a dataset?

  • How to visualize and communicate QC results in a standard way?

The ioos_qc project does not just implement QC algorithms – it attempts to help you with these problems as well.

The following sections explore concepts that ioos_qc uses to help you manage and run your tests efficiently.

Concepts

There are three main concepts in the ioos_qc project:

  • Configurations: Standardized quality control definitions

  • Streams: Flexible data source classes to support running qualith checks against various data formats

  • Stores: Flexible data storage classes to support storing quality results in various data formats

  • ConfigGeneration: Classes to generate configuration objects based on external climatology datasets

Configurations

Configuration objects represent a collection of quality control tests to run and the parameters for each one. There are three main types of Config objects:

  • StreamConfig: This configures QC tests for a single stream of data like a list, tuple, numpy.ndarray, dask.array, pandas.Series, netCDF4.Variable, or xarray.DataArray. This can be used standalone, or as a building block for the following more complex configs.

  • ContextConfig: This defines a collection of StreamConfig objects. These can be applied to multiple variables provided in a pandas.DataFrame, dask.DataFrame, netCDF4.Dataset, or xarray.Dataset. Optionally, these configs can be constrained to specific time domains (windows) – and/or spatial domains (regions).

  • Config: A collection of ContextConfig objects, suitable for configuring a single input dataset to be broken up by region and time window before having QC checks applied.

Each configuration type can be initialized through Python objects or from files and can be represented in the following ways:

  • python - dict or OrderedDict

  • JSON/YAML filepath (str or Path object), str, or StringIO

In addition, the ContextConfig and Config objects can be initialized with:

  • netCDF4/xarray filepath (str or Path object) or Dataset

StreamConfig

A StreamConfig object defines a specific ioos_qc test module and test function along with the configuration parameters in which to run it with.

Note

In earlier versions, StreamConfig was known as QcConfig.

Usage

A basic StreamConfig object
 1from ioos_qc.config import StreamConfig
 2
 3config = {
 4    'qartod': {
 5        'gross_range_test': {
 6            'suspect_span': [1, 11],
 7            'fail_span': [0, 12],
 8        }
 9    }
10}
11c = StreamConfig(config)

ContextConfig

A ContextConfig object defines multiple StreamConfig objects as well as optional region and window objects.

region

A GeoJSON representation of a geographical region. This is processed into a shapely.geometry.GeometryCollection internally for intersection calculations.

window

An object defining a time window using starting and ending. Internally this is defined as

window = namedtuple(
    'TimeWindow',
    ('starting', 'ending'),
    defaults=[None, None]
)

Usage

A basic ContextConfig object
 1from ioos_qc.config import ContextConfig
 2
 3config = """
 4    region: null
 5    window:
 6        starting: 2020-01-01T00:00:00Z
 7        ending: 2020-04-01T00:00:00Z
 8    streams:
 9        variable1:
10            qartod:
11                location_test:
12                    bbox: [-80, 40, -70, 60]
13        variable2:
14            qartod:
15                gross_range_test:
16                    suspect_span: [1, 11]
17                    fail_span: [0, 12]
18"""
19c = ContextConfig(config)
20c = Config(config)  # Also loadable as a Config

Config

The highest level and most flexible configuration object is a Config. It can describe quality control configurations for any number of regions, windows and streams.

Usage

A basic Config object
 1from ioos_qc.config import Config
 2
 3config = """
 4    contexts:
 5        -   region: null
 6            window:
 7                starting: 2020-01-01T00:00:00Z
 8                ending: 2020-04-01T00:00:00Z
 9            streams:
10                variable1:
11                    qartod:
12                        location_test:
13                            bbox: [-80, 40, -70, 60]
14                variable2:
15                    qartod:
16                        gross_range_test:
17                            suspect_span: [1, 11]
18                            fail_span: [0, 12]
19        -   region: null
20            window:
21                starting: 2020-01-01T00:00:00Z
22                ending: 2020-04-01T00:00:00Z
23            streams:
24                variable1:
25                    qartod:
26                        location_test:
27                            bbox: [-80, 40, -70, 60]
28                variable2:
29                    qartod:
30                        gross_range_test:
31                            suspect_span: [1, 11]
32                            fail_span: [0, 12]
33"""
34c = Config(config)

Streams

Streams represent the data input types for running quality control tests. A user “runs” a stream of data through a collection of quality control tests defined by a Config. A list of possible Streams can be found in the Streams API. All streams return a generator of QC results that contain contextual information that can be useful when using the results. You can iterate over the results generator directly or you can collect them into more familiar list or dict objects before usage. If you are working in a streaming environment you will want to use generator result objects yourself. If you are running one-time or batch process quality checks you likely want to collect the results or use one of the Stores provided by ioos_qc.

ioos_qc comes with some built-in Streams:

  • NumpyStream - Run QC checks against an numpy array

  • PandasStream - Run QC checks against a DataFrame

  • XarrayStream - Run QC checks staing an xarray Dataset

  • NetcdfStream - Run QC checks against a netCDF file (deprecated - use XarrayStream)

Results

Each yielded result will be a StreamConfigResult or a ContextResult, depending on which type of Config object was used. Collected results are only ever of one type, a CollectedResult, and only one CollectedResult will be returned after collecting Results. The benefit of using a CollectedResult is that it will piece back together all of the different ContextConfig objects in a Config and return you one result per unique stream_id and module/test combination.

Note

For example: If you had a Config object that contained (3) different ContextConfig objects (each defining a time window and test inputs) for a single variable/stream_id, running that Config through any Stream implementation would yield (3) different ContextResult objects. You could use them yourself to construct whatever results you wanted to manually, or you could collect those results back into a single CollectedResult object to only have to deal with one result.

Warning

Historically, test results were returned in a dict structure. While this is still supported it should be considered deprecated. The individually yielded result objects or a list of CollectedResult objects should be used in any applications, including any implementation of Stores, going forward.

Different way to use Stream results
 1import numpy as np
 2import pandas as pd
 3from ioos_qc.config import Config
 4from ioos_qc.streams import PandasStream
 5from ioos_qc.results import collect_results
 6
 7config = """
 8    contexts:
 9        -   window:
10                starting: 2020-01-01T00:00:00Z
11                ending: 2020-02-01T00:00:00Z
12            streams:
13                variable1:
14                    qartod:
15                        aggregate:
16                        gross_range_test:
17                            suspect_span: [3, 4]
18                            fail_span: [2, 5]
19                variable2:
20                    qartod:
21                        aggregate:
22                        gross_range_test:
23                            suspect_span: [23, 24]
24                            fail_span: [22, 25]
25        -   window:
26                starting: 2020-02-01T00:00:00Z
27                ending: 2020-03-01T00:00:00Z
28            streams:
29                variable1:
30                    qartod:
31                        aggregate:
32                        gross_range_test:
33                            suspect_span: [43, 44]
34                            fail_span: [42, 45]
35                variable2:
36                    qartod:
37                        aggregate:
38                        gross_range_test:
39                            suspect_span: [23, 24]
40                            fail_span: [22, 25]
41"""
42c = Config(config)
43
44rows = 50
45data_inputs = {
46    'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
47    'z': 2.0,
48    'lat': 36.1,
49    'lon': -76.5,
50    'variable1': np.arange(0, rows),
51    'variable2': np.arange(0, rows),
52}
53df = pd.DataFrame(data_inputs)
54
55# Setup the stream
56ps = PandasStream(df)
57
58# Pass the run method the config to use
59results = ps.run(c)
60
61# results is a generator of ContextResult objects
62print(results)
63# <generator object PandasStream.run at ...>
64
65# list_collected is a list of CollectedResult objects
66# for each stream_id and module/test combination
67list_collected = collect_results(results, how='list')
68print(list_collected)
69# [
70#   CollectedResult(stream_id='variable1', package='qartod', test='gross_range_test', ...),
71#   CollectedResult(stream_id='variable1', package='qartod', test='aggregate', ...),
72#   CollectedResult(stream_id='variable2', package='qartod', test='gross_range_test', ...),
73#   CollectedResult(stream_id='variable2', package='qartod', test='aggregate', ...),
74# ]

NumpyStream

An example of a NumpyStream
 1import numpy as np
 2import pandas as pd
 3from ioos_qc.config import Config
 4from ioos_qc.streams import NumpyStream
 5
 6config = """
 7    window:
 8        starting: 2020-01-01T00:00:00Z
 9        ending: 2020-04-01T00:00:00Z
10    streams:
11        variable1:
12            qartod:
13                aggregate:
14                gross_range_test:
15                    suspect_span: [20, 30]
16                    fail_span: [10, 40]
17"""
18c = Config(config)
19
20rows = 50
21tinp = pd.date_range(start='01/01/2020', periods=rows, freq='D').values
22inp = np.arange(0, tinp.size)
23zinp = np.full_like(tinp, 2.0)
24lat = np.full_like(tinp, 36.1)
25lon = np.full_like(tinp, -76.5)
26
27# Setup the stream
28ns = NumpyStream(inp, tinp, zinp, lat, lon)
29# Pass the run method the config to use
30results = ns.run(c)

PandasStream

A PandasStream pulls all required information to run the qc tests from a single DataFrame. If the axes column names are not in time, z, lat, lon or geom, you may provide them as key word arguments. See the API docs for more information.

An example of a PandasStream
 1import numpy as np
 2import pandas as pd
 3from ioos_qc.config import Config
 4from ioos_qc.streams import PandasStream
 5
 6config = """
 7    contexts:
 8        -   window:
 9                starting: 2020-01-01T00:00:00Z
10                ending: 2020-02-01T00:00:00Z
11            streams:
12                variable1:
13                    qartod:
14                        aggregate:
15                        gross_range_test:
16                            suspect_span: [3, 4]
17                            fail_span: [2, 5]
18                variable2:
19                    qartod:
20                        aggregate:
21                        gross_range_test:
22                            suspect_span: [23, 24]
23                            fail_span: [22, 25]
24        -   window:
25                starting: 2020-02-01T00:00:00Z
26                ending: 2020-03-01T00:00:00Z
27            streams:
28                variable1:
29                    qartod:
30                        aggregate:
31                        gross_range_test:
32                            suspect_span: [43, 44]
33                            fail_span: [42, 45]
34                variable2:
35                    qartod:
36                        aggregate:
37                        gross_range_test:
38                            suspect_span: [23, 24]
39                            fail_span: [22, 25]
40"""
41c = Config(config)
42
43rows = 50
44data_inputs = {
45    'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
46    'z': 2.0,
47    'lat': 36.1,
48    'lon': -76.5,
49    'variable1': np.arange(0, rows),
50    'variable2': np.arange(0, rows),
51}
52df = pd.DataFrame(data_inputs)
53
54# Setup the stream
55ps = PandasStream(df)
56# ps = PandasStream(df, time='time', z='z', lat='lat', lon='lon', geom='geom')
57# Pass the run method the config to use
58results = ps.run(c)

XarrayStream

An example of a XarrayStream
 1import numpy as np
 2import xarray as xr
 3import pandas as pd
 4from ioos_qc.config import Config
 5from ioos_qc.streams import XarrayStream
 6
 7config = """
 8    window:
 9        starting: 2020-01-01T00:00:00Z
10        ending: 2020-04-01T00:00:00Z
11    streams:
12        variable1:
13            qartod:
14                aggregate:
15                gross_range_test:
16                    suspect_span: [20, 30]
17                    fail_span: [10, 40]
18"""
19c = Config(config)
20
21rows = 50
22data_inputs = {
23    'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
24    'z': 2.0,
25    'lat': 36.1,
26    'lon': -76.5,
27    'variable1': np.arange(0, rows),
28}
29df = pd.DataFrame(data_inputs)
30ds = xr.Dataset.from_dataframe(df)
31
32# Setup the stream
33xs = XarrayStream(ds)
34# xs = XarrayStream(ds, time='time', z='z', lat='lat', lon='lon')
35# Pass the run method the config to use
36results = xs.run(c)

NetcdfStream

A subset of the NumpyStream, the NetcdfStream simply extracts numpy arrays from variables within a netCDF file and passes them through as arrays to NumpyStream. If you are using this class you should look towards the XarrayStream class which subsets more efficiently.

An example of a NetcdfStream
 1import numpy as np
 2import xarray as xr
 3import pandas as pd
 4from ioos_qc.config import Config
 5from ioos_qc.streams import NetcdfStream
 6
 7config = """
 8    window:
 9        starting: 2020-01-01T00:00:00Z
10        ending: 2020-04-01T00:00:00Z
11    streams:
12        variable1:
13            qartod:
14                aggregate:
15                gross_range_test:
16                    suspect_span: [20, 30]
17                    fail_span: [10, 40]
18"""
19c = Config(config)
20
21rows = 50
22data_inputs = {
23    'time': pd.date_range(start='01/01/2020', periods=rows, freq='D'),
24    'z': 2.0,
25    'lat': 36.1,
26    'lon': -76.5,
27    'variable1': np.arange(0, rows),
28}
29df = pd.DataFrame(data_inputs)
30ds = xr.Dataset.from_dataframe(df)
31
32# Setup the stream
33ns = NetcdfStream(ds)
34# ns = NetcdfStream(ds, time='time', z='z', lat='lat', lon='lon')
35# Pass the run method the config to use
36results = ns.run(c)

Stores

Stores represent different data formats for storing quality control Results from Streams. The results from any Stream should be able to be passed into any Store implementation defined in the Stores API.

ioos_qc comes with some built-in Stores:

PandasStore

Collects all results and stores them as columns in a DataFrame.

A typical PandasStore workflow
 1import pandas as pd
 2from ioos_qc.streams import PandasStream
 3from ioos_qc.stores import PandasStore
 4
 5# Setup the stream
 6stream = PandasStream(df)
 7
 8# Run the tests by passing in a Config object
 9results = stream.run(config)
10
11# Store the results in another DataFrame
12store = PandasStore(
13    results,
14    axes={
15        't': 'time',
16        'z': 'z',
17        'y': 'lat',
18        'x': 'lon'
19    }
20)
21
22# Compute any aggregations
23store.compute_aggregate(name='rollup_qc')  # Appends to the results internally
24
25# Write only the test results to the store
26results_store = store.save(write_data=False, write_axes=False)
27
28# Append columns from qc results back into the data
29results_store = pd.concat([df, results_store], axis=1)

CFNetCDFStore

Store the QC results in a CF compliant DSG type netCDF file, along with all metadata information and serializing the configuration used in the tests into the netCDF file. This currently only supports creating a new file with all results and does not support appending to existing files or results, although that is expected to be implemented at some point. You can also choose to store a subset of results in a file to support storing the aggregate results in one file and the individual test results in another file.

A typical CFNetCDFStore workflow
 1import pandas as pd
 2from ioos_qc.streams import PandasStream
 3from ioos_qc.stores import CFNetCDFStore
 4from pocean.dsg import IncompleteMultidimensionalTrajectory
 5
 6# Setup the stream
 7stream = PandasStream(df)
 8
 9# Run the tests by passing in a Config object
10results = stream.run(config)
11
12# Save a netCDF file
13ncd = CFNetCDFStore(results)
14ncd.save(
15    'results.nc',
16    IncompleteMultidimensionalTrajectory,
17    config,
18    dsg_kwargs=dict(
19        reduce_dims=True,
20        unlimited=False,
21        unique_dims=True
22    )
23)

ConfigGeneration

A QcConfigCreator instance generates a config for QcConfig informed by reference datasets, such as climatologies, defined via configuration.

CreatorConfig

CreatorConfig performs checks on the configuration to ensure that all required fields and attributes are provided.

For convenience, the get_assets.py script is provided to download and prepare climatology dataset from NARR and Ocean Atlas.

Specify datasets and variables to be used by QcConfigCreator
 1creator_config = {
 2    "datasets": [
 3        {
 4            "name": "ocean_atlas",
 5            "file_path": "assets/ocean_atlas.nc",
 6            "variables": {
 7                "o2": "o_an",
 8                "salinity": "s_an",
 9                "temperature": "t_an"
10            },
11            "3d": "depth"
12        },
13        {
14            "name": "narr",
15            "file_path": "assets/narr.nc",
16            "variables": {
17                "air": "air",
18                "pres": "slp",
19                "rhum": "rhum",
20                "uwnd": "uwnd",
21                "vwnd": "vwnd"
22            }
23        }
24    ]
25}
26cc = CreatorConfig(creator_config)
27
28print(cc)
29{
30    "narr": {
31        "file_path": "assets/narr.nc",
32        "variables": {
33            "air": "air",
34            "pres": "slp",
35            "rhum": "rhum",
36            "uwnd": "uwnd",
37            "vwnd": "vwnd"
38        }
39    },
40    "ocean_atlas": {
41        "3d": "depth",
42        "file_path": "assets/ocean_atlas.nc",
43        "variables": {
44            "o2": "o_an",
45            "salinity": "s_an",
46            "temperature": "t_an"
47        }
48    }
49}

QcConfigCreator

Create QcConfigCreator using configuration just created
 1qccc = QcConfigCreator(cc)
 2
 3print(qccc)
 4{
 5    "narr": {
 6        "file_path": "assets/narr.nc",
 7        "variables": {
 8            "air": "air",
 9            "pres": "slp",
10            "rhum": "rhum",
11            "uwnd": "uwnd",
12            "vwnd": "vwnd"
13        }
14    },
15    "ocean_atlas": {
16        "3d": "depth",
17        "file_path": "assets/ocean_atlas.nc",
18        "variables": {
19            "o2": "o_an",
20            "salinity": "s_an",
21            "temperature": "t_an"
22        }
23    }
24}

QcVariableConfig

An instance of QcVariableConfig specifies how quality control will be tested for a given variable.

In this example, the variable air, or air temperature, will be quality controlled based on climatological data in the region defined by bbox (xmin, ymin, xmax, ymax), for a time range (between 2020-01-01 and 2020-01-08). The tests sections specifies that two tests will be performed: spike_test and gross_range_test. Each test section requires suspect_min, suspect_max, fail_min, and fail_max to be defined.

The {fail,suspect}_{min,max} values will be evaluated as functions with values for min, max, mean, and std derived from the dataset for the bounds specified. Note that each term, operator, and grouping symbol must be surrounded by whitespace.

Test function allowed symbols:

  • Data derived descriptive statistics: min, max, mean, std

  • Operators: -, +, *, /

  • Grouping symbols: (, )

Like CreatorConfig, QcVaribleConfig performs checks on the configuration to ensure that it adheres to the specified schema and includes all required fields and attributes.

 1qc_variable_config = {
 2    "variable": "air",
 3    "bbox": [-165, 70, 160, 80],
 4    "start_time": "2020-01-01",
 5    "end_time": "2020-01-08",
 6    "tests": {
 7        "spike_test": {
 8            "suspect_min": "1",
 9            "suspect_max": "( 1 + 2 )",
10            "fail_min": "3 * 2 - 6",
11            "fail_max": "3 * mean + std / ( max * min )"
12        },
13        "gross_range_test": {
14            "suspect_min": "min - std * 2",
15            "suspect_max": "max + std / 2",
16            "fail_min": "mean * std",
17            "fail_max": "mean / std"
18        }
19    }
20}
21vc = QcVariableConfig(qc_variable_config)
22print(vc)
23{
24    "bbox": [
25        -165,
26        70,
27        160,
28        80
29    ],
30    "end_time": "2020-01-08",
31    "start_time": "2020-01-01",
32    "tests": {
33        "gross_range_test": {
34            "fail_max": "mean / std",
35            "fail_min": "mean * std",
36            "suspect_max": "max + std / 2",
37            "suspect_min": "min - std * 2"
38        },
39        "spike_test": {
40            "fail_max": "3 * mean + std / ( max * min )",
41            "fail_min": "3 * 2 - 6",
42            "suspect_max": "( 1 + 2 )",
43            "suspect_min": "1"
44        }
45    }
46}

Create config for QcConfig

Finally, the QcConfigCreator instance (qccc) takes the QcVariableConfig instance (vc) and returns a config that can then be used with QcConfig.

 1config = qccc.create_config(vc)
 2print(json.dumps(config, indent=4, sort_keys=True))
 3{
 4    "qartod": {
 5        "gross_range_test": {
 6            "fail_span": [
 7                -224.23900165924232,
 8                -2.673170364457356
 9            ],
10            "suspect_span": [
11                -54.89132748864793,
12                7.09364403443822
13            ]
14        },
15        "spike_test": {
16            "fail_span": [
17                0.0,
18                -73.54932418742399
19            ],
20            "suspect_span": [
21                1.0,
22                3.0
23            ]
24        }
25    }
26}