From fb6954c9352cb705989304e4099c6a6a888e01a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C5=91rinc=20Serf=C5=91z=C5=91?= Date: Thu, 4 Jun 2026 13:10:29 +0200 Subject: [PATCH 1/2] Enabling pinned memory pool only if there is enough memory --- httomo/runner/task_runner.py | 90 +++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 80aade769..706b7b48a 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -2,6 +2,7 @@ import time from typing import Any, Dict, Literal, Optional, List, Tuple, Union import os +import resource import tqdm from mpi4py import MPI @@ -9,7 +10,10 @@ import httomo.globals from httomo.data.dataset_store import DataSetStoreWriter from httomo.method_wrappers.save_intermediate import SaveIntermediateFilesWrapper -from httomo.runner.dataset_store_backing import determine_store_backing +from httomo.runner.dataset_store_backing import ( + determine_store_backing, + estimate_section_memory, +) from httomo.runner.method_wrapper import MethodWrapper from httomo.runner.block_split import BlockSplitter from httomo.runner.dataset import DataSetBlock @@ -20,7 +24,12 @@ ReadableDataSetSink, ) from httomo.utils import save_2d_snapshot -from httomo.runner.gpu_utils import get_available_gpu_memory, gpumem_cleanup +from httomo.runner.gpu_utils import ( + get_available_gpu_memory, + gpumem_cleanup, + gpu_enabled, + xp, +) from httomo.runner.monitoring_interface import MonitoringInterface from httomo.runner.pipeline import Pipeline from httomo.runner.section import ( @@ -332,6 +341,7 @@ def _prepare(self): ) self._check_params_for_sweep() self._load_datasets() + self._setup_pinned_memory_pool() def _load_datasets(self): start_time = self._log_task_start( @@ -349,6 +359,82 @@ def _load_datasets(self): self.pipeline.loader.package_name, ) + def _setup_pinned_memory_pool(self): + if not gpu_enabled: + return + + # Calculate peak on every rank + rank_peak_estimated_memory_bytes = np.zeros(shape=(1,), dtype=np.int64) + for i in range(len(self._sections)): + assert self.source is not None + section_memory = estimate_section_memory( + nprocs=self.comm.size, + rank=self.comm.rank, + allgather_func=self.comm.allgather, + dtype=self.source.dtype, + global_shape=self.source.global_shape, + sections=self._sections, + section_idx=i, + consider_pinned_memory_pool=True, + ) + rank_peak_estimated_memory_bytes[0] = max( + rank_peak_estimated_memory_bytes[0], section_memory + ) + + # Select maximum peak of all ranks + global_peak_estimated_memory_bytes = np.zeros_like( + rank_peak_estimated_memory_bytes + ) + self.comm.Allreduce( + [rank_peak_estimated_memory_bytes * self.comm.size, MPI.INT64_T], + [global_peak_estimated_memory_bytes, MPI.INT64_T], + MPI.MAX, + ) + + global_memory_pool_allowed = np.asarray([True], dtype=np.bool) + limit_bytes = self._memory_limit_bytes + + # If the limit is passed, use the limit on all ranks + if limit_bytes > 0: + global_memory_pool_allowed[0] = ( + limit_bytes >= global_peak_estimated_memory_bytes[0] + ) + else: + # Else rank 0 queries the available memory and makes the decision + rank_memory_pool_allowed = np.asarray([True], dtype=np.bool) + if self.comm.rank == 0: + # First two lines of 'free -t -b': + # total used free shared buff/cache available + # Mem: 33413799936 10897051648 11293478912 303587328 11639463936 22516748288 + limit_bytes = int(os.popen("free -t -b").readlines()[1].split()[-1]) + + # 10% margin + limit_bytes = int(limit_bytes * 0.9) + rank_memory_pool_allowed[0] = ( + limit_bytes >= global_peak_estimated_memory_bytes[0] + ) + + # The decision is communicated back to the other ranks + self.comm.Allreduce( + [rank_memory_pool_allowed, MPI.BOOL], + [global_memory_pool_allowed, MPI.BOOL], + MPI.LAND, + ) + + # Apply decision on all ranks + if global_memory_pool_allowed[0]: + log_once( + f"Estimated CPU memory peak under limit, enabling CuPy pinned memory pool ({limit_bytes} >= {global_peak_estimated_memory_bytes[0]})", + level=logging.DEBUG, + ) + else: + xp.get_default_pinned_memory_pool().free_all_blocks() + xp.cuda.set_pinned_memory_allocator(None) + log_once( + f"Estimated CPU memory peak over limit, disabling CuPy pinned memory pool ({limit_bytes} < {global_peak_estimated_memory_bytes[0]})", + level=logging.DEBUG, + ) + def _execute_method( self, method: MethodWrapper, block: DataSetBlock ) -> DataSetBlock: From e51b5384bae725d681296820f7d5e42edcca03b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C5=91rinc=20Serf=C5=91z=C5=91?= Date: Thu, 4 Jun 2026 14:15:36 +0200 Subject: [PATCH 2/2] Initialize `_memory_limit_bytes` with system query --- httomo/runner/task_runner.py | 59 +++++++++++++++--------------------- httomo/utils.py | 8 +++++ 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 706b7b48a..cf82ece23 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -45,6 +45,7 @@ log_exception, log_once, log_rank, + get_available_system_memory_bytes, ) import numpy as np @@ -71,7 +72,25 @@ def __init__( self.source: Optional[DataSetSource] = None self.sink: Optional[Union[DataSetSink, ReadableDataSetSink]] = None - self._memory_limit_bytes = memory_limit_bytes + available_system_memory = np.zeros((1,), dtype=np.int64) + global_available_system_memory = np.empty_like(available_system_memory) + # Rank 0 queries memory + if self.comm.rank == 0: + available_system_memory[0] = get_available_system_memory_bytes() + self.comm.Allreduce( + [available_system_memory, MPI.INT64_T], + [global_available_system_memory, MPI.INT64_T], + MPI.MAX, + ) + if memory_limit_bytes == 0: + self._memory_limit_bytes = global_available_system_memory[0] + else: + if memory_limit_bytes > global_available_system_memory[0]: + log_once( + f"WARNING: Memory limit set is larger than available system memory ({memory_limit_bytes} > {global_available_system_memory[0]})", + ) + self._memory_limit_bytes = memory_limit_bytes + self._pipeline_inspector() self._sections = self._sectionize() @@ -371,7 +390,7 @@ def _setup_pinned_memory_pool(self): nprocs=self.comm.size, rank=self.comm.rank, allgather_func=self.comm.allgather, - dtype=self.source.dtype, + dtype=self.source.dtype if i == 0 else np.float32, global_shape=self.source.global_shape, sections=self._sections, section_idx=i, @@ -391,47 +410,17 @@ def _setup_pinned_memory_pool(self): MPI.MAX, ) - global_memory_pool_allowed = np.asarray([True], dtype=np.bool) - limit_bytes = self._memory_limit_bytes - - # If the limit is passed, use the limit on all ranks - if limit_bytes > 0: - global_memory_pool_allowed[0] = ( - limit_bytes >= global_peak_estimated_memory_bytes[0] - ) - else: - # Else rank 0 queries the available memory and makes the decision - rank_memory_pool_allowed = np.asarray([True], dtype=np.bool) - if self.comm.rank == 0: - # First two lines of 'free -t -b': - # total used free shared buff/cache available - # Mem: 33413799936 10897051648 11293478912 303587328 11639463936 22516748288 - limit_bytes = int(os.popen("free -t -b").readlines()[1].split()[-1]) - - # 10% margin - limit_bytes = int(limit_bytes * 0.9) - rank_memory_pool_allowed[0] = ( - limit_bytes >= global_peak_estimated_memory_bytes[0] - ) - - # The decision is communicated back to the other ranks - self.comm.Allreduce( - [rank_memory_pool_allowed, MPI.BOOL], - [global_memory_pool_allowed, MPI.BOOL], - MPI.LAND, - ) - # Apply decision on all ranks - if global_memory_pool_allowed[0]: + if self._memory_limit_bytes >= global_peak_estimated_memory_bytes[0]: log_once( - f"Estimated CPU memory peak under limit, enabling CuPy pinned memory pool ({limit_bytes} >= {global_peak_estimated_memory_bytes[0]})", + f"Estimated CPU memory peak under limit, enabling CuPy pinned memory pool ({self._memory_limit_bytes} >= {global_peak_estimated_memory_bytes[0]})", level=logging.DEBUG, ) else: xp.get_default_pinned_memory_pool().free_all_blocks() xp.cuda.set_pinned_memory_allocator(None) log_once( - f"Estimated CPU memory peak over limit, disabling CuPy pinned memory pool ({limit_bytes} < {global_peak_estimated_memory_bytes[0]})", + f"Estimated CPU memory peak over limit, disabling CuPy pinned memory pool ({self._memory_limit_bytes} < {global_peak_estimated_memory_bytes[0]})", level=logging.DEBUG, ) diff --git a/httomo/utils.py b/httomo/utils.py index cd1de10ef..15d8c498a 100644 --- a/httomo/utils.py +++ b/httomo/utils.py @@ -415,3 +415,11 @@ def clp2(x: int) -> int: x |= x >> 16 x |= x >> 32 return x + 1 + + +def get_available_system_memory_bytes(safety_margin_percent: float = 10): + # First two lines of 'free -t -b': + # total used free shared buff/cache available + # Mem: 33413799936 10897051648 11293478912 303587328 11639463936 22516748288 + available_system_memory = int(os.popen("free -t -b").readlines()[1].split()[-1]) + return int(available_system_memory * (1 - safety_margin_percent / 100))