QARTOD - NetCDF Examples

This notebook provides examples of running QARTOD on a netCDF file. For background, see XarrayStream and CFNetCDFStore in the docs.

There are multiple ways that you can integrate ioos_qc into your netcdf-based workflow.

Option A: Store test configurations externally, pass your configuration and netcdf file to ioos_qc, and manually update netcdf variables with results of the test * In this case, you extract variables from the netcdf file, use ioos_qc methods to run tests, and then manually update the netcdf file with results * This provides the most control, but doesn’t take advantage of shared code in the ioos_qc library * It’s up to you to ensure your resulting netcdf is self-describing and CF-compliant

Option B: Store test configurations externally, then pass your configuration and netcdf file to ioos_qc, and let it run tests and update the file with results * This takes advantage of ioos_qc code to store results and configuration in the netCDF file, and ensure a self-describing, CF-compliant file * Managing your test configurations outside the file is better when dealing with a large number of datasets/configurations

Option C: Store test configurations in your netcdf file, then pass that file to ioos_qc and let it run tests and update the file with results * You only need to add test configurations to the file one time, and after that you could run tests over and over again on the same file * This option is the most portable, since the data, configuration, and results are all in one place * The downside is, test configuration management is difficult since it’s stored in the file instead of some common external location

Setup

[1]:
from bokeh.plotting import output_notebook

output_notebook()
Loading BokehJS ...
[2]:
def plot_ncresults(ncdata, var_name, results, title, test_name):
    """Helper method to plot QC results using Bokeh."""

    qc_test = next(r for r in results if r.stream_id == var_name and r.test == test_name)
    time = np.array(qc_test.tinp)
    obs = np.array(qc_test.data)
    results = qc_test.results

    qc_pass = np.ma.masked_where(results != 1, obs)
    num_pass = (results == 1).sum()
    qc_suspect = np.ma.masked_where(results != 3, obs)
    num_suspect = (results == 3).sum()
    qc_fail = np.ma.masked_where(results != 4, obs)
    num_fail = (results == 4).sum()
    qc_notrun = np.ma.masked_where(results != 2, obs)

    p1 = figure(
        x_axis_type="datetime",
        title=f"{test_name} : {title} : p/s/f= {num_pass}/{num_suspect}/{num_fail}")
    p1.grid.grid_line_alpha = 0.3
    p1.xaxis.axis_label = "Time"
    p1.yaxis.axis_label = "Observation Value"

    p1.line(time, obs, legend_label="obs", color="#A6CEE3")
    p1.circle(
        time, qc_notrun, size=2, legend_label="qc not run", color="gray", alpha=0.2
    )
    p1.circle(time, qc_pass, size=4, legend_label="qc pass", color="green", alpha=0.5)
    p1.circle(
        time, qc_suspect, size=4, legend_label="qc suspect", color="orange", alpha=0.7
    )
    p1.circle(time, qc_fail, size=6, legend_label="qc fail", color="red", alpha=1.0)

    show(gridplot([[p1]], width=800, height=400))
[3]:
import os
import shutil
import tempfile
from datetime import datetime
import pandas as pd

Load the netCDF dataset

The example netCDF dataset is a pCO2 sensor from the Ocean Observatories Initiative (OOI) Coastal Endurance Inshore Surface Mooring instrument frame at 7 meters depth located on the Oregon Shelf break.

[4]:
import xarray as xr
from erddapy.url_handling import urlopen
from netCDF4 import Dataset


def open_from_https(url):

    data = urlopen(fname)
    nc = Dataset("pco2_netcdf_example", memory=data.read())
    return xr.open_dataset(xr.backends.NetCDF4DataStore(nc))


url = "https://github.com/ioos/ioos_qc/raw/master/docs/source/examples"
fname = f"{url}/pco2_netcdf_example.nc#mode=bytes"

pco2 = open_from_https(fname)
pco2
[4]:
<xarray.Dataset>
Dimensions:                                   (time: 7339, spectrum: 14)
Coordinates:
    obs                                       (time) int64 ...
  * time                                      (time) datetime64[ns] 2015-10-0...
    lat                                       (time) float64 ...
    lon                                       (time) float64 ...
Dimensions without coordinates: spectrum
Data variables: (12/30)
    deployment                                (time) int32 ...
    id                                        (time) |S64 ...
    dcl_controller_timestamp                  (time) object ...
    driver_timestamp                          (time) datetime64[ns] ...
    ingestion_timestamp                       (time) datetime64[ns] ...
    internal_timestamp                        (time) datetime64[ns] ...
    ...                                        ...
    absorbance_ratio_620_qc_executed          (time) float32 ...
    absorbance_ratio_620_qc_results           (time) float32 ...
    pco2w_thermistor_temperature_qc_executed  (time) float32 ...
    pco2w_thermistor_temperature_qc_results   (time) float32 ...
    pco2_seawater_qc_executed                 (time) float32 ...
    pco2_seawater_qc_results                  (time) float32 ...
