Helper Module (marEx.helper)

The marEx.helper module provides utilities for high-performance computing (HPC) environments, particularly for managing Dask clusters on SLURM systems and optimising performance for large-scale data processing with marEx workflows.

Overview

The helper module is designed to simplify the deployment and management of marEx workflows on supercomputing systems. It provides tools for cluster management, memory optimisation, and performance tuning specifically tailored for DKRZ Levante and similar HPC environments.

Key Features:

  • SLURM Integration: Seamless integration with SLURM job schedulers

  • Automated Cluster Management: Start, configure, and monitor Dask clusters

  • Memory Optimisation: Intelligent memory management for large datasets

  • Performance Tuning: Optimised configurations for HPC environments

  • Dashboard Integration: Jupyter dashboard tunneling and monitoring

  • Resource Detection: Automatic system resource detection and optimisation

Main Functions

start_distributed_cluster(n_workers, ...[, ...])

Start a distributed Dask cluster on a SLURM-based supercomputer.

start_local_cluster([n_workers, ...])

Start a local Dask cluster.

configure_dask([scratch_dir, config])

Configure Dask with appropriate settings for HPC environments.

get_cluster_info(client)

Get and print cluster connection information.

Detailed Documentation

Cluster Management

marEx.helper.start_distributed_cluster(n_workers, workers_per_node, runtime=9, node_memory=256, dashboard_address=8889, queue='compute', scratch_dir=None, account=None, verbose=None, quiet=None, **kwargs)[source]

Start a distributed Dask cluster on a SLURM-based supercomputer.

Parameters:
  • n_workers (int) – Total number of workers to request.

  • workers_per_node (int) – Number of workers per node.

  • runtime (int, default=9) – Maximum runtime in minutes.

  • node_memory (int, default=256) – Memory per node in GB (256, 512, or 1024).

  • dashboard_address (int, default=8889) – Port for the Dask dashboard.

  • queue (str, default='compute') – SLURM queue to submit jobs to.

  • scratch_dir (str or Path, optional) – Directory to use for temporary files.

  • account (str, optional) – SLURM account to charge. Defaults to DKRZ_ACCOUNT.

  • **kwargs – Additional keyword arguments to pass to SLURMCluster.

  • verbose (bool | None)

  • quiet (bool | None)

Returns:

Dask client connected to the distributed cluster.

Return type:

Client

Examples

Basic SLURM cluster for medium-scale processing:

>>> import marEx
>>>
>>> # Start cluster with 16 workers on 4 nodes (4 workers per node)
>>> client = marEx.start_distributed_cluster(
...     n_workers=16,
...     workers_per_node=4,
...     runtime=60,      # 1 hour
...     node_memory=128   # 128GB nodes
... )
>>> print(f"Cluster: {client}")
>>> # Access dashboard via SSH tunnel
>>> cluster_info = marEx.get_cluster_info(client)
>>> client.close()

Processing large marEx workflow on HPC:

>>> # Start cluster for full marEx pipeline
>>> client = marEx.start_distributed_cluster(
...     n_workers=32,
...     workers_per_node=8,
...     runtime=60,  # 1 hour
...     node_memory=256   # 256GB nodes
... )
>>>
>>> # Load very large SST dataset
>>> import xarray as xr
>>> sst = xr.open_zarr('/work/data/large_sst.zarr').chunk({'time': 25})
>>>
>>> # Preprocess with distributed computing
>>> processed = marEx.preprocess_data(
...     sst,
...     method_anomaly="shifting_baseline",
...     threshold_percentile=95
... )
>>>
>>> # Track events using full cluster
>>> tracker = marEx.tracker(
...     processed.extreme_events,
...     processed.mask,
...     R_fill=12,
...     area_filter_quartile=0.3
... )
>>> events = tracker.run()
>>>
>>> # Save results
>>> events.to_zarr('/work/results/tracked_events.zarr')
>>> client.close()

Custom SLURM configuration:

>>> # Advanced SLURM configuration
>>> client = marEx.start_distributed_cluster(
...     n_workers=64,
...     workers_per_node=32,
...     runtime=20,           # 20 minutes
...     node_memory=256,
...     account='myproject',   # Custom account
...     queue='debug',           # debug queue
... )

Dashboard access and monitoring:

>>> # Start cluster and set up monitoring
>>> client = marEx.start_distributed_cluster(
...     n_workers=16,
...     workers_per_node=4,
...     dashboard_address=8890  # Custom dashboard port
... )
>>>
>>> # Get connection info for SSH tunneling
>>> info = marEx.get_cluster_info(client)
>>> print(f"SSH tunnel: ssh -L {info['port']}:localhost:{info['port']} {info['hostname']}")
>>> print(f"Dashboard: {info['dashboard_link']}")
>>>
>>> # Monitor cluster performance
>>> print(f"Workers: {len(client.scheduler_info()['workers'])}")
>>> print(f"Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")

Memory optimisation strategies:

>>> # Optimise for different workload types
>>>
>>> # Memory-intensive: fewer workers per node
>>> memory_cluster = marEx.start_distributed_cluster(
...     n_workers=8, workers_per_node=2, node_memory=512
... )
>>>
>>> # CPU-intensive: more workers per node
>>> cpu_cluster = marEx.start_distributed_cluster(
...     n_workers=64, workers_per_node=16, node_memory=256
... )
marEx.helper.start_local_cluster(n_workers=4, threads_per_worker=1, scratch_dir=None, verbose=None, quiet=None, **kwargs)[source]

Start a local Dask cluster.

Parameters:
  • n_workers (int, default=4) – Number of worker processes to start.

  • threads_per_worker (int, default=1) – Number of threads per worker.

  • scratch_dir (str or Path, optional) – Directory to use for temporary files.

  • verbose (bool, optional) – Enable verbose logging with detailed cluster startup information. If None, uses current global logging configuration.

  • quiet (bool, optional) – Enable quiet logging with minimal output (warnings and errors only). If None, uses current global logging configuration. Note: quiet takes precedence over verbose if both are True.

  • **kwargs – Additional keyword arguments to pass to LocalCluster.

Returns:

Dask client connected to the local cluster.

Return type:

Client

Examples

Basic local cluster for development:

>>> import marEx
>>>
>>> # Start simple local cluster
>>> client = marEx.start_local_cluster(n_workers=2, threads_per_worker=1)
>>> print(client)
<Client: 'tcp://127.0.0.1:xxxxx' processes=2 threads=2, memory=15.7 GB>
>>>
>>> # Check cluster status
>>> print(f"Dashboard: {client.dashboard_link}")
Dashboard: http://127.0.0.1:8787/status
>>>
>>> client.close()

Optimised cluster for CPU-intensive work:

>>> # Use one worker per physical core
>>> client = marEx.start_local_cluster(
...     n_workers=8,           # Number of physical cores
...     threads_per_worker=1   # Avoid hyperthreading for compute
... )
>>>
>>> # Process data with the cluster
>>> import xarray as xr
>>> data = xr.open_dataset('large_data.nc').chunk({'time': 25})
>>> result = data.mean().compute()
>>> client.close()

Memory-optimised cluster:

>>> # Configure for large datasets
>>> client = marEx.start_local_cluster(
...     n_workers=4,
...     threads_per_worker=2,
...     memory_limit='8GB',      # Limit memory per worker
...     scratch_dir='/tmp/dask'  # Fast (e.g. Lustre) local storage
... )

Integration with marEx preprocessing:

>>> # Start cluster then process data
>>> client = marEx.start_local_cluster(n_workers=16)
>>>
>>> # Load and preprocess SST data
>>> sst = xr.open_dataset('sst_data.nc').sst.chunk({'time': 30})
>>> processed = marEx.preprocess_data(sst, threshold_percentile=95)
>>>
>>> # Track events using the cluster
>>> tracker = marEx.tracker(
...     processed.extreme_events,
...     processed.mask,
...     R_fill=8,
...     area_filter_quartile=0.5
... )
>>> events = tracker.run()
>>>
>>> print(f"Processed {len(sst.time)} time steps with {client.nthreads()} threads")
>>> client.close()

Custom worker configuration:

>>> # Advanced configuration for specific workloads
>>> client = marEx.start_local_cluster(
...     n_workers=4,
...     threads_per_worker=2,
...     processes=True,         # Use separate processes (default)
...     silence_logs=False,     # Keep logs for debugging
...     dashboard_address=':8787'  # Specific dashboard port
... )

Configuration and Utilities

marEx.helper.configure_dask(scratch_dir=None, config=None)[source]

Configure Dask with appropriate settings for HPC environments.

Parameters:
  • scratch_dir (str or Path, optional) – Directory to use for temporary files.

  • config (dict, optional) – Additional Dask configuration settings to apply.

Returns:

Temporary directory object that should be kept alive while Dask is in use.

Return type:

TemporaryDirectory

marEx.helper.get_cluster_info(client)[source]

Get and print cluster connection information.

Parameters:

client (Client) – Dask client connected to a cluster.

Returns:

Dictionary containing connection information.

Return type:

dict

Examples

Basic cluster info retrieval:

>>> import marEx
>>>
>>> # Start local cluster
>>> client = marEx.start_local_cluster(n_workers=2)
>>>
>>> # Get connection information
>>> info = marEx.get_cluster_info(client)
Hostname: login01
Forward Port: login01:8787
Dashboard Link: localhost:8787/status
>>>
>>> print(f"Connect via: {info['dashboard_link']}")
Connect via: localhost:8787/status
>>> client.close()

SSH tunneling for remote access:

>>> # Start cluster on HPC system
>>> client = marEx.start_distributed_cluster(
...     n_workers=8, workers_per_node=4, dashboard_address=8889
... )
>>>
>>> # Get tunneling information
>>> info = marEx.get_cluster_info(client)
Hostname: levante-login01
Forward Port: levante-login01:8889
Dashboard Link: localhost:8889/status
>>>
>>> # Use this info to set up SSH tunnel:
>>> # ssh -L 8889:localhost:8889 levante-login01.dkrz.de
>>> # Then access dashboard at localhost:8889/status
>>>
>>> client.close()

Monitoring cluster status:

>>> client = marEx.start_local_cluster(n_workers=4)
>>> info = marEx.get_cluster_info(client)
>>>
>>> # Access cluster details
>>> print(f"Dashboard URL: {client.dashboard_link}")
>>> print(f"Cluster address: {client.cluster.scheduler_address}")
>>> print(f"Total threads: {client.nthreads()}")
>>>
>>> # Use port for programmatic access
>>> import requests
>>> try:
...     response = requests.get(f"http://localhost:{info['port']}/info")
...     print(f"Scheduler info: {response.status_code}")
... except:
...     print("Dashboard not accessible")
>>>
>>> client.close()

Basic Usage Examples

SLURM Cluster Setup

import marEx
from marEx.helper import start_distributed_cluster

# Start a SLURM cluster optimised for marEx workflows
cluster, client = start_distributed_cluster(
    n_workers=20,
    cores_per_worker=4,
    memory_per_worker='16GB',
    walltime='02:00:00',
    queue='compute',
    project='your_project_id'
)

# Now run marEx workflows with the cluster
extremes_ds = marEx.preprocess_data(
    sst_data,
    threshold_percentile=95,
    dask_chunks={'time': 365, 'lat': 200, 'lon': 200}
)

# Close cluster when done
cluster.close()
client.close()

Local Cluster Setup

from marEx.helper import start_local_cluster

# Start a local cluster with automatic resource detection
cluster, client = start_local_cluster(
    n_workers=8,
    threads_per_worker=2,
    memory_limit='32GB',
    dashboard_port=8787
)

# Get cluster information
cluster_info = marEx.helper.get_cluster_info(client)
print(f"Dashboard: {cluster_info['dashboard_url']}")
print(f"Workers: {cluster_info['n_workers']}")
print(f"Total Memory: {cluster_info['total_memory']}")

Dask Configuration

from marEx.helper import configure_dask

# Configure Dask for HPC environment
configure_dask(
    temporary_directory='/scratch/tmp',
    memory_limit='64GB',
    threads_per_worker=4,
    dashboard_port=8787,
    silence_logs=True
)

Advanced HPC Configurations

DKRZ Levante Optimisation

# Optimised configuration for DKRZ Levante supercomputer
cluster, client = start_distributed_cluster(
    n_workers=40,
    cores_per_worker=8,
    memory_per_worker='32GB',
    walltime='04:00:00',
    queue='compute',
    project='your_project',
    # Levante-specific optimisations
    extra_directives=[
        '#SBATCH --exclusive',
        '#SBATCH --partition=compute'
    ],
    worker_extra_args=[
        '--local-directory=/scratch/tmp',
        '--memory-limit=30GB'  # Leave memory for system overhead
    ]
)

Multi-Node Scaling

