import os
from tempfile import TemporaryDirectory
import boto3
import dask.bag as db
import fsspec
import ujson
import xarray as xr
from dask_gateway import Gateway, GatewayCluster
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
# Specify the CMIP model and variable to use.
# Here we are using near-surface air temperature from the GISS-E2-1-G GCM
= "GISS-E2-1-G"
model = "tas"
variable # If this code were re-used for a protected bucket, anon should be False.
= True
anon # Note: We are only using the historical data in this example.
# More years of data are available from multiple Shared Socio-Economic Pathways (SSPs) in the s3://nex-gddp-cmip6 bucket.
= f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1*/{variable}/*" s3_path
Generate Kerchunk Reference from CMIP6 NetCDF files
Run this notebook
You can launch this notebook in VEDA JupyterHub by clicking the link below.
Launch in VEDA JupyterHub (requires access)
Learn more
Inside the Hub
This notebook was written on a VEDA JupyterHub instance
See (VEDA Analytics JupyterHub Access)[https://nasa-impact.github.io/veda-docs/veda-jh-access.html] for information about how to gain access.
Outside the Hub
You are welcome to run this anywhere you like (Note: alternatively you can run this on https://daskhub.veda.smce.nasa.gov/, MAAP, locally, …), just make sure that the data is accessible, or get in contact with the VEDA team to enable access.
Approach
This notebook demonstrates how to create a kerchunk reference for the AWS Open Data Registry of NASA Earth Exchange Global Daily Downscaled Projections (NEX-GDDP-CMIP6) NetCDF files on S3. Because the NetCDF files are publicly avaialble, this notebook should be runnable in any environment with the imported libraries, up until the last step where the kerchunk reference file is stored in the veda-data-store-staging S3 bucket, as that is a protected bucket.
To see how to publish a kerchunk reference to a STAC collection, see the Publishing a CMIP6 Kerchunk Reference to STAC notebook.
Step 1: Setup
Import necessary libraries and define some variables for which CMIP6 variable and model we will create references for.
Step 2: Initiate file systems for reading and (temporary) writing
= fsspec.filesystem("s3", anon=anon, skip_instance_cache=False)
fs_read
# Create a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
= TemporaryDirectory()
td = td.name
temp_dir print(f"Writing single file references to {temp_dir}")
Writing single file references to /tmp/tmpugv4rwhn
Step 3: Discover files from S3
# List available files for this model and variable
= sorted(["s3://" + f for f in fs_read.glob(s3_path)])
all_files print(f"{len(all_files)} discovered from {s3_path}")
65 discovered from s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/GISS-E2-1-G/historical/r1i1p1*/tas/*
Step 4: Define some functions for creating and storing Kerchunk reference files for single files
= dict(mode="rb", anon=anon, default_fill_cache=False, default_cache_type="first")
so
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from a NetCDF file.
def generate_json_reference(u):
with fs_read.open(u, **so) as infile:
= u.split("/")[-1].strip(".nc")
fname = SingleHdf5ToZarr(infile, u, inline_threshold=300)
h5chunks return fname, ujson.dumps(h5chunks.translate()).encode()
def write_json(fname, reference_json, temp_dir):
= os.path.join(temp_dir, f"{fname}.json")
outf with open(outf, "wb") as f:
f.write(reference_json)return outf
Test we can create a kerchunk reference for one file.
= generate_json_reference(all_files[0])
fname, ref_json write_json(fname, ref_json, temp_dir)
'/tmp/tmpugv4rwhn/tas_day_GISS-E2-1-G_historical_r1i1p1f2_gn_1950.json'
Step 5: Use a dask cluster to generate references for all the data
Start the cluster and check out the dashboard for active workers, as it may take a few seconds or minutes for them to start up.
This was run on the VEDA JupyterHub which has access to a distributed cluster. You could also create a LocalCluster instance to run the dask bag code below. But because this code is not using DaskArrays, you could also use a regular multiprocessing library to distribute the generate_json_refence
tasks.
= Gateway()
gateway = gateway.list_clusters()
clusters
# connect to an existing cluster - this is useful when the kernel shutdown in the middle of an interactive session
if clusters:
= gateway.connect(clusters[0].name)
cluster else:
= GatewayCluster(shutdown_on_close=True)
cluster
16)
cluster.scale(= cluster.get_client()
client client
Client
Client-1e000625-59cf-11ef-8709-c2be3c95b571
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: /services/dask-gateway/clusters/prod.f49e93767f6f44469809f0227bf42fc8/status |
Cluster Info
GatewayCluster
- Name: prod.f49e93767f6f44469809f0227bf42fc8
- Dashboard: /services/dask-gateway/clusters/prod.f49e93767f6f44469809f0227bf42fc8/status
Generate a dask bag for all the files and store files in the temp_dir
%%time
= db.from_sequence(all_files, partition_size=1)
bag = db.map(generate_json_reference, bag)
result = result.compute()
all_references = [
output_files
write_json(fname, reference_json, temp_dir)for fname, reference_json in all_references
]
CPU times: user 118 ms, sys: 24.9 ms, total: 143 ms
Wall time: 2min 39s
Step 6: Combine individual references into a single consolidated reference
Store it to local storage and test opening it.
%%time
= MultiZarrToZarr(
mzz
output_files,="s3",
remote_protocol={"anon": anon},
remote_options=["time"],
concat_dims={"time": "cf:time"},
coo_map=0,
inline_threshold
)= mzz.translate() multi_kerchunk
CPU times: user 961 ms, sys: 11.1 ms, total: 972 ms
Wall time: 950 ms
Write the kerchunk .json file to local storage
= f"combined_CMIP6_daily_{model}_{variable}_kerchunk.json"
output_fname = os.path.join(temp_dir, output_fname)
output_location with open(f"{output_location}", "wb") as f:
print(f"Writing combined kerchunk reference file {output_location}")
f.write(ujson.dumps(multi_kerchunk).encode())
Writing combined kerchunk reference file /tmp/tmpugv4rwhn/combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk.json
# open dataset as zarr object using fsspec reference file system and Xarray
= fsspec.filesystem(
fs "reference", fo=multi_kerchunk, remote_protocol="s3", remote_options={"anon": anon}
)= fs.get_mapper("") m
# Check the data
= xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))
ds ##noqa display(ds)
<xarray.Dataset> Size: 82GB Dimensions: (lat: 600, lon: 1440, time: 23725) Coordinates: * lat (lat) float64 5kB -59.88 -59.62 -59.38 -59.12 ... 89.38 89.62 89.88 * lon (lon) float64 12kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9 * time (time) object 190kB 1950-01-01 12:00:00 ... 2014-12-31 12:00:00 Data variables: tas (time, lat, lon) float32 82GB ... Attributes: (12/23) Conventions: CF-1.7 activity: NEX-GDDP-CMIP6 cmip6_institution_id: NASA-GISS cmip6_license: CC-BY-SA 4.0 cmip6_source_id: GISS-E2-1-G contact: Dr. Rama Nemani: rama.nemani@nasa.gov, Dr. Bridget... ... ... scenario: historical source: BCSD title: GISS-E2-1-G, r1i1p1f2, historical, global downscal... tracking_id: 25d6baa3-0404-4eba-a3f1-afddbf69d4cc variant_label: r1i1p1f2 version: 1.0
# Close the cluster
client.close() cluster.close()
Final Step: Upload Kerchunk to VEDA Bucket
You can skip this if you are not trying to upload the reference file to veda-data-store-staging.
If you are on the VEDA JupyterHub, you should have access to veda-data-store-staging.
= boto3.client("s3")
s3 = "veda-data-store-staging"
upload_bucket_name = s3.upload_file(
response
output_location,
upload_bucket_name,f"cmip6-{model}-{variable}-kerchunk/{output_fname}",
)# None is good.
print(f"Response uploading {output_fname} to {upload_bucket_name} was {response}.")
Response uploading combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk.json to veda-data-store-staging was None.
2023-10-04 21:59:02,340 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client