Skip to content

Commit

Permalink
Add parallel processing with dask.distributed (#16)
Browse files Browse the repository at this point in the history
Add command line arguments `--dask-distributed-local-core-fraction` and `--dask-distributed-local-memory-fraction` which can be used to enable multiprocessing by setting the former to a non-zero fraction representing the fraction of CPU cores on the local machine to use for parallel processing with `dask.distrubted.LocalCluster`. The latter argument set what fraction of the total system memory is allocated to the workers.
  • Loading branch information
leifdenby authored Aug 12, 2024
1 parent 3297c75 commit 660aabf
Show file tree
Hide file tree
Showing 9 changed files with 600 additions and 55 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci-pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ jobs:
linting:
name: "pre-commit hooks"
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
# don't use python3.12 because flake8 finds extra issues with that
# version
python-version: "3.11"
- uses: pre-commit/[email protected]
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [unreleased](https://github.com/mllam/mllam-data-prep/)

[All changes](https://github.com/mllam/mllam-data-prep/compare/HEAD...v0.2.0)

### Added

- add support for parallel processing using `dask.distributed` with command
line flags `--dask-distributed-local-core-fraction` and
`--dask-distributed-local-memory-fraction` to control the number of cores and
memory to use on the local machine.
![\#16](https://github.com/mllam/mllam-data-prep/pull/16)


## [v0.2.0](https://github.com/mllam/mllam-data-prep/releases/tags/v0.2.0)

[All changes](https://github.com/mllam/mllam-data-prep/compare/v0.2.0...v0.1.0)
Expand Down
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ All the linting is handelled by `pre-commit` which can be setup to automatically
pdm run pre-commit install
```

The branch, commit, push and make a pull-request :)
Then branch, commit, push and make a pull-request :)


## Usage

The package is designed to be used as a command-line tool. The main command is `mllam-data-prep` which takes a configuration file as input and outputs a training dataset in the form of a `.zarr` dataset named from the config file (e.g. `example.danra.yaml` produces `example.danra.zarr`).
The format for the [config is described below](#configuration-file).
The package can also be used as a python module to create datasets in a more programmatic way by calling `mllam_data_prep.create_dataset()` directly (see below).

### Command-line usage

```bash
python -m mllam_data_prep example.danra.yaml
Expand All @@ -56,6 +60,47 @@ Example output:

![](docs/example_output.png)


#### Creating large datasets (with `dask.distributed`)

If you will be creating datasets larger than a few 100MB you may want to use
`dask.distributed.LocalCluster` to parallelise the creation of the dataset. This can be done
by setting the ` --dask-distributed-local-core-fraction` flag to a value
between `0.0` and `1.0`. This will create a local `dask.distributed` cluster with the
number of workers set to the number of cores on the machine multiplied by the
fraction given. For example, to use 50% of the cores on the machine you would
run:

```bash
python -m mllam_data_prep example.danra.yaml --dask-distributed-local-core-fraction 0.5
```

Unfortunately, the number of cores to use can only be worked out by trial and
error, but a good starting point is to use 50% of the cores on the machine and
then if you notice warnings suggesting that workers are running out of memory
you should reduce the fraction of cores used (so that each worker has more
memory available).
You can also adjust the fraction of the total system memory allocated with
`--dask-distributed-local-memory-fraction` (default is `0.9`).

When you run the above command the console will print a URL to the dask
dashboard, which you can open in a browser to monitor the progress of the
dataset creation (and see the memory usage of the workers).

![example of using mllam-data-prep with dask.distrubted for parallel processing](docs/using_dask_distributed.png)

### Usage as a python module

The package can also be used as a python module to create datasets directly, for example to create training datasets during training. The main function to use is `mllam_data_prep.create_dataset(config)` which takes a `mllam_data_prep.Config` as input and returns a `xarray.Dataset` object. For example:

```python
import mllam_data_prep as mdp

config_path = "example.danra.yaml"
config = mdp.Config.from_yaml_file(config_path)
ds = mdp.create_dataset(config=config)
```

## Configuration file

A full example configuration file is given in [example.danra.yaml](example.danra.yaml), and reproduced here for completeness:
Expand Down
Binary file modified docs/example_output.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/using_dask_distributed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
46 changes: 45 additions & 1 deletion mllam_data_prep/__main__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,64 @@
import os
from pathlib import Path

import psutil
from dask.diagnostics import ProgressBar
from dask.distributed import LocalCluster
from loguru import logger

from .create_dataset import create_dataset_zarr

if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("config", help="Path to the config file", type=Path)
parser.add_argument(
"--show-progress", help="Show progress bar", action="store_true"
)
parser.add_argument(
"--dask-distributed-local-core-fraction",
help="Fraction of cores to use on the local machine to do multiprocessing with dask.distributed",
type=float,
default=0.0,
)
parser.add_argument(
"--dask-distributed-local-memory-fraction",
help="Fraction of memory to use on the local machine (when doing multiprocessing with dask.distributed)",
type=float,
default=0.9,
)
args = parser.parse_args()

if args.show_progress:
ProgressBar().register()

if args.dask_distributed_local_core_fraction > 0.0:
# get the number of system cores
n_system_cores = os.cpu_count()
# compute the number of cores to use
n_local_cores = int(args.dask_distributed_local_core_fraction * n_system_cores)
# get the total system memory
total_memory = psutil.virtual_memory().total
# compute the memory per worker
memory_per_worker = (
total_memory / n_local_cores * args.dask_distributed_local_memory_fraction
)

logger.info(
f"Setting up dask.distributed.LocalCluster with {n_local_cores} cores and {memory_per_worker/1024/1024:0.0f} MB of memory per worker"
)

cluster = LocalCluster(
n_workers=n_local_cores,
threads_per_worker=1,
memory_limit=memory_per_worker,
)

client = cluster.get_client()
# print the dashboard link
logger.info(f"Dashboard link: {cluster.dashboard_link}")

create_dataset_zarr(fp_config=args.config)
11 changes: 10 additions & 1 deletion mllam_data_prep/create_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import xarray as xr
from loguru import logger
from numcodecs import Blosc

from . import __version__
from .config import Config, InvalidConfigException
Expand Down Expand Up @@ -255,5 +256,13 @@ def create_dataset_zarr(fp_config, fp_zarr: str = None):
if fp_zarr.exists():
logger.info(f"Removing existing dataset at {fp_zarr}")
shutil.rmtree(fp_zarr)
ds.to_zarr(fp_zarr)

# use zstd compression since it has a good balance of speed and compression ratio
# https://engineering.fb.com/2016/08/31/core-infra/smaller-and-faster-data-compression-with-zstandard/
compressor = Blosc(cname="zstd", clevel=1, shuffle=Blosc.BITSHUFFLE)
encoding = {v: {"compressor": compressor} for v in ds.data_vars}

ds.to_zarr(fp_zarr, consolidated=True, mode="w", encoding=encoding)
logger.info(f"Wrote training-ready dataset to {fp_zarr}")

logger.info(ds)
Loading

0 comments on commit 660aabf

Please sign in to comment.