diff --git a/CMakeLists.txt b/CMakeLists.txt index 967f8d57c6d..d590134d672 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,7 @@ set(NVFUSER_CUTLASS "${NVFUSER_ROOT}/cutlass") set(NVFUSER_THIRD_PARTY_DIR "${NVFUSER_ROOT}/third_party") option(NVFUSER_STANDALONE_BUILD_WITH_UCC "" OFF) +option(NVFUSER_BUILD_WITH_NIXL "" OFF) option(NVFUSER_EXPLICIT_ERROR_CHECK "" OFF) option(NVFUSER_ENABLE_DEPENDENCY_REPORT "Enable Python-based dependency reporting and log capture" ON) @@ -76,6 +77,7 @@ include(cmake/deps/handle_torch.cmake) include(cmake/deps/handle_pybind11.cmake) include(cmake/deps/handle_llvm.cmake) include(cmake/deps/handle_nvmmh.cmake) +include(cmake/deps/handle_nixl.cmake) include(cmake/deps/handle_git_submodules.cmake) # Initialize success flag @@ -95,6 +97,7 @@ handle_torch() # Must come AFTER python and cudatoolkit. handle_pybind11() handle_llvm() handle_nvmmh() # Must come AFTER python to query correct site-packages +handle_nixl() # Must come AFTER python and cudatoolkit for CUDA version check if(NVFUSER_ENABLE_DEPENDENCY_REPORT) stop_capture(DEP_LOGS) @@ -248,6 +251,7 @@ list(APPEND NVFUSER_SRCS ${NVFUSER_SRCS_DIR}/multidevice/ipc_utils.cpp ${NVFUSER_SRCS_DIR}/multidevice/device_mesh.cpp ${NVFUSER_SRCS_DIR}/multidevice/executor.cpp + ${NVFUSER_SRCS_DIR}/multidevice/nixl.cpp ${NVFUSER_SRCS_DIR}/multidevice/execution_utils.cpp ${NVFUSER_SRCS_DIR}/multidevice/propagation.cpp ${NVFUSER_SRCS_DIR}/multidevice/resharding.cpp @@ -1001,6 +1005,7 @@ if(BUILD_TEST) ${NVFUSER_ROOT}/tests/cpp/test_multidevice_lower_communication.cpp ${NVFUSER_ROOT}/tests/cpp/test_multidevice_lower_communication_cuda.cpp ${NVFUSER_ROOT}/tests/cpp/test_multidevice_matmul.cpp + ${NVFUSER_ROOT}/tests/cpp/test_multidevice_nixl.cpp ${NVFUSER_ROOT}/tests/cpp/test_multidevice_pipeline.cpp ${NVFUSER_ROOT}/tests/cpp/test_multidevice_sharding.cpp ${NVFUSER_ROOT}/tests/cpp/test_multidevice_stream_parallel_type.cpp @@ -1299,6 +1304,12 @@ if(NVFUSER_STANDALONE_BUILD_WITH_UCC) message(STATUS " UCX_DIR : $ENV{UCX_DIR}") endif() message(STATUS " NVFUSER_STANDALONE_BUILD_WITH_UCC : ${NVFUSER_STANDALONE_BUILD_WITH_UCC}") +message(STATUS " NVFUSER_BUILD_WITH_NIXL : ${NVFUSER_BUILD_WITH_NIXL}") +message(STATUS " NIXL_FOUND : ${NIXL_FOUND}") +if(NIXL_FOUND) + message(STATUS " NIXL_INCLUDE_DIR: ${NIXL_INCLUDE_DIR}") + message(STATUS " NIXL_LIBRARY : ${NIXL_LIBRARY}") +endif() message(STATUS " NVFUSER_BUILD_WITH_ASAN : ${NVFUSER_BUILD_WITH_ASAN}") message(STATUS " NVFUSER_DISTRIBUTED : ${NVFUSER_DISTRIBUTED}") message(STATUS " NVFUSER_CPP_STANDARD : ${NVFUSER_CPP_STANDARD}") diff --git a/cmake/DependencyRequirements.cmake b/cmake/DependencyRequirements.cmake index 6b941dc4c62..b595d448079 100644 --- a/cmake/DependencyRequirements.cmake +++ b/cmake/DependencyRequirements.cmake @@ -41,5 +41,8 @@ set(NVFUSER_REQUIREMENT_LLVM_VERSION_MIN "18.1") # NVMMH set(NVFUSER_REQUIREMENT_NVMMH_OPTIONAL "TRUE") +# NIXL +set(NVFUSER_REQUIREMENT_NIXL_OPTIONAL "TRUE") + # Git Submodules (required for build) # No version requirement - just checks if submodules are initialized diff --git a/cmake/deps/handle_nixl.cmake b/cmake/deps/handle_nixl.cmake new file mode 100644 index 00000000000..28ed7789ab5 --- /dev/null +++ b/cmake/deps/handle_nixl.cmake @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: BSD-3-Clause + +# ------------------------------------------------------------------------------ +# NIXL Handler +# ------------------------------------------------------------------------------ + +macro(handle_nixl) + message("") + message("Finding NIXL...") + + if(NOT NVFUSER_BUILD_WITH_NIXL) + set(NIXL_FOUND FALSE) + message(STATUS "NIXL disabled (NVFUSER_BUILD_WITH_NIXL=OFF)") + else() + # User may need to set NIXL_PREFIX to the NIXL install directory. + find_path(NIXL_INCLUDE_DIR nixl.h + HINTS $ENV{NIXL_PREFIX}/include ENV CPATH + ) + find_library(NIXL_LIBRARY nixl + HINTS $ENV{NIXL_PREFIX}/lib $ENV{NIXL_PREFIX}/lib64 + $ENV{NIXL_PREFIX}/lib/x86_64-linux-gnu + $ENV{NIXL_PREFIX}/lib/aarch64-linux-gnu + ) + find_library(NIXL_BUILD_LIBRARY nixl_build + HINTS $ENV{NIXL_PREFIX}/lib $ENV{NIXL_PREFIX}/lib64 + $ENV{NIXL_PREFIX}/lib/x86_64-linux-gnu + $ENV{NIXL_PREFIX}/lib/aarch64-linux-gnu + ) + + if(NIXL_INCLUDE_DIR AND NIXL_LIBRARY) + set(NIXL_FOUND TRUE) + message(STATUS "Found NIXL: ${NIXL_LIBRARY} (include: ${NIXL_INCLUDE_DIR})") + if(NIXL_BUILD_LIBRARY) + message(STATUS "Found NIXL build lib: ${NIXL_BUILD_LIBRARY}") + endif() + + add_library(__nvfuser_nixl INTERFACE) + target_include_directories(__nvfuser_nixl INTERFACE ${NIXL_INCLUDE_DIR}) + + get_filename_component(NIXL_LIB_DIR "${NIXL_LIBRARY}" DIRECTORY) + target_link_directories(__nvfuser_nixl INTERFACE ${NIXL_LIB_DIR}) + target_link_options(__nvfuser_nixl INTERFACE "LINKER:-rpath-link,${NIXL_LIB_DIR}") + + target_link_libraries(__nvfuser_nixl INTERFACE ${NIXL_LIBRARY}) + if(NIXL_BUILD_LIBRARY) + target_link_libraries(__nvfuser_nixl INTERFACE ${NIXL_BUILD_LIBRARY}) + endif() + else() + set(NIXL_FOUND FALSE) + message(WARNING "NIXL not found – building without NIXL support. Set NIXL_PREFIX to the NIXL install directory.") + endif() + + # CUDA major version constraint check + if(NIXL_FOUND AND Python_FOUND AND CUDAToolkit_FOUND) + execute_process( + COMMAND "${Python_EXECUTABLE}" -c "import nixl; print(nixl._pkg.__name__.split('_cu')[-1])" + OUTPUT_VARIABLE nixl_cuda_major + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + RESULT_VARIABLE nixl_cuda_result + ) + + if(nixl_cuda_result EQUAL 0 AND NOT nixl_cuda_major STREQUAL "") + set(NIXL_CUDA_VERSION "${nixl_cuda_major}") + set(cuda_toolkit_major "${CUDAToolkit_VERSION_MAJOR}") + + if(NOT nixl_cuda_major STREQUAL cuda_toolkit_major) + set(NIXL_CUDA_constraint_status "mismatch") + set(NIXL_CUDA_constraint_found "${nixl_cuda_major}") + set(NIXL_CUDA_constraint_required "${cuda_toolkit_major}") + message(WARNING "NIXL CUDA major version mismatch: NIXL built for CUDA ${nixl_cuda_major}, but CUDAToolkit major is ${cuda_toolkit_major}") + else() + set(NIXL_CUDA_constraint_status "match") + set(NIXL_CUDA_constraint_version "${nixl_cuda_major}") + endif() + else() + set(NIXL_CUDA_constraint_status "not_available") + endif() + else() + set(NIXL_CUDA_constraint_status "not_available") + endif() + endif() + + set_dependency_report_status(NIXL) +endmacro() diff --git a/csrc/multidevice/communicator.cpp b/csrc/multidevice/communicator.cpp index dbd65ba4610..168da4af93a 100644 --- a/csrc/multidevice/communicator.cpp +++ b/csrc/multidevice/communicator.cpp @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -41,6 +42,9 @@ std::ostream& operator<<(std::ostream& out, const CommunicatorBackend& cb) { case CommunicatorBackend::kCuda: out << "CUDA"; break; + case CommunicatorBackend::kNixl: + out << "NIXL"; + break; } return out; } @@ -121,7 +125,8 @@ bool parseEnv( } // retrieves master port - if ((env = std::getenv("NVFUSER_MASTER_PORT")) != nullptr) { + env = std::getenv("NVFUSER_MASTER_PORT"); + if (env != nullptr) { master_port = std::atoi(env); } else { LOG(INFO) << "The environment variable NVFUSER_MASTER_PORT has not been " @@ -183,7 +188,8 @@ Communicator::Communicator( master_port_( c10d::TCPStoreOptions::kDefaultPort + 42), // to avoid collision ucc_available_(false), - nccl_available_(false) { + nccl_available_(false), + nixl_available_(false) { if (isOptionDisabled(DisableOption::Multidevice)) { TORCH_WARN( "Multi-device support is disabled. All communication operations will " @@ -236,6 +242,10 @@ Communicator::Communicator( #ifdef USE_C10D_NCCL nccl_available_ = true; #endif + +#ifdef USE_NIXL + nixl_available_ = true; +#endif } namespace { @@ -248,10 +258,10 @@ void waitForDebuggerAtRanks( std::cerr << "Process " << pid << " is waiting for the debugger. To continue debugging, " << "start gdb, `attach " << pid - << "`, `set var waiting=false`, and `fini`." << std::endl; + << "`, `set var waiting=false`, and `fini`." << '\n'; while (waiting) { // Please change `waiting` in the debugger. } - std::cerr << "Process " << getpid() << " finished waiting." << std::endl; + std::cerr << "Process " << getpid() << " finished waiting." << '\n'; } if (communicator->is_available()) { @@ -354,7 +364,7 @@ void Communicator::cleanup() { // in different orders between ranks have been causing a hang. std::vector>> keyed_backends(backends_.begin(), backends_.end()); - std::sort(keyed_backends.begin(), keyed_backends.end()); + std::ranges::sort(keyed_backends); for (auto& [key, backend] : keyed_backends) { // Call shutdown before destructing a ProcessGroupNCCL as instructed by // https://github.com/pytorch/pytorch/blob/e62073d7997c9e63896cb5289ffd0874a8cc1838/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp#L1164-L1170. @@ -388,7 +398,7 @@ c10d::Backend* Communicator::getBackendForTeam( #ifdef NVFUSER_DISTRIBUTED backends_[team_key] = [&]() -> c10::intrusive_ptr { // check that the caller's rank belongs to the requested team - auto rank_it = std::find(team.begin(), team.end(), deviceId()); + auto rank_it = std::ranges::find(team, deviceId()); if (rank_it == team.end()) { return nullptr; } diff --git a/csrc/multidevice/communicator.h b/csrc/multidevice/communicator.h index b56e6fee3aa..7d1e0b89305 100644 --- a/csrc/multidevice/communicator.h +++ b/csrc/multidevice/communicator.h @@ -116,6 +116,8 @@ class NVF_API Communicator { return ucc_available_; } else if (backend == CommunicatorBackend::kNccl) { return nccl_available_; + } else if (backend == CommunicatorBackend::kNixl) { + return nixl_available_; } return false; } @@ -149,6 +151,7 @@ class NVF_API Communicator { int master_port_; bool ucc_available_; bool nccl_available_; + bool nixl_available_; // stores the world's store used for the backend init c10::intrusive_ptr store_; // cache for the created backends. The keys are strings generated from Teams diff --git a/csrc/multidevice/multidevice.h b/csrc/multidevice/multidevice.h index 288a89fe952..738dc46b1f2 100644 --- a/csrc/multidevice/multidevice.h +++ b/csrc/multidevice/multidevice.h @@ -8,6 +8,7 @@ #pragma once +#include #include #include @@ -19,5 +20,5 @@ using DeviceType = c10::Device; using Team = std::vector; // Supported backends. -enum class CommunicatorBackend { kNccl, kUcc, kCuda }; +enum class CommunicatorBackend : std::uint8_t { kNccl, kUcc, kCuda, kNixl }; } // namespace nvfuser diff --git a/csrc/multidevice/nixl.cpp b/csrc/multidevice/nixl.cpp new file mode 100644 index 00000000000..7fd69218af5 --- /dev/null +++ b/csrc/multidevice/nixl.cpp @@ -0,0 +1,479 @@ +// clang-format off +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#include "multidevice/nixl.h" +#include "exceptions.h" + +#include +#include +#include +#include +#include +#include + +#ifdef USE_NIXL +#include +#endif +namespace nvfuser { + +// =================================================================== +// NixlTransferHandle +// =================================================================== + +class NixlTransferHandleImpl { + public: +#ifdef USE_NIXL + explicit NixlTransferHandleImpl(nixlAgent* agent) : agent(agent) {} + nixlAgent* agent; + nixlXferReqH* xfer_handle = nullptr; + + ~NixlTransferHandleImpl() { + if (xfer_handle) { + agent->releaseXferReq(xfer_handle); + } + } +#endif + bool posted = false; +}; + +NixlTransferHandle::NixlTransferHandle() = default; +NixlTransferHandle::~NixlTransferHandle() = default; +NixlTransferHandle::NixlTransferHandle(NixlTransferHandle&&) noexcept = default; +NixlTransferHandle& NixlTransferHandle::operator=( + NixlTransferHandle&&) noexcept = default; + +// =================================================================== +// Tensor validation and descriptor helpers +// =================================================================== + +namespace { + +void validateCudaTensors(const std::vector& tensors) { + NVF_ERROR(!tensors.empty(), "Tensor list must not be empty"); + for (const auto& t : tensors) { + NVF_ERROR(t.is_cuda(), "All tensors must be CUDA tensors"); + NVF_ERROR(t.is_contiguous(), "All tensors must be contiguous"); + } +} + +#ifdef USE_NIXL + +nixl_reg_dlist_t buildRegDlist(const std::vector& tensors) { + nixl_reg_dlist_t dlist(VRAM_SEG); + for (const auto& t : tensors) { + dlist.addDesc( + {reinterpret_cast(t.data_ptr()), + static_cast(t.numel()) * t.element_size(), + static_cast(t.device().index())}); + } + return dlist; +} + +nixl_xfer_dlist_t buildXferDlist(const std::vector& descs) { + nixl_xfer_dlist_t dlist(VRAM_SEG); + for (const auto& desc : descs) { + dlist.addDesc({desc.addr, desc.size, desc.dev}); + } + return dlist; +} + +nixl_xfer_op_t toNixlXferOp(NixlXferOp op) { + switch (op) { + case NixlXferOp::kRead: + return NIXL_READ; + case NixlXferOp::kWrite: + return NIXL_WRITE; + } + NVF_THROW("Invalid NIXL transfer operation: ", static_cast(op)); +} + +#endif + +} // namespace + +// =================================================================== +// NixlBackend::Impl +// =================================================================== + +#ifdef USE_NIXL + +class NixlBackend::Impl { + public: + static std::unique_ptr create(Communicator& communicator); + ~Impl(); + + void registerTensors(const std::vector& tensors); + void deregisterTensors(const std::vector& tensors); + + NixlTransferHandle prepareTransfer( + const std::vector& local_descs, + const std::vector& remote_descs, + NixlXferOp op); + + void postTransfer(NixlTransferHandle& handle); + NixlXferStatus getTransferStatus(const NixlTransferHandle& handle) const; + void waitTransfer(NixlTransferHandle& handle); + + private: + void exchangeMetadata(); + explicit Impl(Communicator& communicator); + inline std::string getAgentName(int64_t rank); + + std::unique_ptr agent_; + nixlBackendH* backend_ = nullptr; + Communicator& communicator_; + bool metadata_exchanged_ = false; +}; + +// ------------------------------------------------------------------- +// Construction / destruction +// ------------------------------------------------------------------- + +NixlBackend::Impl::Impl(Communicator& communicator) + : communicator_(communicator) {} + +std::unique_ptr NixlBackend::Impl::create( + Communicator& communicator) { + std::unique_ptr impl(new Impl(communicator)); + + std::string agent_name = impl->getAgentName(communicator.deviceId()); + nixlAgentConfig cfg(false); + impl->agent_ = std::make_unique(agent_name, cfg); + + nixl_b_params_t params; + nixl_status_t status = + impl->agent_->createBackend("UCX", params, impl->backend_); + if (status != NIXL_SUCCESS) { + impl->agent_.reset(); + NVF_THROW("Failed to create UCX backend for NIXL agent"); + } + + // Probe: verify that VRAM (CUDA GPU memory) is actually usable with + // the UCX backend. Some UCX installations lack CUDA support, causing + // registerMem to silently misclassify VRAM as host memory. We detect + // this by registering a small buffer and asking NIXL to prepare a + // local descriptor list for VRAM -- if no backend claims VRAM, the + // probe fails and we mark the backend as unavailable. + { + constexpr int64_t kProbeBytes = 1; + auto probe = at::empty( + {kProbeBytes}, + at::TensorOptions().dtype(at::kByte).device( + at::kCUDA, communicator.deviceId())); + size_t nbytes = static_cast(probe.nbytes()); + uintptr_t addr = reinterpret_cast(probe.data_ptr()); + uint32_t dev_idx = static_cast(probe.device().index()); + + NVF_ERROR(nbytes > 0, "NIXL probe: unexpected zero-byte tensor"); + NVF_ERROR(addr != 0, "NIXL probe: null data pointer"); + + nixl_reg_dlist_t reg_dlist(VRAM_SEG); + reg_dlist.addDesc({addr, nbytes, static_cast(dev_idx)}); + + nixl_status_t reg_status = impl->agent_->registerMem(reg_dlist); + if (reg_status != NIXL_SUCCESS) { + return nullptr; + } + + nixl_xfer_dlist_t xfer_dlist(VRAM_SEG); + xfer_dlist.addDesc({addr, nbytes, static_cast(dev_idx)}); + + nixlDlistH* dlist_handle = nullptr; + nixl_status_t prep_status = + impl->agent_->prepXferDlist(NIXL_INIT_AGENT, xfer_dlist, dlist_handle); + + if (dlist_handle) { + impl->agent_->releasedDlistH(dlist_handle); + } + impl->agent_->deregisterMem(reg_dlist); + + if (prep_status != NIXL_SUCCESS) { + return nullptr; + } + } + + return impl; +} + +NixlBackend::Impl::~Impl() = default; + +std::string NixlBackend::Impl::getAgentName(int64_t rank) { + return "rank_" + std::to_string(rank); +} + +// ------------------------------------------------------------------- +// Memory registration +// ------------------------------------------------------------------- + +// TODO - consider adding RAII wrapper +void NixlBackend::Impl::registerTensors( + const std::vector& tensors) { + validateCudaTensors(tensors); + + nixl_reg_dlist_t dlist = buildRegDlist(tensors); + nixl_status_t status = agent_->registerMem(dlist); + NVF_ERROR( + status == NIXL_SUCCESS, + "NIXL registerMem failed with status ", + static_cast(status)); + + metadata_exchanged_ = false; + exchangeMetadata(); +} + +void NixlBackend::Impl::deregisterTensors( + const std::vector& tensors) { + validateCudaTensors(tensors); + + nixl_reg_dlist_t dlist = buildRegDlist(tensors); + nixl_status_t status = agent_->deregisterMem(dlist); + NVF_ERROR( + status == NIXL_SUCCESS, + "NIXL deregisterMem failed with status ", + static_cast(status)); + + metadata_exchanged_ = false; + exchangeMetadata(); +} + +// ------------------------------------------------------------------- +// Metadata exchange +// ------------------------------------------------------------------- + +void NixlBackend::Impl::exchangeMetadata() { + nixl_blob_t local_md; + nixl_status_t md_status = agent_->getLocalMD(local_md); + NVF_ERROR( + md_status == NIXL_SUCCESS, + "NIXL getLocalMD failed with status ", + static_cast(md_status)); + + auto* store = communicator_.getTcpStore(); + const auto my_rank = communicator_.deviceId(); + const auto world_size = communicator_.size(); + + std::string md_key_prefix = "nixl_agent_md_rank_"; + store->set( + md_key_prefix + std::to_string(my_rank), + std::vector(local_md.begin(), local_md.end())); + + for (int64_t rank = 0; rank < world_size; ++rank) { + if (rank == my_rank) { + continue; + } + auto bytes = store->get(md_key_prefix + std::to_string(rank)); + nixl_blob_t remote_md(bytes.begin(), bytes.end()); + std::string remote_agent_name; + nixl_status_t status = agent_->loadRemoteMD(remote_md, remote_agent_name); + NVF_ERROR( + status == NIXL_SUCCESS, + "NIXL loadRemoteMD failed for rank ", + rank, + " with status ", + static_cast(status)); + } + + // Barrier before deleting keys so no rank reads a deleted key. + communicator_.barrier(); + + store->deleteKey(md_key_prefix + std::to_string(my_rank)); + metadata_exchanged_ = true; +} + +// ------------------------------------------------------------------- +// Transfer preparation +// ------------------------------------------------------------------- + +// Prepare a transfer between local and remote tensor pairs. +// +// NIXL pairs local_tensors[i] with remote_tensors[i]. The direction +// depends on `op`: +// kRead -- data flows from remote into local +// kWrite -- data flows from local into remote +// +NixlTransferHandle NixlBackend::Impl::prepareTransfer( + const std::vector& local_descs, + const std::vector& remote_descs, + NixlXferOp op) { + NVF_ERROR(metadata_exchanged_, "exchangeMetadata() must be called first"); + NVF_ERROR(!remote_descs.empty(), "remote_descs must not be empty"); + NVF_ERROR( + local_descs.size() == remote_descs.size(), + "Local and remote tensor lists must have the same size. Got ", + local_descs.size(), + " vs ", + remote_descs.size()); + NVF_ERROR( + std::all_of( + remote_descs.begin(), + remote_descs.end(), + [&](const TensorDesc& d) { return d.rank == remote_descs[0].rank; }), + "All remote descriptors must belong to the same remote peer"); + + std::string remote_agent_name = getAgentName(remote_descs.at(0).rank); + + nixl_xfer_dlist_t local_dlist = buildXferDlist(local_descs); + nixl_xfer_dlist_t remote_dlist = buildXferDlist(remote_descs); + + auto impl = std::make_unique(agent_.get()); + nixl_status_t status = agent_->createXferReq( + toNixlXferOp(op), + local_dlist, + remote_dlist, + remote_agent_name, + impl->xfer_handle); + NVF_ERROR( + status == NIXL_SUCCESS, + "NIXL createXferReq failed with status ", + static_cast(status)); + + NixlTransferHandle handle; + handle.impl_ = std::move(impl); + return handle; +} + +// ------------------------------------------------------------------- +// Transfer posting +// ------------------------------------------------------------------- + +void NixlBackend::Impl::postTransfer(NixlTransferHandle& handle) { + NVF_ERROR(handle.impl_, "Transfer handle is empty - was it moved from?"); + NVF_ERROR( + !handle.impl_->posted, + "Transfer already posted. Wait for completion before re-posting."); + + nixl_status_t status = agent_->postXferReq(handle.impl_->xfer_handle); + NVF_ERROR( + status == NIXL_SUCCESS || status == NIXL_IN_PROG, + "NIXL postXferReq failed with status ", + static_cast(status)); + + handle.impl_->posted = true; +} + +// ------------------------------------------------------------------- +// Transfer status / wait +// ------------------------------------------------------------------- + +NixlXferStatus NixlBackend::Impl::getTransferStatus( + const NixlTransferHandle& handle) const { + NVF_ERROR(handle.impl_, "Transfer handle is empty - was it moved from?"); + NVF_ERROR(handle.impl_->posted, "Transfer has not been posted yet"); + + nixl_status_t status = agent_->getXferStatus(handle.impl_->xfer_handle); + switch (status) { + case NIXL_SUCCESS: + return NixlXferStatus::kDone; + case NIXL_IN_PROG: + return NixlXferStatus::kInProgress; + default: + return NixlXferStatus::kError; + } +} + +void NixlBackend::Impl::waitTransfer(NixlTransferHandle& handle) { + NVF_ERROR(handle.impl_, "Transfer handle is empty - was it moved from?"); + NVF_ERROR(handle.impl_->posted, "Transfer has not been posted yet"); + + NixlXferStatus xfer_status; + do { + xfer_status = getTransferStatus(handle); + NVF_ERROR( + xfer_status != NixlXferStatus::kError, + "NIXL transfer completed with an error"); + if (xfer_status == NixlXferStatus::kInProgress) { + std::this_thread::yield(); + } + } while (xfer_status == NixlXferStatus::kInProgress); + + handle.impl_->posted = false; +} + +#else // !USE_NIXL + +class NixlBackend::Impl { + public: + void registerTensors(const std::vector&) {} + void deregisterTensors(const std::vector&) {} + NixlTransferHandle prepareTransfer( + const std::vector&, + const std::vector&, + NixlXferOp) { + return {}; + } + void postTransfer(NixlTransferHandle&) {} + NixlXferStatus getTransferStatus(const NixlTransferHandle&) const { + return NixlXferStatus::kError; + } + void waitTransfer(NixlTransferHandle&) {} + + private: + void exchangeMetadata() {} +}; + +#endif // USE_NIXL + +// =================================================================== +// NixlBackend singleton + public API +// =================================================================== + +NixlBackend::NixlBackend() { +#ifdef USE_NIXL + impl_ = Impl::create(Communicator::getInstance()); +#endif +} + +NixlBackend& NixlBackend::getInstance() { + static auto* instance = new NixlBackend(); + NVF_CHECK(!instance->cleaned_up_, "NIXL backend has been cleaned up"); + return *instance; +} + +void NixlBackend::cleanup() { + cleaned_up_ = true; + impl_.reset(); +} + +bool NixlBackend::isAvailable() const { + return impl_ != nullptr; +} + +void NixlBackend::registerTensors(const std::vector& tensors) { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + impl_->registerTensors(tensors); +} + +void NixlBackend::deregisterTensors(const std::vector& tensors) { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + impl_->deregisterTensors(tensors); +} + +NixlTransferHandle NixlBackend::prepareTransfer( + const std::vector& local_descs, + const std::vector& remote_descs, + NixlXferOp op) { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + return impl_->prepareTransfer(local_descs, remote_descs, op); +} + +void NixlBackend::postTransfer(NixlTransferHandle& handle) { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + impl_->postTransfer(handle); +} + +NixlXferStatus NixlBackend::getTransferStatus( + const NixlTransferHandle& handle) const { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + return impl_->getTransferStatus(handle); +} + +void NixlBackend::waitTransfer(NixlTransferHandle& handle) { + NVF_CHECK(isAvailable(), "NIXL backend is not available"); + impl_->waitTransfer(handle); +} + +} // namespace nvfuser diff --git a/csrc/multidevice/nixl.h b/csrc/multidevice/nixl.h new file mode 100644 index 00000000000..8226ef1b047 --- /dev/null +++ b/csrc/multidevice/nixl.h @@ -0,0 +1,221 @@ +// clang-format off +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#pragma once + +#include +#include +#include +#include +#include + +#include "exceptions.h" +#include "multidevice/communicator.h" +#include "visibility.h" + +namespace nvfuser { + +// Transfer direction. NIXL uses a one-sided model: +// Read = pull remote data into local buffers +// Write = push local data into remote buffers +enum class NixlXferOp : std::uint8_t { + kRead, + kWrite, +}; + +enum class NixlXferStatus : std::uint8_t { + kDone, + kInProgress, + kError, +}; + +// ------------------------------------------------------------------ +// Todo - those functions should be moved to a more global file +// Helper functions for serializing and deserializing tensors descriptors for +// TCP store +struct TensorDesc { + uintptr_t addr; + size_t size; + uint32_t dev; // CUDA device index (tensor.device().index()) + int64_t rank; // communicator rank owning this tensor +}; +static_assert( + std::is_trivially_copyable_v, + "TensorDesc must be trivially copyable for serialization"); + +inline TensorDesc toTensorDesc(const at::Tensor& tensor, int64_t rank) { + return { + .addr = reinterpret_cast(tensor.data_ptr()), + .size = static_cast(tensor.numel()) * tensor.element_size(), + .dev = static_cast(tensor.device().index()), + .rank = rank}; +} + +inline std::vector serializeTensorsDescs( + const std::vector& descs) { + size_t count = descs.size(); + std::vector buf(sizeof(count) + count * sizeof(TensorDesc)); + std::memcpy(buf.data(), &count, sizeof(count)); + if (count == 0) { + return buf; + } + + std::memcpy( + buf.data() + sizeof(count), + descs.data(), + descs.size() * sizeof(TensorDesc)); + return buf; +} + +inline std::vector deserializeTensorsDescs( + const std::vector& buf) { + NVF_ERROR(buf.size() >= sizeof(size_t), "Invalid serialized descriptor data"); + size_t count = 0; + std::memcpy(&count, buf.data(), sizeof(count)); + NVF_ERROR( + buf.size() == sizeof(count) + count * sizeof(TensorDesc), + "Corrupted serialized descriptor data"); + + std::vector descs(count); + if (count > 0) { + std::memcpy( + descs.data(), buf.data() + sizeof(count), count * sizeof(TensorDesc)); + } + return descs; +} + +inline void storeTensorDescs( + Communicator& communicator, + const std::string& key, + const std::vector& descs) { + NVF_CHECK(communicator.is_available(), "Communicator is not available"); + communicator.getTcpStore()->set(key, serializeTensorsDescs(descs)); +} + +inline void storeTensorDescs( + Communicator& communicator, + const std::string& key, + const std::vector& tensors) { + std::vector descs; + descs.reserve(tensors.size()); + for (const auto& tensor : tensors) { + descs.push_back(toTensorDesc(tensor, communicator.deviceId())); + } + storeTensorDescs(communicator, key, descs); +} + +inline std::vector fetchTensorDescs( + Communicator& communicator, + const std::string& key) { + NVF_CHECK(communicator.is_available(), "Communicator is not available"); + auto bytes = communicator.getTcpStore()->get(key); + return deserializeTensorsDescs(bytes); +} + +// End of Todo - those functions should be moved to a more global file +// ------------------------------------------------------------------ + +// ------------------------------------------------------------------- +// NixlTransferHandle: opaque handle for a prepared transfer +// ------------------------------------------------------------------- +// Returned by NixlBackend::prepareTransfer(). Callers hold this handle +// and pass it to postTransfer() / waitTransfer(). The actual NIXL +// transfer handle lives inside the impl; this is just an owning wrapper. +class NixlTransferHandleImpl; + +class NVF_API NixlTransferHandle { + public: + NixlTransferHandle(); + ~NixlTransferHandle(); + NixlTransferHandle(NixlTransferHandle&&) noexcept; + NixlTransferHandle& operator=(NixlTransferHandle&&) noexcept; + + NixlTransferHandle(const NixlTransferHandle&) = delete; + NixlTransferHandle& operator=(const NixlTransferHandle&) = delete; + + private: + friend class NixlBackend; + std::unique_ptr impl_; +}; + +// ------------------------------------------------------------------- +// NixlBackend: singleton NIXL backend over UCX for GPU tensors +// ------------------------------------------------------------------- +// Singleton - Wraps a nixlAgent with the UCX backend and provides a +// tensor-level API for registering GPU memory and performing RDMA transfers. +// +// Lifecycle: +// 1. getInstance() - creates agent, loads UCX backend +// 2. registerTensors() - register GPU tensors and exchange metadata +// (collective) +// 3. prepareTransfer() - expensive one-time setup per transfer pattern +// 4. postTransfer() - cheap, non-blocking data movement +// 5. waitTransfer() - block until complete +// +// Thread safety: methods are NOT thread-safe. The caller must +// synchronize if the same NixlBackend is used from multiple threads. +class NixlBackend { + public: + static NixlBackend& getInstance(); + + NixlBackend(const NixlBackend&) = delete; + NixlBackend& operator=(const NixlBackend&) = delete; + ~NixlBackend() = delete; + + // Explicitly tear down the singleton. Must be called before program + // exit (same pattern as Communicator::cleanup). + void cleanup(); + + [[nodiscard]] bool isAvailable() const; + + // ------------------------------------------------------------------ + // Memory registration + // ------------------------------------------------------------------ + + // Register CUDA tensors with the NIXL agent so they can participate + // in RDMA transfers. Tensors must be contiguous and remain alive + // until deregisterTensors() is called. + // Both methods are collective: they exchange agent metadata with all + // peers through the TCPStore, so all ranks must call them together and in the + // same order. + void registerTensors(const std::vector& tensors); + + void deregisterTensors(const std::vector& tensors); + + // ------------------------------------------------------------------ + // Transfer lifecycle + // ------------------------------------------------------------------ + + // Prepare a transfer between pairs of tensors. + // local_tensors[i] and remote_tensors[i] must have the same byte size. + // All tensors must be contiguous CUDA tensors and previously registered. + // The returned handle can be posted multiple times (preparation is + // amortized). + [[nodiscard]] NixlTransferHandle prepareTransfer( + const std::vector& local_descs, + const std::vector& remote_descs, + NixlXferOp op); + + // Post a previously prepared transfer for execution (non-blocking). + void postTransfer(NixlTransferHandle& handle); + + // Poll the status of a posted transfer without blocking. + [[nodiscard]] NixlXferStatus getTransferStatus( + const NixlTransferHandle& handle) const; + + // Block until the transfer completes (or errors out). + void waitTransfer(NixlTransferHandle& handle); + + private: + NixlBackend(); + bool cleaned_up_ = false; + + class Impl; + std::unique_ptr impl_; +}; + +} // namespace nvfuser diff --git a/python/setup.py b/python/setup.py index c7fe063afa7..7a77fc04fae 100644 --- a/python/setup.py +++ b/python/setup.py @@ -32,6 +32,9 @@ # NVFUSER_BUILD_WITH_UCC # Build nvfuser with UCC support. You may need to specify environment variables of UCC_HOME, UCC_DIR, UCX_HOME, UCX_DIR. # +# NVFUSER_BUILD_WITH_NIXL +# Build nvfuser with NIXL support. You may need to set NIXL_PREFIX to the NIXL install directory. +# # NVFUSER_BUILD_WITHOUT_DISTRIBUTED # Build nvfuser without multidevice support # diff --git a/python/tools/check_dependencies.py b/python/tools/check_dependencies.py index 43628e52314..08032fbd04d 100644 --- a/python/tools/check_dependencies.py +++ b/python/tools/check_dependencies.py @@ -29,6 +29,7 @@ CompilerRequirement, NinjaRequirement, NVMMHRequirement, + NIXLRequirement, GitSubmodulesRequirement, ) @@ -59,6 +60,7 @@ def __init__(self, deps_path: Path): self.requirements.append(Pybind11Requirement(cmake_vars)) self.requirements.append(LLVMRequirement(cmake_vars)) self.requirements.append(NVMMHRequirement(cmake_vars)) + self.requirements.append(NIXLRequirement(cmake_vars)) def _load_cmake_vars(self, deps_path: Path) -> Dict: """Load CMake variables from JSON file""" diff --git a/python/tools/prereqs/__init__.py b/python/tools/prereqs/__init__.py index 6627bbd5f46..fc4246afbf2 100644 --- a/python/tools/prereqs/__init__.py +++ b/python/tools/prereqs/__init__.py @@ -71,6 +71,7 @@ CompilerRequirement, GitSubmodulesRequirement, NinjaRequirement, + NIXLRequirement, ) __all__ = [ @@ -102,4 +103,5 @@ "CompilerRequirement", "GitSubmodulesRequirement", "NinjaRequirement", + "NIXLRequirement", ] diff --git a/python/tools/prereqs/requirements/__init__.py b/python/tools/prereqs/requirements/__init__.py index e5c6f0059e6..0a66c4012eb 100644 --- a/python/tools/prereqs/requirements/__init__.py +++ b/python/tools/prereqs/requirements/__init__.py @@ -13,6 +13,7 @@ from .git_submodules import GitSubmodulesRequirement from .ninja import NinjaRequirement from .nvmmh import NVMMHRequirement +from .nixl import NIXLRequirement __all__ = [ # Base classes @@ -30,4 +31,5 @@ "GitSubmodulesRequirement", "NinjaRequirement", "NVMMHRequirement", + "NIXLRequirement", ] diff --git a/python/tools/prereqs/requirements/nixl.py b/python/tools/prereqs/requirements/nixl.py new file mode 100644 index 00000000000..2adfada5117 --- /dev/null +++ b/python/tools/prereqs/requirements/nixl.py @@ -0,0 +1,113 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: BSD-3-Clause +"""NIXL dependency requirement with CUDA constraint validation.""" + +from typing import Dict +from .base import BooleanRequirement +from ..colors import colorize + + +class NIXLRequirement(BooleanRequirement): + """ + NIXL check with CUDA major version constraint. + + CMake variables used: + - NIXL_FOUND: Whether NIXL is available + - NVFUSER_REQUIREMENT_NIXL_STATUS: Validation status + - NVFUSER_REQUIREMENT_NIXL_OPTIONAL: Whether NIXL is optional + - NIXL_CUDA_constraint_status: CUDA constraint validation result + - "match": NIXL CUDA major == CUDAToolkit major + - "mismatch": Versions don't match (WARNING) + - "not_available": Unable to determine NIXL CUDA version + - NIXL_CUDA_constraint_version: CUDA major version if match + - NIXL_CUDA_constraint_found: NIXL's CUDA major version (if mismatch) + - NIXL_CUDA_constraint_required: System's CUDA major version (if mismatch) + """ + + def __init__(self, cmake_vars: Dict): + name = "NIXL" + found_var = "NIXL_FOUND" + status_var = "NVFUSER_REQUIREMENT_NIXL_STATUS" + optional_var = "NVFUSER_REQUIREMENT_NIXL_OPTIONAL" + location_var = "NIXL_LIBRARY" + + super().__init__( + name, cmake_vars, found_var, status_var, optional_var, location_var + ) + + self.constraint_status = cmake_vars.get("NIXL_CUDA_constraint_status") + self.constraint_version = cmake_vars.get("NIXL_CUDA_constraint_version") + self.constraint_found = cmake_vars.get("NIXL_CUDA_constraint_found") + self.constraint_required = cmake_vars.get("NIXL_CUDA_constraint_required") + + def format_status_line(self, colors) -> str: + main_line = super().format_status_line(colors) + + constraint_line = self._format_cuda_constraint(colors) + if constraint_line: + return main_line + "\n" + constraint_line + return main_line + + def _format_cuda_constraint(self, colors) -> str: + if not self.constraint_status or self.constraint_status == "not_available": + return "" + + name_padded = f"{'NIXL_CUDA':<15}" + + if self.constraint_status == "match": + cuda_version = self.constraint_version or "unknown" + status_part = colorize(colors.GREEN, "[nvFuser] ✓") + " " + name_padded + version_part = colorize( + colors.CYAN, f"CUDA {cuda_version} (NIXL.CUDA == CUDAToolkit major)" + ) + return f"{status_part} {version_part}" + elif self.constraint_status == "mismatch": + nixl_cuda = self.constraint_found or "unknown" + toolkit_cuda = self.constraint_required or "unknown" + status_part = colorize(colors.BOLD_RED, "[nvFuser] ✗") + " " + name_padded + error_part = colorize( + colors.BOLD_RED, + f"mismatch (NIXL: CUDA {nixl_cuda}, CUDAToolkit: CUDA {toolkit_cuda})", + ) + return f"{status_part} {error_part}" + return "" + + def generate_help(self, platform_info): + print("NIXL") + print() + print( + "Why: NIXL provides high-performance data transfer for multi-device nvFuser." + ) + print() + print(" nvFuser links against the NIXL C++ API (nixl.h / libnixl.so).") + print(" 'pip install nixl' provides the shared library but does NOT install") + print(" the C++ headers. You need both headers and libraries for the build.") + print() + print("Install NIXL:") + print() + print( + " Option 1 (recommended for CI): Run the helper script that pip-installs" + ) + print(" nixl for the .so and clones the repo for headers:") + print() + print(" bash tools/install-nixl.sh") + print(" export NIXL_PREFIX=/tmp/nixl-prefix # or your chosen path") + print() + print(" Option 2: Build NIXL from source and set NIXL_PREFIX to the install") + print(" directory (must contain include/nixl.h and lib/libnixl.so).") + print() + print(" Note: This is an optional dependency. nvFuser will build without it,") + print(" but multi-device NIXL-based transfers will not be available.") + print() + + if self.constraint_status == "mismatch": + print() + print("IMPORTANT: NIXL CUDA Version Mismatch Detected") + print() + print(" NIXL was built for a different CUDA major version than your") + print(" system's CUDA Toolkit. This will cause linking or runtime errors.") + print() + print(" Resolution: Install the NIXL package matching your CUDA version.") + print(" Check system CUDA major version: nvcc --version") + print() diff --git a/python/utils.py b/python/utils.py index 28a383e23a1..c8019779909 100644 --- a/python/utils.py +++ b/python/utils.py @@ -21,6 +21,7 @@ class BuildConfig: no_benchmark: bool = False no_ninja: bool = False build_with_ucc: bool = False + build_with_nixl: bool = False build_with_asan: bool = False build_without_distributed: bool = False explicit_error_check: bool = False @@ -70,6 +71,8 @@ def override_build_config_from_env(config): config.no_ninja = get_env_flag_bool("NVFUSER_BUILD_NO_NINJA") if "NVFUSER_BUILD_WITH_UCC" in os.environ: config.build_with_ucc = get_env_flag_bool("NVFUSER_BUILD_WITH_UCC") + if "NVFUSER_BUILD_WITH_NIXL" in os.environ: + config.build_with_nixl = get_env_flag_bool("NVFUSER_BUILD_WITH_NIXL") if "NVFUSER_BUILD_WITH_ASAN" in os.environ: config.build_with_asan = get_env_flag_bool("NVFUSER_BUILD_WITH_ASAN") if "NVFUSER_BUILD_WITHOUT_DISTRIBUTED" in os.environ: @@ -277,6 +280,7 @@ def on_or_off(flag: bool) -> str: f"-DUSE_DISTRIBUTED={pytorch_use_distributed}", f"-DNVFUSER_BUILD_WITH_ASAN={on_or_off(config.build_with_asan)}", f"-DNVFUSER_STANDALONE_BUILD_WITH_UCC={on_or_off(config.build_with_ucc)}", + f"-DNVFUSER_BUILD_WITH_NIXL={on_or_off(config.build_with_nixl)}", f"-DNVFUSER_EXPLICIT_ERROR_CHECK={on_or_off(config.explicit_error_check)}", f"-DBUILD_TEST={on_or_off(not config.no_test)}", f"-DBUILD_PYTHON={on_or_off(not config.no_python)}", diff --git a/tests/cpp/test_multidevice_nixl.cpp b/tests/cpp/test_multidevice_nixl.cpp new file mode 100644 index 00000000000..a6868ef7542 --- /dev/null +++ b/tests/cpp/test_multidevice_nixl.cpp @@ -0,0 +1,254 @@ +// clang-format off +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#include + +#include "multidevice/nixl.h" +#include "tests/cpp/multidevice.h" + +namespace nvfuser { + +using NixlTest = MultiDeviceTest; + +// ------------------------------------------------------------------- +// NixlBackend singleton tests +// ------------------------------------------------------------------- + +TEST_F(NixlTest, SingletonIsAccessible) { + NixlBackend& backend = NixlBackend::getInstance(); + // isAvailable() returns true only when USE_NIXL is defined and the + // UCX backend loaded successfully. Either outcome is valid here. + (void)backend.isAvailable(); +} + +// ------------------------------------------------------------------- +// Input validation tests (these exercise the guards in the impl) +// ------------------------------------------------------------------- + +TEST_F(NixlTest, RegisterEmptyTensorListThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + std::vector empty; + EXPECT_THROW(backend.registerTensors(empty), nvfError); +} + +TEST_F(NixlTest, RegisterCpuTensorThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + auto cpu_tensor = at::randn({64}); + EXPECT_THROW(backend.registerTensors({cpu_tensor}), nvfError); +} + +TEST_F(NixlTest, RegisterNonContiguousTensorThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + auto t = at::randn({8, 8}, tensor_options_); + auto non_contig = t.transpose(0, 1); + ASSERT_FALSE(non_contig.is_contiguous()); + EXPECT_THROW(backend.registerTensors({non_contig}), nvfError); +} + +TEST_F(NixlTest, DeregisterEmptyTensorListThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + std::vector empty; + EXPECT_THROW(backend.deregisterTensors(empty), nvfError); +} + +// ------------------------------------------------------------------- +// Transfer preparation validation +// ------------------------------------------------------------------- + +TEST_F(NixlTest, PrepareTransferMismatchedSizesThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + auto t1 = at::randn({64}, tensor_options_); + auto t2 = at::randn({64}, tensor_options_); + auto t3 = at::randn({64}, tensor_options_); + backend.registerTensors({t1, t2, t3}); + + const int64_t rank = communicator_->deviceId(); + EXPECT_THROW( + (void)backend.prepareTransfer( + {toTensorDesc(t1, rank), toTensorDesc(t2, rank)}, + {toTensorDesc(t3, rank)}, + NixlXferOp::kRead), + nvfError); + + backend.deregisterTensors({t1, t2, t3}); +} + +// ------------------------------------------------------------------- +// Post / wait on invalid handles +// ------------------------------------------------------------------- + +TEST_F(NixlTest, PostInvalidHandleThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + NixlTransferHandle invalid_handle; + EXPECT_THROW(backend.postTransfer(invalid_handle), nvfError); +} + +TEST_F(NixlTest, WaitInvalidHandleThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + NixlTransferHandle invalid_handle; + EXPECT_THROW(backend.waitTransfer(invalid_handle), nvfError); +} + +TEST_F(NixlTest, GetStatusInvalidHandleThrows) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + NixlTransferHandle invalid_handle; + EXPECT_THROW((void)backend.getTransferStatus(invalid_handle), nvfError); +} + +// ------------------------------------------------------------------- +// End-to-end transfer test (requires >= 2 devices with NIXL) +// ------------------------------------------------------------------- + +TEST_F(NixlTest, ReadTransferEndToEnd) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + if (communicator_->size() < 2) { + GTEST_SKIP() << "Need at least 2 devices for transfer test"; + } + + const int64_t rank = communicator_->deviceId(); + const int64_t world_size = communicator_->size(); + const int64_t peer_rank = (rank + 1) % world_size; + constexpr int64_t kSize = 1024; + + // Ring style transfer: each rank reads from its peer's remote tensor to + // its local . + auto src = at::full({kSize}, static_cast(rank + 1), tensor_options_); + auto dst = at::zeros({kSize}, tensor_options_); + cudaDeviceSynchronize(); + + backend.registerTensors({src, dst}); + + // Fetch the remote tensor descriptor from the peer + std::string src_key_prefix = "nixl_test_read_transfer_src_rank_"; + storeTensorDescs( + *communicator_, src_key_prefix + std::to_string(rank), {src}); + auto remote_src_descs = fetchTensorDescs( + *communicator_, src_key_prefix + std::to_string(peer_rank)); + communicator_->barrier(); + communicator_->getTcpStore()->deleteKey( + src_key_prefix + std::to_string(rank)); + auto remote_src_desc = + remote_src_descs[0]; // Only one remote tensor is expected + + // Each rank reads from its peer. After the read, local should contain + // the values that the peer stored in *its* remote tensor. + auto handle = backend.prepareTransfer( + {toTensorDesc(dst, rank)}, {remote_src_desc}, NixlXferOp::kRead); + backend.postTransfer(handle); + backend.waitTransfer(handle); + + auto local_cpu = dst.cpu(); + float expected_val = static_cast(peer_rank + 1); + EXPECT_TRUE(at::allclose(local_cpu, at::full({kSize}, expected_val))); + + backend.deregisterTensors({dst, src}); +} + +TEST_F(NixlTest, WriteTransferEndToEnd) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + if (communicator_->size() < 2) { + GTEST_SKIP() << "Need at least 2 devices for transfer test"; + } + + const int64_t rank = communicator_->deviceId(); + const int64_t world_size = communicator_->size(); + const int64_t peer_rank = (rank + 1) % world_size; + constexpr int64_t kSize = 512; + + // Each rank writes its local to the remote of its peer in a ring + // style + auto src = at::full({kSize}, static_cast(rank + 1), tensor_options_); + auto dst = at::zeros({kSize}, tensor_options_); + cudaDeviceSynchronize(); + + backend.registerTensors({src, dst}); + + // Fetch the remote tensor descriptor from the peer + std::string dst_key_prefix = "nixl_test_write_transfer_dst_rank_"; + storeTensorDescs( + *communicator_, dst_key_prefix + std::to_string(rank), {dst}); + auto remote_dst_descs = fetchTensorDescs( + *communicator_, dst_key_prefix + std::to_string(peer_rank)); + communicator_->barrier(); + communicator_->getTcpStore()->deleteKey( + dst_key_prefix + std::to_string(rank)); + auto remote_dst_desc = + remote_dst_descs[0]; // Only one remote tensor is expected + + // Each rank writes its local tensor into its peer's remote tensor. + auto handle = backend.prepareTransfer( + {toTensorDesc(src, rank)}, {remote_dst_desc}, NixlXferOp::kWrite); + backend.postTransfer(handle); + backend.waitTransfer(handle); + + // After a barrier, the peer should have written into our remote tensor . + communicator_->barrier(); + + auto remote_cpu = dst.cpu(); + int64_t writer_rank = (rank - 1 + world_size) % world_size; + float expected_val = static_cast(writer_rank + 1); + EXPECT_TRUE(at::allclose(remote_cpu, at::full({kSize}, expected_val))); + + backend.deregisterTensors({src, dst}); +} + +TEST_F(NixlTest, RegisterDeregisterRoundTrip) { + NixlBackend& backend = NixlBackend::getInstance(); + if (!backend.isAvailable()) { + GTEST_SKIP() << "NIXL backend not available"; + } + + auto t1 = at::randn({256}, tensor_options_); + auto t2 = at::randn({128}, tensor_options_); + + backend.registerTensors({t1, t2}); + backend.deregisterTensors({t1, t2}); + + // Re-registering the same tensors should succeed. + backend.registerTensors({t1, t2}); + backend.deregisterTensors({t1, t2}); +} + +} // namespace nvfuser diff --git a/tools/install-nixl.sh b/tools/install-nixl.sh new file mode 100755 index 00000000000..c5b0b3cae20 --- /dev/null +++ b/tools/install-nixl.sh @@ -0,0 +1,201 @@ +#!/bin/bash + +# Install NIXL headers and libraries for Fuser CI. +# +# Two modes: +# 1. pip (default for GitHub Actions): install pre-built wheels, clone repo +# for headers, symlink shared libs into NIXL_PREFIX. +# 2. source (default when CUDA toolkit is detected): build UCX with CUDA +# transport and NIXL from source so the UCX backend can register VRAM. +# +# Environment variables: +# NIXL_PREFIX – install prefix (default: /tmp/nixl-prefix) +# NIXL_BUILD_MODE – "pip", "source", or "auto" (default: auto) +# CUDA_HOME – CUDA toolkit root (auto-detected from nvcc) +# +# Used by: +# .github/workflows/build.yml (GitHub Actions compilation check) +# Blossom GPU CI build jobs (needs runtime UCX+CUDA support) +# tools/ci-local-build.sh (local Docker reproduction) + +set -e + +NIXL_PREFIX="${NIXL_PREFIX:-/tmp/nixl-prefix}" +NIXL_BUILD_MODE="${NIXL_BUILD_MODE:-auto}" +NIXL_REPO="https://github.com/ai-dynamo/nixl.git" +NIXL_CLONE_DIR="/tmp/nixl-repo" +UCX_CLONE_DIR="/tmp/ucx-src" +UCX_INSTALL_DIR="/tmp/ucx-install" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +detect_cuda_home() { + if [ -n "$CUDA_HOME" ]; then + return + fi + if command -v nvcc >/dev/null 2>&1; then + CUDA_HOME="$(dirname "$(dirname "$(command -v nvcc)")")" + elif [ -d /usr/local/cuda ]; then + CUDA_HOME=/usr/local/cuda + fi +} + +resolve_build_mode() { + if [ "$NIXL_BUILD_MODE" = "auto" ]; then + detect_cuda_home + if [ -n "$CUDA_HOME" ] && [ -x "$CUDA_HOME/bin/nvcc" ]; then + echo "Auto-detected CUDA at $CUDA_HOME — using source build for UCX+CUDA support" + NIXL_BUILD_MODE="source" + else + echo "No CUDA toolkit with nvcc found — using pip install (compile-only)" + NIXL_BUILD_MODE="pip" + fi + fi +} + +# --------------------------------------------------------------------------- +# Mode: pip (headers + pre-built .so, no runtime CUDA guarantee) +# --------------------------------------------------------------------------- + +install_pip() { + echo "=== Installing NIXL via pip ===" + + pip install --no-deps nixl nixl-cu12 || pip install --no-deps nixl + + SITE_PACKAGES=$(python3 -c "import site; print(site.getsitepackages()[0])") + + git clone --depth 1 "$NIXL_REPO" "$NIXL_CLONE_DIR" + mkdir -p "$NIXL_PREFIX/include" "$NIXL_PREFIX/lib" + cp "$NIXL_CLONE_DIR"/src/api/cpp/*.h "$NIXL_PREFIX/include/" + + for libs_dir in "$SITE_PACKAGES"/.nixl*.mesonpy.libs "$SITE_PACKAGES"/nixl*.libs; do + [ -d "$libs_dir" ] || continue + echo " Symlinking libs from $libs_dir" + for so in "$libs_dir"/*.so*; do + [ -e "$so" ] || continue + ln -sf "$so" "$NIXL_PREFIX/lib/$(basename "$so")" + done + done + + for so in $(find "$SITE_PACKAGES" -maxdepth 3 -path "*/nixl*/*.so*" -type f 2>/dev/null); do + name=$(basename "$so") + [ -e "$NIXL_PREFIX/lib/$name" ] || ln -sf "$so" "$NIXL_PREFIX/lib/$name" + done + + if [ ! -f "$NIXL_PREFIX/lib/libnixl.so" ]; then + echo "Error: libnixl.so not found under $SITE_PACKAGES" + exit 1 + fi + + rm -rf "$NIXL_CLONE_DIR" +} + +# --------------------------------------------------------------------------- +# Mode: source (UCX built with CUDA transport, NIXL built from source) +# --------------------------------------------------------------------------- + +install_source() { + echo "=== Building NIXL from source with UCX+CUDA ===" + detect_cuda_home + + if [ -z "$CUDA_HOME" ]; then + echo "Error: CUDA_HOME not set and nvcc not found" + exit 1 + fi + echo " CUDA_HOME=$CUDA_HOME" + + # --- build dependencies --------------------------------------------------- + apt-get update -qq 2>/dev/null || true + apt-get install -y -qq libtool autoconf automake pkg-config \ + libibverbs-dev librdmacm-dev libnuma-dev 2>/dev/null || true + pip install meson ninja 2>/dev/null || pip3 install meson ninja + + # --- UCX with CUDA -------------------------------------------------------- + echo "--- Building UCX with CUDA support ---" + if [ -d "$UCX_CLONE_DIR" ]; then rm -rf "$UCX_CLONE_DIR"; fi + git clone --depth 1 -b v1.18.x https://github.com/openucx/ucx.git "$UCX_CLONE_DIR" + ( + cd "$UCX_CLONE_DIR" + ./autogen.sh + ./contrib/configure-release \ + --prefix="$UCX_INSTALL_DIR" \ + --with-cuda="$CUDA_HOME" \ + --enable-mt + make -j"$(nproc)" + make install + ) + + export PKG_CONFIG_PATH="$UCX_INSTALL_DIR/lib/pkgconfig:${PKG_CONFIG_PATH:-}" + export LD_LIBRARY_PATH="$UCX_INSTALL_DIR/lib:${LD_LIBRARY_PATH:-}" + + # --- NIXL from source ----------------------------------------------------- + echo "--- Building NIXL from source ---" + if [ -d "$NIXL_CLONE_DIR" ]; then rm -rf "$NIXL_CLONE_DIR"; fi + git clone --depth 1 "$NIXL_REPO" "$NIXL_CLONE_DIR" + ( + cd "$NIXL_CLONE_DIR" + + CUDA_INC="$CUDA_HOME/include" + CUDA_LIB="$CUDA_HOME/lib64" + [ -d "$CUDA_LIB" ] || CUDA_LIB="$CUDA_HOME/lib" + + meson setup builddir \ + --prefix="$NIXL_PREFIX" \ + -Ducx_path="$UCX_INSTALL_DIR" \ + -Dcudapath_inc="$CUDA_INC" \ + -Dcudapath_lib="$CUDA_LIB" \ + -Dbuild_tests=false \ + -Dbuild_examples=false \ + -Dbuildtype=release + meson compile -C builddir + meson install -C builddir + ) + + # Copy API headers if not already installed by meson + mkdir -p "$NIXL_PREFIX/include" + cp -n "$NIXL_CLONE_DIR"/src/api/cpp/*.h "$NIXL_PREFIX/include/" 2>/dev/null || true + + # Ensure UCX libs are alongside NIXL libs so everything is on one rpath. + # Also copy UCX transport plugins (libuct_cuda.so etc.) so they're discoverable. + mkdir -p "$NIXL_PREFIX/lib/ucx" + for so in "$UCX_INSTALL_DIR"/lib/*.so*; do + [ -e "$so" ] || continue + ln -sf "$so" "$NIXL_PREFIX/lib/$(basename "$so")" + done + for so in "$UCX_INSTALL_DIR"/lib/ucx/*.so*; do + [ -e "$so" ] || continue + ln -sf "$so" "$NIXL_PREFIX/lib/ucx/$(basename "$so")" + done + + rm -rf "$NIXL_CLONE_DIR" "$UCX_CLONE_DIR" +} + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +resolve_build_mode +echo "NIXL build mode: $NIXL_BUILD_MODE" + +case "$NIXL_BUILD_MODE" in + pip) install_pip ;; + source) install_source ;; + *) + echo "Error: unknown NIXL_BUILD_MODE=$NIXL_BUILD_MODE (expected pip, source, or auto)" + exit 1 + ;; +esac + +# Ensure LD_LIBRARY_PATH includes NIXL_PREFIX/lib for runtime +export LD_LIBRARY_PATH="$NIXL_PREFIX/lib:${LD_LIBRARY_PATH:-}" + +echo "" +echo "NIXL prefix ready at $NIXL_PREFIX" +echo " include: $(ls "$NIXL_PREFIX/include/" 2>/dev/null || echo '(empty)')" +echo " lib: $(ls "$NIXL_PREFIX/lib/" 2>/dev/null || echo '(empty)')" +echo "" +echo "Remember to set:" +echo " export LD_LIBRARY_PATH=$NIXL_PREFIX/lib:\$LD_LIBRARY_PATH" +echo " export UCX_MODULE_DIR=$NIXL_PREFIX/lib/ucx (if UCX was built from source)"