Attributes: (12/71)
    node:                               RID16
    comment:
    publisher_email:
    sourceUrl:                          http://oceanobservatories.org/
    collection_method:                  recovered_host
    stream:                             pco2w_abc_dcl_instrument_recovered
    ...                                 ...
    geospatial_vertical_units:          meters
    geospatial_vertical_resolution:     0.1
    geospatial_vertical_positive:       down
    DODS.strlen:                        36
    DODS.dimName:                       string36
    DODS_EXTRA.Unlimited_Dimension:     obs

Plot the raw data.

[5]:
import numpy as np
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot


data = pco2["pco2_seawater"]
t = np.array(pco2["time"])
x = np.array(data)

p1 = figure(x_axis_type="datetime", title="pco2_seawater")
p1.grid.grid_line_alpha = 0.3
p1.xaxis.axis_label = "Time"
p1.yaxis.axis_label = data.units
p1.line(t, x)

show(gridplot([[p1]], width=800, height=400))

QC Configuration

Here we define the generic config object for multiple QARTOD tests, plus the aggregate/rollup flag.

The key “pco2_seawater” indicates which variable in the netcdf file this config should run against.

[6]:
from ioos_qc.config import Config


config = {
    "pco2_seawater": {
        "qartod": {
            "gross_range_test": {"suspect_span": [200, 2400], "fail_span": [0, 3000]},
            "spike_test": {"suspect_threshold": 500, "fail_threshold": 1000},
            "location_test": {"bbox": [-124.5, 44, -123.5, 45]},
            "flat_line_test": {
                "tolerance": 1,
                "suspect_threshold": 3600,
                "fail_threshold": 86400,
            },
            "aggregate": {},
        }
    }
}


config = """
  contexts:
    - streams:
        pco2_seawater:
          qartod:
              gross_range_test:
                suspect_span: [200, 2400]
                fail_span: [0, 3000]
              spike_test:
                suspect_threshold: 500
                fail_threshold: 1000
              location_test:
                bbox: [-124.5, 44, -123.5, 45]
              flat_line_test:
                tolerance: 1
                suspect_threshold: 3600
                fail_threshold: 86400
"""

c = Config(config)

Option A: Manually run tests and store results

Store test configurations externally, pass your configuration and netcdf file to ioos_qc, and manually update netcdf variables with results of the test.

Note: For tests that need tinp, zinp, etc, use args to define the t, x, y, z dimensions. In this case, we need latitude and longitude for the location test.

[7]:
from ioos_qc.qartod import aggregate
from ioos_qc.streams import XarrayStream
from ioos_qc.results import collect_results, CollectedResult

qc = XarrayStream(pco2, lon="lon", lat="lat")

# Store as a list to run QC now
runner = list(qc.run(c))

results = collect_results(runner, how='list')

agg = CollectedResult(
    stream_id='',
    package='qartod',
    test='qc_rollup',
    function=aggregate,
    results=aggregate(results),
    tinp=qc.time(),
    data=data
)
results.append(agg)
results
[7]:
[<CollectedResult stream_id=pco2_seawater package=qartod test=gross_range_test>,
 <CollectedResult stream_id=pco2_seawater package=qartod test=spike_test>,
 <CollectedResult stream_id=pco2_seawater package=qartod test=location_test>,
 <CollectedResult stream_id=pco2_seawater package=qartod test=flat_line_test>,
 <CollectedResult stream_id= package=qartod test=qc_rollup>]
[8]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "gross_range_test")
[9]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "spike_test")
[10]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "flat_line_test")
[11]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "location_test")
[12]:
# To see overall results, use the aggregate test
plot_ncresults(pco2, "", results, "pCO2 seawater", "qc_rollup")
[13]:
# Store results manually
# This is just a simple example and stores the aggregate test flag as a variable.
# You can expand upon this, or use the ioos_qc library to store the results for you (see subsequent examples)
# and use xarray's to_xxx methods to serialize results to whichever format you prefer

agg_da = xr.DataArray(agg.results, {}, ('time',))
output_xds = pco2.assign(
    qartod_aggregate=agg_da
)