# Large-scale configuration for century-long datasets
cluster, client = start_distributed_cluster(
    n_workers=100,               # Scale to many workers
    cores_per_worker=4,
    memory_per_worker='16GB',
    walltime='08:00:00',
    queue='compute',
    project='projectID',
    # Advanced scaling options
    adaptive_scaling=True,       # Enable adaptive scaling
    minimum_workers=20,          # Minimum number of workers
    maximum_workers=200,         # Maximum number of workers
    scale_factor=2.0            # Scaling responsiveness
)

Performance Tuning

Chunk Size Optimisation

def calculate_optimal_chunks(data_shape, n_workers, memory_per_worker_gb):
    """Calculate optimal chunk sizes for marEx workflows."""
    # Time chunks should be manageable for individual workers
    time_chunks = min(365, data_shape[0] // n_workers)

    # Spatial chunks based on available memory
    # Assuming 8 bytes per float64 value
    max_spatial_elements = (memory_per_worker_gb * 1e9 / 8) / time_chunks
    spatial_chunks = int(np.sqrt(max_spatial_elements))

    return {
        'time': time_chunks,
        'lat': min(spatial_chunks, data_shape[1]),
        'lon': min(spatial_chunks, data_shape[2])
    }

# Use with marEx preprocessing
chunks = calculate_optimal_chunks(
    sst.shape,
    n_workers=20,
    memory_per_worker_gb=16
)

extremes_ds = marEx.preprocess_data(
    sst,
    threshold_percentile=95,
    dask_chunks=chunks
)

Memory Management

# Configure Dask for memory-constrained environments
configure_dask(
    # Memory limits
    memory_limit='32GB',
    memory_target_fraction=0.8,     # Use 80% of available memory
    memory_spill_fraction=0.9,      # Spill at 90% usage
    memory_pause_fraction=0.95,     # Pause at 95% usage

    # Temporary storage
    temporary_directory='/scratch/dask-tmp',

    # Garbage collection
    garbage_collection_interval='10s',

    # Network optimisation
    tcp_timeout='300s',
    heartbeat_interval='5s'
)

Dashboard and Monitoring

Cluster Monitoring

# Monitor cluster status and performance
cluster_info = marEx.helper.get_cluster_info(client)

print("=== Cluster Status ===")
print(f"Status: {cluster_info['status']}")
print(f"Active Workers: {cluster_info['n_workers']}")
print(f"Total Cores: {cluster_info['total_cores']}")
print(f"Total Memory: {cluster_info['total_memory']}")
print(f"Dashboard URL: {cluster_info['dashboard_url']}")

# Check if SSH tunnel is needed
if cluster_info.get('needs_tunnel'):
    print(f"SSH tunnel command: {cluster_info['tunnel_command']}")

Performance Monitoring

# Monitor task performance during computation
def monitor_computation(client, computation_future):
    """Monitor and report computation progress."""
    import time

    start_time = time.time()
    while not computation_future.done():
        # Get current status
        status = client.scheduler_info()

        # Report progress
        print(f"Tasks: {status['tasks']}")
        print(f"Workers: {len(status['workers'])}")
        print(f"Memory: {status['memory']} / {status['memory_limit']}")

        time.sleep(30)  # Update every 30 seconds

    end_time = time.time()
    print(f"Computation completed in {end_time - start_time:.1f} seconds")

# Use during marEx operations
future = client.compute(extremes_ds, sync=False)
monitor_computation(client, future)
result = future.result()

Data Management

Temporary Directory Setup

import tempfile
import os
from pathlib import Path

# Set up optimised temporary directory structure
def setup_temp_directories(base_path='/scratch'):
    """Set up temporary directories for optimal I/O performance."""
    user = os.environ.get('USER', 'unknown')

    # Create user-specific temp directory
    temp_base = Path(base_path) / user / 'marEx_tmp'
    temp_base.mkdir(parents=True, exist_ok=True)

    # Set up subdirectories
    dask_temp = temp_base / 'dask'
    marEx_temp = temp_base / 'marEx'

    dask_temp.mkdir(exist_ok=True)
    marEx_temp.mkdir(exist_ok=True)

    return {
        'dask_temp': str(dask_temp),
        'marEx_temp': str(marEx_temp),
        'base_temp': str(temp_base)
    }

# Configure for HPC environment
temp_dirs = setup_temp_directories('/scratch')

configure_dask(
    temporary_directory=temp_dirs['dask_temp']
)

Zarr Storage Optimisation

# Optimise Zarr storage for intermediate results
def setup_zarr_storage(output_path, chunk_config):
    """Configure optimised Zarr storage for marEx results."""
    import zarr

    # Configure Zarr compressor for climate data
    compressor = zarr.Blosc(
        cname='lz4',        # Fast compression
        clevel=3,           # Moderate compression level
        shuffle=1           # Byte shuffle for better compression
    )

    # Set up encoding for xarray to_zarr
    encoding = {}
    for var in ['dat_anomaly', 'extreme_events', 'thresholds']:
        encoding[var] = {
            'compressor': compressor,
            'chunks': tuple(chunk_config.values())
        }

    return encoding

# Use with marEx workflows
encoding = setup_zarr_storage('./results.zarr', chunks)
extremes_ds.to_zarr('./extremes.zarr', encoding=encoding)

Integration with Job Schedulers

SLURM Integration

# Advanced SLURM configuration
cluster, client = start_distributed_cluster(
    # Basic configuration
    n_workers=50,
    cores_per_worker=8,
    memory_per_worker='32GB',
    walltime='06:00:00',

    # SLURM-specific options
    queue='compute',
    project='climate_research',
    job_name='marEx_analysis',

    # Advanced SLURM directives
    extra_directives=[
        '#SBATCH --exclusive',
        '#SBATCH --constraint=haswell',
        '#SBATCH --mail-type=END,FAIL',
        '#SBATCH --mail-user=user@institution.edu'
    ],

    # Worker optimisation
    worker_extra_args=[
        '--local-directory=/scratch/dask-temp',
        '--memory-limit=30GB',
        '--nthreads=8',
        '--death-timeout=300'
    ]
)

PBS/Torque Integration

# Example for PBS/Torque systems (implementation may vary)
from dask_jobqueue import PBSCluster

def start_pbs_cluster(n_workers=20, cores_per_worker=24, memory_per_worker='64GB'):
    """Start PBS cluster for marEx workflows."""
    cluster = PBSCluster(
        cores=cores_per_worker,
        memory=memory_per_worker,
        queue='normal',
        walltime='04:00:00',
        job_extra_directives=[
            '-l nodes=1:ppn=24',
            '-A climate_project'
        ],
        local_directory='/scratch/dask-temp'
    )

    cluster.scale(n_workers)
    client = Client(cluster)

    return cluster, client

Performance Optimisation

# Performance optimisation checklist:
# 1. Use local SSD storage for temporary files
# 2. Optimise network settings for multi-node clusters
# 3. Use appropriate compression for I/O
# 4. Monitor task scheduling efficiency
# 5. Balance compute vs I/O intensive operations

configure_dask(
    # Network optimisation
    tcp_timeout='300s',
    heartbeat_interval='5s',

    # I/O optimisation
    temporary_directory='/local_ssd/dask-tmp',

    # Task optimisation
    distributed_scheduler_allowed_failures=3,
    distributed_worker_daemon=False
)

Troubleshooting

Common Issues and Solutions

Cluster Startup Issues:

# Issue: Workers not starting
# Solution: Check SLURM queue status and resource availability

# Debug cluster startup
cluster, client = start_distributed_cluster(
    n_workers=10,
    cores_per_worker=4,
    memory_per_worker='16GB',
    walltime='02:00:00',
    queue='debug',  # Use debug queue for troubleshooting
    log_directory='./logs'  # Enable logging
)

Memory Errors:

# Issue: Out of memory errors
# Solution: Reduce chunk sizes and worker memory usage

# Conservative memory configuration
configure_dask(
    memory_limit='8GB',
    memory_target_fraction=0.7,
    memory_spill_fraction=0.8,
    temporary_directory='/scratch/spill'
)

Network Timeouts:

# Issue: Network timeouts in large clusters
# Solution: Increase timeout values

configure_dask(
    tcp_timeout='600s',
    heartbeat_interval='10s',
    comm_retry_delay_min='1s',
    comm_retry_delay_max='20s'
)

Dashboard Access Issues:

# Issue: Cannot access dashboard from login node
# Solution: Set up SSH tunnel

cluster_info = get_cluster_info(client)
if cluster_info.get('needs_tunnel'):
    tunnel_cmd = cluster_info['tunnel_command']
    print(f"Run this command on your local machine:")
    print(tunnel_cmd)
    print(f"Then access dashboard at: http://localhost:8787")

See Also