Source code for ioos_qc.argo
#!/usr/bin/env python
# coding=utf-8
"""Tests based on the ARGO QC manual."""
import logging
import warnings
from numbers import Real as N
from typing import Sequence
import numpy as np
from ioos_qc.qartod import QartodFlags
from ioos_qc.utils import add_flag_metadata
from ioos_qc.utils import great_circle_distance
from ioos_qc.utils import mapdates
L = logging.getLogger(__name__) # noqa
[docs]@add_flag_metadata(stanard_name='pressure_increasing_test_quality_flag',
long_name='Pressure Increasing Test Quality Flag')
def pressure_increasing_test(inp):
"""
Returns an array of flag values where each input is flagged with SUSPECT if
it does not monotonically increase
Ref: ARGO QC Manual: 8. Pressure increasing test
Args:
inp: Pressure values as a numeric numpy array or a list of numbers.
Returns:
A masked array of flag values equal in size to that of the input.
"""
delta = np.diff(inp)
flags = np.ones_like(inp, dtype='uint8') * QartodFlags.GOOD
# Correct for downcast vs upcast by flipping the sign if it's decreasing
sign = np.sign(np.mean(delta))
if sign < 0:
delta = sign * delta
flag_idx = np.where(delta <= 0)[0] + 1
flags[flag_idx] = QartodFlags.SUSPECT
return flags
[docs]@add_flag_metadata(standard_name='speed_test_quality_flag',
long_name='Speed Test Quality Flag')
def speed_test(lon: Sequence[N],
lat: Sequence[N],
tinp: Sequence[N],
suspect_threshold: float,
fail_threshold: float
) -> np.ma.core.MaskedArray:
"""Checks that the calculated speed between two points is within reasonable bounds.
This test calculates a speed between subsequent points by
* using latitude and longitude to calculate the distance between points
* calculating the time difference between those points
* checking if distance/time_diff exceeds the given threshold(s)
Missing and masked data is flagged as UNKNOWN.
If this test fails, it typically means that either a position or time is bad data,
or that a platform is mislabeled.
Ref: ARGO QC Manual: 5. Impossible speed test
Args:
lon: Longitudes as a numeric numpy array or a list of numbers.
lat: Latitudes as a numeric numpy array or a list of numbers.
tinp: Time data as a sequence of datetime objects compatible with pandas DatetimeIndex.
This includes numpy datetime64, python datetime objects and pandas Timestamp object.
ie. pd.DatetimeIndex([datetime.utcnow(), np.datetime64(), pd.Timestamp.now()]
If anything else is passed in the format is assumed to be seconds since the unix epoch.
suspect_threshold: A float value representing a speed, in meters per second.
Speeds exceeding this will be flagged as SUSPECT.
fail_threshold: A float value representing a speed, in meters per second.
Speeds exceeding this will be flagged as FAIL.
Returns:
A masked array of flag values equal in size to that of the input.
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
lat = np.ma.masked_invalid(np.array(lat).astype(np.float64))
lon = np.ma.masked_invalid(np.array(lon).astype(np.float64))
tinp = mapdates(tinp)
if lon.shape != lat.shape or lon.shape != tinp.shape:
raise ValueError(f'Lon ({lon.shape}) and lat ({lat.shape}) and tinp ({tinp.shape}) must be the same shape')
# Save original shape
original_shape = lon.shape
lon = lon.flatten()
lat = lat.flatten()
tinp = tinp.flatten()
# If no data, return
if lon.size == 0:
return np.ma.masked_array([])
# Start with everything as passing
flag_arr = QartodFlags.GOOD * np.ma.ones(lon.size, dtype='uint8')
# If either lon or lat are masked we just set the flag to MISSING
mloc = lon.mask & lat.mask
flag_arr[mloc] = QartodFlags.MISSING
# If only one data point, return
if lon.size < 2:
flag_arr[0] = QartodFlags.UNKNOWN
return flag_arr.reshape(original_shape)
# Calculate the great_distance between each point
dist = great_circle_distance(lat, lon)
# calculate speed in m/s
speed = np.ma.zeros(tinp.size, dtype='float')
speed[1:] = np.abs(dist[1:] / np.diff(tinp).astype('timedelta64[s]').astype(float))
with np.errstate(invalid='ignore'):
flag_arr[speed > suspect_threshold] = QartodFlags.SUSPECT
with np.errstate(invalid='ignore'):
flag_arr[speed > fail_threshold] = QartodFlags.FAIL
# first value is unknown, since we have no speed data for the first point
flag_arr[0] = QartodFlags.UNKNOWN
# If the value is masked set the flag to MISSING
flag_arr[dist.mask] = QartodFlags.MISSING
return flag_arr.reshape(original_shape)