Generate Kerchunk Reference from CMIP6 NetCDF files

Tutorial for data providers who want to create a kerchunk reference for NetCDF files.
Author

Aimee Barciauskas

Published

November 17, 2023

Run this notebook

You can launch this notebook in VEDA JupyterHub by clicking the link below.

Launch in VEDA JupyterHub (requires access)

Learn more

Inside the Hub

This notebook was written on a VEDA JupyterHub instance

See (VEDA Analytics JupyterHub Access)[https://nasa-impact.github.io/veda-docs/veda-jh-access.html] for information about how to gain access.

Outside the Hub

You are welcome to run this anywhere you like (Note: alternatively you can run this on https://daskhub.veda.smce.nasa.gov/, MAAP, locally, …), just make sure that the data is accessible, or get in contact with the VEDA team to enable access.

Approach

This notebook demonstrates how to create a kerchunk reference for the AWS Open Data Registry of NASA Earth Exchange Global Daily Downscaled Projections (NEX-GDDP-CMIP6) NetCDF files on S3. Because the NetCDF files are publicly avaialble, this notebook should be runnable in any environment with the imported libraries, up until the last step where the kerchunk reference file is stored in the veda-data-store-staging S3 bucket, as that is a protected bucket.

To see how to publish a kerchunk reference to a STAC collection, see the Publishing a CMIP6 Kerchunk Reference to STAC notebook.

Step 1: Setup

Import necessary libraries and define some variables for which CMIP6 variable and model we will create references for.

import os
from tempfile import TemporaryDirectory

import boto3
import dask.bag as db
import fsspec
import ujson
import xarray as xr
from dask_gateway import Gateway, GatewayCluster
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

# Specify the CMIP model and variable to use.
# Here we are using near-surface air temperature from the GISS-E2-1-G GCM
model = "GISS-E2-1-G"
variable = "tas"
# If this code were re-used for a protected bucket, anon should be False.
anon = True
# Note: We are only using the historical data in this example.
# More years of data are available from multiple Shared Socio-Economic Pathways (SSPs) in the s3://nex-gddp-cmip6 bucket.
s3_path = f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1*/{variable}/*"

Step 2: Initiate file systems for reading and (temporary) writing

fs_read = fsspec.filesystem("s3", anon=anon, skip_instance_cache=False)

# Create a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
print(f"Writing single file references to {temp_dir}")
Writing single file references to /tmp/tmpugv4rwhn

Step 3: Discover files from S3

# List available files for this model and variable
all_files = sorted(["s3://" + f for f in fs_read.glob(s3_path)])
print(f"{len(all_files)} discovered from {s3_path}")
65 discovered from s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/GISS-E2-1-G/historical/r1i1p1*/tas/*

Step 4: Define some functions for creating and storing Kerchunk reference files for single files

so = dict(mode="rb", anon=anon, default_fill_cache=False, default_cache_type="first")

# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from a NetCDF file.


def generate_json_reference(u):
    with fs_read.open(u, **so) as infile:
        fname = u.split("/")[-1].strip(".nc")
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        return fname, ujson.dumps(h5chunks.translate()).encode()


def write_json(fname, reference_json, temp_dir):
    outf = os.path.join(temp_dir, f"{fname}.json")
    with open(outf, "wb") as f:
        f.write(reference_json)
    return outf

Test we can create a kerchunk reference for one file.

fname, ref_json = generate_json_reference(all_files[0])
write_json(fname, ref_json, temp_dir)
'/tmp/tmpugv4rwhn/tas_day_GISS-E2-1-G_historical_r1i1p1f2_gn_1950.json'

Step 5: Use a dask cluster to generate references for all the data

Start the cluster and check out the dashboard for active workers, as it may take a few seconds or minutes for them to start up.

This was run on the VEDA JupyterHub which has access to a distributed cluster. You could also create a LocalCluster instance to run the dask bag code below. But because this code is not using DaskArrays, you could also use a regular multiprocessing library to distribute the generate_json_refence tasks.

gateway = Gateway()
clusters = gateway.list_clusters()

# connect to an existing cluster - this is useful when the kernel shutdown in the middle of an interactive session
if clusters:
    cluster = gateway.connect(clusters[0].name)
else:
    cluster = GatewayCluster(shutdown_on_close=True)

cluster.scale(16)
client = cluster.get_client()
client

Client

Client-1e000625-59cf-11ef-8709-c2be3c95b571

Connection method: Cluster object Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/prod.f49e93767f6f44469809f0227bf42fc8/status

Cluster Info

GatewayCluster

Generate a dask bag for all the files and store files in the temp_dir

%%time
bag = db.from_sequence(all_files, partition_size=1)
result = db.map(generate_json_reference, bag)
all_references = result.compute()
output_files = [
    write_json(fname, reference_json, temp_dir)
    for fname, reference_json in all_references
]
CPU times: user 118 ms, sys: 24.9 ms, total: 143 ms
Wall time: 2min 39s

Step 6: Combine individual references into a single consolidated reference

Store it to local storage and test opening it.

%%time
mzz = MultiZarrToZarr(
    output_files,
    remote_protocol="s3",
    remote_options={"anon": anon},
    concat_dims=["time"],
    coo_map={"time": "cf:time"},
    inline_threshold=0,
)
multi_kerchunk = mzz.translate()
CPU times: user 961 ms, sys: 11.1 ms, total: 972 ms
Wall time: 950 ms

Write the kerchunk .json file to local storage

output_fname = f"combined_CMIP6_daily_{model}_{variable}_kerchunk.json"
output_location = os.path.join(temp_dir, output_fname)
with open(f"{output_location}", "wb") as f:
    print(f"Writing combined kerchunk reference file {output_location}")
    f.write(ujson.dumps(multi_kerchunk).encode())
Writing combined kerchunk reference file /tmp/tmpugv4rwhn/combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk.json
# open dataset as zarr object using fsspec reference file system and Xarray
fs = fsspec.filesystem(
    "reference", fo=multi_kerchunk, remote_protocol="s3", remote_options={"anon": anon}
)
m = fs.get_mapper("")
# Check the data
ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))
display(ds)  ##noqa
<xarray.Dataset> Size: 82GB
Dimensions:  (lat: 600, lon: 1440, time: 23725)
Coordinates:
  * lat      (lat) float64 5kB -59.88 -59.62 -59.38 -59.12 ... 89.38 89.62 89.88
  * lon      (lon) float64 12kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
  * time     (time) object 190kB 1950-01-01 12:00:00 ... 2014-12-31 12:00:00
Data variables:
    tas      (time, lat, lon) float32 82GB ...
Attributes: (12/23)
    Conventions:           CF-1.7
    activity:              NEX-GDDP-CMIP6
    cmip6_institution_id:  NASA-GISS
    cmip6_license:         CC-BY-SA 4.0
    cmip6_source_id:       GISS-E2-1-G
    contact:               Dr. Rama Nemani: rama.nemani@nasa.gov, Dr. Bridget...
    ...                    ...
    scenario:              historical
    source:                BCSD
    title:                 GISS-E2-1-G, r1i1p1f2, historical, global downscal...
    tracking_id:           25d6baa3-0404-4eba-a3f1-afddbf69d4cc
    variant_label:         r1i1p1f2
    version:               1.0
# Close the cluster
client.close()
cluster.close()

Final Step: Upload Kerchunk to VEDA Bucket

You can skip this if you are not trying to upload the reference file to veda-data-store-staging.

If you are on the VEDA JupyterHub, you should have access to veda-data-store-staging.

s3 = boto3.client("s3")
upload_bucket_name = "veda-data-store-staging"
response = s3.upload_file(
    output_location,
    upload_bucket_name,
    f"cmip6-{model}-{variable}-kerchunk/{output_fname}",
)
# None is good.
print(f"Response uploading {output_fname} to {upload_bucket_name} was {response}.")
Response uploading combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk.json to veda-data-store-staging was None.
2023-10-04 21:59:02,340 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client