output_xds.qartod_aggregate
[13]:
<xarray.DataArray 'qartod_aggregate' (time: 7339)>
array([4, 4, 4, ..., 3, 1, 1], dtype=uint8)
Coordinates:
    obs      (time) int64 ...
  * time     (time) datetime64[ns] 2015-10-08T19:35:30.569000448 ... 2015-04-...
    lat      (time) float64 44.66 44.66 44.66 44.66 ... 44.66 44.66 44.66 44.66
    lon      (time) float64 -124.1 -124.1 -124.1 -124.1 ... -124.1 -124.1 -124.1

Option B

Store test configurations externally, then pass your configuration and netcdf file to ioos_qc, and let it run tests and update the file with results

[14]:
# Using the CFNetCDFStore Store we can serialize our results back to a CF compliant netCDF file easily
from ioos_qc.stores import CFNetCDFStore
from pocean.dsg import OrthogonalMultidimensionalTimeseries

# We use the `results` from Option A so we don't repeat ourselves.
store = CFNetCDFStore(runner)

outfile_b = os.path.join(tempfile.gettempdir(), "out_b.nc")
if os.path.exists(outfile_b):
    os.remove(outfile_b)

qc_all = store.save(
    # The netCDF file to export OR append to
    outfile_b,
    # The DSG class to save the results as
    OrthogonalMultidimensionalTimeseries,
    # The QC config that was run
    c,
    # Should we write the data or just metadata? Defaults to false
    write_data=True,
    # Compute a total aggregate?
    compute_aggregate=True,
    # Any kwargs to pass to the DSG class
    dsg_kwargs=dict(
        reduce_dims=True,  # Remove dimensions of size 1
        unlimited=False,  # Don't make the record dimension unlimited
        unique_dims=True,  # Support loading into xarray
    ),
)
[15]:
# Explore results: qc test variables are named [variable_name]_qartod_[test_name]
out_b = xr.open_dataset(outfile_b)
out_b
[15]:
<xarray.Dataset>
Dimensions:                                (time_dim: 7339)
Coordinates:
    time                                   (time_dim) datetime64[ns] ...
    lat                                    float64 ...
    lon                                    float64 ...
    z                                      float64 ...
Dimensions without coordinates: time_dim
Data variables:
    crs                                    int32 ...
    station                                int32 ...
    pco2_seawater                          (time_dim) float64 ...
    pco2_seawater_qartod_gross_range_test  (time_dim) float32 ...
    pco2_seawater_qartod_spike_test        (time_dim) float32 ...
    pco2_seawater_qartod_location_test     (time_dim) float32 ...
    pco2_seawater_qartod_flat_line_test    (time_dim) float64 ...
    qartod_qc_rollup                       (time_dim) float32 ...
Attributes:
    Conventions:    CF-1.6
    date_created:   2023-02-16T13:59:00Z
    featureType:    timeseries
    cdm_data_type:  Timeseries
[16]:
# Gross range test
# Note how the config used is stored in the ioos_qc_* variables
out_b["pco2_seawater_qartod_gross_range_test"]
[16]:
<xarray.DataArray 'pco2_seawater_qartod_gross_range_test' (time_dim: 7339)>
[7339 values with dtype=float32]
Coordinates:
    time     (time_dim) datetime64[ns] ...
    lat      float64 ...
    lon      float64 ...
    z        float64 ...
Dimensions without coordinates: time_dim
Attributes:
    standard_name:   gross_range_test_quality_flag
    long_name:       Gross Range Test Quality Flag
    flag_values:     [1 2 3 4 9]
    flag_meanings:   GOOD UNKNOWN SUSPECT FAIL MISSING
    valid_min:       1
    valid_max:       9
    ioos_qc_module:  qartod
    ioos_qc_test:    gross_range_test
    ioos_qc_target:  pco2_seawater
    ioos_qc_config:  {"suspect_span": [200, 2400], "fail_span": [0, 3000]}
[17]:
# Aggregate/rollup flag
out_b["qartod_qc_rollup"]
[17]:
<xarray.DataArray 'qartod_qc_rollup' (time_dim: 7339)>
[7339 values with dtype=float32]
Coordinates:
    time     (time_dim) datetime64[ns] ...
    lat      float64 ...
    lon      float64 ...
    z        float64 ...
Dimensions without coordinates: time_dim
Attributes:
    standard_name:   aggregate_quality_flag
    long_name:       Aggregate Quality Flag
    flag_values:     [1 2 3 4 9]
    flag_meanings:   GOOD UNKNOWN SUSPECT FAIL MISSING
    valid_min:       1
    valid_max:       9
    ioos_qc_module:  qartod
    ioos_qc_test:    qc_rollup
    ioos_qc_target:  

