Skip to content
3 changes: 3 additions & 0 deletions sdk_v2/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ set(FOUNDRY_LOCAL_SOURCES
src/inferencing/generative/chat/chat_session.cc
src/inferencing/generative/chat/chat_template.cc
src/configuration.cc
src/download/blob_download_state.cc
src/download/blob_downloader.cc
src/download/cross_process_file_lock.cc
src/download/download_manager.cc
src/download/file_writer.cc
src/download/inference_model_writer.cc
src/download/model_registry_client.cc
src/ep_detection/cuda_ep_bootstrapper.cc
Expand Down
381 changes: 381 additions & 0 deletions sdk_v2/cpp/src/download/blob_download_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
#include "download/blob_download_state.h"
#include "logger.h"

#include <chrono>
#include <cstring>
#include <fstream>
#include <system_error>

namespace fl {

namespace {

constexpr const char* kStateFileExtension = ".dlstate";

// On-disk format (little-endian throughout):
// bytes | field
// -------|--------------------------------------------------------
// 0..3 | magic "FLDS"
// 4 | version (currently 1)
// 5..12 | blob_size (int64)
// 13..16 | chunk_size (int32)
// 17..20 | total_chunks (int32)
// 21..24 | bitmap_byte_aligned_start (int32)
// 25..28 | highest_completed_chunk (int32)
// 29..32 | completed_count (int32)
// 33..40 | last_modified_unix_ms (int64)
// 41..44 | trunc_bitmap_byte_len (uint32)
// 45.. | trunc_bitmap_byte_len bytes of bitmap data, copied directly out of
// full_completion_bitmap starting at the byte offset implied by
// bitmap_byte_aligned_start.
constexpr char kMagic[4] = {'F', 'L', 'D', 'S'};
constexpr uint8_t kVersion = 1;

constexpr int32_t kBitsPerWord = 64;

template <typename T>
void WriteLE(std::ostream& out, T value) {
static_assert(std::is_trivially_copyable_v<T>);
unsigned char buf[sizeof(T)];
std::memcpy(buf, &value, sizeof(T));
out.write(reinterpret_cast<const char*>(buf), sizeof(T));
}

template <typename T>
bool ReadLE(std::istream& in, T& out_value) {
static_assert(std::is_trivially_copyable_v<T>);
unsigned char buf[sizeof(T)];
in.read(reinterpret_cast<char*>(buf), sizeof(T));
if (!in) {
return false;
}
std::memcpy(&out_value, buf, sizeof(T));
return true;
}

int64_t NowUnixMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}

} // namespace

std::filesystem::path BlobDownloadState::GetStateFilePath(const std::filesystem::path& local_file_path) {
auto p = local_file_path;
p += kStateFileExtension;
return p;
}

std::unique_ptr<BlobDownloadState> BlobDownloadState::CreateNew(std::string blob_name,
std::filesystem::path local_file_path,
int64_t blob_size,
int32_t chunk_size,
int32_t total_chunks) {
auto state = std::make_unique<BlobDownloadState>();
state->blob_name = std::move(blob_name);
state->local_file_path = local_file_path.string();
state->blob_size = blob_size;
state->chunk_size = chunk_size;
state->total_chunks = total_chunks;
state->bitmap_byte_aligned_start = 0;
state->highest_completed_chunk = -1;
state->completed_count = 0;
state->last_modified_unix_ms = NowUnixMs();
auto words = static_cast<size_t>((total_chunks + kBitsPerWord - 1) / kBitsPerWord);
state->full_completion_bitmap.assign(words, 0);
return state;
}

std::unique_ptr<BlobDownloadState> BlobDownloadState::LoadState(std::string blob_name,
std::filesystem::path local_file_path,
int64_t expected_blob_size,
int32_t expected_chunk_size,
int32_t expected_total_chunks,
ILogger* logger) {
auto state_path = GetStateFilePath(local_file_path);
std::error_code ec;
if (!std::filesystem::exists(state_path, ec)) {
return nullptr;
}

std::ifstream in(state_path, std::ios::binary);
if (!in) {
if (logger) {
logger->Log(LogLevel::Warning, "Could not open download state file: " + state_path.string());
}
return nullptr;
}

char magic[4]{};
in.read(magic, 4);
uint8_t version = 0;
if (!in || std::memcmp(magic, kMagic, 4) != 0 || !ReadLE(in, version) || version != kVersion) {
if (logger) {
logger->Log(LogLevel::Warning,
"Download state file " + state_path.string() + " has unexpected magic/version; ignoring");
}
return nullptr;
}

int64_t blob_size = 0;
int32_t chunk_size = 0;
int32_t total_chunks = 0;
int32_t bitmap_byte_aligned_start = 0;
int32_t highest_completed_chunk = 0;
int32_t completed_count = 0;
int64_t last_modified_unix_ms = 0;
uint32_t trunc_len = 0;
if (!ReadLE(in, blob_size) || !ReadLE(in, chunk_size) || !ReadLE(in, total_chunks) ||
!ReadLE(in, bitmap_byte_aligned_start) || !ReadLE(in, highest_completed_chunk) ||
!ReadLE(in, completed_count) || !ReadLE(in, last_modified_unix_ms) || !ReadLE(in, trunc_len)) {
if (logger) {
logger->Log(LogLevel::Warning, "Download state header truncated: " + state_path.string());
}
return nullptr;
}

// Sanity / compatibility checks.
if (blob_size != expected_blob_size || chunk_size != expected_chunk_size ||
total_chunks != expected_total_chunks) {
if (logger) {
logger->Log(LogLevel::Information,
"Download state for " + state_path.string() +
" is incompatible with current blob layout; starting fresh");
}
return nullptr;
}
if (bitmap_byte_aligned_start < 0 || bitmap_byte_aligned_start % 8 != 0 ||
bitmap_byte_aligned_start > total_chunks || completed_count < 0 ||
completed_count > total_chunks || highest_completed_chunk < -1 ||
highest_completed_chunk >= total_chunks) {
if (logger) {
logger->Log(LogLevel::Warning, "Download state header values out of range: " + state_path.string());
}
return nullptr;
}

auto words_total = static_cast<size_t>((total_chunks + kBitsPerWord - 1) / kBitsPerWord);
std::vector<uint64_t> bitmap(words_total, 0);

// The prefix of fully-completed chunks below bitmap_byte_aligned_start is
// implied — fill those bits.
size_t implicit_full_words = static_cast<size_t>(bitmap_byte_aligned_start) / kBitsPerWord;
for (size_t i = 0; i < implicit_full_words && i < bitmap.size(); ++i) {
bitmap[i] = ~uint64_t{0};
}
// Any remaining "implicit" bits inside a partial word (between
// implicit_full_words*64 and bitmap_byte_aligned_start).
if (size_t partial_bits = static_cast<size_t>(bitmap_byte_aligned_start) % kBitsPerWord;
partial_bits > 0 && implicit_full_words < bitmap.size()) {
bitmap[implicit_full_words] |= (uint64_t{1} << partial_bits) - 1;
}

if (trunc_len > 0) {
// Copy serialized bytes directly into the bitmap starting at the byte
// position implied by bitmap_byte_aligned_start.
size_t byte_offset = static_cast<size_t>(bitmap_byte_aligned_start) / 8;
auto* dest = reinterpret_cast<unsigned char*>(bitmap.data()) + byte_offset;
auto dest_capacity = bitmap.size() * sizeof(uint64_t) - byte_offset;
if (trunc_len > dest_capacity) {
if (logger) {
logger->Log(LogLevel::Warning,
"Download state bitmap length exceeds expected capacity: " + state_path.string());
}
return nullptr;
}
in.read(reinterpret_cast<char*>(dest), trunc_len);
if (!in) {
if (logger) {
logger->Log(LogLevel::Warning,
"Download state bitmap payload truncated: " + state_path.string());
}
return nullptr;
}
}

auto state = std::make_unique<BlobDownloadState>();
state->blob_name = std::move(blob_name);
state->local_file_path = local_file_path.string();
state->blob_size = blob_size;
state->chunk_size = chunk_size;
state->total_chunks = total_chunks;
state->bitmap_byte_aligned_start = bitmap_byte_aligned_start;
state->highest_completed_chunk = highest_completed_chunk;
state->completed_count = completed_count;
state->last_modified_unix_ms = last_modified_unix_ms;
state->full_completion_bitmap = std::move(bitmap);

if (logger) {
logger->Log(LogLevel::Information,
"Loaded download state " + state_path.string() + ": " +
std::to_string(completed_count) + "/" + std::to_string(total_chunks) +
" chunks already done");
}
return state;
}

int64_t BlobDownloadState::CalculateDownloadedSize() const noexcept {
int64_t bytes = static_cast<int64_t>(completed_count) * chunk_size;
// If the final chunk is partial and was completed, adjust the overcount.
if (highest_completed_chunk == total_chunks - 1 && chunk_size > 0) {
auto remainder = blob_size % chunk_size;
if (remainder != 0) {
bytes -= (chunk_size - remainder);
}
}
return bytes;
}

bool BlobDownloadState::IsChunkComplete(int32_t chunk_idx) const noexcept {
if (chunk_idx < 0 || chunk_idx >= total_chunks) {
return false;
}
if (chunk_idx < bitmap_byte_aligned_start) {
// Below the truncation point — implicitly complete.
return true;
}
auto word_idx = static_cast<size_t>(chunk_idx) / kBitsPerWord;
auto bit_idx = static_cast<size_t>(chunk_idx) % kBitsPerWord;
if (word_idx >= full_completion_bitmap.size()) {
return false;
}
return (full_completion_bitmap[word_idx] & (uint64_t{1} << bit_idx)) != 0;
}

void BlobDownloadState::MarkChunkComplete(int32_t chunk_idx) {
if (chunk_idx < 0 || chunk_idx >= total_chunks) {
return;
}
if (IsChunkComplete(chunk_idx)) {
return;
}
if (chunk_idx > highest_completed_chunk) {
highest_completed_chunk = chunk_idx;
}
auto word_idx = static_cast<size_t>(chunk_idx) / kBitsPerWord;
auto bit_idx = static_cast<size_t>(chunk_idx) % kBitsPerWord;
full_completion_bitmap[word_idx] |= (uint64_t{1} << bit_idx);
++completed_count;
}

std::vector<int32_t> BlobDownloadState::GetPendingChunks() const {
std::vector<int32_t> pending;
pending.reserve(static_cast<size_t>(total_chunks - completed_count));
for (int32_t i = bitmap_byte_aligned_start; i < total_chunks; ++i) {
if (!IsChunkComplete(i)) {
pending.push_back(i);
}
}
return pending;
}

void BlobDownloadState::SaveState(ILogger* logger) {
// Advance bitmap_byte_aligned_start past any words that are now all 1s, so
// the next save serializes only the unfinished tail.
int32_t new_start = bitmap_byte_aligned_start;
size_t word_idx = static_cast<size_t>(new_start) / kBitsPerWord;
while (word_idx < full_completion_bitmap.size() &&
full_completion_bitmap[word_idx] == ~uint64_t{0}) {
new_start += kBitsPerWord;
++word_idx;
}
// Within the first not-fully-set word, advance to the lowest 0 bit and round
// down to a byte boundary (8 bits) so reload-then-resume re-reads on a clean
// alignment.
if (word_idx < full_completion_bitmap.size()) {
uint64_t inverted = ~full_completion_bitmap[word_idx];
int trailing_zero = 0;
while (trailing_zero < kBitsPerWord && ((inverted >> trailing_zero) & 1) == 0) {
++trailing_zero;
}
new_start += trailing_zero;
}
new_start = (new_start / 8) * 8;
if (new_start > total_chunks) {
new_start = (total_chunks / 8) * 8;
}
if (new_start > bitmap_byte_aligned_start) {
bitmap_byte_aligned_start = new_start;
}

last_modified_unix_ms = NowUnixMs();

auto state_path = GetStateFilePath(local_file_path);
auto tmp_path = state_path;
tmp_path += ".tmp";

// Compute the serialized bitmap payload: bytes from bitmap_byte_aligned_start
// up to (highest_completed_chunk + 1), rounded up to the nearest byte.
uint32_t trunc_len = 0;
if (highest_completed_chunk >= bitmap_byte_aligned_start) {
int32_t bit_count = highest_completed_chunk - bitmap_byte_aligned_start + 1;
trunc_len = static_cast<uint32_t>((bit_count + 7) / 8);
}
size_t byte_offset = static_cast<size_t>(bitmap_byte_aligned_start) / 8;

{
std::ofstream out(tmp_path, std::ios::binary | std::ios::trunc);
if (!out) {
if (logger) {
logger->Log(LogLevel::Warning, "Failed to open download state tmp file: " + tmp_path.string());
}
return;
}
out.write(kMagic, 4);
WriteLE(out, kVersion);
WriteLE(out, blob_size);
WriteLE(out, chunk_size);
WriteLE(out, total_chunks);
WriteLE(out, bitmap_byte_aligned_start);
WriteLE(out, highest_completed_chunk);
WriteLE(out, completed_count);
WriteLE(out, last_modified_unix_ms);
WriteLE(out, trunc_len);
if (trunc_len > 0) {
auto* src = reinterpret_cast<const unsigned char*>(full_completion_bitmap.data()) + byte_offset;
out.write(reinterpret_cast<const char*>(src), trunc_len);
}
if (!out) {
if (logger) {
logger->Log(LogLevel::Warning, "Failed to write download state tmp file: " + tmp_path.string());
}
return;
}
}

std::error_code ec;
std::filesystem::rename(tmp_path, state_path, ec);
if (ec) {
// std::filesystem::rename atomically replaces the destination on every
// platform we target (POSIX rename(2); Windows MoveFileExW with
// MOVEFILE_REPLACE_EXISTING). If it still fails, the cause is transient
// (e.g. a brief sharing violation on Windows or a flaky network FS) —
// do NOT delete state_path as a fallback; that loses the only intact
// copy of the resume bitmap. Instead, drop the tmp file and let the
// next SaveState call retry from the up-to-date in-memory state.
std::error_code rm_ec;
std::filesystem::remove(tmp_path, rm_ec);
if (logger) {
logger->Log(LogLevel::Warning,
"Failed to commit download state file: " + tmp_path.string() + " -> " +
state_path.string() + " (" + ec.message() +
"); previous state retained, will retry on next save");
}
}
}

void BlobDownloadState::DeleteState(const std::filesystem::path& local_file_path, ILogger* logger) {
auto state_path = GetStateFilePath(local_file_path);
std::error_code ec;
std::filesystem::remove(state_path, ec);
if (ec && logger) {
logger->Log(LogLevel::Warning,
"Failed to delete download state file: " + state_path.string() + " (" +
ec.message() + ")");
}
}

} // namespace fl
Loading
Loading