marEx.helper.start_distributed_cluster

marEx.helper.start_distributed_cluster(n_workers, workers_per_node, runtime=9, node_memory=256, dashboard_address=8889, queue='compute', scratch_dir=None, account=None, verbose=None, quiet=None, **kwargs)[source]

Start a distributed Dask cluster on a SLURM-based supercomputer.

Parameters:
  • n_workers (int) – Total number of workers to request.

  • workers_per_node (int) – Number of workers per node.

  • runtime (int, default=9) – Maximum runtime in minutes.

  • node_memory (int, default=256) – Memory per node in GB (256, 512, or 1024).

  • dashboard_address (int, default=8889) – Port for the Dask dashboard.

  • queue (str, default='compute') – SLURM queue to submit jobs to.

  • scratch_dir (str or Path, optional) – Directory to use for temporary files.

  • account (str, optional) – SLURM account to charge. Defaults to DKRZ_ACCOUNT.

  • **kwargs – Additional keyword arguments to pass to SLURMCluster.

  • verbose (bool | None)

  • quiet (bool | None)

Returns:

Dask client connected to the distributed cluster.

Return type:

Client

Examples

Basic SLURM cluster for medium-scale processing:

>>> import marEx
>>>
>>> # Start cluster with 16 workers on 4 nodes (4 workers per node)
>>> client = marEx.start_distributed_cluster(
...     n_workers=16,
...     workers_per_node=4,
...     runtime=60,      # 1 hour
...     node_memory=128   # 128GB nodes
... )
>>> print(f"Cluster: {client}")
>>> # Access dashboard via SSH tunnel
>>> cluster_info = marEx.get_cluster_info(client)
>>> client.close()

Processing large marEx workflow on HPC:

>>> # Start cluster for full marEx pipeline
>>> client = marEx.start_distributed_cluster(
...     n_workers=32,
...     workers_per_node=8,
...     runtime=60,  # 1 hour
...     node_memory=256   # 256GB nodes
... )
>>>
>>> # Load very large SST dataset
>>> import xarray as xr
>>> sst = xr.open_zarr('/work/data/large_sst.zarr').chunk({'time': 25})
>>>
>>> # Preprocess with distributed computing
>>> processed = marEx.preprocess_data(
...     sst,
...     method_anomaly="shifting_baseline",
...     threshold_percentile=95
... )
>>>
>>> # Track events using full cluster
>>> tracker = marEx.tracker(
...     processed.extreme_events,
...     processed.mask,
...     R_fill=12,
...     area_filter_quartile=0.3
... )
>>> events = tracker.run()
>>>
>>> # Save results
>>> events.to_zarr('/work/results/tracked_events.zarr')
>>> client.close()

Custom SLURM configuration:

>>> # Advanced SLURM configuration
>>> client = marEx.start_distributed_cluster(
...     n_workers=64,
...     workers_per_node=32,
...     runtime=20,           # 20 minutes
...     node_memory=256,
...     account='myproject',   # Custom account
...     queue='debug',           # debug queue
... )

Dashboard access and monitoring:

>>> # Start cluster and set up monitoring
>>> client = marEx.start_distributed_cluster(
...     n_workers=16,
...     workers_per_node=4,
...     dashboard_address=8890  # Custom dashboard port
... )
>>>
>>> # Get connection info for SSH tunneling
>>> info = marEx.get_cluster_info(client)
>>> print(f"SSH tunnel: ssh -L {info['port']}:localhost:{info['port']} {info['hostname']}")
>>> print(f"Dashboard: {info['dashboard_link']}")
>>>
>>> # Monitor cluster performance
>>> print(f"Workers: {len(client.scheduler_info()['workers'])}")
>>> print(f"Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")

Memory optimisation strategies:

>>> # Optimise for different workload types
>>>
>>> # Memory-intensive: fewer workers per node
>>> memory_cluster = marEx.start_distributed_cluster(
...     n_workers=8, workers_per_node=2, node_memory=512
... )
>>>
>>> # CPU-intensive: more workers per node
>>> cpu_cluster = marEx.start_distributed_cluster(
...     n_workers=64, workers_per_node=16, node_memory=256
... )