Option C

Store test configurations in your netcdf file, then pass that file to ioos_qc and let it run tests and update the file with results.

In the example above, we used the library to store results and config in the netcdf file itself. At this point, we can load that same file and run tests again, without having to re-define config. This is very powerful!

[18]:
# Create a copy of the output from B
infile_c = os.path.join(tempfile.gettempdir(), "in_c.nc")
shutil.copy(outfile_b, infile_c)

# Load this file into the Config object
input_c = xr.open_dataset(infile_c)
qc_config_c = Config(input_c)
[19]:
# The QC functions that will be run are extracted from the netCDF attributes
qc_config_c.calls
[19]:
[<Call stream_id=pco2_seawater function=qartod.gross_range_test(suspect_span=[200, 2400], fail_span=[0, 3000])>,
 <Call stream_id=pco2_seawater function=qartod.spike_test(suspect_threshold=500, fail_threshold=1000)>,
 <Call stream_id=pco2_seawater function=qartod.location_test(bbox=[-124.5, 44, -123.5, 45])>,
 <Call stream_id=pco2_seawater function=qartod.flat_line_test(tolerance=1, suspect_threshold=3600, fail_threshold=86400)>]
[20]:
# We can use that Config just like any other config object.
# Here we will re-run the netCDF file with the config extracted from the same netCDF file

# Setup input stream from file
xrs_c = XarrayStream(input_c, lon="lon", lat="lat")

# Setup config run
runner_c = list(xrs_c.run(qc_config_c))

# Collect QC run results
results = collect_results(runner_c, how='list')

# Compute Aggregate
agg = CollectedResult(
    stream_id='',
    package='qartod',
    test='qc_rollup',
    function=aggregate,
    results=aggregate(results),
    tinp=xrs_c.time(),
    data=data
)
results.append(agg)
[21]:
store = CFNetCDFStore(runner)

outfile_c = os.path.join(tempfile.gettempdir(), "out_c.nc")
if os.path.exists(outfile_c):
    os.remove(outfile_c)

qc_all = store.save(
    # The netCDF file to export OR append to
    outfile_c,
    # The DSG class to save the results as
    OrthogonalMultidimensionalTimeseries,
    # The QC config that was run
    c,
    # Should we write the data or just metadata? Defaults to false
    write_data=True,
    # Compute a total aggregate?
    compute_aggregate=True,
    # Any kwargs to pass to the DSG class
    dsg_kwargs=dict(
        reduce_dims=True,  # Remove dimensions of size 1
        unlimited=False,  # Don't make the record dimension unlimited
        unique_dims=True,  # Support loading into xarray
    ),
)
[22]:
# Explore results: qc test variables are named [variable_name]_qartod_[test_name]
out_c = xr.open_dataset(outfile_c)
out_c
[22]:
<xarray.Dataset>
Dimensions:                                (time_dim: 7339)
Coordinates:
    time                                   (time_dim) datetime64[ns] ...
    lat                                    float64 ...
    lon                                    float64 ...
    z                                      float64 ...
Dimensions without coordinates: time_dim
Data variables:
    crs                                    int32 ...
    station                                int32 ...
    pco2_seawater                          (time_dim) float64 ...
    pco2_seawater_qartod_gross_range_test  (time_dim) float32 ...
    pco2_seawater_qartod_spike_test        (time_dim) float32 ...
    pco2_seawater_qartod_location_test     (time_dim) float32 ...
    pco2_seawater_qartod_flat_line_test    (time_dim) float64 ...
    qartod_qc_rollup                       (time_dim) float32 ...
Attributes:
    Conventions:    CF-1.6
    date_created:   2023-02-16T13:59:00Z
    featureType:    timeseries
    cdm_data_type:  Timeseries
[23]:
# Aggregate/rollup flag
out_c["qartod_qc_rollup"]
[23]:
<xarray.DataArray 'qartod_qc_rollup' (time_dim: 7339)>
[7339 values with dtype=float32]
Coordinates:
    time     (time_dim) datetime64[ns] ...
    lat      float64 ...
    lon      float64 ...
    z        float64 ...
Dimensions without coordinates: time_dim
Attributes:
    standard_name:   aggregate_quality_flag
    long_name:       Aggregate Quality Flag
    flag_values:     [1 2 3 4 9]
    flag_meanings:   GOOD UNKNOWN SUSPECT FAIL MISSING
    valid_min:       1
    valid_max:       9
    ioos_qc_module:  qartod
    ioos_qc_test:    qc_rollup
    ioos_qc_target: