Skip to content

netCDF→ zarr conversion throughput benchmark #7

@andersy005

Description

@andersy005

Thanks to @rsignell-usgs's script, I've been playing around with netCDF->Zarr conversion on S3. I am wondering whether there's any throughput data that I can use to make sense of the following measurements I recorded? Or if someone has played with transferring Zarr to S3/GCP in the past, I'd like to know more about this and/or best practices for this kind of task. How to tune Dask cluster to maximize the throughput, etc?

Dask configuration

  • 1 worker
  • 72 threads per worker
Data size in (GB) Chunk size Transfer time (s) Throughput (Mb/s)
5.1 (1, 1032, 289, 288) 285.2 146
5.1 (1, 516, 289, 288) 309.3 135
5.1 (1, 258, 289, 288) 350.7 119
5.1 (1, 129, 289, 288) 439.0 95

Dask configuration

  • 2 workers on the same machine
  • 72 threads per worker
Data size in (GB) Chunk size Transfer time (s) Throughput (Mb/s)
5.1 (1, 1032, 289, 288) 16 2611
5.1 (1, 516, 289, 288) 18 2321
5.1 (1, 258, 289, 288) 28 1492
5.1 (1, 129, 289, 288) 47 889

Here's my script:

import xarray as xr
from pathlib import Path 
from dask.distributed import Client 
import s3fs
import time 

if __name__ == '__main__':

    client = Client(processes=False, n_workers=1, threads_per_worker=72)
    print(client)

    root_dir = Path("/glade/p_old/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS")
    CASE = 'b.e11.B20TRC5CNBDRD.f09_g16'
    list_1 = sorted(root_dir.glob("b.e11.B20TRC5CNBDRD.f09_g16.???.cam.h0.*"))
    # indices of special runs to remove from the original list. 
    # These runs' outputs have additional variables, and/or have special time ranges
    indices = 0, 33, 34 
    updated_list = [item for index, item in enumerate(list_1) if index not in indices]
    
    dset = xr.open_mfdataset(updated_list, concat_dim='ensemble')
    dset = dset.chunk({'ensemble': 1, 'time': 516})

    # Output: S3 Bucket 
    f_zarr = f'zarr-test-bucket/test1/lens/{CASE}'

    # write data using xarray.to_zarr()
    fs = s3fs.S3FileSystem(anon=False)
    d = s3fs.S3Map(f_zarr, s3=fs)
   
    start = time.clock()
    dset.to_zarr(store=d, mode='w')
    print(f'Time taken = {time.clock()-start}')

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions