QARTOD - NetCDF Examples¶
This notebook provides examples of running QARTOD on a netCDF file. For background, see XarrayStream and CFNetCDFStore in the docs.
There are multiple ways that you can integrate ioos_qc
into your netcdf-based workflow.
Option A: Store test configurations externally, pass your configuration and netcdf file to ioos_qc
, and manually update netcdf variables with results of the test * In this case, you extract variables from the netcdf file, use ioos_qc
methods to run tests, and then manually update the netcdf file with results * This provides the most control, but doesn’t take advantage of shared code in the ioos_qc
library * It’s up to you to ensure your resulting netcdf is self-describing and
CF-compliant
Option B: Store test configurations externally, then pass your configuration and netcdf file to ioos_qc
, and let it run tests and update the file with results * This takes advantage of ioos_qc
code to store results and configuration in the netCDF file, and ensure a self-describing, CF-compliant file * Managing your test configurations outside the file is better when dealing with a large number of datasets/configurations
Option C: Store test configurations in your netcdf file, then pass that file to ioos_qc
and let it run tests and update the file with results * You only need to add test configurations to the file one time, and after that you could run tests over and over again on the same file * This option is the most portable, since the data, configuration, and results are all in one place * The downside is, test configuration management is difficult since it’s stored in the file instead of some
common external location
Setup¶
[1]:
from bokeh.plotting import output_notebook
output_notebook()
[2]:
def plot_ncresults(ncdata, var_name, results, title, test_name):
"""Helper method to plot QC results using Bokeh."""
qc_test = next(r for r in results if r.stream_id == var_name and r.test == test_name)
time = np.array(qc_test.tinp)
obs = np.array(qc_test.data)
results = qc_test.results
qc_pass = np.ma.masked_where(results != 1, obs)
num_pass = (results == 1).sum()
qc_suspect = np.ma.masked_where(results != 3, obs)
num_suspect = (results == 3).sum()
qc_fail = np.ma.masked_where(results != 4, obs)
num_fail = (results == 4).sum()
qc_notrun = np.ma.masked_where(results != 2, obs)
p1 = figure(
x_axis_type="datetime",
title=f"{test_name} : {title} : p/s/f= {num_pass}/{num_suspect}/{num_fail}")
p1.grid.grid_line_alpha = 0.3
p1.xaxis.axis_label = "Time"
p1.yaxis.axis_label = "Observation Value"
p1.line(time, obs, legend_label="obs", color="#A6CEE3")
p1.circle(
time, qc_notrun, size=2, legend_label="qc not run", color="gray", alpha=0.2
)
p1.circle(time, qc_pass, size=4, legend_label="qc pass", color="green", alpha=0.5)
p1.circle(
time, qc_suspect, size=4, legend_label="qc suspect", color="orange", alpha=0.7
)
p1.circle(time, qc_fail, size=6, legend_label="qc fail", color="red", alpha=1.0)
show(gridplot([[p1]], width=800, height=400))
[3]:
import os
import shutil
import tempfile
from datetime import datetime
import pandas as pd
Load the netCDF dataset¶
The example netCDF dataset is a pCO2 sensor from the Ocean Observatories Initiative (OOI) Coastal Endurance Inshore Surface Mooring instrument frame at 7 meters depth located on the Oregon Shelf break.
[4]:
import xarray as xr
from erddapy.url_handling import urlopen
from netCDF4 import Dataset
def open_from_https(url):
data = urlopen(fname)
nc = Dataset("pco2_netcdf_example", memory=data.read())
return xr.open_dataset(xr.backends.NetCDF4DataStore(nc))
url = "https://github.com/ioos/ioos_qc/raw/master/docs/source/examples"
fname = f"{url}/pco2_netcdf_example.nc#mode=bytes"
pco2 = open_from_https(fname)
pco2
[4]:
<xarray.Dataset> Dimensions: (time: 7339, spectrum: 14) Coordinates: obs (time) int64 ... * time (time) datetime64[ns] 2015-10-0... lat (time) float64 ... lon (time) float64 ... Dimensions without coordinates: spectrum Data variables: (12/30) deployment (time) int32 ... id (time) |S64 ... dcl_controller_timestamp (time) object ... driver_timestamp (time) datetime64[ns] ... ingestion_timestamp (time) datetime64[ns] ... internal_timestamp (time) datetime64[ns] ... ... ... absorbance_ratio_620_qc_executed (time) float32 ... absorbance_ratio_620_qc_results (time) float32 ... pco2w_thermistor_temperature_qc_executed (time) float32 ... pco2w_thermistor_temperature_qc_results (time) float32 ... pco2_seawater_qc_executed (time) float32 ... pco2_seawater_qc_results (time) float32 ... Attributes: (12/71) node: RID16 comment: publisher_email: sourceUrl: http://oceanobservatories.org/ collection_method: recovered_host stream: pco2w_abc_dcl_instrument_recovered ... ... geospatial_vertical_units: meters geospatial_vertical_resolution: 0.1 geospatial_vertical_positive: down DODS.strlen: 36 DODS.dimName: string36 DODS_EXTRA.Unlimited_Dimension: obs
Plot the raw data.
[5]:
import numpy as np
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
data = pco2["pco2_seawater"]
t = np.array(pco2["time"])
x = np.array(data)
p1 = figure(x_axis_type="datetime", title="pco2_seawater")
p1.grid.grid_line_alpha = 0.3
p1.xaxis.axis_label = "Time"
p1.yaxis.axis_label = data.units
p1.line(t, x)
show(gridplot([[p1]], width=800, height=400))
QC Configuration¶
Here we define the generic config object for multiple QARTOD tests, plus the aggregate/rollup flag.
The key “pco2_seawater” indicates which variable in the netcdf file this config should run against.
[6]:
from ioos_qc.config import Config
config = {
"pco2_seawater": {
"qartod": {
"gross_range_test": {"suspect_span": [200, 2400], "fail_span": [0, 3000]},
"spike_test": {"suspect_threshold": 500, "fail_threshold": 1000},
"location_test": {"bbox": [-124.5, 44, -123.5, 45]},
"flat_line_test": {
"tolerance": 1,
"suspect_threshold": 3600,
"fail_threshold": 86400,
},
"aggregate": {},
}
}
}
config = """
contexts:
- streams:
pco2_seawater:
qartod:
gross_range_test:
suspect_span: [200, 2400]
fail_span: [0, 3000]
spike_test:
suspect_threshold: 500
fail_threshold: 1000
location_test:
bbox: [-124.5, 44, -123.5, 45]
flat_line_test:
tolerance: 1
suspect_threshold: 3600
fail_threshold: 86400
"""
c = Config(config)
Option A: Manually run tests and store results¶
Store test configurations externally, pass your configuration and netcdf file to ioos_qc
, and manually update netcdf variables with results of the test.
Note: For tests that need tinp, zinp, etc, use args to define the t, x, y, z dimensions. In this case, we need latitude and longitude for the location test.
[7]:
from ioos_qc.qartod import aggregate
from ioos_qc.streams import XarrayStream
from ioos_qc.results import collect_results, CollectedResult
qc = XarrayStream(pco2, lon="lon", lat="lat")
# Store as a list to run QC now
runner = list(qc.run(c))
results = collect_results(runner, how='list')
agg = CollectedResult(
stream_id='',
package='qartod',
test='qc_rollup',
function=aggregate,
results=aggregate(results),
tinp=qc.time(),
data=data
)
results.append(agg)
results
[7]:
[<CollectedResult stream_id=pco2_seawater package=qartod test=gross_range_test>,
<CollectedResult stream_id=pco2_seawater package=qartod test=spike_test>,
<CollectedResult stream_id=pco2_seawater package=qartod test=location_test>,
<CollectedResult stream_id=pco2_seawater package=qartod test=flat_line_test>,
<CollectedResult stream_id= package=qartod test=qc_rollup>]
[8]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "gross_range_test")
[9]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "spike_test")
[10]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "flat_line_test")
[11]:
plot_ncresults(pco2, "pco2_seawater", results, "pCO2 seawater", "location_test")
[12]:
# To see overall results, use the aggregate test
plot_ncresults(pco2, "", results, "pCO2 seawater", "qc_rollup")
[13]:
# Store results manually
# This is just a simple example and stores the aggregate test flag as a variable.
# You can expand upon this, or use the ioos_qc library to store the results for you (see subsequent examples)
# and use xarray's to_xxx methods to serialize results to whichever format you prefer
agg_da = xr.DataArray(agg.results, {}, ('time',))
output_xds = pco2.assign(
qartod_aggregate=agg_da
)
output_xds.qartod_aggregate
[13]:
<xarray.DataArray 'qartod_aggregate' (time: 7339)> array([4, 4, 4, ..., 3, 1, 1], dtype=uint8) Coordinates: obs (time) int64 ... * time (time) datetime64[ns] 2015-10-08T19:35:30.569000448 ... 2015-04-... lat (time) float64 44.66 44.66 44.66 44.66 ... 44.66 44.66 44.66 44.66 lon (time) float64 -124.1 -124.1 -124.1 -124.1 ... -124.1 -124.1 -124.1
Option B¶
Store test configurations externally, then pass your configuration and netcdf file to ioos_qc
, and let it run tests and update the file with results
[14]:
# Using the CFNetCDFStore Store we can serialize our results back to a CF compliant netCDF file easily
from ioos_qc.stores import CFNetCDFStore
from pocean.dsg import OrthogonalMultidimensionalTimeseries
# We use the `results` from Option A so we don't repeat ourselves.
store = CFNetCDFStore(runner)
outfile_b = os.path.join(tempfile.gettempdir(), "out_b.nc")
if os.path.exists(outfile_b):
os.remove(outfile_b)
qc_all = store.save(
# The netCDF file to export OR append to
outfile_b,
# The DSG class to save the results as
OrthogonalMultidimensionalTimeseries,
# The QC config that was run
c,
# Should we write the data or just metadata? Defaults to false
write_data=True,
# Compute a total aggregate?
compute_aggregate=True,
# Any kwargs to pass to the DSG class
dsg_kwargs=dict(
reduce_dims=True, # Remove dimensions of size 1
unlimited=False, # Don't make the record dimension unlimited
unique_dims=True, # Support loading into xarray
),
)
[15]:
# Explore results: qc test variables are named [variable_name]_qartod_[test_name]
out_b = xr.open_dataset(outfile_b)
out_b
[15]:
<xarray.Dataset> Dimensions: (time_dim: 7339) Coordinates: time (time_dim) datetime64[ns] ... lat float64 ... lon float64 ... z float64 ... Dimensions without coordinates: time_dim Data variables: crs int32 ... station int32 ... pco2_seawater (time_dim) float64 ... pco2_seawater_qartod_gross_range_test (time_dim) float32 ... pco2_seawater_qartod_spike_test (time_dim) float32 ... pco2_seawater_qartod_location_test (time_dim) float32 ... pco2_seawater_qartod_flat_line_test (time_dim) float64 ... qartod_qc_rollup (time_dim) float32 ... Attributes: Conventions: CF-1.6 date_created: 2023-02-16T13:59:00Z featureType: timeseries cdm_data_type: Timeseries
[16]:
# Gross range test
# Note how the config used is stored in the ioos_qc_* variables
out_b["pco2_seawater_qartod_gross_range_test"]
[16]:
<xarray.DataArray 'pco2_seawater_qartod_gross_range_test' (time_dim: 7339)> [7339 values with dtype=float32] Coordinates: time (time_dim) datetime64[ns] ... lat float64 ... lon float64 ... z float64 ... Dimensions without coordinates: time_dim Attributes: standard_name: gross_range_test_quality_flag long_name: Gross Range Test Quality Flag flag_values: [1 2 3 4 9] flag_meanings: GOOD UNKNOWN SUSPECT FAIL MISSING valid_min: 1 valid_max: 9 ioos_qc_module: qartod ioos_qc_test: gross_range_test ioos_qc_target: pco2_seawater ioos_qc_config: {"suspect_span": [200, 2400], "fail_span": [0, 3000]}
[17]:
# Aggregate/rollup flag
out_b["qartod_qc_rollup"]
[17]:
<xarray.DataArray 'qartod_qc_rollup' (time_dim: 7339)> [7339 values with dtype=float32] Coordinates: time (time_dim) datetime64[ns] ... lat float64 ... lon float64 ... z float64 ... Dimensions without coordinates: time_dim Attributes: standard_name: aggregate_quality_flag long_name: Aggregate Quality Flag flag_values: [1 2 3 4 9] flag_meanings: GOOD UNKNOWN SUSPECT FAIL MISSING valid_min: 1 valid_max: 9 ioos_qc_module: qartod ioos_qc_test: qc_rollup ioos_qc_target:
Option C¶
Store test configurations in your netcdf file, then pass that file to ioos_qc
and let it run tests and update the file with results.
In the example above, we used the library to store results and config in the netcdf file itself. At this point, we can load that same file and run tests again, without having to re-define config. This is very powerful!
[18]:
# Create a copy of the output from B
infile_c = os.path.join(tempfile.gettempdir(), "in_c.nc")
shutil.copy(outfile_b, infile_c)
# Load this file into the Config object
input_c = xr.open_dataset(infile_c)
qc_config_c = Config(input_c)
[19]:
# The QC functions that will be run are extracted from the netCDF attributes
qc_config_c.calls
[19]:
[<Call stream_id=pco2_seawater function=qartod.gross_range_test(suspect_span=[200, 2400], fail_span=[0, 3000])>,
<Call stream_id=pco2_seawater function=qartod.spike_test(suspect_threshold=500, fail_threshold=1000)>,
<Call stream_id=pco2_seawater function=qartod.location_test(bbox=[-124.5, 44, -123.5, 45])>,
<Call stream_id=pco2_seawater function=qartod.flat_line_test(tolerance=1, suspect_threshold=3600, fail_threshold=86400)>]
[20]:
# We can use that Config just like any other config object.
# Here we will re-run the netCDF file with the config extracted from the same netCDF file
# Setup input stream from file
xrs_c = XarrayStream(input_c, lon="lon", lat="lat")
# Setup config run
runner_c = list(xrs_c.run(qc_config_c))
# Collect QC run results
results = collect_results(runner_c, how='list')
# Compute Aggregate
agg = CollectedResult(
stream_id='',
package='qartod',
test='qc_rollup',
function=aggregate,
results=aggregate(results),
tinp=xrs_c.time(),
data=data
)
results.append(agg)
[21]:
store = CFNetCDFStore(runner)
outfile_c = os.path.join(tempfile.gettempdir(), "out_c.nc")
if os.path.exists(outfile_c):
os.remove(outfile_c)
qc_all = store.save(
# The netCDF file to export OR append to
outfile_c,
# The DSG class to save the results as
OrthogonalMultidimensionalTimeseries,
# The QC config that was run
c,
# Should we write the data or just metadata? Defaults to false
write_data=True,
# Compute a total aggregate?
compute_aggregate=True,
# Any kwargs to pass to the DSG class
dsg_kwargs=dict(
reduce_dims=True, # Remove dimensions of size 1
unlimited=False, # Don't make the record dimension unlimited
unique_dims=True, # Support loading into xarray
),
)
[22]:
# Explore results: qc test variables are named [variable_name]_qartod_[test_name]
out_c = xr.open_dataset(outfile_c)
out_c
[22]:
<xarray.Dataset> Dimensions: (time_dim: 7339) Coordinates: time (time_dim) datetime64[ns] ... lat float64 ... lon float64 ... z float64 ... Dimensions without coordinates: time_dim Data variables: crs int32 ... station int32 ... pco2_seawater (time_dim) float64 ... pco2_seawater_qartod_gross_range_test (time_dim) float32 ... pco2_seawater_qartod_spike_test (time_dim) float32 ... pco2_seawater_qartod_location_test (time_dim) float32 ... pco2_seawater_qartod_flat_line_test (time_dim) float64 ... qartod_qc_rollup (time_dim) float32 ... Attributes: Conventions: CF-1.6 date_created: 2023-02-16T13:59:00Z featureType: timeseries cdm_data_type: Timeseries
[23]:
# Aggregate/rollup flag
out_c["qartod_qc_rollup"]
[23]:
<xarray.DataArray 'qartod_qc_rollup' (time_dim: 7339)> [7339 values with dtype=float32] Coordinates: time (time_dim) datetime64[ns] ... lat float64 ... lon float64 ... z float64 ... Dimensions without coordinates: time_dim Attributes: standard_name: aggregate_quality_flag long_name: Aggregate Quality Flag flag_values: [1 2 3 4 9] flag_meanings: GOOD UNKNOWN SUSPECT FAIL MISSING valid_min: 1 valid_max: 9 ioos_qc_module: qartod ioos_qc_test: qc_rollup ioos_qc_target: