Helper Module (marEx.helper)
The marEx.helper module provides utilities for high-performance computing (HPC)
environments, particularly for managing Dask clusters on SLURM systems and optimising
performance for large-scale data processing with marEx workflows.
Overview
The helper module is designed to simplify the deployment and management of marEx workflows on supercomputing systems. It provides tools for cluster management, memory optimisation, and performance tuning specifically tailored for DKRZ Levante and similar HPC environments.
Key Features:
SLURM Integration: Seamless integration with SLURM job schedulers
Automated Cluster Management: Start, configure, and monitor Dask clusters
Memory Optimisation: Intelligent memory management for large datasets
Performance Tuning: Optimised configurations for HPC environments
Dashboard Integration: Jupyter dashboard tunneling and monitoring
Resource Detection: Automatic system resource detection and optimisation
Main Functions
|
Start a distributed Dask cluster on a SLURM-based supercomputer. |
|
Start a local Dask cluster. |
|
Configure Dask with appropriate settings for HPC environments. |
|
Get and print cluster connection information. |
Detailed Documentation
Cluster Management
- marEx.helper.start_distributed_cluster(n_workers, workers_per_node, runtime=9, node_memory=256, dashboard_address=8889, queue='compute', scratch_dir=None, account=None, verbose=None, quiet=None, **kwargs)[source]
Start a distributed Dask cluster on a SLURM-based supercomputer.
- Parameters:
n_workers (int) – Total number of workers to request.
workers_per_node (int) – Number of workers per node.
runtime (int, default=9) – Maximum runtime in minutes.
node_memory (int, default=256) – Memory per node in GB (256, 512, or 1024).
dashboard_address (int, default=8889) – Port for the Dask dashboard.
queue (str, default='compute') – SLURM queue to submit jobs to.
scratch_dir (str or Path, optional) – Directory to use for temporary files.
account (str, optional) – SLURM account to charge. Defaults to DKRZ_ACCOUNT.
**kwargs – Additional keyword arguments to pass to SLURMCluster.
verbose (bool | None)
quiet (bool | None)
- Returns:
Dask client connected to the distributed cluster.
- Return type:
Client
Examples
Basic SLURM cluster for medium-scale processing:
>>> import marEx >>> >>> # Start cluster with 16 workers on 4 nodes (4 workers per node) >>> client = marEx.start_distributed_cluster( ... n_workers=16, ... workers_per_node=4, ... runtime=60, # 1 hour ... node_memory=128 # 128GB nodes ... ) >>> print(f"Cluster: {client}") >>> # Access dashboard via SSH tunnel >>> cluster_info = marEx.get_cluster_info(client) >>> client.close()
Processing large marEx workflow on HPC:
>>> # Start cluster for full marEx pipeline >>> client = marEx.start_distributed_cluster( ... n_workers=32, ... workers_per_node=8, ... runtime=60, # 1 hour ... node_memory=256 # 256GB nodes ... ) >>> >>> # Load very large SST dataset >>> import xarray as xr >>> sst = xr.open_zarr('/work/data/large_sst.zarr').chunk({'time': 25}) >>> >>> # Preprocess with distributed computing >>> processed = marEx.preprocess_data( ... sst, ... method_anomaly="shifting_baseline", ... threshold_percentile=95 ... ) >>> >>> # Track events using full cluster >>> tracker = marEx.tracker( ... processed.extreme_events, ... processed.mask, ... R_fill=12, ... area_filter_quartile=0.3 ... ) >>> events = tracker.run() >>> >>> # Save results >>> events.to_zarr('/work/results/tracked_events.zarr') >>> client.close()
Custom SLURM configuration:
>>> # Advanced SLURM configuration >>> client = marEx.start_distributed_cluster( ... n_workers=64, ... workers_per_node=32, ... runtime=20, # 20 minutes ... node_memory=256, ... account='myproject', # Custom account ... queue='debug', # debug queue ... )
Dashboard access and monitoring:
>>> # Start cluster and set up monitoring >>> client = marEx.start_distributed_cluster( ... n_workers=16, ... workers_per_node=4, ... dashboard_address=8890 # Custom dashboard port ... ) >>> >>> # Get connection info for SSH tunneling >>> info = marEx.get_cluster_info(client) >>> print(f"SSH tunnel: ssh -L {info['port']}:localhost:{info['port']} {info['hostname']}") >>> print(f"Dashboard: {info['dashboard_link']}") >>> >>> # Monitor cluster performance >>> print(f"Workers: {len(client.scheduler_info()['workers'])}") >>> print(f"Total memory: {sum(w['memory_limit'] for w in client.scheduler_info()['workers'].values()) / 1e9:.1f} GB")
Memory optimisation strategies:
>>> # Optimise for different workload types >>> >>> # Memory-intensive: fewer workers per node >>> memory_cluster = marEx.start_distributed_cluster( ... n_workers=8, workers_per_node=2, node_memory=512 ... ) >>> >>> # CPU-intensive: more workers per node >>> cpu_cluster = marEx.start_distributed_cluster( ... n_workers=64, workers_per_node=16, node_memory=256 ... )
- marEx.helper.start_local_cluster(n_workers=4, threads_per_worker=1, scratch_dir=None, verbose=None, quiet=None, **kwargs)[source]
Start a local Dask cluster.
- Parameters:
n_workers (int, default=4) – Number of worker processes to start.
threads_per_worker (int, default=1) – Number of threads per worker.
scratch_dir (str or Path, optional) – Directory to use for temporary files.
verbose (bool, optional) – Enable verbose logging with detailed cluster startup information. If None, uses current global logging configuration.
quiet (bool, optional) – Enable quiet logging with minimal output (warnings and errors only). If None, uses current global logging configuration. Note: quiet takes precedence over verbose if both are True.
**kwargs – Additional keyword arguments to pass to LocalCluster.
- Returns:
Dask client connected to the local cluster.
- Return type:
Client
Examples
Basic local cluster for development:
>>> import marEx >>> >>> # Start simple local cluster >>> client = marEx.start_local_cluster(n_workers=2, threads_per_worker=1) >>> print(client) <Client: 'tcp://127.0.0.1:xxxxx' processes=2 threads=2, memory=15.7 GB> >>> >>> # Check cluster status >>> print(f"Dashboard: {client.dashboard_link}") Dashboard: http://127.0.0.1:8787/status >>> >>> client.close()
Optimised cluster for CPU-intensive work:
>>> # Use one worker per physical core >>> client = marEx.start_local_cluster( ... n_workers=8, # Number of physical cores ... threads_per_worker=1 # Avoid hyperthreading for compute ... ) >>> >>> # Process data with the cluster >>> import xarray as xr >>> data = xr.open_dataset('large_data.nc').chunk({'time': 25}) >>> result = data.mean().compute() >>> client.close()
Memory-optimised cluster:
>>> # Configure for large datasets >>> client = marEx.start_local_cluster( ... n_workers=4, ... threads_per_worker=2, ... memory_limit='8GB', # Limit memory per worker ... scratch_dir='/tmp/dask' # Fast (e.g. Lustre) local storage ... )
Integration with marEx preprocessing:
>>> # Start cluster then process data >>> client = marEx.start_local_cluster(n_workers=16) >>> >>> # Load and preprocess SST data >>> sst = xr.open_dataset('sst_data.nc').sst.chunk({'time': 30}) >>> processed = marEx.preprocess_data(sst, threshold_percentile=95) >>> >>> # Track events using the cluster >>> tracker = marEx.tracker( ... processed.extreme_events, ... processed.mask, ... R_fill=8, ... area_filter_quartile=0.5 ... ) >>> events = tracker.run() >>> >>> print(f"Processed {len(sst.time)} time steps with {client.nthreads()} threads") >>> client.close()
Custom worker configuration:
>>> # Advanced configuration for specific workloads >>> client = marEx.start_local_cluster( ... n_workers=4, ... threads_per_worker=2, ... processes=True, # Use separate processes (default) ... silence_logs=False, # Keep logs for debugging ... dashboard_address=':8787' # Specific dashboard port ... )
Configuration and Utilities
- marEx.helper.configure_dask(scratch_dir=None, config=None)[source]
Configure Dask with appropriate settings for HPC environments.
- marEx.helper.get_cluster_info(client)[source]
Get and print cluster connection information.
- Parameters:
client (Client) – Dask client connected to a cluster.
- Returns:
Dictionary containing connection information.
- Return type:
Examples
Basic cluster info retrieval:
>>> import marEx >>> >>> # Start local cluster >>> client = marEx.start_local_cluster(n_workers=2) >>> >>> # Get connection information >>> info = marEx.get_cluster_info(client) Hostname: login01 Forward Port: login01:8787 Dashboard Link: localhost:8787/status >>> >>> print(f"Connect via: {info['dashboard_link']}") Connect via: localhost:8787/status >>> client.close()
SSH tunneling for remote access:
>>> # Start cluster on HPC system >>> client = marEx.start_distributed_cluster( ... n_workers=8, workers_per_node=4, dashboard_address=8889 ... ) >>> >>> # Get tunneling information >>> info = marEx.get_cluster_info(client) Hostname: levante-login01 Forward Port: levante-login01:8889 Dashboard Link: localhost:8889/status >>> >>> # Use this info to set up SSH tunnel: >>> # ssh -L 8889:localhost:8889 levante-login01.dkrz.de >>> # Then access dashboard at localhost:8889/status >>> >>> client.close()
Monitoring cluster status:
>>> client = marEx.start_local_cluster(n_workers=4) >>> info = marEx.get_cluster_info(client) >>> >>> # Access cluster details >>> print(f"Dashboard URL: {client.dashboard_link}") >>> print(f"Cluster address: {client.cluster.scheduler_address}") >>> print(f"Total threads: {client.nthreads()}") >>> >>> # Use port for programmatic access >>> import requests >>> try: ... response = requests.get(f"http://localhost:{info['port']}/info") ... print(f"Scheduler info: {response.status_code}") ... except: ... print("Dashboard not accessible") >>> >>> client.close()
Basic Usage Examples
SLURM Cluster Setup
import marEx
from marEx.helper import start_distributed_cluster
# Start a SLURM cluster optimised for marEx workflows
cluster, client = start_distributed_cluster(
n_workers=20,
cores_per_worker=4,
memory_per_worker='16GB',
walltime='02:00:00',
queue='compute',
project='your_project_id'
)
# Now run marEx workflows with the cluster
extremes_ds = marEx.preprocess_data(
sst_data,
threshold_percentile=95,
dask_chunks={'time': 365, 'lat': 200, 'lon': 200}
)
# Close cluster when done
cluster.close()
client.close()
Local Cluster Setup
from marEx.helper import start_local_cluster
# Start a local cluster with automatic resource detection
cluster, client = start_local_cluster(
n_workers=8,
threads_per_worker=2,
memory_limit='32GB',
dashboard_port=8787
)
# Get cluster information
cluster_info = marEx.helper.get_cluster_info(client)
print(f"Dashboard: {cluster_info['dashboard_url']}")
print(f"Workers: {cluster_info['n_workers']}")
print(f"Total Memory: {cluster_info['total_memory']}")
Dask Configuration
from marEx.helper import configure_dask
# Configure Dask for HPC environment
configure_dask(
temporary_directory='/scratch/tmp',
memory_limit='64GB',
threads_per_worker=4,
dashboard_port=8787,
silence_logs=True
)
Advanced HPC Configurations
DKRZ Levante Optimisation
# Optimised configuration for DKRZ Levante supercomputer
cluster, client = start_distributed_cluster(
n_workers=40,
cores_per_worker=8,
memory_per_worker='32GB',
walltime='04:00:00',
queue='compute',
project='your_project',
# Levante-specific optimisations
extra_directives=[
'#SBATCH --exclusive',
'#SBATCH --partition=compute'
],
worker_extra_args=[
'--local-directory=/scratch/tmp',
'--memory-limit=30GB' # Leave memory for system overhead
]
)
Multi-Node Scaling
# Large-scale configuration for century-long datasets
cluster, client = start_distributed_cluster(
n_workers=100, # Scale to many workers
cores_per_worker=4,
memory_per_worker='16GB',
walltime='08:00:00',
queue='compute',
project='projectID',
# Advanced scaling options
adaptive_scaling=True, # Enable adaptive scaling
minimum_workers=20, # Minimum number of workers
maximum_workers=200, # Maximum number of workers
scale_factor=2.0 # Scaling responsiveness
)
Performance Tuning
Chunk Size Optimisation
def calculate_optimal_chunks(data_shape, n_workers, memory_per_worker_gb):
"""Calculate optimal chunk sizes for marEx workflows."""
# Time chunks should be manageable for individual workers
time_chunks = min(365, data_shape[0] // n_workers)
# Spatial chunks based on available memory
# Assuming 8 bytes per float64 value
max_spatial_elements = (memory_per_worker_gb * 1e9 / 8) / time_chunks
spatial_chunks = int(np.sqrt(max_spatial_elements))
return {
'time': time_chunks,
'lat': min(spatial_chunks, data_shape[1]),
'lon': min(spatial_chunks, data_shape[2])
}
# Use with marEx preprocessing
chunks = calculate_optimal_chunks(
sst.shape,
n_workers=20,
memory_per_worker_gb=16
)
extremes_ds = marEx.preprocess_data(
sst,
threshold_percentile=95,
dask_chunks=chunks
)
Memory Management
# Configure Dask for memory-constrained environments
configure_dask(
# Memory limits
memory_limit='32GB',
memory_target_fraction=0.8, # Use 80% of available memory
memory_spill_fraction=0.9, # Spill at 90% usage
memory_pause_fraction=0.95, # Pause at 95% usage
# Temporary storage
temporary_directory='/scratch/dask-tmp',
# Garbage collection
garbage_collection_interval='10s',
# Network optimisation
tcp_timeout='300s',
heartbeat_interval='5s'
)
Dashboard and Monitoring
Cluster Monitoring
# Monitor cluster status and performance
cluster_info = marEx.helper.get_cluster_info(client)
print("=== Cluster Status ===")
print(f"Status: {cluster_info['status']}")
print(f"Active Workers: {cluster_info['n_workers']}")
print(f"Total Cores: {cluster_info['total_cores']}")
print(f"Total Memory: {cluster_info['total_memory']}")
print(f"Dashboard URL: {cluster_info['dashboard_url']}")
# Check if SSH tunnel is needed
if cluster_info.get('needs_tunnel'):
print(f"SSH tunnel command: {cluster_info['tunnel_command']}")
Performance Monitoring
# Monitor task performance during computation
def monitor_computation(client, computation_future):
"""Monitor and report computation progress."""
import time
start_time = time.time()
while not computation_future.done():
# Get current status
status = client.scheduler_info()
# Report progress
print(f"Tasks: {status['tasks']}")
print(f"Workers: {len(status['workers'])}")
print(f"Memory: {status['memory']} / {status['memory_limit']}")
time.sleep(30) # Update every 30 seconds
end_time = time.time()
print(f"Computation completed in {end_time - start_time:.1f} seconds")
# Use during marEx operations
future = client.compute(extremes_ds, sync=False)
monitor_computation(client, future)
result = future.result()
Data Management
Temporary Directory Setup
import tempfile
import os
from pathlib import Path
# Set up optimised temporary directory structure
def setup_temp_directories(base_path='/scratch'):
"""Set up temporary directories for optimal I/O performance."""
user = os.environ.get('USER', 'unknown')
# Create user-specific temp directory
temp_base = Path(base_path) / user / 'marEx_tmp'
temp_base.mkdir(parents=True, exist_ok=True)
# Set up subdirectories
dask_temp = temp_base / 'dask'
marEx_temp = temp_base / 'marEx'
dask_temp.mkdir(exist_ok=True)
marEx_temp.mkdir(exist_ok=True)
return {
'dask_temp': str(dask_temp),
'marEx_temp': str(marEx_temp),
'base_temp': str(temp_base)
}
# Configure for HPC environment
temp_dirs = setup_temp_directories('/scratch')
configure_dask(
temporary_directory=temp_dirs['dask_temp']
)
Zarr Storage Optimisation
# Optimise Zarr storage for intermediate results
def setup_zarr_storage(output_path, chunk_config):
"""Configure optimised Zarr storage for marEx results."""
import zarr
# Configure Zarr compressor for climate data
compressor = zarr.Blosc(
cname='lz4', # Fast compression
clevel=3, # Moderate compression level
shuffle=1 # Byte shuffle for better compression
)
# Set up encoding for xarray to_zarr
encoding = {}
for var in ['dat_anomaly', 'extreme_events', 'thresholds']:
encoding[var] = {
'compressor': compressor,
'chunks': tuple(chunk_config.values())
}
return encoding
# Use with marEx workflows
encoding = setup_zarr_storage('./results.zarr', chunks)
extremes_ds.to_zarr('./extremes.zarr', encoding=encoding)
Integration with Job Schedulers
SLURM Integration
# Advanced SLURM configuration
cluster, client = start_distributed_cluster(
# Basic configuration
n_workers=50,
cores_per_worker=8,
memory_per_worker='32GB',
walltime='06:00:00',
# SLURM-specific options
queue='compute',
project='climate_research',
job_name='marEx_analysis',
# Advanced SLURM directives
extra_directives=[
'#SBATCH --exclusive',
'#SBATCH --constraint=haswell',
'#SBATCH --mail-type=END,FAIL',
'#SBATCH --mail-user=user@institution.edu'
],
# Worker optimisation
worker_extra_args=[
'--local-directory=/scratch/dask-temp',
'--memory-limit=30GB',
'--nthreads=8',
'--death-timeout=300'
]
)
PBS/Torque Integration
# Example for PBS/Torque systems (implementation may vary)
from dask_jobqueue import PBSCluster
def start_pbs_cluster(n_workers=20, cores_per_worker=24, memory_per_worker='64GB'):
"""Start PBS cluster for marEx workflows."""
cluster = PBSCluster(
cores=cores_per_worker,
memory=memory_per_worker,
queue='normal',
walltime='04:00:00',
job_extra_directives=[
'-l nodes=1:ppn=24',
'-A climate_project'
],
local_directory='/scratch/dask-temp'
)
cluster.scale(n_workers)
client = Client(cluster)
return cluster, client
Performance Optimisation
# Performance optimisation checklist:
# 1. Use local SSD storage for temporary files
# 2. Optimise network settings for multi-node clusters
# 3. Use appropriate compression for I/O
# 4. Monitor task scheduling efficiency
# 5. Balance compute vs I/O intensive operations
configure_dask(
# Network optimisation
tcp_timeout='300s',
heartbeat_interval='5s',
# I/O optimisation
temporary_directory='/local_ssd/dask-tmp',
# Task optimisation
distributed_scheduler_allowed_failures=3,
distributed_worker_daemon=False
)
Troubleshooting
Common Issues and Solutions
Cluster Startup Issues:
# Issue: Workers not starting
# Solution: Check SLURM queue status and resource availability
# Debug cluster startup
cluster, client = start_distributed_cluster(
n_workers=10,
cores_per_worker=4,
memory_per_worker='16GB',
walltime='02:00:00',
queue='debug', # Use debug queue for troubleshooting
log_directory='./logs' # Enable logging
)
Memory Errors:
# Issue: Out of memory errors
# Solution: Reduce chunk sizes and worker memory usage
# Conservative memory configuration
configure_dask(
memory_limit='8GB',
memory_target_fraction=0.7,
memory_spill_fraction=0.8,
temporary_directory='/scratch/spill'
)
Network Timeouts:
# Issue: Network timeouts in large clusters
# Solution: Increase timeout values
configure_dask(
tcp_timeout='600s',
heartbeat_interval='10s',
comm_retry_delay_min='1s',
comm_retry_delay_max='20s'
)
Dashboard Access Issues:
# Issue: Cannot access dashboard from login node
# Solution: Set up SSH tunnel
cluster_info = get_cluster_info(client)
if cluster_info.get('needs_tunnel'):
tunnel_cmd = cluster_info['tunnel_command']
print(f"Run this command on your local machine:")
print(tunnel_cmd)
print(f"Then access dashboard at: http://localhost:8787")
See Also
marEx.detect- Data preprocessingmarEx.track- Event trackingmarEx.plotX- Visualisation