import difflib
import logging
from collections import defaultdict
import cftime
import numpy as np
import regex
from cf_units import Unit
from compliance_checker import cfutil
from compliance_checker.base import BaseCheck, Result, TestCtx
from compliance_checker.cf import util
from compliance_checker.cf.appendix_c import valid_modifiers
from compliance_checker.cf.appendix_d import dimless_vertical_coordinates_1_6
from compliance_checker.cf.appendix_e import cell_methods16
from compliance_checker.cf.appendix_f import (
grid_mapping_attr_types16,
grid_mapping_dict16,
)
from compliance_checker.cf.cf_base import CFNCCheck, appendix_a_base
logger = logging.getLogger(__name__)
[docs]
class CF1_6Check(CFNCCheck):
"""CF-1.6-specific implementation of CFBaseCheck; supports checking
netCDF datasets.
These checks are translated documents:
https://cfconventions.org/cf-conventions/v1.6.0/cf-conventions.html"""
register_checker = True
_cc_spec = "cf"
_cc_spec_version = "1.6"
_cc_description = "Climate and Forecast Conventions (CF)"
_cc_url = "http://cfconventions.org/cf-conventions/v1.6.0/cf-conventions.html"
_cc_display_headers = {3: "Errors", 2: "Warnings", 1: "Info"}
appendix_a = appendix_a_base
appendix_d_parametric_coords = dimless_vertical_coordinates_1_6
_allowed_numeric_var_types = {
np.character,
np.bytes_, # "|S1" dtype, byte array used as string
np.int8,
np.int16,
np.int32,
np.float32,
np.float64,
}
[docs]
def __init__(self, options=None): # initialize with parent methods and data
super().__init__(options)
self.cell_methods = cell_methods16
self.grid_mapping_dict = grid_mapping_dict16
self.grid_mapping_attr_types = grid_mapping_attr_types16
###############################################################################
# Chapter 2: NetCDF Files and Components
###############################################################################
[docs]
def check_filename(self, ds):
"""Checks that the filename ends with .nc"""
# IMPLEMENTS CONFORMANCE 2.1
filename_suffix = TestCtx(BaseCheck.HIGH, self.section_titles["2.1"])
filename_suffix.assert_true(
ds.filepath().endswith("nc"),
f'Dataset path {ds.filepath} must end with ".nc"',
)
return filename_suffix.to_result()
[docs]
def check_data_types(self, ds):
"""
Checks the data type of all netCDF variables to ensure they are valid
data types under CF.
CF §2.2 The netCDF data types char, byte, short, int, float or real, and
double are all acceptable
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
# IMPLEMENTS CONFORMANCE 2.2
fails = []
total = len(ds.variables)
for k, v in ds.variables.items():
if (
v.dtype is not str
and v.dtype.kind != "S"
and v.dtype.type not in self._allowed_numeric_var_types
):
fails.append(
f"The variable {k} failed because the datatype is {v.datatype}",
)
return Result(
BaseCheck.HIGH,
(total - len(fails), total),
self.section_titles["2.2"],
msgs=fails,
)
[docs]
def check_child_attr_data_types(self, ds):
"""
For any variables which contain any of the following attributes:
- valid_min/valid_max
- valid_range
- scale_factor
- add_offset
- _FillValue
the data type of the attribute must match the type of its parent variable as specified in the
NetCDF User Guide (NUG) https://docs.unidata.ucar.edu/netcdf-c/current/attribute_conventions.html,
referenced in the CF Conventions in Section 2.5.2
(http://cfconventions.org/Data/cf-conventions/cf-conventions-1.7/cf-conventions.html#missing-data)
:param netCDF4.Dataset ds: open netCDF dataset object
:rtype: compliance_checker.base.Result
"""
ctx = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.5"])
special_attrs = {
"actual_range",
"valid_min",
"valid_max",
"valid_range",
"_FillValue",
}
for _var_name, var in ds.variables.items():
for att_name in special_attrs.intersection(var.ncattrs()):
self._parent_var_attr_type_check(att_name, var, ctx)
return ctx.to_result()
# TODO: consider renaming to avoid confusion with non-underscore
# primary function version
def _check_add_offset_scale_factor_type(self, variable, attr_name):
"""
Reusable function for checking both add_offset and scale_factor.
"""
msgs = []
error_msg = (
f"Variable {variable.name} and {attr_name} must be equivalent "
f"data types or {variable.name} must be of type byte, short, or int "
f"and {attr_name} must be float or double"
)
att = getattr(variable, attr_name, None)
if not (isinstance(att, (np.number, float))): # can't compare dtypes
val = False
else:
val = (
att.dtype == variable.dtype
) or ( # will short-circuit or if first condition is true
isinstance(att, (np.float32, np.float64, float))
and variable.dtype in (np.byte, np.short, np.int16, np.int32, int)
)
if not val:
msgs.append(error_msg)
return Result(BaseCheck.MEDIUM, val, self.section_titles["8.1"], msgs)
[docs]
def check_add_offset_scale_factor_type(self, ds):
"""
If a variable has the attributes add_offset and scale_factor,
check that the variables and attributes are of the same type
OR that the variable is of type byte, short or int and the
attributes are of type float or double.
"""
results = []
add_offset_vars = ds.get_variables_by_attributes(
add_offset=lambda x: x is not None,
)
scale_factor_vars = ds.get_variables_by_attributes(
scale_factor=lambda x: x is not None,
)
both = set(add_offset_vars).intersection(scale_factor_vars)
both_msgs = []
for both_var in sorted(both, key=lambda var: var.name):
if both_var.scale_factor.dtype != both_var.add_offset.dtype:
both_msgs.append(
"When both scale_factor and add_offset "
f"are supplied for variable {both_var.name}, "
"they must have the same type",
)
results.append(
Result(
BaseCheck.MEDIUM,
not bool(both_msgs),
self.section_titles["8.1"],
both_msgs,
),
)
for _att_vars_tup in (
("add_offset", add_offset_vars),
("scale_factor", scale_factor_vars),
):
results.extend(
[
self._check_add_offset_scale_factor_type(
var,
_att_vars_tup[0],
)
for var in _att_vars_tup[1]
],
)
return results
[docs]
def check_naming_conventions(self, ds):
"""
Checks the variable names to ensure they are valid CF variable names under CF.
CF §2.3 Variable, dimension and attribute names should begin with a letter
and be composed of letters, digits, and underscores.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
ret_val = []
variable_naming = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.3"])
dimension_naming = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.3"])
attribute_naming = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.3"])
ignore_attributes = [
"_FillValue",
"DODS",
"_ChunkSizes",
"_Coordinate",
"_Unsigned",
"_Encoding",
]
rname = regex.compile("^[A-Za-z][A-Za-z0-9_]*$")
# IMPLEMENTATION CONFORMANCE 2.3 REQUIRED
for name, variable in ds.variables.items():
variable_naming.assert_true(
rname.match(name) is not None,
f"variable {name} should begin with a letter and be composed of "
"letters, digits, and underscores",
)
# Keep track of all the attributes, we'll need to check them
for attr in variable.ncattrs():
if attr in ignore_attributes:
continue
# Special attributes made by THREDDS
if attr.startswith("DODS"):
continue
# Ignore model produced attributes
if attr.startswith("_Coordinate"):
continue
attribute_naming.assert_true(
rname.match(attr) is not None,
f"attribute {name}:{attr} should begin with a letter and be composed of "
"letters, digits, and underscores",
)
ret_val.append(variable_naming.to_result())
for dimension in ds.dimensions:
dimension_naming.assert_true(
rname.match(dimension) is not None,
f"dimension {dimension} should begin with a latter and be composed of "
"letters, digits, and underscores",
)
ret_val.append(dimension_naming.to_result())
for global_attr in ds.ncattrs():
# Special attributes made by THREDDS
if global_attr.startswith("DODS"):
continue
if global_attr.startswith("EXTRA_DIMENSION"):
continue
attribute_naming.assert_true(
rname.match(global_attr) is not None,
f"global attribute {global_attr} should begin with a letter and be composed of "
"letters, digits, and underscores",
)
ret_val.append(attribute_naming.to_result())
return ret_val
[docs]
def check_names_unique(self, ds):
"""
Checks the variable names for uniqueness regardless of case.
CF §2.3 names should not be distinguished purely by case, i.e., if case
is disregarded, no two names should be the same.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
fails = []
total = len(ds.variables)
names = defaultdict(int)
# IMPLEMENTATION CONFORMANCE 2.3 RECOMMENDED
for k in ds.variables:
names[k.lower()] += 1
fails = [
"Variables are not case sensitive. Duplicate variables named: %s" % k
for k, v in names.items()
if v > 1
]
return Result(
BaseCheck.MEDIUM,
(total - len(fails), total),
self.section_titles["2.3"],
msgs=fails,
)
[docs]
def check_dimension_names(self, ds):
"""
Checks variables contain no duplicate dimension names.
CF §2.4 A variable may have any number of dimensions, including zero,
and the dimensions must all have different names.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
fails = []
total = len(ds.variables)
for k, v in ds.variables.items():
dims = defaultdict(int)
for d in v.dimensions:
dims[d] += 1
# IMPLEMENTATION CONFORMANCE 2.4 REQUIRED
for dimension, count in dims.items():
if count > 1:
fails.append(
f"{k} has two or more dimensions named {dimension}",
)
return Result(
BaseCheck.HIGH,
(total - len(fails), total),
self.section_titles["2.4"],
msgs=fails,
)
[docs]
def check_dimension_order(self, ds):
"""
Checks each variable's dimension order to ensure that the order is
consistent and in order under CF §2.4
CF §2.4 If any or all of the dimensions of a variable have the
interpretations of "date or time" (T), "height or depth" (Z),
"latitude" (Y), or "longitude" (X) then we recommend, those dimensions
to appear in the relative order T, then Z, then Y, then X in the CDL
definition corresponding to the file. All other dimensions should,
whenever possible, be placed to the left of the spatiotemporal
dimensions.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
valid_dimension_order = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.4"])
# Build a map from coordinate variable to axis
coord_axis_map = self._get_coord_axis_map(ds)
# Check each variable's dimension order, excluding climatology and
# bounds variables
any_clim = cfutil.get_climatology_variable(ds)
any_bounds = cfutil.get_cell_boundary_variables(ds)
for name, variable in ds.variables.items():
# Skip bounds/climatology variables, as they should implicitly
# have the same order except for the bounds specific dimension.
# This is tested later in the respective checks
if name in any_bounds or name == any_clim:
continue
# Skip strings/labels
if hasattr(variable.dtype, "char") and variable.dtype.char == "S":
continue
elif variable.dtype == str:
continue
if variable.dimensions:
dimension_order = self._get_dimension_order(ds, name, coord_axis_map)
valid_dimension_order.assert_true(
self._dims_in_order(dimension_order),
"{}'s spatio-temporal dimensions are not in the "
"recommended order T, Z, Y, X and/or further dimensions "
"are not located left of T, Z, Y, X. The dimensions (and "
"their guessed types) are {} (with U: other/unknown; L: "
"unlimited).".format(
name,
self._get_pretty_dimension_order_with_type(
ds,
name,
dimension_order,
),
),
)
return valid_dimension_order.to_result()
[docs]
def check_fill_value_equal_missing_value(self, ds):
"""
If both missing_value and _FillValue be used, they should have the same value.
This according to CF §2.5.1 Recommendations:
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of Results
"""
fails = []
total = 0
for variable in ds.variables.values():
# If the variable have a defined _FillValue a defined missing_value check it.
if hasattr(variable, "_FillValue") and hasattr(variable, "missing_value"):
total = total + 1
if variable._FillValue != variable.missing_value:
fails.append(
f"For the variable {variable.name} the missing_value must be equal to the _FillValue",
)
return Result(
BaseCheck.MEDIUM,
(total - len(fails), total),
self.section_titles["2.5"],
msgs=fails,
)
[docs]
def check_valid_range_and_valid_min_max_present(self, ds):
"""
The valid_range attribute must not be present if the valid_min
and/or valid_max attributes are present. This according to 2.5.1 Requirements.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of Results
"""
fails = []
total = 0
for variable in ds.variables.values():
if hasattr(variable, "valid_max") or hasattr(variable, "valid_min"):
total += 1
# if there's also valid_range in addition to
# valid_min/valid_max, this is not compliant
if hasattr(variable, "valid_range"):
fails.append(
f"For the variable {variable.name} the valid_range attribute must not be present "
"if the valid_min and/or valid_max attributes are present",
)
# *Just* valid_range should be added to total as well
elif hasattr(variable, "valid_range"):
total += 1
return Result(
BaseCheck.MEDIUM,
(total - len(fails), total),
self.section_titles["2.5"],
msgs=fails,
)
[docs]
def check_fill_value_outside_valid_range(self, ds):
"""
Checks each variable's _FillValue to ensure that it's in valid_range or
between valid_min and valid_max according to CF §2.5.1
CF §2.5.1 The _FillValue should be outside the range specified by
valid_range (if used) for a variable.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of Results
"""
valid_fill_range = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.5"])
for name, variable in ds.variables.items():
# If the variable doesn't have a defined _FillValue don't check it.
if not hasattr(variable, "_FillValue"):
continue
fill_value = variable._FillValue
attrs = variable.ncattrs()
if "valid_range" in attrs:
if isinstance(variable.valid_range, str):
m = "§2.5.1 Fill Values should be outside the range specified by valid_range" # subsection message
valid_fill_range.assert_true(
False,
f"{m};\n\t{name}:valid_range must be a numeric type not a string",
)
continue
rmin, rmax = variable.valid_range
spec_by = "valid_range"
elif "valid_min" in attrs and "valid_max" in attrs:
if isinstance(variable.valid_min, str):
valid_fill_range.assert_true(
False,
f"{name}:valid_min must be a numeric type not a string",
)
if isinstance(variable.valid_max, str):
valid_fill_range.assert_true(
False,
f"{name}:valid_max must be a numeric type not a string",
)
if isinstance(variable.valid_min, str) or isinstance(
variable.valid_max,
str,
):
continue
rmin = variable.valid_min
rmax = variable.valid_max
spec_by = "valid_min/valid_max"
else:
continue
if np.isnan(fill_value):
valid = True
else:
valid = fill_value < rmin or fill_value > rmax
valid_fill_range.assert_true(
valid,
f"{name}:_FillValue ({fill_value}) should be outside the range specified by {spec_by} ({rmin}, {rmax})"
"",
)
return valid_fill_range.to_result()
[docs]
def check_convention_globals(self, ds):
"""
Check the common global attributes are strings if they exist.
CF §2.6.2 title/history global attributes, must be strings. Do not need
to exist.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of Results
"""
attrs = ["title", "history"]
valid_globals = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.6"])
for attr in attrs:
dataset_attr = getattr(ds, attr, None)
is_string = isinstance(dataset_attr, str)
valid_globals.assert_true(
is_string and len(dataset_attr),
f"§2.6.2 global attribute {attr} should exist and be a non-empty string" # subsection message
"",
)
return valid_globals.to_result()
# IMPLEMENTATION
[docs]
def check_coordinate_variables_strict_monotonicity(self, ds):
"""
Checks that data in coordinate variables is either monotonically
increasing or decreasing
"""
ret_val = []
for coord_var_name in self._find_coord_vars(ds):
coord_var = ds.variables[coord_var_name]
arr_diff = np.diff(coord_var)
monotonicity = TestCtx(BaseCheck.HIGH, self.section_titles["5"])
monotonicity.assert_true(
np.all(arr_diff > 0) or np.all(arr_diff < 0),
f'Coordinate variable "{coord_var_name}" must be strictly monotonic',
)
ret_val.append(monotonicity.to_result())
return ret_val
[docs]
def check_convention_possibly_var_attrs(self, ds):
"""
Check variable and global attributes are strings for recommended attributes under CF §2.6.2
CF §2.6.2 institution, source, references, and comment, either global
or assigned to individual variables. When an attribute appears both
globally and as a variable attribute, the variable's version has
precedence. Must be strings.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of Results
"""
# The attrs are optional and only needs to be a string and non-empty if it
# exists.
attrs = ["institution", "source", "references", "comment"]
valid_attributes = TestCtx(BaseCheck.MEDIUM, self.section_titles["2.6"])
attr_bin = set()
# If the attribute is defined for any variable, check it and mark in
# the set that we've seen it at least once.
for name, variable in ds.variables.items():
for attribute in variable.ncattrs():
varattr = getattr(variable, attribute)
if attribute in attrs:
is_string = isinstance(varattr, str)
valid_attributes.assert_true(
is_string and len(varattr) > 0,
f"§2.6.2 {name}:{attribute} should be a non-empty string" "",
)
attr_bin.add(attribute)
# Check all the global attributes too and mark if we've seen them
for attribute in ds.ncattrs():
dsattr = getattr(ds, attribute)
if attribute in attrs:
is_string = isinstance(dsattr, str)
valid_attributes.assert_true(
is_string and len(dsattr) > 0,
f"§2.6.2 {attribute} global attribute should be a non-empty string"
"",
)
attr_bin.add(attribute)
return valid_attributes.to_result()
###############################################################################
# Chapter 3: Description of the Data
###############################################################################
[docs]
def check_units(self, ds):
"""
Check the units attribute for all variables to ensure they are CF
compliant under CF §3.1
CF §3.1 The units attribute is required for all variables that represent dimensional quantities
(except for boundary variables defined in Section 7.1, "Cell Boundaries" and climatology variables
defined in Section 7.4, "Climatological Statistics").
Units are not required for dimensionless quantities. A variable with no units attribute is assumed
to be dimensionless. However, a units attribute specifying a dimensionless unit may optionally be
included.
- units required
- type must be recognized by udunits
- if standard name specified, must be consistent with standard name table, must also be consistent with a
specified cell_methods attribute if present
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
coordinate_variables = self._find_coord_vars(ds)
auxiliary_coordinates = self._find_aux_coord_vars(ds)
geophysical_variables = self._find_geophysical_vars(ds)
modifier_variables = cfutil._find_standard_name_modifier_variables(ds)
forecast_variables = cfutil.get_forecast_metadata_variables(ds)
dimless_vert = {
var.name
for var in ds.get_variables_by_attributes(
standard_name=lambda s: s in self.appendix_d_parametric_coords,
)
if not hasattr(var, "units")
}
# check anything remaining that has units
# unit_containing =
unit_required_variables = (
set(
coordinate_variables
+ auxiliary_coordinates
+ geophysical_variables
+ forecast_variables
+ modifier_variables,
) # standard names with modifiers require proper units, *except* for flags, where they should not be present
- dimless_vert
)
for name in unit_required_variables:
# For reduced horizontal grids, the compression index variable does
# not require units.
if cfutil.is_compression_coordinate(ds, name):
continue
variable = ds.variables[name]
# Skip instance coordinate variables
if getattr(variable, "cf_role", None) is not None:
continue
# Skip labels
if (
hasattr(variable.dtype, "char") and variable.dtype.char == "S"
) or variable.dtype == str:
continue
standard_name = getattr(variable, "standard_name", None)
standard_name, standard_name_modifier = self._split_standard_name(
standard_name,
)
units = getattr(variable, "units", None)
valid_units = self._check_valid_cf_units(ds, name)
ret_val.append(valid_units)
units_attr_is_string = TestCtx(BaseCheck.MEDIUM, self.section_titles["3.1"])
# side effects, but better than teasing out the individual result
if units is not None and units_attr_is_string.assert_true(
isinstance(units, str),
f"units ({units}) attribute of '{variable.name}' must be a string compatible with UDUNITS",
):
valid_udunits = self._check_valid_udunits(ds, name)
ret_val.append(valid_udunits)
ret_val.append(units_attr_is_string.to_result())
if isinstance(standard_name, str):
# CONFORMANCE 3.1 REQUIRED
valid_standard_units = self._check_valid_standard_units(ds, name)
ret_val.append(valid_standard_units)
return ret_val
def _check_valid_cf_units(self, ds, variable_name):
"""
Checks that the variable contains units attribute, the attribute is a
string and the value is not deprecated by CF
:param netCDF4.Dataset ds: An open netCDF dataset
:param str variable_name: Name of the variable to be checked
:rtype:
:return: List of results
"""
# This list is straight from section 3
deprecated = ["level", "layer", "sigma_level"]
variable = ds.variables[variable_name]
valid_units = TestCtx(BaseCheck.HIGH, self.section_titles["3.1"])
units = getattr(variable, "units", None)
standard_name_full = getattr(variable, "standard_name", None)
standard_name, standard_name_modifier = self._split_standard_name(
standard_name_full,
)
std_name_units_dimensionless = cfutil.is_dimensionless_standard_name(
self._std_names._root,
standard_name,
)
# 3) units are not deprecated
valid_units.assert_true(
units not in deprecated,
f'units for {variable_name}, "{units}" are deprecated by CF 1.6',
)
# 4/5) Modifiers, if present, have the appropriate units, or none for
# status_flag
if standard_name_modifier is not None:
if standard_name_modifier not in valid_modifiers:
# standard name modifier warning given elsewhere
return valid_units.to_result()
else:
unit_type = valid_modifiers[standard_name_modifier]
# no modifiers, just check against standard name canonical_units
else:
unit_type = "u"
if unit_type == "u":
try:
reference = self._std_names[standard_name].canonical_units
# if standard name isn't found, there won't be an associated units
# but a standard name error will be raised elsewhere
except KeyError:
return valid_units.to_result()
elif unit_type == "1":
reference = "1"
elif unit_type is None:
valid_units.assert_true(
units is None,
f"units attribute for variable {variable_name} must be unset "
"when status_flag standard name modifier is set",
)
return valid_units.to_result()
# Is this even in the database? also, if there is no standard_name,
# there's no way to know if it is dimensionless.
should_be_dimensionless = (
variable.dtype is str
or (hasattr(variable.dtype, "char") and variable.dtype.char == "S")
or std_name_units_dimensionless
or standard_name is None
)
# 1) Units must exist
valid_units.assert_true(
should_be_dimensionless or units is not None,
f"units attribute is required for {variable_name} when variable is not a dimensionless quantity",
)
# Don't bother checking the rest
if units is None and not should_be_dimensionless:
return valid_units.to_result()
# 2) units attribute must be a string
valid_units.assert_true(
should_be_dimensionless or isinstance(units, str),
f"units attribute for {variable_name} needs to be a string",
)
try:
units_conv = Unit(units)
except ValueError:
valid_units.messages.append(
f'Unit string "{units}" is not recognized by UDUnits',
)
valid_units.out_of += 1
return valid_units
else:
valid_units.score += 1
valid_units.out_of += 1
# time and forecast_reference time have special unit handling rules
# that use time relative to a reference point, despite canonical units
# being expressed as "s"/seconds
if standard_name not in {"time", "forecast_reference_time"}:
valid_units.assert_true(
units_conv.is_convertible(Unit(reference)),
f'Units "{units}" for variable '
f"{variable_name} must be convertible to "
f'canonical units "{reference}"',
)
return valid_units.to_result()
def _check_valid_udunits(self, ds, variable_name):
"""
Checks that the variable's units are contained in UDUnits
:param netCDF4.Dataset ds: An open netCDF dataset
:param str variable_name: Name of the variable to be checked
"""
variable = ds.variables[variable_name]
units = getattr(variable, "units", None)
standard_name = getattr(variable, "standard_name", None)
standard_name, standard_name_modifier = self._split_standard_name(standard_name)
std_name_units_dimensionless = cfutil.is_dimensionless_standard_name(
self._std_names._root,
standard_name,
)
# If the variable is supposed to be dimensionless, it automatically passes
should_be_dimensionless = (
variable.dtype is str
or (hasattr(variable.dtype, "char") and variable.dtype.char == "S")
or std_name_units_dimensionless
)
valid_udunits = TestCtx(BaseCheck.HIGH, self.section_titles["3.1"])
are_udunits = units is not None and util.units_known(units)
valid_udunits.assert_true(
should_be_dimensionless or are_udunits or units is None,
f'units for {variable_name}, "{units}" are not recognized by UDUNITS',
)
return valid_udunits.to_result()
def _check_valid_standard_units(self, ds, variable_name):
"""
Checks that the variable's units are appropriate for the standard name
according to the CF standard name table and coordinate sections in CF
1.6
:param netCDF4.Dataset ds: An open netCDF dataset
:param str variable_name: Name of the variable to be checked
"""
variable = ds.variables[variable_name]
units = getattr(variable, "units", None)
standard_name = getattr(variable, "standard_name", None)
valid_standard_units = TestCtx(BaseCheck.HIGH, self.section_titles["3.1"])
# If the variable is supposed to be dimensionless, it automatically passes
std_name_units_dimensionless = cfutil.is_dimensionless_standard_name(
self._std_names._root,
standard_name,
)
if std_name_units_dimensionless:
return valid_standard_units.to_result()
standard_name, standard_name_modifier = self._split_standard_name(standard_name)
# Other standard_name modifiers have the same units as the
# unmodified standard name or are not checked for units.
# number_of_observations is a special case which always must be units
# of "1"
if standard_name_modifier == "number_of_observations":
valid_standard_units.out_of += 1
if units != "1":
err_msg = (
f"When variable {variable_name} has a "
"standard name modifier of number_of_observations, "
"the specified units must be 1"
)
valid_standard_units.messages.append(err_msg)
else:
valid_standard_units.score += 1
# number_of_observations should short circuit and not continue
# on to further units checks
return valid_standard_units.to_result()
elif standard_name_modifier == "status_flag":
# no units required - skip further checks
return valid_standard_units.to_result()
# This section represents the different cases where simple udunits
# comparison isn't comprehensive enough to determine if the units are
# appropriate under CF
# UDUnits accepts "s" as a unit of time but it should be <unit> since <epoch>
# TODO: forecast_reference_time. Include upcoming merge.
# IMPLEMENTATION CONFORMANCE 4.4 REQUIRED 1/2
elif standard_name == "time":
valid_standard_units.assert_true(
util.units_convertible(units, "seconds since 1970-01-01"),
"time must be in a valid units format <unit> since <epoch> "
f"not {units}",
)
# UDunits can't tell the difference between east and north facing coordinates
elif standard_name == "latitude":
# degrees is allowed if using a transformed grid
allowed_units = cfutil.VALID_LAT_UNITS | {"degrees"}
valid_standard_units.assert_true(
(units.lower() if units is not None else None) in allowed_units,
f'variables defining latitude ("{variable_name}") must use degrees_north '
"or degrees if defining a transformed grid. Currently "
f"{units}",
)
# UDunits can't tell the difference between east and north facing coordinates
elif standard_name == "longitude":
# degrees is allowed if using a transformed grid
allowed_units = cfutil.VALID_LON_UNITS | {"degrees"}
valid_standard_units.assert_true(
(units.lower() if units is not None else None) in allowed_units,
f'variables defining longitude ("{variable_name}") must use degrees_east '
"or degrees if defining a transformed grid. Currently "
f"{units}",
)
return valid_standard_units.to_result()
[docs]
def check_standard_name(self, ds):
"""
Check a variables's standard_name attribute to ensure that it meets CF
compliance.
CF §3.3 A standard name is associated with a variable via the attribute
standard_name which takes a string value comprised of a standard name
optionally followed by one or more blanks and a standard name modifier
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
coord_vars = self._find_coord_vars(ds)
aux_coord_vars = self._find_aux_coord_vars(ds)
axis_vars = cfutil.get_axis_variables(ds)
flag_vars = cfutil.get_flag_variables(ds)
geophysical_vars = self._find_geophysical_vars(ds)
variables_requiring_standard_names = (
coord_vars + aux_coord_vars + axis_vars + flag_vars + geophysical_vars
)
for name in set(variables_requiring_standard_names):
# Compression indices used in reduced horizontal grids or
# compression schemes do not require attributes other than compress
if cfutil.is_compression_coordinate(ds, name):
continue
ncvar = ds.variables[name]
# §9 doesn't explicitly allow instance variables as coordinates but
# it's loosely implied. Just in case, skip it.
if hasattr(ncvar, "cf_role"):
continue
# Unfortunately, §6.1 allows for string types to be listed as
# coordinates.
if hasattr(ncvar.dtype, "char") and ncvar.dtype.char == "S":
continue
elif ncvar.dtype == str:
continue
standard_name = getattr(ncvar, "standard_name", None)
standard_name, standard_name_modifier = self._split_standard_name(
standard_name,
)
long_name = getattr(ncvar, "long_name", None)
long_or_std_name = TestCtx(BaseCheck.HIGH, self.section_titles["3.3"])
if long_name is not None:
long_name_present = True
long_or_std_name.assert_true(
isinstance(long_name, str),
f"Attribute long_name for variable {name} must be a string",
)
else:
long_name_present = False
# §1.3 The long_name and standard_name attributes are used to
# describe the content of each variable. For backwards
# compatibility with COARDS neither is required, but use of at
# least one of them is strongly recommended.
# If standard_name is not defined but long_name is, don't continue
# the check for this variable
# IMPLEMENTATION CONFORMANCE 3.3 REQUIRED 1, 2, 3 / 3
if standard_name is not None:
standard_name_present = True
valid_std_name = TestCtx(BaseCheck.HIGH, self.section_titles["3.3"])
valid_std_name.assert_true(
isinstance(standard_name, str),
f"Attribute standard_name for variable {name} must be a string",
)
valid_std_name.out_of += 1
if standard_name not in self._std_names:
err_msg = "standard_name {} is not defined in Standard Name Table v{}.".format(
standard_name or "undefined",
self._std_names._version,
)
close_matches = difflib.get_close_matches(
standard_name,
self._std_names,
)
if close_matches:
err_msg += f" Possible close match(es): {close_matches}"
valid_std_name.messages.append(err_msg)
else:
valid_std_name.score += 1
ret_val.append(valid_std_name.to_result())
# 2) optional - if modifiers, should be in table
if standard_name_modifier is not None:
valid_modifier = TestCtx(BaseCheck.HIGH, self.section_titles["3.3"])
valid_modifier.assert_true(
standard_name_modifier in valid_modifiers,
f'Standard name modifier "{standard_name_modifier}" for variable {name} is not a valid modifier '
"according to CF Appendix C",
)
ret_val.append(valid_modifier.to_result())
else:
standard_name_present = False
# IMPLEMENTATION CONFORMANCE 3 RECOMMENDED
long_or_std_name.assert_true(
long_name_present or standard_name_present,
f"Attribute long_name or/and standard_name is highly recommended for variable {name}",
)
ret_val.append(long_or_std_name.to_result())
return ret_val
[docs]
def check_ancillary_variables(self, ds):
"""
Checks the ancillary_variable attribute for all variables to ensure
they are CF compliant.
CF §3.4 It is a string attribute whose value is a blank separated list
of variable names. The nature of the relationship between variables
associated via ancillary_variables must be determined by other
attributes. The variables listed by the ancillary_variables attribute
will often have the standard name of the variable which points to them
including a modifier (Appendix C, Standard Name Modifiers) to indicate
the relationship.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for ncvar in ds.get_variables_by_attributes(
ancillary_variables=lambda x: x is not None,
):
name = ncvar.name
valid_ancillary = TestCtx(BaseCheck.HIGH, self.section_titles["3.4"])
ancillary_variables = ncvar.ancillary_variables
valid_ancillary.assert_true(
isinstance(ancillary_variables, str),
f"ancillary_variables attribute defined by {name} " "should be string",
)
# Can't perform the second check if it's not a string
if not isinstance(ancillary_variables, str):
ret_val.append(valid_ancillary.to_result())
continue
for ancillary_variable in ancillary_variables.split():
valid_ancillary.assert_true(
ancillary_variable in ds.variables,
f"{ancillary_variable} is not a variable in this dataset",
)
ret_val.append(valid_ancillary.to_result())
return ret_val
[docs]
def check_flags(self, ds):
"""
Check the flag_values, flag_masks and flag_meanings attributes for
variables to ensure they are CF compliant.
CF §3.5 The attributes flag_values, flag_masks and flag_meanings are
intended to make variables that contain flag values self describing.
Status codes and Boolean (binary) condition flags may be expressed with
different combinations of flag_values and flag_masks attribute
definitions.
The flag_values and flag_meanings attributes describe a status flag
consisting of mutually exclusive coded values.
The flag_meanings attribute is a string whose value is a blank
separated list of descriptive words or phrases, one for each flag
value. Each word or phrase should consist of characters from the
alphanumeric set and the following five: '_', '-', '.', '+', '@'.
The flag_masks and flag_meanings attributes describe a number of
independent Boolean conditions using bit field notation by setting
unique bits in each flag_masks value.
The flag_masks, flag_values and flag_meanings attributes, used
together, describe a blend of independent Boolean conditions and
enumerated status codes. A flagged condition is identified by a bitwise
AND of the variable value and each flag_masks value; a result that
matches the flag_values value indicates a true condition.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for name in cfutil.get_flag_variables(ds):
variable = ds.variables[name]
flag_values = getattr(variable, "flag_values", None)
flag_masks = getattr(variable, "flag_masks", None)
valid_flags_var = TestCtx(BaseCheck.HIGH, self.section_titles["3.5"])
# Check that the variable defines mask or values
valid_flags_var.assert_true(
flag_values is not None or flag_masks is not None,
f"{name} does not define either flag_masks or flag_values",
)
ret_val.append(valid_flags_var.to_result())
valid_meanings = self._check_flag_meanings(ds, name)
ret_val.append(valid_meanings)
# check flag_values
if flag_values is not None:
valid_values = self._check_flag_values(ds, name)
ret_val.append(valid_values)
# check flag_masks
if flag_masks is not None:
valid_masks = self._check_flag_masks(ds, name)
ret_val.append(valid_masks)
if flag_values is not None and flag_masks is not None:
vals_arr = np.array(flag_values, ndmin=1)
masks_arr = np.array(flag_masks, ndmin=1)
# IMPLEMENTATION CONFORMANCE 3.5 RECOMMENDED 1/1
# If shapes aren't equal, we can't do proper elementwise
# comparison
if vals_arr.size != masks_arr.size:
allv = False
else:
allv = np.all(vals_arr & masks_arr == vals_arr)
allvr = Result(BaseCheck.MEDIUM, allv, self.section_titles["3.5"])
if not allvr.value:
allvr.msgs = [
f"flag masks and flag values for '{name}' combined don't equal flag values",
]
ret_val.append(allvr)
return ret_val
def _check_flag_values(self, ds, name):
"""
Checks a variable's flag_values attribute for compliance under CF
- flag_values exists as an array
- unique elements in flag_values
- flag_values si the same dtype as the variable
- flag_values is the same length as flag_meanings
:param netCDF4.Dataset ds: An open netCDF dataset
:param str name: Name of variable to check
:rtype: compliance_checker.base.Result
"""
variable = ds.variables[name]
flag_values = getattr(variable, "flag_values", None)
flag_meanings = getattr(variable, "flag_meanings", None)
valid_values = TestCtx(BaseCheck.HIGH, self.section_titles["3.5"])
# IMPLEMENTATION CONFORMANCE 3.5 REQUIRED 2/8
valid_values.assert_true(
hasattr(variable, "flag_meanings"),
f"Variable {variable.name} must have attribute flag_meanings "
"defined when flag_values attribute is present",
)
# the flag values must be independent, no repeating values
flag_set = np.unique(flag_values)
valid_values.assert_true(
flag_set.size == np.array(flag_values).size,
f"{name}'s flag_values must be independent and can not be repeated",
)
# IMPLEMENTATION CONFORMANCE 3.5 REQUIRED 1/8
# the data type for flag_values should be the same as the variable
valid_values.assert_true(
variable.dtype.type == flag_values.dtype.type,
f"flag_values ({flag_values.dtype.type}) must be the same data type as {name} ({variable.dtype.type})"
"",
)
# IMPLEMENTATION CONFORMANCE 3.5 REQUIRED 4/8
if isinstance(flag_meanings, str):
flag_meanings = flag_meanings.split()
valid_values.assert_true(
len(flag_meanings) == np.array(flag_values).size,
f"{name}'s flag_meanings and flag_values should have the same "
"number of elements.",
)
return valid_values.to_result()
def _check_flag_masks(self, ds, name):
"""
Check a variable's flag_masks attribute for compliance under CF
- flag_masks exists as an array
- flag_masks is the same dtype as the variable
- variable's dtype can support bit-field
- flag_masks is the same length as flag_meanings
:param netCDF4.Dataset ds: An open netCDF dataset
:param str name: Variable name
:rtype: compliance_checker.base.Result
"""
variable = ds.variables[name]
flag_masks = variable.flag_masks
flag_meanings = getattr(variable, "flag_meanings", None)
valid_masks = TestCtx(BaseCheck.HIGH, self.section_titles["3.5"])
valid_masks.assert_true(
variable.dtype.type == flag_masks.dtype.type,
f"flag_masks ({flag_masks.dtype.type}) must be the same data type as {name} ({variable.dtype.type})"
"",
)
type_ok = (
np.issubdtype(variable.dtype, np.integer)
or np.issubdtype(variable.dtype, "S")
or np.issubdtype(variable.dtype, "b")
)
valid_masks.assert_true(
0 not in np.array(flag_masks),
f"flag_masks for variable {variable.name} must "
"not contain zero as an element",
)
valid_masks.assert_true(
type_ok,
f"{name}'s data type must be capable of bit-field expression",
)
if isinstance(flag_meanings, str):
flag_meanings = flag_meanings.split()
valid_masks.assert_true(
# cast to array here as single element arrays are returned as
# scalars from netCDF4 Python
len(flag_meanings) == np.array(flag_masks).size,
f"{name} flag_meanings and flag_masks should have the same "
"number of elements.",
)
return valid_masks.to_result()
def _check_flag_meanings(self, ds, name):
"""
Check a variable's flag_meanings attribute for compliance under CF
- flag_meanings exists
- flag_meanings is a string
- flag_meanings elements are valid strings
:param netCDF4.Dataset ds: An open netCDF dataset
:param str name: Variable name
:rtype: compliance_checker.base.Result
"""
variable = ds.variables[name]
flag_meanings = getattr(variable, "flag_meanings", None)
valid_meanings = TestCtx(BaseCheck.HIGH, self.section_titles["3.5"])
valid_meanings.assert_true(
flag_meanings is not None,
f"{name}'s flag_meanings attribute is required for flag variables",
)
valid_meanings.assert_true(
isinstance(flag_meanings, str),
f"{name}'s flag_meanings attribute must be a string",
)
# We can't perform any additional checks if it's not a string
if not isinstance(flag_meanings, str):
return valid_meanings.to_result()
valid_meanings.assert_true(
len(flag_meanings) > 0,
f"{name}'s flag_meanings can't be empty",
)
# IMPLEMENTATION CONFORMANCE REQUIRED 3.5 3/8
flag_regx = regex.compile(r"^[0-9A-Za-z_\-.+@]+$")
meanings = flag_meanings.split()
for meaning in meanings:
if flag_regx.match(meaning) is None:
valid_meanings.assert_true(
False,
f"{name}'s flag_meanings attribute defined an illegal flag meaning "
+ f"{meaning}",
)
return valid_meanings.to_result()
###############################################################################
# Chapter 4: Coordinate Types
###############################################################################
[docs]
def check_coordinate_types(self, ds):
"""
Check the axis attribute of coordinate variables
CF §4 The attribute axis may be attached to a coordinate variable and
given one of the values X, Y, Z or T which stand for a longitude,
latitude, vertical, or time axis respectively. Alternatively the
standard_name attribute may be used for direct identification.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for variable in ds.get_variables_by_attributes(axis=lambda x: x is not None):
name = variable.name
# Coordinate compressions should not be checked as a valid
# coordinate, which they are not. They are a mechanism to project
# an array of indices onto a 2-d grid containing valid coordinates.
if cfutil.is_compression_coordinate(ds, name):
continue
variable = ds.variables[name]
# Even though it's not allowed in CF 1.6, it is allowed in CF 1.7
# and we see people do it, often.
if hasattr(variable, "cf_role"):
continue
# §6.1 allows for labels to be referenced as auxiliary coordinate
# variables, which should not be checked like the rest of the
# coordinates.
if hasattr(variable.dtype, "char") and variable.dtype.char == "S":
continue
elif variable.dtype == str:
continue
axis = getattr(variable, "axis", None)
if axis is not None:
valid_axis = self._check_axis(ds, name)
ret_val.append(valid_axis)
return ret_val
def _check_axis(self, ds, name):
"""
Checks that the axis attribute is a string and an allowed value, namely
one of 'T', 'X', 'Y', or 'Z'.
:param netCDF4.Dataset ds: An open netCDF dataset
:param str name: Name of the variable
:rtype: compliance_checker.base.Result
"""
allowed_axis = ["T", "X", "Y", "Z"]
variable = ds.variables[name]
axis = variable.axis
valid_axis = TestCtx(BaseCheck.HIGH, self.section_titles["4"])
axis_is_string = (isinstance(axis, str),)
valid_axis.assert_true(
axis_is_string and len(axis) > 0,
f"{name}'s axis attribute must be a non-empty string",
)
# If axis isn't a string we can't continue any checks
if not axis_is_string or len(axis) == 0:
return valid_axis.to_result()
valid_axis.assert_true(
axis in allowed_axis,
f"{name}'s axis attribute must be T, X, Y, or Z, " + f"currently {axis}",
)
return valid_axis.to_result()
[docs]
def check_latitude(self, ds):
"""
Check variable(s) that define latitude and are defined correctly according to CF.
CF §4.1 Variables representing latitude must always explicitly include
the units attribute; there is no default value. The recommended unit
of latitude is degrees_north. Also acceptable are degree_north,
degree_N, degrees_N, degreeN, and degreesN.
Optionally, the latitude type may be indicated additionally by
providing the standard_name attribute with the value latitude, and/or
the axis attribute with the value Y.
- Four checks per latitude variable
- (H) latitude has units attribute
- (M) latitude has an allowed units attribute
- (L) latitude uses degrees_north (if not in rotated pole)
- (M) latitude defines either standard_name or axis
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
allowed_lat_units = [
"degrees_north",
"degree_north",
"degree_n",
"degrees_n",
"degreen",
"degreesn",
]
# Determine the grid mappings in this dataset
grid_mapping = []
grid_mapping_variables = cfutil.get_grid_mapping_variables(ds)
for name in grid_mapping_variables:
variable = ds.variables[name]
grid_mapping_name = getattr(variable, "grid_mapping_name", None)
if grid_mapping_name:
grid_mapping.append(grid_mapping_name)
latitude_variables = cfutil.get_latitude_variables(ds)
for latitude in latitude_variables:
variable = ds.variables[latitude]
units = getattr(variable, "units", None)
units_is_string = isinstance(units, str)
standard_name = getattr(variable, "standard_name", None)
axis = getattr(variable, "axis", None)
# Check that latitude defines units
valid_latitude = TestCtx(BaseCheck.HIGH, self.section_titles["4.1"])
valid_latitude.assert_true(
units is not None,
f"latitude variable '{latitude}' must define units",
)
ret_val.append(valid_latitude.to_result())
# Check that latitude uses allowed units
allowed_units = TestCtx(BaseCheck.MEDIUM, self.section_titles["4.1"])
if standard_name == "grid_latitude":
e_n_units = cfutil.VALID_LAT_UNITS | cfutil.VALID_LON_UNITS
# check that the units aren't in east and north degrees units,
# but are convertible to angular units
allowed_units.assert_true(
units not in e_n_units and Unit(units) == Unit("degree"),
f"Grid latitude variable '{latitude}' should use degree equivalent units without east or north components. "
f"Current units are {units}",
)
else:
allowed_units.assert_true(
units_is_string and units.lower() in allowed_lat_units,
f"latitude variable '{latitude}' should define valid units for latitude"
"",
)
ret_val.append(allowed_units.to_result())
# Check that latitude uses degrees_north
if standard_name == "latitude" and units != "degrees_north":
# This is only a recommendation and we won't penalize but we
# will include a recommended action.
msg = (
f"CF recommends latitude variable '{latitude}' to use units degrees_north"
""
)
recommended_units = Result(
BaseCheck.LOW,
(1, 1),
self.section_titles["4.1"],
[msg],
)
ret_val.append(recommended_units)
y_variables = ds.get_variables_by_attributes(axis="Y")
# Check that latitude defines either standard_name or axis
definition = TestCtx(BaseCheck.MEDIUM, self.section_titles["4.1"])
definition.assert_true(
standard_name == "latitude" or axis == "Y" or y_variables != [],
f"latitude variable '{latitude}' should define standard_name='latitude' or axis='Y'"
"",
)
ret_val.append(definition.to_result())
return ret_val
[docs]
def check_longitude(self, ds):
"""
Check variable(s) that define longitude and are defined correctly according to CF.
CF §4.2 Variables representing longitude must always explicitly include
the units attribute; there is no default value. The recommended unit
of longitude is degrees_east. Also acceptable are degree_east,
degree_E, degrees_E, degreeE, and degreesE.
Optionally, the longitude type may be indicated additionally by
providing the standard_name attribute with the value longitude, and/or
the axis attribute with the value X.
- Four checks per longitude variable
- (H) longitude has units attribute
- (M) longitude has an allowed units attribute
- (L) longitude uses degrees_east (if not in rotated pole)
- (M) longitude defines either standard_name or axis
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
# TODO we already have a check_latitude... I'm sure we can make DRYer
ret_val = []
allowed_lon_units = [
"degrees_east",
"degree_east",
"degree_e",
"degrees_e",
"degreee",
"degreese",
]
# Determine the grid mappings in this dataset
grid_mapping = []
grid_mapping_variables = cfutil.get_grid_mapping_variables(ds)
for name in grid_mapping_variables:
variable = ds.variables[name]
grid_mapping_name = getattr(variable, "grid_mapping_name", None)
if grid_mapping_name:
grid_mapping.append(grid_mapping_name)
longitude_variables = cfutil.get_longitude_variables(ds)
for longitude in longitude_variables:
variable = ds.variables[longitude]
units = getattr(variable, "units", None)
units_is_string = isinstance(units, str)
standard_name = getattr(variable, "standard_name", None)
axis = getattr(variable, "axis", None)
# NOTE see docstring--should below be 4.1 or 4.2?
# Check that longitude defines units
valid_longitude = TestCtx(BaseCheck.HIGH, self.section_titles["4.2"])
valid_longitude.assert_true(
units is not None,
f"longitude variable '{longitude}' must define units",
)
ret_val.append(valid_longitude.to_result())
# Check that longitude uses allowed units
allowed_units = TestCtx(BaseCheck.MEDIUM, self.section_titles["4.2"])
if standard_name == "grid_longitude":
e_n_units = cfutil.VALID_LAT_UNITS | cfutil.VALID_LON_UNITS
# check that the units aren't in east and north degrees units,
# but are convertible to angular units
allowed_units.assert_true(
units not in e_n_units and Unit(units) == Unit("degree"),
f"Grid longitude variable '{longitude}' should use degree equivalent units without east or north components. "
f"Current units are {units}",
)
else:
allowed_units.assert_true(
units_is_string and units.lower() in allowed_lon_units,
f"longitude variable '{longitude}' should define valid units for longitude"
"",
)
ret_val.append(allowed_units.to_result())
# Check that longitude uses degrees_east
if standard_name == "longitude" and units != "degrees_east":
# This is only a recommendation and we won't penalize but we
# will include a recommended action.
msg = (
f"CF recommends longitude variable '{longitude}' to use units degrees_east"
""
)
recommended_units = Result(
BaseCheck.LOW,
(1, 1),
self.section_titles["4.2"],
[msg],
)
ret_val.append(recommended_units)
x_variables = ds.get_variables_by_attributes(axis="X")
# Check that longitude defines either standard_name or axis
definition = TestCtx(BaseCheck.MEDIUM, self.section_titles["4.2"])
definition.assert_true(
standard_name == "longitude" or axis == "X" or x_variables != [],
f"longitude variable '{longitude}' should define standard_name='longitude' or axis='X'"
"",
)
ret_val.append(definition.to_result())
return ret_val
[docs]
def check_dimensional_vertical_coordinate(
self,
ds,
dimless_vertical_coordinates=dimless_vertical_coordinates_1_6,
):
"""
Check units for variables defining vertical position are valid under
CF.
CF §4.3.1 The units attribute for dimensional coordinates will be a string
formatted as per the udunits.dat file.
The acceptable units for vertical (depth or height) coordinate variables
are:
- units of pressure as listed in the file udunits.dat. For vertical axes
the most commonly used of these include include bar, millibar,
decibar, atmosphere (atm), pascal (Pa), and hPa.
- units of length as listed in the file udunits.dat. For vertical axes
the most commonly used of these include meter (metre, m), and
kilometer (km).
- other units listed in the file udunits.dat that may under certain
circumstances reference vertical position such as units of density or
temperature.
Plural forms are also acceptable.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
z_variables = cfutil.get_z_variables(ds)
# dimless_standard_names = [name for name, regx in dimless_vertical_coordinates]
for name in z_variables:
variable = ds.variables[name]
standard_name = getattr(variable, "standard_name", None)
units = getattr(variable, "units", None)
positive = getattr(variable, "positive", None)
# Skip the variable if it's dimensionless
if (
hasattr(variable, "formula_terms")
or standard_name in dimless_vertical_coordinates
):
continue
valid_vertical_coord = TestCtx(BaseCheck.HIGH, self.section_titles["4.3"])
valid_vertical_coord.assert_true(
isinstance(units, str) and units,
f"§4.3.1 {name}'s units must be defined for vertical coordinates, "
"there is no default",
)
if not util.units_convertible("bar", units):
valid_vertical_coord.assert_true(
positive in ("up", "down"),
f"{name}: vertical coordinates not defining pressure must include "
"a positive attribute that is either 'up' or 'down'",
)
# _check_valid_standard_units, part of the Chapter 3 checks,
# already verifies that this coordinate has valid units
ret_val.append(valid_vertical_coord.to_result())
return ret_val
def _check_dimensionless_vertical_coordinate_1_6(
self,
ds,
vname,
deprecated_units,
ret_val,
dim_vert_coords_dict,
):
"""
Check that a dimensionless vertical coordinate variable is valid under
CF-1.6.
:param netCDF4.Dataset ds: open netCDF4 dataset
:param str name: variable name
:param list ret_val: array to append Results to
:rtype None
"""
variable = ds.variables[vname]
standard_name = getattr(variable, "standard_name", None)
units = getattr(variable, "units", None)
formula_terms = getattr(variable, "formula_terms", None)
# Skip the variable if it's dimensional
if formula_terms is None and standard_name not in dim_vert_coords_dict:
return
is_not_deprecated = TestCtx(BaseCheck.LOW, self.section_titles["4.3"])
is_not_deprecated.assert_true(
units not in deprecated_units,
f"§4.3.2: units are deprecated by CF in variable {vname}: {units}" "",
)
# check the vertical coordinates
ret_val.append(is_not_deprecated.to_result())
ret_val.append(self._check_formula_terms(ds, vname, dim_vert_coords_dict))
[docs]
def check_dimensionless_vertical_coordinates(self, ds):
"""
Check the validity of dimensionless coordinates under CF
CF §4.3.2 The units attribute is not required for dimensionless
coordinates.
The standard_name attribute associates a coordinate with its definition
from Appendix D, Dimensionless Vertical Coordinates. The definition
provides a mapping between the dimensionless coordinate values and
dimensional values that can positively and uniquely indicate the
location of the data.
A new attribute, formula_terms, is used to associate terms in the
definitions with variables in a netCDF file. To maintain backwards
compatibility with COARDS the use of these attributes is not required,
but is strongly recommended.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
deprecated_units = ["level", "layer", "sigma_level"]
ret_val.extend(
self._check_dimensionless_vertical_coordinates(
ds,
deprecated_units,
self._check_dimensionless_vertical_coordinate_1_6,
dimless_vertical_coordinates_1_6,
),
)
return ret_val
[docs]
def check_time_coordinate(self, ds):
"""
Check variables defining time are valid under CF
CF §4.4 Variables representing time must always explicitly include the
units attribute; there is no default value.
The units attribute takes a string value formatted as per the
recommendations in the Udunits package.
The acceptable units for time are listed in the udunits.dat file. The
most commonly used of these strings (and their abbreviations) includes
day (d), hour (hr, h), minute (min) and second (sec, s). Plural forms
are also acceptable. The reference time string (appearing after the
identifier since) may include date alone; date and time; or date, time,
and time zone. The reference time is required. A reference time in year
0 has a special meaning (see Section 7.4, "Climatological Statistics").
Recommend that the unit year be used with caution. It is not a calendar
year. For similar reasons the unit month should also be used with
caution.
A time coordinate is identifiable from its units string alone.
Optionally, the time coordinate may be indicated additionally by
providing the standard_name attribute with an appropriate value, and/or
the axis attribute with the value T.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for name in cfutil.get_time_variables(ds):
variable = ds.variables[name]
# Has units
has_units = hasattr(variable, "units")
if not has_units:
result = Result(
BaseCheck.HIGH,
False,
self.section_titles["4.4"],
["%s does not have units" % name],
)
ret_val.append(result)
continue
# Correct and identifiable units
# TODO: year zero climatological time warning
result = Result(BaseCheck.HIGH, True, self.section_titles["4.4"])
ret_val.append(result)
correct_units = util.units_temporal(variable.units)
reasoning = None
if not correct_units:
reasoning = ["%s does not have correct time units" % name]
result = Result(
BaseCheck.HIGH,
correct_units,
self.section_titles["4.4"],
reasoning,
)
ret_val.append(result)
continue
# IMPLEMENTATION CONFORMANCE 4.4 RECOMMENDED 1/2
if hasattr(variable, "climatology"):
year_match = regex.match(r"\w+ since (?P<year>\d{1,4})", variable.units)
# year should always exist at this point if it's been parsed as
# valid date
if int(year_match.group("year")) == 0:
message = (
f"Time coordinate variable {variable.name}'s "
"use of year 0 for climatological time is "
"deprecated"
)
result = Result(
BaseCheck.MEDIUM,
False,
self.section_titles["4.4"],
[message],
)
ret_val.append(result)
# IMPLEMENTATION CONFORMANCE 4.4 RECOMMENDED 2/2
# catch non-recommended months or years time interval
unit = Unit(variable.units)
if unit.is_long_time_interval():
message = f"Using relative time interval of months or years is not recommended for coordinate variable {variable.name}"
result = Result(
BaseCheck.MEDIUM,
False,
self.section_titles["4.4"],
[message],
)
ret_val.append(result)
return ret_val
[docs]
def check_calendar(self, ds):
"""
Check the calendar attribute for variables defining time and ensure it
is a valid calendar prescribed by CF.
CF §4.4.1 In order to calculate a new date and time given a base date, base
time and a time increment one must know what calendar to use.
The values currently defined for calendar are:
- gregorian or standard
- proleptic_gregorian
- noleap or 365_day
- all_leap or 366_day
- 360_day
- julian
- none
The calendar attribute may be set to none in climate experiments that
simulate a fixed time of year.
The time of year is indicated by the date in the reference time of the
units attribute.
If none of the calendars defined above applies, a non-standard calendar
can be defined. The lengths of each month are explicitly defined with
the month_lengths attribute of the time axis.
If leap years are included, then two other attributes of the time axis
should also be defined:
leap_year, leap_month
The calendar attribute is not required when a non-standard calendar is
being used. It is sufficient to define the calendar using the
month_lengths attribute, along with leap_year, and leap_month as
appropriate. However, the calendar attribute is allowed to take
non-standard values and in that case defining the non-standard calendar
using the appropriate attributes is required.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
standard_calendars = {
"gregorian",
"standard",
"proleptic_gregorian",
"noleap",
"365_day",
"all_leap",
"366_day",
"360_day",
"julian",
"none",
}
ret_val = []
def check_standard_calendar_no_cross(time_var):
"""
Check that the time variable does not cross the date
1582-10-15 when standard or gregorian calendars are used
"""
# IMPLEMENTATION CONFORMANCE 4.4.1 RECOMMENDED 2/2
# Only get non-nan/FillValue times, as these are the only things
# that make sense for conversion. Furthermore, non-null checks
# should be made for time coordinate variables anyways, so errors
# should be caught where implemented there
crossover_date = cftime.DatetimeGregorian(1582, 10, 15)
# has_year_zero set to true in order to just check crossover,
# actual year less than or equal to zero check handled elsewhere
# when standard/Gregorian, or Julian calendars used.
# WARNING: might fail here if months_since are used and suppress
# usual warning
try:
times = cftime.num2date(
time_var[:].compressed(),
time_var.units,
has_year_zero=True,
)
except ValueError:
return Result(
BaseCheck.LOW,
False,
self.section_titles["4.4"],
[
"Miscellaneous failure when attempting to calculate crossover, possible malformed date",
],
)
crossover_1582 = np.any(times < crossover_date) and np.any(
times >= crossover_date,
)
if not crossover_1582:
reasoning = (
f"Variable {time_var.name} has standard or Gregorian "
"calendar and does not cross 1582-10-15T00:00Z"
)
else:
reasoning = (
f"Variable {time_var.name} has time values "
"prior to 1582-10-15T00:00Z and utilizes "
"the standard or Gregorian calendar"
)
return Result(
BaseCheck.LOW,
not crossover_1582,
self.section_titles["4.4"],
[reasoning],
)
# if has a calendar, check that it is within the valid values
# otherwise no calendar is valid
# this will only fetch variables with time units defined
for time_var_name in cfutil.get_time_variables(ds):
if time_var_name not in {var.name for var in util.find_coord_vars(ds)}:
continue
time_var = ds.variables[time_var_name]
if not hasattr(time_var, "calendar"):
continue
if time_var.calendar.lower() == "gregorian":
reasoning = (
f"For time variable {time_var.name}, when using "
"the standard Gregorian calendar, the value "
'"standard" is preferred over "gregorian" for '
"the calendar attribute"
)
result = Result(
BaseCheck.LOW,
False,
self.section_titles["4.4.1"],
[reasoning],
)
ret_val.append(result)
# check here and in the below case that time does not cross
# thee date 1582-10-15 as requested by CF conformance
ret_val.append(check_standard_calendar_no_cross(time_var))
elif time_var.calendar == "standard":
ret_val.append(check_standard_calendar_no_cross(time_var))
# if a nonstandard calendar, then leap_years and leap_months must
# must be present
if time_var.calendar.lower() not in standard_calendars:
result = self._check_leap_time(time_var)
# passes if the calendar is valid, otherwise notify of invalid
# calendar
else:
result = Result(BaseCheck.LOW, True, self.section_titles["4.4.1"])
ret_val.append(result)
return ret_val
def _check_leap_time(self, time_variable):
"""
Helper method to handle checking custom calendar leap time specifications
"""
leap_time = TestCtx(BaseCheck.HIGH, self.section_titles["4.4"])
leap_time.out_of = 1
# IMPLEMENTATION CONFORMANCE 4.4.1 REQUIRED 2, 3 / 5
if not hasattr(time_variable, "month_lengths") or not (
hasattr(time_variable.month_lengths, "dtype")
and np.issubdtype(time_variable.month_lengths.dtype, np.integer)
and time_variable.month_lengths.size == 12
):
leap_time.messages.append(
f"For nonstandard calendar on variable {time_variable.name}, "
"attribute month_lengths must be supplied as a 12-element "
"integer array",
)
return leap_time.to_result()
# If leap years are included, then attributes leap_month and
# leap_year must be included.
has_leap_year = hasattr(time_variable, "leap_year")
# IMPLEMENTATION CONFORMANCE 4.4.1 REQUIRED 4,5/5
if hasattr(time_variable, "leap_month"):
leap_time.assert_true(
(
np.isscalar(time_variable.leap_month)
and hasattr(time_variable.leap_month, "dtype")
and np.issubdtype(time_variable.leap_month.dtype, np.integer)
and 1 <= time_variable.leap_month <= 12
),
"When attribute leap_month is supplied for variable "
f"{time_variable.name}, the value must be a scalar integer "
"between 1 and 12",
)
# IMPLEMENTATION CONFORMANCE 4.4.1 RECOMMENDED 1/2
if not has_leap_year:
leap_time.out_of += 1
fail_message = (
f"For time variable {time_variable.name}, "
"attribute leap_year must be present if "
"leap_month attribute is defined"
)
leap_time.messages.append(fail_message)
# IMPLEMENTATION CONFORMANCE 4.4.1 REQUIRED 5/5
if has_leap_year:
leap_time.assert_true(
np.isscalar(time_variable.leap_year)
and hasattr(time_variable.leap_year, "dtype"),
"When attribute leap_year is supplied for variable "
f"{time_variable.name}, the value must be a scalar "
"integer",
)
return leap_time.to_result()
###############################################################################
# Chapter 5: Coordinate Systems
###############################################################################
[docs]
def check_aux_coordinates(self, ds):
"""
Chapter 5 paragraph 3
The dimensions of an auxiliary coordinate variable must be a subset of
the dimensions of the variable with which the coordinate is associated,
with two exceptions. First, string-valued coordinates (Section 6.1,
"Labels") have a dimension for maximum string length. Second, in the
ragged array representations of data (Chapter 9, Discrete Sampling
Geometries), special methods are needed to connect the data and
coordinates.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
# for contiguous ragged array/indexed ragged array representations,
# coordinates are not required to adhere to the same principles;
# these representaitions can be identified by two attributes:
# required for contiguous
count_vars = ds.get_variables_by_attributes(
sample_dimension=lambda x: x is not None,
)
# required for indexed
index_vars = ds.get_variables_by_attributes(
instance_dimension=lambda x: x is not None,
)
# if these attributes exist, we don't need to test
# the coordinates
if count_vars or index_vars:
return ret_val
geophysical_variables = self._find_geophysical_vars(ds)
for name in geophysical_variables:
variable = ds.variables[name]
coordinates = getattr(variable, "coordinates", None)
# We use a set so we can assert
dim_set = set(variable.dimensions)
# No auxiliary coordinates, no check
if not isinstance(coordinates, str) or coordinates == "":
continue
valid_aux_coords = TestCtx(BaseCheck.HIGH, self.section_titles["5"])
for aux_coord in coordinates.split():
valid_aux_coords.assert_true(
aux_coord in ds.variables,
f"{name}'s auxiliary coordinate specified by the coordinates attribute, {aux_coord}, "
"is not a variable in this dataset"
"",
)
if aux_coord not in ds.variables:
continue
# TODO CONFORMANCE: Partial implementation of labels
# §6.1 Allows for "labels" to be referenced as coordinates
if (
hasattr(ds.variables[aux_coord].dtype, "char")
and ds.variables[aux_coord].dtype.char == "S"
):
continue
elif ds.variables[aux_coord].dtype == str:
continue
aux_coord_dims = set(ds.variables[aux_coord].dimensions)
valid_aux_coords.assert_true(
aux_coord_dims.issubset(dim_set),
"dimensions for auxiliary coordinate variable {} ({}) "
"are not a subset of dimensions for variable {} ({})"
"".format(
aux_coord,
", ".join(aux_coord_dims),
name,
", ".join(dim_set),
),
)
ret_val.append(valid_aux_coords.to_result())
return ret_val
[docs]
def check_duplicate_axis(self, ds):
"""
Checks that no variable contains two coordinates defining the same
Chapter 5 paragraph 6
If an axis attribute is attached to an auxiliary coordinate variable,
it can be used by applications in the same way the `axis` attribute
attached to a coordinate variable is used. However, it is not
permissible for a [geophysical variable] to have both a coordinate
variable and an auxiliary coordinate variable, or more than one of
either type of variable, having an `axis` attribute with any given
value e.g. there must be no more than one axis attribute for X for any
[geophysical variable].
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
:return: List of results
"""
ret_val = []
geophysical_variables = self._find_geophysical_vars(ds)
for name in geophysical_variables:
no_duplicates = TestCtx(BaseCheck.HIGH, self.section_titles["5"])
axis_map = cfutil.get_axis_map(ds, name)
# For every coordinate associated with this variable, keep track of
# which coordinates define an axis and assert that there are no
# duplicate axis attributes defined in the set of associated
# coordinates. axis_map includes coordinates that don't actually have
# an axis attribute, so we need to ignore those here.
for axis, coords in axis_map.items():
coords = [c for c in coords if hasattr(ds.variables[c], "axis")]
no_duplicates.assert_true(
len(coords) <= 1,
"'{}' has duplicate axis {} defined by [{}]".format(
name,
axis,
", ".join(sorted(coords)),
),
)
ret_val.append(no_duplicates.to_result())
return ret_val
[docs]
def check_multi_dimensional_coords(self, ds):
"""
Checks that no multidimensional coordinate shares a name with its
dimensions.
Chapter 5 paragraph 4
We recommend that the name of a [multidimensional coordinate] should
not match the name of any of its dimensions.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
# This can only apply to auxiliary coordinate variables
for coord in self._find_aux_coord_vars(ds):
variable = ds.variables[coord]
if variable.ndim < 2:
continue
not_matching = TestCtx(BaseCheck.MEDIUM, self.section_titles["5"])
not_matching.assert_true(
coord not in variable.dimensions,
f"{coord} shares the same name as one of its dimensions" "",
)
ret_val.append(not_matching.to_result())
return ret_val
# NOTE **********
# IS THIS EVEN NEEDED ANYMORE?
# ***************
[docs]
def check_grid_coordinates(self, ds):
# def _check_grid_coordinates(self, ds):
"""
5.6 When the coordinate variables for a horizontal grid are not
longitude and latitude, it is required that the true latitude and
longitude coordinates be supplied via the coordinates attribute.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
latitudes = cfutil.get_true_latitude_variables(ds)
longitudes = cfutil.get_true_longitude_variables(ds)
check_featues = [
"2d-regular-grid",
"2d-static-grid",
"3d-regular-grid",
"3d-static-grid",
"mapped-grid",
"reduced-grid",
]
# This one is tricky because there's a very subtle difference between
# latitude as defined in Chapter 4 and "true" latitude as defined in
# chapter 5.
# For each geophysical variable that defines a grid, assert it is
# associated with a true latitude or longitude coordinate.
for variable in self._find_geophysical_vars(ds):
# We use a set so we can do set-wise comparisons with coordinate
# dimensions
dimensions = set(ds.variables[variable].dimensions)
# If it's not a grid, skip it
if cfutil.guess_feature_type(ds, variable) not in check_featues:
continue
has_coords = TestCtx(BaseCheck.HIGH, self.section_titles["5.6"])
# axis_map is a defaultdict(list) mapping the axis to a list of
# coordinate names. For example:
# {'X': ['lon'], 'Y':['lat'], 'Z':['lev']}
# The mapping comes from the dimensions of the variable and the
# contents of the `coordinates` attribute only.
axis_map = cfutil.get_axis_map(ds, variable)
msg = (
'{}\'s coordinate variable "{}" is not one of the variables identifying true '
+ "latitude/longitude and its dimensions are not a subset of {}'s dimensions"
)
alt = (
"{} has no coordinate associated with a variable identified as true latitude/longitude; "
"its coordinate variable should also share a subset of {}'s dimensions"
)
# Make sure we can find latitude and its dimensions are a subset
_lat = None
found_lat = False
for lat in axis_map["Y"]:
_lat = lat
is_subset_dims = set(ds.variables[lat].dimensions).issubset(dimensions)
if is_subset_dims and lat in latitudes:
found_lat = True
break
if _lat:
has_coords.assert_true(found_lat, msg.format(variable, _lat, variable))
else:
has_coords.assert_true(found_lat, alt.format(variable, variable))
# Make sure we can find longitude and its dimensions are a subset
_lon = None
found_lon = False
for lon in axis_map["X"]:
_lon = lon
is_subset_dims = set(ds.variables[lon].dimensions).issubset(dimensions)
if is_subset_dims and lon in longitudes:
found_lon = True
break
if _lon:
has_coords.assert_true(found_lon, msg.format(variable, _lon, variable))
else:
has_coords.assert_true(found_lon, alt.format(variable, variable))
ret_val.append(has_coords.to_result())
return ret_val
[docs]
def check_reduced_horizontal_grid(self, ds):
"""
5.3 A "reduced" longitude-latitude grid is one in which the points are
arranged along constant latitude lines with the number of points on a
latitude line decreasing toward the poles.
Recommend that this type of gridded data be stored using the compression
scheme described in Section 8.2, "Compression by Gathering". The
compressed latitude and longitude auxiliary coordinate variables are
identified by the coordinates attribute.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
# Create a set of coordinate variables defining `compress`
lats = set(cfutil.get_latitude_variables(ds))
lons = set(cfutil.get_longitude_variables(ds))
for name in self._find_geophysical_vars(ds):
coords = getattr(ds.variables[name], "coordinates", None)
axis_map = cfutil.get_axis_map(ds, name)
# If this variable has no coordinate that defines compression
if "C" not in axis_map:
continue
valid_rgrid = TestCtx(BaseCheck.HIGH, self.section_titles["5.3"])
# Make sure reduced grid features define coordinates
valid_rgrid.assert_true(
isinstance(coords, str) and coords,
f"reduced grid feature {name} must define coordinates attribute" "",
)
# We can't check anything else if there are no defined coordinates
if not isinstance(coords, str) and coords:
continue
coord_set = set(coords.split())
# Make sure it's associated with valid lat and valid lon
valid_rgrid.assert_true(
len(coord_set.intersection(lons)) > 0,
f"{name} must be associated with a valid longitude coordinate",
)
valid_rgrid.assert_true(
len(coord_set.intersection(lats)) > 0,
f"{name} must be associated with a valid latitude coordinate",
)
valid_rgrid.assert_true(
len(axis_map["C"]) == 1,
"{} can not be associated with more than one compressed coordinates: "
"({})".format(name, ", ".join(axis_map["C"])),
)
for compressed_coord in axis_map["C"]:
coord = ds.variables[compressed_coord]
compress = getattr(coord, "compress", None)
valid_rgrid.assert_true(
isinstance(compress, str) and compress,
f"compress attribute for compression coordinate {compressed_coord} must be a non-empty string"
"",
)
if not isinstance(compress, str):
continue
for dim in compress.split():
valid_rgrid.assert_true(
dim in ds.dimensions,
f"dimension {dim} referenced by {compressed_coord}:compress must exist"
"",
)
ret_val.append(valid_rgrid.to_result())
return ret_val
def _check_grid_mapping_attr_condition(self, attr, attr_name):
"""
Evaluate a condition (or series of conditions) for a particular
attribute. Implementation for CF-1.6.
:param attr: attribute to teset condition for
:param str attr_name: name of the attribute
:rtype tuple
:return two-tuple of (bool, str)
"""
if attr_name == "latitude_of_projection_origin":
return self._evaluate_latitude_of_projection_origin(attr)
elif attr_name == "longitude_of_projection_origin":
return self._evaluate_longitude_of_projection_origin(attr)
elif attr_name == "longitude_of_central_meridian":
return self._evaluate_longitude_of_central_meridian(attr)
elif attr_name == "longitude_of_prime_meridian":
return self._evaluate_longitude_of_prime_meridian(attr)
elif attr_name == "scale_factor_at_central_meridian":
return self._evaluate_scale_factor_at_central_meridian(attr)
elif attr_name == "scale_factor_at_projection_origin":
return self._evaluate_scale_factor_at_projection_origin(attr)
elif attr_name == "standard_parallel":
return self._evaluate_standard_parallel(attr)
elif attr_name == "straight_vertical_longitude_from_pole":
return self._evaluate_straight_vertical_longitude_from_pole(attr)
else:
raise NotImplementedError(
f"Evaluation for {attr_name} not yet implemented",
)
def _evaluate_latitude_of_projection_origin(self, val):
"""
Evaluate the condition for `latitude_of_projection_origin` attribute.
Return result. Value must be -90 <= x <= 90.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -90.0) and (val <= 90.0),
"latitude_of_projection_origin must satisfy (-90 <= x <= 90)",
)
def _evaluate_longitude_of_projection_origin(self, val):
"""
Evaluate the condition for `longitude_of_projection_origin` attribute.
Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -180.0) and (val <= 180.0),
"longitude_of_projection_origin must satisfy (-180 <= x <= 180)",
)
def _evaluate_longitude_of_central_meridian(self, val):
"""
Evaluate the condition for `longitude_of_central_meridian` attribute.
Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -180.0) and (val <= 180.0),
"longitude_of_central_meridian must satisfy (-180 <= x <= 180)",
)
def _evaluate_longitude_of_prime_meridian(self, val):
"""
Evaluate the condition for `longitude_of_prime_meridian` attribute.
Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -180.0) and (val <= 180.0),
"longitude_of_prime_meridian must satisfy (-180 <= x <= 180)",
)
def _evaluate_scale_factor_at_central_meridian(self, val):
"""
Evaluate the condition for `scale_factor_at_central_meridian` attribute.
Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (val > 0.0, "scale_factor_at_central_meridian must be > 0.0")
def _evaluate_scale_factor_at_projection_origin(self, val):
"""
Evaluate the condition for `scale_factor_at_projection_origin` attribute.
Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (val > 0.0, "scale_factor_at_projection_origin must be > 0.0")
def _evaluate_standard_parallel(self, val):
"""
Evaluate the condition for `standard_parallel` attribute. Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -90.0) and (val <= 90),
"standard_parallel must satisfy (-90 <= x <= 90)",
)
def _evaluate_straight_vertical_longitude_from_pole(self, val):
"""
Evaluate the condition for `straight_vertical_longitude_from_pole`
attribute. Return result.
:param val: value to be tested
:rtype tuple
:return two-tuple (bool, msg)
"""
return (
(val >= -180.0) and (val <= 180),
"straight_vertical_longitude_from_pole must satisfy (-180 <= x <= 180)",
)
###############################################################################
# Chapter 6: Labels and Alternative Coordinates
###############################################################################
[docs]
def check_geographic_region(self, ds):
"""
6.1.1 When data is representative of geographic regions which can be identified by names but which have complex
boundaries that cannot practically be specified using longitude and latitude boundary coordinates, a labeled
axis should be used to identify the regions.
Recommend that the names be chosen from the list of standardized region names whenever possible. To indicate
that the label values are standardized the variable that contains the labels must be given the standard_name
attribute with the value region.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
region_list = (
[ # TODO maybe move this (and other info like it) into a config file?
"africa",
"antarctica",
"arabian_sea",
"aral_sea",
"arctic_ocean",
"asia",
"atlantic_ocean",
"australia",
"baltic_sea",
"barents_opening",
"barents_sea",
"beaufort_sea",
"bellingshausen_sea",
"bering_sea",
"bering_strait",
"black_sea",
"canadian_archipelago",
"caribbean_sea",
"caspian_sea",
"central_america",
"chukchi_sea",
"contiguous_united_states",
"denmark_strait",
"drake_passage",
"east_china_sea",
"english_channel",
"eurasia",
"europe",
"faroe_scotland_channel",
"florida_bahamas_strait",
"fram_strait",
"global",
"global_land",
"global_ocean",
"great_lakes",
"greenland",
"gulf_of_alaska",
"gulf_of_mexico",
"hudson_bay",
"iceland_faroe_channel",
"indian_ocean",
"indonesian_throughflow",
"indo_pacific_ocean",
"irish_sea",
"lake_baykal",
"lake_chad",
"lake_malawi",
"lake_tanganyika",
"lake_victoria",
"mediterranean_sea",
"mozambique_channel",
"north_america",
"north_sea",
"norwegian_sea",
"pacific_equatorial_undercurrent",
"pacific_ocean",
"persian_gulf",
"red_sea",
"ross_sea",
"sea_of_japan",
"sea_of_okhotsk",
"south_america",
"south_china_sea",
"southern_ocean",
"taiwan_luzon_straits",
"weddell_sea",
"windward_passage",
"yellow_sea",
]
)
for var in ds.get_variables_by_attributes(standard_name="region"):
valid_region = TestCtx(BaseCheck.MEDIUM, self.section_titles["6.1"])
region = var[:]
if np.ma.isMA(region):
region = region.data
valid_region.assert_true(
"".join(region.astype(str)).lower() in region_list,
"6.1.1 '{}' specified by '{}' is not a valid region".format(
"".join(region.astype(str)),
var.name,
),
)
ret_val.append(valid_region.to_result())
return ret_val
###############################################################################
# Chapter 7: Data Representative of Cells
###############################################################################
[docs]
def check_cell_boundaries(self, ds):
"""
Checks the dimensions of cell boundary variables to ensure they are CF compliant.
7.1 To represent cells we add the attribute bounds to the appropriate coordinate variable(s). The value of bounds
is the name of the variable that contains the vertices of the cell boundaries. We refer to this type of variable as
a "boundary variable." A boundary variable will have one more dimension than its associated coordinate or auxiliary
coordinate variable. The additional dimension should be the most rapidly varying one, and its size is the maximum
number of cell vertices.
Applications that process cell boundary data often times need to determine whether or not adjacent cells share an
edge. In order to facilitate this type of processing the following restrictions are placed on the data in boundary
variables:
Bounds for 1-D coordinate variables
For a coordinate variable such as lat(lat) with associated boundary variable latbnd(x,2), the interval endpoints
must be ordered consistently with the associated coordinate, e.g., for an increasing coordinate, lat(1) > lat(0)
implies latbnd(i,1) >= latbnd(i,0) for all i
If adjacent intervals are contiguous, the shared endpoint must be represented identically in each instance where
it occurs in the boundary variable. For example, if the intervals that contain grid points lat(i) and lat(i+1) are
contiguous, then latbnd(i+1,0) = latbnd(i,1).
Bounds for 2-D coordinate variables with 4-sided cells
In the case where the horizontal grid is described by two-dimensional auxiliary coordinate variables in latitude
lat(n,m) and longitude lon(n,m), and the associated cells are four-sided, then the boundary variables are given
in the form latbnd(n,m,4) and lonbnd(n,m,4), where the trailing index runs over the four vertices of the cells.
Bounds for multi-dimensional coordinate variables with p-sided cells
In all other cases, the bounds should be dimensioned (...,n,p), where (...,n) are the dimensions of the auxiliary
coordinate variables, and p the number of vertices of the cells. The vertices must be traversed anticlockwise in the
lon-lat plane as viewed from above. The starting vertex is not specified.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
# Note that test does not check monotonicity
ret_val = []
reasoning = []
for variable_name, boundary_variable_name in cfutil.get_cell_boundary_map(
ds,
).items():
variable = ds.variables[variable_name]
valid = True
reasoning = []
if boundary_variable_name not in ds.variables:
valid = False
reasoning.append(
f"Boundary variable {boundary_variable_name} referenced by {variable.name} not "
+ "found in dataset variables",
)
else:
boundary_variable = ds.variables[boundary_variable_name]
# The number of dimensions in the bounds variable should always be
# the number of dimensions in the referring variable + 1
if boundary_variable.ndim < 2:
valid = False
reasoning.append(
f"Boundary variable {boundary_variable.name} specified by {variable.name}"
+ " should have at least two dimensions to enclose the base "
+ "case of a one dimensionsal variable",
)
if boundary_variable.ndim != variable.ndim + 1:
valid = False
reasoning.append(
f"The number of dimensions of the variable {variable.name} is {variable.ndim}, but the "
f"number of dimensions of the boundary variable {boundary_variable.name} is {boundary_variable.ndim}. The boundary variable "
f"should have {variable.ndim + 1} dimensions",
)
if variable.dimensions[:] != boundary_variable.dimensions[: variable.ndim]:
valid = False
reasoning.append(
f"Boundary variable coordinates (for {variable.name}) are in improper order: {boundary_variable.dimensions}. Bounds-specific dimensions should be last"
"",
)
# ensure p vertices form a valid simplex given previous a...n
# previous auxiliary coordinates
if (
ds.dimensions[boundary_variable.dimensions[-1]].size
< len(boundary_variable.dimensions[:-1]) + 1
):
valid = False
reasoning.append(
f"Dimension {boundary_variable.name} of boundary variable (for {variable.name}) must have at least {len(variable.dimensions) + 1} elements to form a simplex/closed cell with previous dimensions {boundary_variable.dimensions[:-1]}.",
)
result = Result(
BaseCheck.MEDIUM,
valid,
self.section_titles["7.1"],
reasoning,
)
ret_val.append(result)
return ret_val
def _cell_measures_core(self, ds, var, external_set, variable_template):
# IMPLEMENTATION CONFORMANCE REQUIRED 1/2
reasoning = []
search_str = (
r"^(?P<measure_type>area|volume):\s+(?P<cell_measure_var_name>\w+)$"
)
search_res = regex.match(search_str, var.cell_measures)
if not search_res:
valid = False
reasoning.append(
f"The cell_measures attribute for variable {var.name} "
"is formatted incorrectly. It should take the "
"form of either 'area: cell_var' or "
"'volume: cell_var' where cell_var is an existing name of "
"a variable describing the cell measures.",
)
else:
valid = True
cell_measure_var_name = search_res.group("cell_measure_var_name")
cell_measure_type = search_res.group("measure_type")
# TODO: cache previous results
if cell_measure_var_name not in set(ds.variables.keys()).union(
external_set,
):
valid = False
reasoning.append(
f"Cell measure variable {cell_measure_var_name} referred to by "
f"{var.name} is not present in {variable_template}s".format(
cell_measure_var_name,
var.name,
),
)
# CF 1.7+ assume external variables -- further checks can't be run here
elif cell_measure_var_name in external_set:
# can't test anything on an external var
return Result(
BaseCheck.MEDIUM,
valid,
(self.section_titles["7.2"]),
reasoning,
)
else:
cell_measure_var = ds.variables[cell_measure_var_name]
if not hasattr(cell_measure_var, "units"):
valid = False
reasoning.append(
f"Cell measure variable {cell_measure_var_name} is required "
"to have units attribute defined",
)
else:
# IMPLEMENTATION CONFORMANCE REQUIRED 2/2
# verify this combination {area: 'm2', volume: 'm3'}
# key is valid measure types, value is expected
# exponent
exponent_lookup = {"area": 2, "volume": 3}
exponent = exponent_lookup[search_res.group("measure_type")]
conversion_failure_msg = (
f'Variable "{cell_measure_var.name}" must have units which are convertible '
f'to UDUNITS "m{exponent}" when variable is referred to by a {variable_template} with '
f'cell_methods attribute with a measure type of "{cell_measure_type}".'
)
try:
cell_measure_units = Unit(cell_measure_var.units)
except ValueError:
valid = False
reasoning.append(conversion_failure_msg)
else:
if not cell_measure_units.is_convertible(Unit(f"m{exponent}")):
valid = False
reasoning.append(conversion_failure_msg)
if not set(cell_measure_var.dimensions).issubset(var.dimensions):
valid = False
reasoning.append(
f"Cell measure variable {cell_measure_var_name} must have "
"dimensions which are a subset of "
f"those defined in variable {var.name}.",
)
return Result(BaseCheck.MEDIUM, valid, (self.section_titles["7.2"]), reasoning)
[docs]
def check_cell_measures(self, ds):
"""
7.2 To indicate extra information about the spatial properties of a
variable's grid cells, a cell_measures attribute may be defined for a
variable. This is a string attribute comprising a list of
blank-separated pairs of words of the form "measure: name". "area" and
"volume" are the only defined measures.
The "name" is the name of the variable containing the measure values,
which we refer to as a "measure variable". The dimensions of the
measure variable should be the same as or a subset of the dimensions of
the variable to which they are related, but their order is not
restricted.
The variable must have a units attribute and may have other attributes
such as a standard_name.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
variables = ds.get_variables_by_attributes(
cell_measures=lambda c: c is not None,
)
for var in variables:
result = self._cell_measures_core(ds, var, set(), "dataset variable")
ret_val.append(result)
return ret_val
[docs]
def check_cell_methods(self, ds):
"""
7.3 To describe the characteristic of a field that is represented by cell values, we define the cell_methods attribute
of the variable. This is a string attribute comprising a list of blank-separated words of the form "name: method". Each
"name: method" pair indicates that for an axis identified by name, the cell values representing the field have been
determined or derived by the specified method.
name can be a dimension of the variable, a scalar coordinate variable, a valid standard name, or the word "area"
values of method should be selected from the list in Appendix E, Cell Methods, which includes point, sum, mean, maximum,
minimum, mid_range, standard_deviation, variance, mode, and median. Case is not significant in the method name. Some
methods (e.g., variance) imply a change of units of the variable, as is indicated in Appendix E, Cell Methods.
Because the default interpretation for an intensive quantity differs from that of an extensive quantity and because this
distinction may not be understood by some users of the data, it is recommended that every data variable include for each
of its dimensions and each of its scalar coordinate variables the cell_methods information of interest (unless this
information would not be meaningful). It is especially recommended that cell_methods be explicitly specified for each
spatio-temporal dimension and each spatio-temporal scalar coordinate variable.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
# CONFORMANCE IMPLEMENTATION 7.3 1/3
psep = regex.compile(
r"(?P<vars>\w+: )+(?P<method>\w+) ?(?P<where>where (?P<wtypevar>\w+) "
r"?(?P<over>over (?P<otypevar>\w+))?| ?)(?:\((?P<paren_contents>[^)]*)\))?",
)
for var in ds.get_variables_by_attributes(cell_methods=lambda x: x is not None):
if not getattr(var, "cell_methods", ""):
continue
method = getattr(var, "cell_methods", "")
valid_attribute = TestCtx(
BaseCheck.HIGH,
self.section_titles["7.3"],
) # changed from 7.1 to 7.3
valid_attribute.assert_true(
regex.match(psep, method) is not None,
f'"{method}" is not a valid format for cell_methods attribute of "{var.name}"'
"",
)
ret_val.append(valid_attribute.to_result())
valid_cell_names = TestCtx(BaseCheck.MEDIUM, self.section_titles["7.3"])
# check that the name is valid
for match in regex.finditer(psep, method):
# it is possible to have "var1: var2: ... varn: ...", so handle
# that case
for var_raw_str in match.captures("vars"):
# strip off the ' :' at the end of each match
var_str = var_raw_str[:-2]
if (
var_str in var.dimensions
or var_str == "area"
or var_str in getattr(var, "coordinates", "")
):
valid = True
else:
valid = False
valid_cell_names.assert_true(
valid,
f"{var.name}'s cell_methods name component {var_str} does not match a dimension, "
"area or auxiliary coordinate",
)
ret_val.append(valid_cell_names.to_result())
# Checks if the method value of the 'name: method' pair is acceptable
valid_cell_methods = TestCtx(BaseCheck.MEDIUM, self.section_titles["7.3"])
for match in regex.finditer(psep, method):
# CF section 7.3 - "Case is not significant in the method name."
valid_cell_methods.assert_true(
match.group("method").lower() in self.cell_methods,
"{}:cell_methods contains an invalid method: {}"
"".format(var.name, match.group("method")),
)
ret_val.append(valid_cell_methods.to_result())
for match in regex.finditer(psep, method):
if match.group("paren_contents") is not None:
# split along spaces followed by words with a colon
# not sure what to do if a comment contains a colon!
ret_val.append(
self._check_cell_methods_paren_info(
match.group("paren_contents"),
var,
).to_result(),
)
return ret_val
def _check_cell_methods_paren_info(self, paren_contents, var):
"""
Checks that the spacing and/or comment info contained inside the
parentheses in cell_methods is well-formed
"""
# IMPLEMENTATION CONFORMANCE REQUIRED 3/3 - comment/paren contents
valid_info = TestCtx(BaseCheck.MEDIUM, self.section_titles["7.3"])
# if there are no colons, this is a simple comment
# TODO: are empty comments considered valid?
if ":" not in paren_contents:
valid_info.out_of += 1
valid_info.score += 1
return valid_info
# otherwise, split into k/v pairs
kv_pair_pat = r"(\S+:)\s+(.*(?=\s+\w+:)|[^:]+$)\s*"
# otherwise, we must split further with intervals coming
# first, followed by non-standard comments
# we need the count of the matches, and re.findall() only returns
# groups if they are present and we wish to see if the entire match
# object concatenated together is the same as the original string
pmatches = list(regex.finditer(kv_pair_pat, paren_contents))
for i, pmatch in enumerate(pmatches):
keyword, val = pmatch.groups()
if keyword == "interval:":
valid_info.out_of += 2
interval_matches = regex.match(
r"^\s*(?P<interval_number>\S+)\s+(?P<interval_units>\S+)\s*$",
val,
)
# attempt to get the number for the interval
if not interval_matches:
valid_info.messages.append(
f'§7.3.3 {var.name}:cell_methods contains an interval specification that does not parse: "{val}". Should be in format "interval: <number> <units>"',
)
else:
try:
float(interval_matches.group("interval_number"))
except ValueError:
valid_info.messages.append(
'§7.3.3 {}:cell_methods contains an interval value that does not parse as a numeric value: "{}".'.format(
var.name,
interval_matches.group("interval_number"),
),
)
else:
valid_info.score += 1
# then the units
try:
Unit(interval_matches.group("interval_units"))
except ValueError:
valid_info.messages.append(
'§7.3.3 {}:cell_methods interval units "{}" is not parsable by UDUNITS.'.format(
var.name,
interval_matches.group("interval_units"),
),
)
else:
valid_info.score += 1
elif keyword == "comment:":
# comments can't really be invalid, except
# if they come first or aren't last, and
# maybe if they contain colons embedded in the
# comment string
valid_info.out_of += 1
if len(pmatches) == 1:
valid_info.messages.append(
f"§7.3.3 If there is no standardized information, the keyword comment: should be omitted for variable {var.name}",
)
# otherwise check that the comment is the last
# item in the parentheses
elif i != len(pmatches) - 1:
valid_info.messages.append(
f'§7.3.3 The non-standard "comment:" element must come after any standard elements in cell_methods for variable {var.name}',
)
#
else:
valid_info.score += 1
else:
valid_info.out_of += 1
valid_info.messages.append(
f'§7.3.3 Invalid cell_methods keyword "{keyword}" for variable {var.name}. Must be one of [interval, comment]',
)
# Ensure concatenated reconstructed matches are the same as the
# original string. If they're not, there's likely a formatting error
valid_info.assert_true(
"".join(m.group(0) for m in pmatches) == paren_contents,
f"§7.3.3 Parenthetical content inside {var.name}:cell_methods is not well formed: {paren_contents}",
)
return valid_info
[docs]
def check_climatological_statistics(self, ds):
"""
7.4 A climatological time coordinate variable does not have a bounds attribute. Instead, it has a climatology
attribute, which names a variable with dimensions (n,2), n being the dimension of the climatological time axis.
Using the units and calendar of the time coordinate variable, element (i,0) of the climatology variable specifies
the beginning of the first subinterval and element (i,1) the end of the last subinterval used to evaluate the
climatological statistics with index i in the time dimension. The time coordinates should be values that are
representative of the climatological time intervals, such that an application which does not recognise climatological
time will nonetheless be able to make a reasonable interpretation.
A climatological axis may use different statistical methods to measure variation among years, within years, and within
days. The methods which can be specified are those listed in Appendix E, Cell Methods and each entry in the cell_methods
attribute may also contain non-standardised information in parentheses after the method. The value of the cell_method
attribute must be in one of the following forms:
- time: method1 within years time: method2 over years
- time: method1 within days time: method2 over days
- time: method1 within days time: method2 over days time: method3 over years
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
reasoning = []
ret_val = []
total_climate_count = 0
valid_climate_count = 0
all_clim_coord_var_names = []
methods = [
"point", # TODO change to appendix import once cf1.7 merged
"sum",
"mean",
"maximum",
"minimum",
"mid_range",
"standard_deviation",
"variance",
"mode",
"median",
]
# find any climatology axis variables; any variables which contain climatological stats will use
# these variables as coordinates
clim_time_coord_vars = ds.get_variables_by_attributes(
climatology=lambda s: s is not None,
)
# first, to determine whether or not we have a valid climatological time
# coordinate variable, we need to make sure it has the attribute "climatology",
# but not the attribute "bounds"
time_vars = cfutil.get_time_variables(ds)
for clim_coord_var in clim_time_coord_vars:
climatology_ctx = TestCtx(BaseCheck.MEDIUM, self.section_titles["7.3"])
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 1/6
if clim_coord_var.name not in time_vars:
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
f"Variable {clim_coord_var.name} is not detected as a time "
"coordinate variable, but has climatology attribute",
)
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED
if hasattr(clim_coord_var, "bounds"):
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
f"Variable {clim_coord_var.name} has a climatology "
"attribute and cannot also have a bounds attribute.",
)
result = Result(
BaseCheck.MEDIUM,
False,
(self.section_titles["7.4"]),
reasoning,
)
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 2/6
# make sure the climatology variable referenced actually exists
elif not isinstance(clim_coord_var.climatology, str):
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
f"Variable {clim_coord_var.name} must have a climatology "
"attribute which is a string",
)
ret_val.append(climatology_ctx.to_result())
continue
elif clim_coord_var.climatology not in ds.variables:
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
"Variable {} referenced in time's climatology attribute does not exist".format(
ds.variables["time"].climatology,
),
)
else:
clim_var = ds.variables[clim_coord_var.climatology]
#
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 4/6
if clim_var.dtype is str or not np.issubdtype(clim_var, np.number):
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
f"Climatology variable {clim_var.name} is not a numeric type",
)
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 6/6
if hasattr(clim_var, "_FillValue") or hasattr(
clim_var,
"missing_value",
):
climatology_ctx.out_of += 1
climatology_ctx.messages.append(
f"Climatology variable {clim_var.name} may not contain "
"attributes _FillValue or missing_value",
)
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 5/6
for same_attr in ("units", "standard_name", "calendar"):
if hasattr(clim_var, same_attr):
climatology_ctx.assert_true(
getattr(clim_var, same_attr)
== getattr(clim_coord_var, same_attr, None),
f"Attribute {same_attr} must have the same value in both "
f"variables {clim_var.name} and {clim_coord_var.name}",
)
ret_val.append(climatology_ctx.to_result())
# check that coordinate bounds are in the proper order.
# make sure last elements are boundary variable specific dimensions
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 3/6
if (
clim_coord_var.dimensions[:]
!= ds.variables[clim_coord_var.climatology].dimensions[
: clim_coord_var.ndim
]
):
total_climate_count += 1
reasoning.append(
f"Climatology variable coordinates are in improper order: {ds.variables[clim_coord_var.climatology].dimensions}. Bounds-specific dimensions should be last",
)
result = Result(
BaseCheck.MEDIUM,
(valid_climate_count, total_climate_count),
(self.section_titles["7.4"]),
reasoning,
)
ret_val.append(result)
# IMPLEMENTATION CONFORMANCE 7.4 REQUIRED 3/6 - dim size of 2 for
# climatology-specific dimension
elif (
ds.dimensions[
ds.variables[clim_coord_var.climatology].dimensions[-1]
].size
!= 2
):
reasoning.append(
f'Climatology dimension "{ds.variables[clim_coord_var.climatology].name}" should only contain two elements',
)
total_climate_count += 1
result = Result(
BaseCheck.MEDIUM,
(valid_climate_count, total_climate_count),
(self.section_titles["7.4"]),
reasoning,
)
ret_val.append(result)
# passed all these checks, so we can add this clim_coord_var to our total list
all_clim_coord_var_names.append(clim_coord_var.name)
# for any variables which use a climatology time coordinate variable as a coordinate,
# if they have a cell_methods attribute, it must comply with the form:
# time: method1 within years time: method2 over years
# time: method1 within days time: method2 over days
# time: method1 within days time: method2 over days time: method3 over years
# optionally followed by parentheses for explaining additional
# info, e.g.
# "time: method1 within years time: method2 over years (sidereal years)"
meth_regex = "(?:{})".format(
"|".join(methods),
) # "or" comparison for the methods
re_string = (
rf"^time: {meth_regex} within (years|days)" # regex string to test
rf" time: {meth_regex} over \1(?<=days)(?: time: {meth_regex} over years)?"
r"(?: \([^)]+\))?$"
)
# find any variables with a valid climatological cell_methods
for cell_method_var in ds.get_variables_by_attributes(
cell_methods=lambda s: s is not None,
):
if any(
dim in all_clim_coord_var_names for dim in cell_method_var.dimensions
):
total_climate_count += 1
if not regex.search(re_string, cell_method_var.cell_methods):
reasoning.append(
f'The "time: method within years/days over years/days" format is not correct in variable {cell_method_var.name}.',
)
else:
valid_climate_count += 1
result = Result(
BaseCheck.MEDIUM,
(valid_climate_count, total_climate_count),
(self.section_titles["7.4"]),
reasoning,
)
ret_val.append(result)
return ret_val
###############################################################################
# Chapter 8: Reduction of Dataset Size
###############################################################################
[docs]
def check_packed_data(self, ds):
"""
8.1 Simple packing may be achieved through the use of the optional NUG defined attributes scale_factor and
add_offset. After the data values of a variable have been read, they are to be multiplied by the scale_factor,
and have add_offset added to them.
The units of a variable should be representative of the unpacked data.
If the scale_factor and add_offset attributes are of the same data type as the associated variable, the unpacked
data is assumed to be of the same data type as the packed data. However, if the scale_factor and add_offset
attributes are of a different data type from the variable (containing the packed data) then the unpacked data
should match the type of these attributes, which must both be of type float or both be of type double. An additional
restriction in this case is that the variable containing the packed data must be of type byte, short or int. It is
not advised to unpack an int into a float as there is a potential precision loss.
When data to be packed contains missing values the attributes that indicate missing values (_FillValue, valid_min,
valid_max, valid_range) must be of the same data type as the packed data.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for name, var in ds.variables.items():
add_offset = getattr(var, "add_offset", None)
scale_factor = getattr(var, "scale_factor", None)
if not (add_offset or scale_factor):
continue
valid = True
reasoning = []
# if only one of these attributes is defined, assume they
# are the same type (value doesn't matter here)
if not add_offset:
add_offset = scale_factor
if not scale_factor:
scale_factor = add_offset
# IMPLEMENTATION CONFORMANCE 8.1 REQUIRED 1/3
# scale_factor and add_offset same type
if type(add_offset) != type(scale_factor):
valid = False
reasoning.append(
"Attributes add_offset and scale_factor have different data type.",
)
# IMPLEMENTATION CONFORMANCE 8.1 REQUIRED 2/3
# scale_factor and add_offset must be floating point or double
# if not the same type
# FIXME: Check add_offset too.
elif type(scale_factor) != var.dtype.type:
# Check both attributes are type float or double
if not isinstance(scale_factor, (float, np.floating)):
valid = False
reasoning.append(
"Attributes add_offset and scale_factor are not of type float or double.",
)
else:
# Check variable type is byte, short or int
if var.dtype.type not in [
int,
np.int8,
np.int16,
np.int32,
np.int64,
]:
valid = False
# IMPLEMENTATION CONFORMANCE REQUIRED 3/3
# IMPLEMENTATION CONFORMANCE REQUIRED 3/3
reasoning.append(
"Variable is not of type byte, short, or int as required for different type add_offset/scale_factor.",
)
result = Result(
BaseCheck.MEDIUM,
valid,
self.section_titles["8.1"],
reasoning,
)
ret_val.append(result)
reasoning = []
valid = True
# test further with _FillValue , valid_min , valid_max , valid_range
if hasattr(var, "_FillValue"):
if var._FillValue.dtype.type != var.dtype.type:
valid = False
reasoning.append(
f"Type of {name}:_FillValue attribute ({var._FillValue.dtype.name}) does not match variable type ({var.dtype.name})",
)
if hasattr(var, "valid_min"):
if var.valid_min.dtype.type != var.dtype.type:
valid = False
reasoning.append(
f"Type of {name}valid_min attribute ({var.valid_min.dtype.name}) does not match variable type ({var.dtype.name})",
)
if hasattr(var, "valid_max"):
if var.valid_max.dtype.type != var.dtype.type:
valid = False
reasoning.append(
f"Type of {name}:valid_max attribute ({var.valid_max.dtype.name}) does not match variable type ({var.dtype.name})",
)
if hasattr(var, "valid_range"):
if var.valid_range.dtype.type != var.dtype.type:
valid = False
reasoning.append(
f"Type of {name}:valid_range attribute ({var.valid_range.dtype.name}) does not match variable type ({var.dtype.name})",
)
result = Result(
BaseCheck.MEDIUM,
valid,
self.section_titles["8.1"],
reasoning,
)
ret_val.append(result)
return ret_val
[docs]
def check_compression_gathering(self, ds):
"""
At the current time the netCDF interface does not provide for packing
data. However a simple packing may be achieved through the use of the
optional NUG defined attributes scale_factor and add_offset . After the
data values of a variable have been read, they are to be multiplied by
the scale_factor , and have add_offset added to them. If both
attributes are present, the data are scaled before the offset is added.
When scaled data are written, the application should first subtract the
offset and then divide by the scale factor. The units of a variable
should be representative of the unpacked data.
This standard is more restrictive than the NUG with respect to the use
of the scale_factor and add_offset attributes; ambiguities and
precision problems related to data type conversions are resolved by
these restrictions. If the scale_factor and add_offset attributes are
of the same data type as the associated variable, the unpacked data is
assumed to be of the same data type as the packed data. However, if the
scale_factor and add_offset attributes are of a different data type
from the variable (containing the packed data) then the unpacked data
should match the type of these attributes, which must both be of type
float or both be of type double . An additional restriction in this
case is that the variable containing the packed data must be of type
byte , short or int . It is not advised to unpack an int into a float
as there is a potential precision loss.
When data to be packed contains missing values the attributes that
indicate missing values ( _FillValue , valid_min , valid_max ,
valid_range ) must be of the same data type as
the packed data. See Section 2.5.1, “Missing Data” for a discussion of
how applications should treat variables that have attributes indicating
both missing values and transformations defined by a scale and/or
offset.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
for compress_var in ds.get_variables_by_attributes(
compress=lambda s: s is not None,
):
valid = True
reasoning = []
# puts the referenced variable being compressed into a set
compress_set = set(compress_var.compress.split(" "))
if compress_var.ndim != 1:
valid = False
reasoning.append(
f"Compression variable {compress_var.name} may only have one dimension",
)
# IMPLEMENTATION CONFORMANCE 8.2 REQUIRED 1/3
# ensure compression variable is a proper index, and thus is an
# signed or unsigned integer type of some sort
if (compress_var.dtype is str) or (
compress_var.dtype.kind not in {"i", "u"}
):
valid = False
reasoning.append(
f"Compression variable {compress_var.name} must be an integer type to form a proper array index",
)
# IMPLEMENTATION CONFORMANCE 8.2 REQUIRED 2/3
# make sure all the variables referred to are contained by the
# variables.
if not compress_set.issubset(ds.dimensions):
not_in_dims = sorted(compress_set.difference(ds.dimensions))
valid = False
reasoning.append(
f"The following dimensions referenced by the compress attribute of variable {compress_var.name} do not exist: {not_in_dims}",
)
# IMPLEMENTATION CONFORMANCE 8.2 REQUIRED 3/3
# The values of the associated coordinate variable must be in the range
# starting with 0 and going up to the product of the compressed dimension
# sizes minus 1 (CDL index conventions).
# Put the the values of the associated coordinate variable into a list
coord_list_size = [
item.size
for item in ds.dimensions.values()
if item.name in compress_set
]
# get the upper limit of the dimenssion size
upper_limit_size = np.prod(coord_list_size) - 1
for coord_size in coord_list_size:
if coord_size not in range(0, upper_limit_size):
valid = False
reasoning.append(
f"The dimenssion size {coord_size} referenced by the compress attribute is not "
"in the range (0, The product of the compressed dimension sizes minus 1)",
)
result = Result(
BaseCheck.MEDIUM,
valid,
self.section_titles["8.2"],
reasoning,
)
ret_val.append(result)
return ret_val
###############################################################################
# Chapter 9: Discrete Sampling Geometries
###############################################################################
[docs]
def check_feature_type(self, ds):
"""
Check the global attribute featureType for valid CF featureTypes
9.4 A global attribute, featureType, is required for all Discrete Geometry representations except the orthogonal
multidimensional array representation, for which it is highly recommended.
The value assigned to the featureType attribute is case-insensitive.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
# Due to case insensitive requirement, we list the possible featuretypes
# in lower case and check using the .lower() method
feature_list = [
"point",
"timeseries",
"trajectory",
"profile",
"timeseriesprofile",
"trajectoryprofile",
]
feature_type = getattr(ds, "featureType", None)
valid_feature_type = TestCtx(
BaseCheck.HIGH,
"§9.1 Dataset contains a valid featureType",
)
valid_feature_type.assert_true(
feature_type is None or feature_type.lower() in feature_list,
"{} is not a valid CF featureType. It must be one of {}"
"".format(feature_type, ", ".join(feature_list)),
)
return valid_feature_type.to_result()
[docs]
def check_cf_role(self, ds):
"""
Check variables defining cf_role for legal cf_role values.
§9.5 The only acceptable values of cf_role for Discrete Geometry CF
data sets are timeseries_id, profile_id, and trajectory_id
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: compliance_checker.base.Result
"""
valid_roles = ["timeseries_id", "profile_id", "trajectory_id"]
variable_count = 0
valid_cf_role = TestCtx(BaseCheck.HIGH, self.section_titles["9.5"])
for variable in ds.get_variables_by_attributes(cf_role=lambda x: x is not None):
variable_count += 1
cf_role = variable.cf_role
valid_cf_role.assert_true(
cf_role in valid_roles,
"{} is not a valid cf_role value. It must be one of {}"
"".format(cf_role, ", ".join(valid_roles)),
)
if variable_count > 0:
m = (
"§9.5 The only acceptable values of cf_role for Discrete Geometry CF"
+ " data sets are timeseries_id, profile_id, and trajectory_id"
)
valid_cf_role.assert_true(variable_count < 3, m)
return valid_cf_role.to_result()
[docs]
def check_variable_features(self, ds):
"""
Checks the variable feature types match the dataset featureType attribute.
If more than one unique feature type is found, report this as an error.
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
feature_types_found = defaultdict(list)
ret_val = []
feature_list = {
"point",
"timeseries",
"trajectory",
"profile",
"timeseriesprofile",
"trajectoryprofile",
}
# Don't bother checking if it's not a legal featureType
# if the featureType attribute doesn't exist
feature_type = getattr(ds, "featureType", "")
if feature_type is not None and feature_type.lower() not in feature_list:
return []
_feature = feature_type.lower()
for name in self._find_geophysical_vars(ds):
variable_feature = cfutil.guess_feature_type(ds, name)
# If we can't figure it out, don't check it.
if variable_feature is None:
continue
feature_types_found[variable_feature].append(name)
matching_feature = TestCtx(BaseCheck.MEDIUM, self.section_titles["9.1"])
matching_feature.assert_true(
variable_feature.lower() == _feature,
f"{name} is not a {_feature}, it is detected as a {variable_feature}"
"",
)
ret_val.append(matching_feature.to_result())
# create explanation of all of the different featureTypes
# found in the dataset
feature_description = ", ".join(
[
"{} ({})".format(ftr, ", ".join(vrs))
for ftr, vrs in feature_types_found.items()
],
)
all_same_features = TestCtx(BaseCheck.HIGH, self.section_titles["9.1"])
all_same_features.assert_true(
len(feature_types_found) < 2,
f"Different feature types discovered in this dataset: {feature_description}"
"",
)
ret_val.append(all_same_features.to_result())
return ret_val
[docs]
def check_hints(self, ds):
"""
Checks for potentially mislabeled metadata and makes suggestions for how to correct
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
ret_val.extend(self._check_hint_bounds(ds))
return ret_val
def _check_hint_bounds(self, ds):
"""
Checks for variables ending with _bounds, if they are not cell methods,
make the recommendation
:param netCDF4.Dataset ds: An open netCDF dataset
:rtype: list
:return: List of results
"""
ret_val = []
boundary_variables = cfutil.get_cell_boundary_variables(ds)
for name in ds.variables:
if name.endswith("_bounds") and name not in boundary_variables:
msg = (
f"{name} might be a cell boundary variable but there are no variables that define it "
"as a boundary using the `bounds` attribute."
)
result = Result(BaseCheck.LOW, True, self.section_titles["7.1"], [msg])
ret_val.append(result)
return ret_val