marEx.helper.start_distributed_cluster
- marEx.helper.start_distributed_cluster(n_workers, workers_per_node, runtime=9, node_memory=256, dashboard_address=8889, queue='compute', scratch_dir=None, account=None, verbose=None, quiet=None, **kwargs)[source]
Start a distributed Dask cluster on a SLURM-based supercomputer.
- Parameters:
n_workers (int) – Total number of workers to request.
workers_per_node (int) – Number of workers per node.
runtime (int, default=9) – Maximum runtime in minutes.
node_memory (int, default=256) – Memory per node in GB (256, 512, or 1024).
dashboard_address (int, default=8889) – Port for the Dask dashboard.
queue (str, default='compute') – SLURM queue to submit jobs to.
scratch_dir (str or Path, optional) – Directory to use for temporary files.
account (str, optional) – SLURM account to charge. Defaults to DKRZ_ACCOUNT.
**kwargs – Additional keyword arguments to pass to SLURMCluster.
verbose (bool | None)
quiet (bool | None)
- Returns:
Dask client connected to the distributed cluster.
- Return type:
Client
Examples
Basic SLURM cluster for medium-scale processing:
>>> import marEx >>> >>> # Start cluster with 16 workers on 4 nodes (4 workers per node) >>> client = marEx.start_distributed_cluster( ... n_workers=16, ... workers_per_node=4, ... runtime=60, # 1 hour ... node_memory=128 # 128GB nodes ... ) >>> print(f"Cluster: {client}") >>> # Access dashboard via SSH tunnel >>> cluster_info = marEx.get_cluster_info(client) >>> client.close()
Processing large marEx workflow on HPC:
>>> # Start cluster for full marEx pipeline >>> client = marEx.start_distributed_cluster( ... n_workers=32, ... workers_per_node=8, ... runtime=60, # 1 hour ... node_memory=256 # 256GB nodes ... ) >>> >>> # Load very large SST dataset >>> import xarray as xr >>> sst = xr.open_zarr('/work/data/large_sst.zarr').chunk({'time': 25}) >>> >>> # Preprocess with distributed computing >>> processed = marEx.preprocess_data( ... sst, ... method_anomaly="shifting_baseline", ... threshold_percentile=95 ... ) >>> >>> # Track events using full cluster >>> tracker = marEx.tracker( ... processed.extreme_events, ... processed.mask, ... R_fill=12, ... area_filter_quartile=0.3 ... ) >>> events = tracker.run() >>> >>> # Save results >>> events.to_zarr('/work/results/tracked_events.zarr') >>> client.close()
Custom SLURM configuration:
>>> # Advanced SLURM configuration >>> client = marEx.start_distributed_cluster( ... n_workers=64, ... workers_per_node=32, ... runtime=20, # 20 minutes ... node_memory=256, ... account='myproject', # Custom account ... queue='debug', # debug queue ... )
Dashboard access and monitoring:
>>> # Start cluster and set up monitoring >>> client = marEx.start_distributed_cluster( ... n_workers=16, ... workers_per_node=4, ... dashboard_address=8890 # Custom dashboard port ... ) >>> >>> # Get connection info for SSH tunneling >>> info = marEx.get_cluster_info(client) >>> print(f"SSH tunnel: ssh -L {info['port']}:localhost:{info['port']} {info['hostname']}") >>> print(f"Dashboard: {info['dashboard_link']}") >>> >>> # Monitor cluster performance >>> print(f"Workers: {len(client.scheduler_info()['workers'])}") >>> print(f"Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")
Memory optimisation strategies:
>>> # Optimise for different workload types >>> >>> # Memory-intensive: fewer workers per node >>> memory_cluster = marEx.start_distributed_cluster( ... n_workers=8, workers_per_node=2, node_memory=512 ... ) >>> >>> # CPU-intensive: more workers per node >>> cpu_cluster = marEx.start_distributed_cluster( ... n_workers=64, workers_per_node=16, node_memory=256 ... )