diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 80aade769..cf82ece23 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -2,6 +2,7 @@ import time from typing import Any, Dict, Literal, Optional, List, Tuple, Union import os +import resource import tqdm from mpi4py import MPI @@ -9,7 +10,10 @@ import httomo.globals from httomo.data.dataset_store import DataSetStoreWriter from httomo.method_wrappers.save_intermediate import SaveIntermediateFilesWrapper -from httomo.runner.dataset_store_backing import determine_store_backing +from httomo.runner.dataset_store_backing import ( + determine_store_backing, + estimate_section_memory, +) from httomo.runner.method_wrapper import MethodWrapper from httomo.runner.block_split import BlockSplitter from httomo.runner.dataset import DataSetBlock @@ -20,7 +24,12 @@ ReadableDataSetSink, ) from httomo.utils import save_2d_snapshot -from httomo.runner.gpu_utils import get_available_gpu_memory, gpumem_cleanup +from httomo.runner.gpu_utils import ( + get_available_gpu_memory, + gpumem_cleanup, + gpu_enabled, + xp, +) from httomo.runner.monitoring_interface import MonitoringInterface from httomo.runner.pipeline import Pipeline from httomo.runner.section import ( @@ -36,6 +45,7 @@ log_exception, log_once, log_rank, + get_available_system_memory_bytes, ) import numpy as np @@ -62,7 +72,25 @@ def __init__( self.source: Optional[DataSetSource] = None self.sink: Optional[Union[DataSetSink, ReadableDataSetSink]] = None - self._memory_limit_bytes = memory_limit_bytes + available_system_memory = np.zeros((1,), dtype=np.int64) + global_available_system_memory = np.empty_like(available_system_memory) + # Rank 0 queries memory + if self.comm.rank == 0: + available_system_memory[0] = get_available_system_memory_bytes() + self.comm.Allreduce( + [available_system_memory, MPI.INT64_T], + [global_available_system_memory, MPI.INT64_T], + MPI.MAX, + ) + if memory_limit_bytes == 0: + self._memory_limit_bytes = global_available_system_memory[0] + else: + if memory_limit_bytes > global_available_system_memory[0]: + log_once( + f"WARNING: Memory limit set is larger than available system memory ({memory_limit_bytes} > {global_available_system_memory[0]})", + ) + self._memory_limit_bytes = memory_limit_bytes + self._pipeline_inspector() self._sections = self._sectionize() @@ -332,6 +360,7 @@ def _prepare(self): ) self._check_params_for_sweep() self._load_datasets() + self._setup_pinned_memory_pool() def _load_datasets(self): start_time = self._log_task_start( @@ -349,6 +378,52 @@ def _load_datasets(self): self.pipeline.loader.package_name, ) + def _setup_pinned_memory_pool(self): + if not gpu_enabled: + return + + # Calculate peak on every rank + rank_peak_estimated_memory_bytes = np.zeros(shape=(1,), dtype=np.int64) + for i in range(len(self._sections)): + assert self.source is not None + section_memory = estimate_section_memory( + nprocs=self.comm.size, + rank=self.comm.rank, + allgather_func=self.comm.allgather, + dtype=self.source.dtype if i == 0 else np.float32, + global_shape=self.source.global_shape, + sections=self._sections, + section_idx=i, + consider_pinned_memory_pool=True, + ) + rank_peak_estimated_memory_bytes[0] = max( + rank_peak_estimated_memory_bytes[0], section_memory + ) + + # Select maximum peak of all ranks + global_peak_estimated_memory_bytes = np.zeros_like( + rank_peak_estimated_memory_bytes + ) + self.comm.Allreduce( + [rank_peak_estimated_memory_bytes * self.comm.size, MPI.INT64_T], + [global_peak_estimated_memory_bytes, MPI.INT64_T], + MPI.MAX, + ) + + # Apply decision on all ranks + if self._memory_limit_bytes >= global_peak_estimated_memory_bytes[0]: + log_once( + f"Estimated CPU memory peak under limit, enabling CuPy pinned memory pool ({self._memory_limit_bytes} >= {global_peak_estimated_memory_bytes[0]})", + level=logging.DEBUG, + ) + else: + xp.get_default_pinned_memory_pool().free_all_blocks() + xp.cuda.set_pinned_memory_allocator(None) + log_once( + f"Estimated CPU memory peak over limit, disabling CuPy pinned memory pool ({self._memory_limit_bytes} < {global_peak_estimated_memory_bytes[0]})", + level=logging.DEBUG, + ) + def _execute_method( self, method: MethodWrapper, block: DataSetBlock ) -> DataSetBlock: diff --git a/httomo/utils.py b/httomo/utils.py index cd1de10ef..15d8c498a 100644 --- a/httomo/utils.py +++ b/httomo/utils.py @@ -415,3 +415,11 @@ def clp2(x: int) -> int: x |= x >> 16 x |= x >> 32 return x + 1 + + +def get_available_system_memory_bytes(safety_margin_percent: float = 10): + # First two lines of 'free -t -b': + # total used free shared buff/cache available + # Mem: 33413799936 10897051648 11293478912 303587328 11639463936 22516748288 + available_system_memory = int(os.popen("free -t -b").readlines()[1].split()[-1]) + return int(available_system_memory * (1 - safety_margin_percent / 100))