Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions httomo/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
import time
from typing import Any, Dict, Literal, Optional, List, Tuple, Union
import os
import resource

import tqdm
from mpi4py import MPI

import httomo.globals
from httomo.data.dataset_store import DataSetStoreWriter
from httomo.method_wrappers.save_intermediate import SaveIntermediateFilesWrapper
from httomo.runner.dataset_store_backing import determine_store_backing
from httomo.runner.dataset_store_backing import (
determine_store_backing,
estimate_section_memory,
)
from httomo.runner.method_wrapper import MethodWrapper
from httomo.runner.block_split import BlockSplitter
from httomo.runner.dataset import DataSetBlock
Expand All @@ -20,7 +24,12 @@
ReadableDataSetSink,
)
from httomo.utils import save_2d_snapshot
from httomo.runner.gpu_utils import get_available_gpu_memory, gpumem_cleanup
from httomo.runner.gpu_utils import (
get_available_gpu_memory,
gpumem_cleanup,
gpu_enabled,
xp,
)
from httomo.runner.monitoring_interface import MonitoringInterface
from httomo.runner.pipeline import Pipeline
from httomo.runner.section import (
Expand All @@ -36,6 +45,7 @@
log_exception,
log_once,
log_rank,
get_available_system_memory_bytes,
)
import numpy as np

Expand All @@ -62,7 +72,25 @@ def __init__(
self.source: Optional[DataSetSource] = None
self.sink: Optional[Union[DataSetSink, ReadableDataSetSink]] = None

self._memory_limit_bytes = memory_limit_bytes
available_system_memory = np.zeros((1,), dtype=np.int64)
global_available_system_memory = np.empty_like(available_system_memory)
# Rank 0 queries memory
if self.comm.rank == 0:
available_system_memory[0] = get_available_system_memory_bytes()
self.comm.Allreduce(
[available_system_memory, MPI.INT64_T],
[global_available_system_memory, MPI.INT64_T],
MPI.MAX,
)
if memory_limit_bytes == 0:
self._memory_limit_bytes = global_available_system_memory[0]
else:
if memory_limit_bytes > global_available_system_memory[0]:
log_once(
f"WARNING: Memory limit set is larger than available system memory ({memory_limit_bytes} > {global_available_system_memory[0]})",
)
self._memory_limit_bytes = memory_limit_bytes

self._pipeline_inspector()
self._sections = self._sectionize()

Expand Down Expand Up @@ -332,6 +360,7 @@ def _prepare(self):
)
self._check_params_for_sweep()
self._load_datasets()
self._setup_pinned_memory_pool()

def _load_datasets(self):
start_time = self._log_task_start(
Expand All @@ -349,6 +378,52 @@ def _load_datasets(self):
self.pipeline.loader.package_name,
)

def _setup_pinned_memory_pool(self):
if not gpu_enabled:
return

# Calculate peak on every rank
rank_peak_estimated_memory_bytes = np.zeros(shape=(1,), dtype=np.int64)
for i in range(len(self._sections)):
assert self.source is not None
section_memory = estimate_section_memory(
nprocs=self.comm.size,
rank=self.comm.rank,
allgather_func=self.comm.allgather,
dtype=self.source.dtype if i == 0 else np.float32,
global_shape=self.source.global_shape,
sections=self._sections,
section_idx=i,
consider_pinned_memory_pool=True,
)
rank_peak_estimated_memory_bytes[0] = max(
rank_peak_estimated_memory_bytes[0], section_memory
)

# Select maximum peak of all ranks
global_peak_estimated_memory_bytes = np.zeros_like(
rank_peak_estimated_memory_bytes
)
self.comm.Allreduce(
[rank_peak_estimated_memory_bytes * self.comm.size, MPI.INT64_T],
[global_peak_estimated_memory_bytes, MPI.INT64_T],
MPI.MAX,
)

# Apply decision on all ranks
if self._memory_limit_bytes >= global_peak_estimated_memory_bytes[0]:
log_once(
f"Estimated CPU memory peak under limit, enabling CuPy pinned memory pool ({self._memory_limit_bytes} >= {global_peak_estimated_memory_bytes[0]})",
level=logging.DEBUG,
)
else:
xp.get_default_pinned_memory_pool().free_all_blocks()
xp.cuda.set_pinned_memory_allocator(None)
log_once(
f"Estimated CPU memory peak over limit, disabling CuPy pinned memory pool ({self._memory_limit_bytes} < {global_peak_estimated_memory_bytes[0]})",
level=logging.DEBUG,
)

def _execute_method(
self, method: MethodWrapper, block: DataSetBlock
) -> DataSetBlock:
Expand Down
8 changes: 8 additions & 0 deletions httomo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,11 @@ def clp2(x: int) -> int:
x |= x >> 16
x |= x >> 32
return x + 1


def get_available_system_memory_bytes(safety_margin_percent: float = 10):
# First two lines of 'free -t -b':
# total used free shared buff/cache available
# Mem: 33413799936 10897051648 11293478912 303587328 11639463936 22516748288
available_system_memory = int(os.popen("free -t -b").readlines()[1].split()[-1])
return int(available_system_memory * (1 - safety_margin_percent / 100))
Loading