Reading and writting zarr files with xarray#
Created: 2023-03-20
Modified: 2024-06-13
The zarr format is a file storage based specification for chunked, compressed, N-dimensional arrays. The format is based on an open-source specification and its main goal is to make cloud data read/write a bit easier and more effective.
The main propblems in data storage are:
Read/write data that is larger than memory
Being able to parallelize computations
Reduce the I/O botteneck
Compression
Speed
One solution is to use a chunked* parallel computing framework and a chunked parallel storage library. Zarr helps us with the latter.
In this example we will load an ocean model data, stored as netCDF and served via THREDDS, subset it and save as zarr. Let’s start by saving a single time step for the surface layer temperature and salinity.
* Many data formats can take advantage of storing the data in chunks for faster access, the zarr approach is different in that each chunk is a different object in cloud storage, making them better for parallel access. The chunks can be compressed to reduce their size and improve cloud performance even further. Zarr has a nice tutorial on how to balance chunk size for performance. Check it out: https://zarr.readthedocs.io/en/stable/tutorial.html#chunk-optimizations.
import xarray as xr
url = "https://tds.marine.rutgers.edu/thredds/dodsC/roms/doppio/2017_da/avg/Averages_Best_Excluding_Day1"
ds = xr.open_dataset(url)
time_slice = {"time": "2022-06-06"}
surface = {"s_rho": -1}
ds = ds[["temp", "salt"]].sel(time_slice).isel(surface)
ds
<xarray.Dataset> Size: 821kB Dimensions: (time: 1, eta_rho: 106, xi_rho: 242) Coordinates: s_rho float64 8B -0.0125 lon_rho (eta_rho, xi_rho) float64 205kB ... lat_rho (eta_rho, xi_rho) float64 205kB ... * time (time) datetime64[ns] 8B 2022-06-06T12:00:00 time_run (time) datetime64[ns] 8B ... Dimensions without coordinates: eta_rho, xi_rho Data variables: temp (time, eta_rho, xi_rho) float64 205kB ... salt (time, eta_rho, xi_rho) float64 205kB ... Attributes: (12/46) file: doppio_avg_6733_0001.nc format: netCDF-4/HDF5 file Conventions: CF-1.4, SGRID-0.3 type: ROMS/TOMS nonlinear model averages file title: ROMS doppio Real-Time Operational PSAS F... var_info: ../Data/varinfo1040t_daily.dat ... ... frc_file_07: ../Data/Winds_ncepnam_3hourly_MAB_and_Go... cdm_data_type: GRID featureType: GRID location: Proto fmrc:doppio_2017_da_avg summary: doppio DODS_EXTRA.Unlimited_Dimension: ocean_time
import humanize
humanize.naturalsize(ds.nbytes)
'820.9 kB'
It is a small subset but it is enough to ilustrate zarr’s compression options.
Now let’s choose a compression level and save it as zarr.
import zarr
compressor = zarr.Blosc(clevel=2, shuffle=-1)
fname = "doppio/doppio_compressed.zarr"
ds.to_zarr(
fname,
mode="w",
safe_chunks=True,
consolidated=True,
encoding={var: {"compressor": compressor} for var in ds.variables},
);
!tree doppio/*zarr
!du -h doppio/*zarr
doppio/doppio_compressed.zarr
├── lat_rho
│ └── 0.0
├── lon_rho
│ └── 0.0
├── salt
│ └── 0.0.0
├── s_rho
│ └── 0
├── temp
│ └── 0.0.0
├── time
│ └── 0
└── time_run
└── 0
7 directories, 7 files
144K doppio/doppio_compressed.zarr/salt
156K doppio/doppio_compressed.zarr/lat_rho
148K doppio/doppio_compressed.zarr/temp
16K doppio/doppio_compressed.zarr/time
16K doppio/doppio_compressed.zarr/s_rho
16K doppio/doppio_compressed.zarr/time_run
156K doppio/doppio_compressed.zarr/lon_rho
676K doppio/doppio_compressed.zarr
The first thing to observe is that the zarr format is a directory based storage. That structure should be familiar for HDF5 users. However, instead of being a filesystem inside a filesystem, zarr is layed out directly on the disk filesystem.
Each variable and coordinate has its own directory and the data chunks are stored in subdirectories. For more information check this awesome presentation from one of zarr authors.
Note that the stored size is quite smaller too! We went from 820.9 kB to 676 kB. Zarr has many modern compression oprions as plugins, including some bitinformation based methods.
The data attributes, groups, and metdata are stored in the .zattrs
, .zgroup
, and .zmetadata
. They are plain text JSON files and easy to parse:
import json
with open("doppio/doppio_compressed.zarr/.zmetadata") as f:
zmetadata = json.loads(f.read())
zmetadata
{'metadata': {'.zattrs': {'CPP_options': 'DOPPIO, ADD_FSOBC, ADD_M2OBC, ANA_BSFLUX, ANA_BTFLUX, ASSUMED_SHAPE, ATM_PRESS, AVERAGES, !BOUNDARY_A !COLLECT_ALL..., CHARNOK, CRAIG_BANNER, CURVGRID, DEFLATE, DJ_GRADPS, DOUBLE_PRECISION, FLOATS, FORWARD_WRITE, GLS_MIXING, HDF5, KANTHA_CLAYSON, MASKING, MIX_GEO_TS, MIX_S_UV, MPI, NONLINEAR, NONLIN_EOS, NO_LBC_ATT, N2S2_HORAVG, OUT_DOUBLE, POWER_LAW, PROFILE, K_GSCHEME, REDUCE_ALLREDUCE, !RST_SINGLE, SALINITY, SOLAR_SOURCE, SOLVE3D, SSH_TIDES, TS_DIF2, UV_ADV, UV_COR, UV_U3HADVECTION, UV_C4VADVECTION, UV_QDRAG, UV_TIDES, UV_VIS2, VAR_RHO_2D, WIND_MINUS_CURRENT',
'Conventions': 'CF-1.4, SGRID-0.3',
'DODS_EXTRA.Unlimited_Dimension': 'ocean_time',
'NLM_LBC': '\nEDGE: WEST SOUTH EAST NORTH \nzeta: Cha Cha Cha Clo \nubar: Fla Fla Fla Clo \nvbar: Fla Fla Fla Clo \nu: RadNud RadNud RadNud Clo \nv: RadNud RadNud RadNud Clo \ntemp: Rad Rad Rad Clo \nsalt: Rad Rad Rad Clo \ntke: Gra Gra Gra Clo',
'NLM_TADV': '\nADVECTION: HORIZONTAL VERTICAL \ntemp: Akima4 Akima4 \nsalt: Akima4 Akima4',
'_CoordSysBuilder': 'ucar.nc2.dataset.conv.CF1Convention',
'ana_file': 'ROMS/Functionals/ana_btflux.h',
'avg_base': 'doppio_avg_6733',
'cdm_data_type': 'GRID',
'clm_file_01': '../Data/doppio_clm.nc',
'code_dir': '/home/julia/ROMS/doppio/svn1040t',
'compiler_command': '/opt/sw/apps/intel-18.0.1/openmpi/3.1.2/bin/mpif90',
'compiler_flags': '-fp-model precise -heap-arrays -ip -O3 -traceback -check uninit',
'compiler_system': 'ifort',
'cpu': 'x86_64',
'featureType': 'GRID',
'file': 'doppio_avg_6733_0001.nc',
'flt_file': 'doppio_flt_6733.nc',
'format': 'netCDF-4/HDF5 file',
'fpos_file': 'floats.in',
'frc_file_01': '../PSAS/doppio_flx_6733_outer2.nc',
'frc_file_02': '../Data/Pair_ncepnam_3hourly_MAB_and_GoM.nc',
'frc_file_03': '../Data/Qair_ncepnam_3hourly_MAB_and_GoM.nc',
'frc_file_04': '../Data/rain_ncepnam_3hourly_MAB_and_GoM.nc',
'frc_file_05': '../Data/swrad_daily_ncepnam_3hourly_MAB_and_GoM.nc',
'frc_file_06': '../Data/Tair_ncepnam_3hourly_MAB_and_GoM.nc',
'frc_file_07': '../Data/Winds_ncepnam_3hourly_MAB_and_GoM.nc',
'grd_file': '/home/om/roms/doppio/7km/grid_doppio_JJA_v13.nc',
'header_dir': '/home/julia/ROMS/doppio/Compile/fwd',
'header_file': 'doppio.h',
'his_base': 'doppio_his_6733',
'history': 'ROMS/TOMS, Version 3.9, Tuesday - June 11, 2024 - 4:33:26 AM ;\nFMRC Best_Excluding_Day1 Dataset',
'ini_file': 'doppio_ini_6733.nc',
'location': 'Proto fmrc:doppio_2017_da_avg',
'nud_file': '/home/om/roms/doppio/7km/doppio_nudgcoef_7km_1500-2000_GS.nc',
'os': 'Linux',
'rst_file': 'doppio_rst_6733.nc',
'script_file': 'nl_ocean_doppio.in',
'summary': 'doppio',
'svn_rev': '1040',
'svn_url': 'https://www.myroms.org/svn/src/trunk',
'tide_file': '/home/om/roms/doppio/7km/doppio_tide_7km.nc',
'tiling': '004x004',
'title': 'ROMS doppio Real-Time Operational PSAS Forecast System Version 1 FMRC Averages',
'type': 'ROMS/TOMS nonlinear model averages file',
'var_info': '../Data/varinfo1040t_daily.dat'},
'.zgroup': {'zarr_format': 2},
'lat_rho/.zarray': {'chunks': [106, 242],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<f8',
'fill_value': 'NaN',
'filters': None,
'order': 'C',
'shape': [106, 242],
'zarr_format': 2},
'lat_rho/.zattrs': {'_ARRAY_DIMENSIONS': ['eta_rho', 'xi_rho'],
'_ChunkSizes': [106, 242],
'_CoordinateAxisType': 'Lat',
'field': 'lat_rho, scalar',
'long_name': 'latitude of RHO-points',
'standard_name': 'latitude',
'units': 'degrees_north'},
'lon_rho/.zarray': {'chunks': [106, 242],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<f8',
'fill_value': 'NaN',
'filters': None,
'order': 'C',
'shape': [106, 242],
'zarr_format': 2},
'lon_rho/.zattrs': {'_ARRAY_DIMENSIONS': ['eta_rho', 'xi_rho'],
'_ChunkSizes': [106, 242],
'_CoordinateAxisType': 'Lon',
'field': 'lon_rho, scalar',
'long_name': 'longitude of RHO-points',
'standard_name': 'longitude',
'units': 'degrees_east'},
's_rho/.zarray': {'chunks': [],
'compressor': None,
'dtype': '<f8',
'fill_value': 'NaN',
'filters': None,
'order': 'C',
'shape': [],
'zarr_format': 2},
's_rho/.zattrs': {'_ARRAY_DIMENSIONS': [],
'_CoordinateAxes': 's_rho',
'_CoordinateAxisType': 'GeoZ',
'_CoordinateTransformType': 'Vertical',
'_CoordinateZisPositive': 'up',
'field': 's_rho, scalar',
'formula_terms': 's: s_rho C: Cs_r eta: zeta depth: h depth_c: hc',
'long_name': 'S-coordinate at RHO-points',
'positive': 'up',
'standard_name': 'ocean_s_coordinate_g2',
'units': '',
'valid_max': 0.0,
'valid_min': -1.0},
'salt/.zarray': {'chunks': [1, 106, 242],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<f8',
'fill_value': 'NaN',
'filters': None,
'order': 'C',
'shape': [1, 106, 242],
'zarr_format': 2},
'salt/.zattrs': {'_ARRAY_DIMENSIONS': ['time', 'eta_rho', 'xi_rho'],
'_ChunkSizes': [1, 20, 53, 121],
'coordinates': 'time_run time s_rho lat_rho lon_rho ',
'field': 'salinity, scalar, series',
'grid': 'grid',
'location': 'face',
'long_name': 'time-averaged salinity',
'standard_name': 'sea_water_practical_salinity',
'time': 'ocean_time'},
'temp/.zarray': {'chunks': [1, 106, 242],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<f8',
'fill_value': 'NaN',
'filters': None,
'order': 'C',
'shape': [1, 106, 242],
'zarr_format': 2},
'temp/.zattrs': {'_ARRAY_DIMENSIONS': ['time', 'eta_rho', 'xi_rho'],
'_ChunkSizes': [1, 20, 53, 121],
'coordinates': 'time_run time s_rho lat_rho lon_rho ',
'field': 'temperature, scalar, series',
'grid': 'grid',
'location': 'face',
'long_name': 'time-averaged potential temperature',
'standard_name': 'sea_water_potential_temperature',
'time': 'ocean_time',
'units': 'Celsius'},
'time/.zarray': {'chunks': [1],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<i8',
'fill_value': None,
'filters': None,
'order': 'C',
'shape': [1],
'zarr_format': 2},
'time/.zattrs': {'_ARRAY_DIMENSIONS': ['time'],
'_CoordinateAxisType': 'Time',
'calendar': 'proleptic_gregorian',
'long_name': 'Forecast time for ForecastModelRunCollection',
'standard_name': 'time',
'units': 'days since 2022-06-06 12:00:00'},
'time_run/.zarray': {'chunks': [1],
'compressor': {'blocksize': 0,
'clevel': 2,
'cname': 'lz4',
'id': 'blosc',
'shuffle': -1},
'dtype': '<i8',
'fill_value': None,
'filters': None,
'order': 'C',
'shape': [1],
'zarr_format': 2},
'time_run/.zattrs': {'_ARRAY_DIMENSIONS': ['time'],
'_CoordinateAxisType': 'RunTime',
'calendar': 'proleptic_gregorian',
'long_name': 'run times for coordinate = time',
'standard_name': 'forecast_reference_time',
'units': 'days since 2022-06-05 12:00:00'}},
'zarr_consolidated_format': 1}
New let’s read back the data to check if we can roundtrip it back to the original dataset.
subset = xr.open_zarr(fname)
subset
<xarray.Dataset> Size: 821kB Dimensions: (eta_rho: 106, xi_rho: 242, time: 1) Coordinates: lat_rho (eta_rho, xi_rho) float64 205kB ... lon_rho (eta_rho, xi_rho) float64 205kB ... s_rho float64 8B ... * time (time) datetime64[ns] 8B 2022-06-06T12:00:00 time_run (time) datetime64[ns] 8B ... Dimensions without coordinates: eta_rho, xi_rho Data variables: salt (time, eta_rho, xi_rho) float64 205kB ... temp (time, eta_rho, xi_rho) float64 205kB ... Attributes: (12/46) CPP_options: DOPPIO, ADD_FSOBC, ADD_M2OBC, ANA_BSFLUX... Conventions: CF-1.4, SGRID-0.3 DODS_EXTRA.Unlimited_Dimension: ocean_time NLM_LBC: \nEDGE: WEST SOUTH EAST NORTH \nz... NLM_TADV: \nADVECTION: HORIZONTAL VERTICAL ... _CoordSysBuilder: ucar.nc2.dataset.conv.CF1Convention ... ... svn_url: https://www.myroms.org/svn/src/trunk tide_file: /home/om/roms/doppio/7km/doppio_tide_7km.nc tiling: 004x004 title: ROMS doppio Real-Time Operational PSAS F... type: ROMS/TOMS nonlinear model averages file var_info: ../Data/varinfo1040t_daily.dat
And a quick plot to check the data.
subset["temp"].compute().squeeze().plot(x="lon_rho", y="lat_rho");
What is the current workflow and what are the altearnatives? Most ocean data are stored as modern netCDF files that are, under the hood HDF5 files with more strict metadata stuture. HDF5 has some limitations like,
no thread-based parallelism
cannot do parallel writes with compression
no support for could object stores
However, for most workflows what really matters is the chunking, not the data format. Leaving the parallelism, compression, and cloud support to be built on top of it with dask
, numcodecs
, and fsspec
, respectively. That raises the question: Should one convert all the existing data to zarr
? Luckily no! We can adopt a more inexpensive workflow and kerchunk to create virtual cloud-optimized CF-compliant datasets that access files in any format using the Zarr library.
We can write the data in whatever format we need (maybe you are NASA and require HDF5, maybe you have R users who like netcdf, or want to use a visualization tool that only reads geotiff), then rechunk the data to best support the expected use cases.