From d3df6e3b18dd9c07b80a48ea2c900f0665b35673 Mon Sep 17 00:00:00 2001 From: shangxinli Date: Sun, 15 Mar 2026 20:15:46 -0700 Subject: [PATCH 1/5] feat: add file cleanup for expire snapshots Implement the file cleanup logic that was missing from the expire snapshots feature (the original PR noted "TODO: File recycling will be added in a followup PR"). Port the "reachable file cleanup" strategy from Java's ReachableFileCleanup, following the same phased approach: Phase 1: Collect manifest paths from expired and retained snapshots Phase 2: Prune manifests still referenced by retained snapshots Phase 3: Find data files only in manifests being deleted, subtract files still reachable from retained manifests (kAll only) Phase 4: Delete orphaned manifest files Phase 5: Delete manifest lists from expired snapshots Phase 6: Delete expired statistics and partition statistics files Key design decisions matching Java parity: - Best-effort deletion: suppress errors on individual file deletions to avoid blocking metadata updates (Java suppressFailureWhenFinished) - Branch/tag awareness: retained snapshot set includes all snapshots reachable from any ref (branch or tag), preventing false-positive deletions of files still referenced by non-main branches - Data file safety: only delete data files from manifests that are themselves being deleted, then subtract any files still reachable from retained manifests (two-pass approach from ReachableFileCleanup) - Respect CleanupLevel: kNone skips all, kMetadataOnly skips data files, kAll cleans everything - FileIO abstraction: uses FileIO::DeleteFile for filesystem compatibility (S3, HDFS, local), with custom DeleteWith() override - Statistics cleanup via snapshot ID membership in retained set TODOs for follow-up: - Multi-threaded file deletion (Java uses Tasks.foreach with executor) - IncrementalFileCleanup strategy for linear ancestry optimization (Java uses this when no branches/cherry-picks involved) --- src/iceberg/test/expire_snapshots_test.cc | 87 ++++++++ src/iceberg/update/expire_snapshots.cc | 246 ++++++++++++++++++++++ src/iceberg/update/expire_snapshots.h | 49 +++++ 3 files changed, 382 insertions(+) diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index dbc577a71..59c3c2ca6 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -19,6 +19,9 @@ #include "iceberg/update/expire_snapshots.h" +#include +#include + #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" @@ -65,4 +68,88 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { } } +TEST_F(ExpireSnapshotsTest, DeleteWithCustomFunction) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + // Apply first so apply_result_ is cached + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // Call Finalize directly to simulate successful commit + // Note: Finalize tries to read manifests from the expired snapshot's manifest list, + // which will fail on mock FS since "s3://a/b/1.avro" doesn't contain real avro data. + // The error is returned from Finalize but in the real commit flow it's ignored. + auto finalize_status = update->Finalize(std::nullopt); + // Finalize may fail because manifest list files don't exist on mock FS, + // but it should not crash + if (finalize_status.has_value()) { + // If it succeeded (e.g., if manifest reading was skipped), verify deletions + EXPECT_FALSE(deleted_files.empty()); + } +} + +TEST_F(ExpireSnapshotsTest, CleanupLevelNoneSkipsFileDeletion) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->CleanupLevel(CleanupLevel::kNone); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // With kNone cleanup level, Finalize should skip all file deletion + auto finalize_status = update->Finalize(std::nullopt); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // Simulate a commit failure - Finalize should not delete any files + auto finalize_status = update->Finalize( + Error{.kind = ErrorKind::kCommitFailed, .message = "simulated failure"}); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, FinalizeSkippedWhenNoSnapshotsExpired) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->RetainLast(2); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.snapshot_ids_to_remove.empty()); + + // No snapshots expired, so Finalize should not delete any files + auto finalize_status = update->Finalize(std::nullopt); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, CommitWithCleanupLevelNone) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->CleanupLevel(CleanupLevel::kNone); + + // Commit should succeed - Finalize is called internally but skips cleanup + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify snapshot was removed from metadata + auto metadata = ReloadMetadata(); + EXPECT_EQ(metadata->snapshots.size(), 1); + EXPECT_EQ(metadata->snapshots.at(0)->snapshot_id, 3055729675574597004); +} + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 722ae7a42..4373ec24a 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -23,11 +23,17 @@ #include #include #include +#include +#include #include #include +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" +#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" @@ -285,7 +291,247 @@ Result ExpireSnapshots::Apply() { }); } + // Cache the result for use during Finalize() + apply_result_ = result; + return result; } +Status ExpireSnapshots::Finalize(std::optional commit_error) { + if (commit_error.has_value()) { + return {}; + } + + if (cleanup_level_ == CleanupLevel::kNone) { + return {}; + } + + if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) { + return {}; + } + + // File cleanup is best-effort: log and continue on individual file deletion failures + // to avoid blocking metadata updates (matching Java behavior). + return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove); +} + +void ExpireSnapshots::DeleteFilePath(const std::string& path) { + try { + if (delete_func_) { + delete_func_(path); + } else { + auto status = ctx_->table->io()->DeleteFile(path); + // Best-effort: ignore NotFound (file already deleted) and other errors. + // Java uses suppressFailureWhenFinished + onFailure logging. + std::ignore = status; + } + } catch (...) { + // Suppress all exceptions during file cleanup to match Java's + // suppressFailureWhenFinished behavior. + } +} + +Status ExpireSnapshots::ReadManifestsForSnapshot( + int64_t snapshot_id, std::unordered_set& manifest_paths) { + const TableMetadata& metadata = base(); + auto file_io = ctx_->table->io(); + + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) { + return {}; + } + auto& snapshot = snapshot_result.value(); + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io); + if (!manifests_result.has_value()) { + // Best-effort: skip this snapshot if we can't read its manifests + return {}; + } + + for (const auto& manifest : manifests_result.value()) { + manifest_paths.insert(manifest.manifest_path); + } + + return {}; +} + +Status ExpireSnapshots::FindDataFilesToDelete( + const std::unordered_set& manifests_to_delete, + const std::unordered_set& retained_manifests, + std::unordered_set& data_files_to_delete) { + const TableMetadata& metadata = base(); + auto file_io = ctx_->table->io(); + + // Step 1: Collect all file paths from manifests being deleted + for (const auto& manifest_path : manifests_to_delete) { + // Find the ManifestFile for this path by scanning expired snapshots + for (const auto& snapshot : metadata.snapshots) { + if (!snapshot) continue; + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io); + if (!manifests_result.has_value()) continue; + + for (const auto& manifest : manifests_result.value()) { + if (manifest.manifest_path != manifest_path) continue; + + auto schema_result = metadata.Schema(); + if (!schema_result.has_value()) continue; + auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); + if (!spec_result.has_value()) continue; + + auto reader_result = ManifestReader::Make( + manifest, file_io, schema_result.value(), spec_result.value()); + if (!reader_result.has_value()) continue; + + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.insert(entry.data_file->file_path); + } + } + goto next_manifest; // Found and processed this manifest, move to next + } + } + next_manifest:; + } + + if (data_files_to_delete.empty()) { + return {}; + } + + // Step 2: Remove any files that are still referenced by retained manifests. + // This ensures we don't delete files that are shared across manifests. + for (const auto& manifest_path : retained_manifests) { + if (data_files_to_delete.empty()) break; + + for (const auto& snapshot : metadata.snapshots) { + if (!snapshot) continue; + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io); + if (!manifests_result.has_value()) continue; + + for (const auto& manifest : manifests_result.value()) { + if (manifest.manifest_path != manifest_path) continue; + + auto schema_result = metadata.Schema(); + if (!schema_result.has_value()) continue; + auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); + if (!spec_result.has_value()) continue; + + auto reader_result = ManifestReader::Make( + manifest, file_io, schema_result.value(), spec_result.value()); + if (!reader_result.has_value()) continue; + + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.erase(entry.data_file->file_path); + } + } + goto next_retained; + } + } + next_retained:; + } + + return {}; +} + +Status ExpireSnapshots::CleanExpiredFiles( + const std::vector& expired_snapshot_ids) { + const TableMetadata& metadata = base(); + + // Build expired and retained snapshot ID sets. + // The retained set includes ALL snapshots referenced by any branch or tag, + // since Apply() already computed retention across all refs. + std::unordered_set expired_id_set(expired_snapshot_ids.begin(), + expired_snapshot_ids.end()); + std::unordered_set retained_snapshot_ids; + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) { + retained_snapshot_ids.insert(snapshot->snapshot_id); + } + } + + // Phase 1: Collect manifest paths from expired and retained snapshots. + // TODO(shangxinli): Parallelize manifest collection with a thread pool. + std::unordered_set expired_manifest_paths; + for (int64_t snapshot_id : expired_snapshot_ids) { + std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths); + } + + std::unordered_set retained_manifest_paths; + for (int64_t snapshot_id : retained_snapshot_ids) { + std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths); + } + + // Phase 2: Prune manifests still referenced by retained snapshots. + // Only manifests exclusively in expired snapshots should be deleted. + std::unordered_set manifests_to_delete; + for (const auto& path : expired_manifest_paths) { + if (!retained_manifest_paths.contains(path)) { + manifests_to_delete.insert(path); + } + } + + // Phase 3: If cleanup level is kAll, find data files to delete. + // Only read entries from manifests being deleted (not all expired manifests), + // then subtract any files still reachable from retained manifests. + if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) { + std::unordered_set data_files_to_delete; + std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths, + data_files_to_delete); + + // TODO(shangxinli): Parallelize file deletion with a thread pool. + for (const auto& path : data_files_to_delete) { + DeleteFilePath(path); + } + } + + // Phase 4: Delete orphaned manifest files. + for (const auto& path : manifests_to_delete) { + DeleteFilePath(path); + } + + // Phase 5: Delete manifest lists from expired snapshots. + for (int64_t snapshot_id : expired_snapshot_ids) { + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) continue; + auto& snapshot = snapshot_result.value(); + if (!snapshot->manifest_list.empty()) { + DeleteFilePath(snapshot->manifest_list); + } + } + + // Phase 6: Delete expired statistics files. + // Use set difference between before and after states (matching Java behavior). + // Since Finalize runs before table_ is updated, "after" is base() minus expired. + std::unordered_set retained_stats_snapshots(retained_snapshot_ids); + for (const auto& stat_file : metadata.statistics) { + if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) { + DeleteFilePath(stat_file->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) { + DeleteFilePath(part_stat->path); + } + } + + return {}; +} + +// TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry +// optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch +// ancestry (no non-main snapshots removed, no non-main snapshots remain). +// The incremental strategy is more efficient because it only needs to scan +// manifests written by expired snapshots (checking added_snapshot_id), avoiding +// the full reachability analysis. It also handles cherry-pick protection via +// SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP. + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index bc05d810d..70e877fdb 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -142,6 +143,16 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \return The results of changes Result Apply(); + /// \brief Finalize the expire snapshots update, cleaning up expired files. + /// + /// After a successful commit, this method deletes manifest files, manifest lists, + /// data files, and statistics files that are no longer referenced by any valid + /// snapshot. The cleanup behavior is controlled by the CleanupLevel setting. + /// + /// \param commit_error An optional error indicating whether the commit was successful + /// \return Status indicating success or failure + Status Finalize(std::optional commit_error) override; + private: explicit ExpireSnapshots(std::shared_ptr ctx); @@ -159,6 +170,41 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { Result> UnreferencedSnapshotIdsToRetain( const SnapshotToRef& refs) const; + /// \brief Clean up files no longer referenced after snapshot expiration. + /// + /// Implements the "reachable file cleanup" strategy from Java's ReachableFileCleanup: + /// 1. Collect manifests from expired and retained snapshots + /// 2. Prune manifests still referenced by retained snapshots + /// 3. Find data files only in manifests being deleted (if kAll) + /// 4. Remove data files still reachable from retained manifests + /// 5. Delete orphaned manifests, manifest lists, and statistics files + /// + /// All deletions are best-effort: failures are suppressed to avoid blocking + /// metadata updates (matching Java's suppressFailureWhenFinished behavior). + /// + /// Branch/tag awareness: retained_snapshot_ids includes all snapshots referenced + /// by any branch or tag, as computed by Apply(). This prevents deleting files + /// that are still reachable from any ref. + /// + /// TODO(shangxinli): Add multi-threaded file deletion support. + /// TODO(shangxinli): Add IncrementalFileCleanup strategy for linear ancestry. + Status CleanExpiredFiles(const std::vector& expired_snapshot_ids); + + /// \brief Read manifest paths from a single snapshot. + /// Best-effort: returns OK even if the snapshot or its manifests can't be read. + Status ReadManifestsForSnapshot(int64_t snapshot_id, + std::unordered_set& manifest_paths); + + /// \brief Find data files to delete by reading entries from manifests being deleted, + /// then subtracting files still reachable from retained manifests. + Status FindDataFilesToDelete(const std::unordered_set& manifests_to_delete, + const std::unordered_set& retained_manifests, + std::unordered_set& data_files_to_delete); + + /// \brief Delete a file, suppressing errors (best-effort). + /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. + void DeleteFilePath(const std::string& path); + private: const TimePointMs current_time_ms_; const int64_t default_max_ref_age_ms_; @@ -169,6 +215,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; + + /// Cached result from Apply(), used during Finalize() for file cleanup + std::optional apply_result_; }; } // namespace iceberg From 1ebc015f3abe55e6f9054acd03913770a67891aa Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Thu, 26 Mar 2026 10:20:05 -0700 Subject: [PATCH 2/5] Address review feedback from Gang Wu - Fix O(M*S) I/O: Pre-cache ManifestFile objects in manifest_cache_ during Phase 1 (ReadManifestsForSnapshot), eliminating repeated manifest list reads in FindDataFilesToDelete. - Fix storage leak: Use LiveEntries() instead of Entries() to match Java's ManifestFiles.readPaths behavior (only ADDED/EXISTING entries). - Fix data loss risk: When reading a retained manifest fails, abort data file deletion entirely instead of silently continuing. Java retries and throws on failure here. - Fix statistics file deletion: Use path-based set difference instead of snapshot_id-only check, preventing erroneous deletion of statistics files shared across snapshots. - Remove goto anti-pattern: Extract ManifestFile lookup into MakeManifestReader() helper and use manifest_cache_ for direct lookup. - Improve API: FindDataFilesToDelete now returns Result> instead of using a mutable out-parameter. Co-Authored-By: Claude Opus 4.6 --- src/iceberg/update/expire_snapshots.cc | 149 +++++++++++++------------ src/iceberg/update/expire_snapshots.h | 24 +++- 2 files changed, 95 insertions(+), 78 deletions(-) diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 4373ec24a..b9f011ff5 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -331,6 +331,17 @@ void ExpireSnapshots::DeleteFilePath(const std::string& path) { } } +Result> ExpireSnapshots::MakeManifestReader( + const ManifestFile& manifest, const std::shared_ptr& file_io) { + const TableMetadata& metadata = base(); + auto schema_result = metadata.Schema(); + if (!schema_result.has_value()) return std::unexpected(schema_result.error()); + auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); + if (!spec_result.has_value()) return std::unexpected(spec_result.error()); + return ManifestReader::Make(manifest, file_io, schema_result.value(), + spec_result.value()); +} + Status ExpireSnapshots::ReadManifestsForSnapshot( int64_t snapshot_id, std::unordered_set& manifest_paths) { const TableMetadata& metadata = base(); @@ -351,95 +362,73 @@ Status ExpireSnapshots::ReadManifestsForSnapshot( for (const auto& manifest : manifests_result.value()) { manifest_paths.insert(manifest.manifest_path); + // Cache manifest metadata for later use in FindDataFilesToDelete, + // avoiding O(M*S) repeated I/O from re-reading manifest lists. + manifest_cache_.emplace(manifest.manifest_path, manifest); } return {}; } -Status ExpireSnapshots::FindDataFilesToDelete( +Result> ExpireSnapshots::FindDataFilesToDelete( const std::unordered_set& manifests_to_delete, - const std::unordered_set& retained_manifests, - std::unordered_set& data_files_to_delete) { - const TableMetadata& metadata = base(); + const std::unordered_set& retained_manifests) { auto file_io = ctx_->table->io(); + std::unordered_set data_files_to_delete; - // Step 1: Collect all file paths from manifests being deleted - for (const auto& manifest_path : manifests_to_delete) { - // Find the ManifestFile for this path by scanning expired snapshots - for (const auto& snapshot : metadata.snapshots) { - if (!snapshot) continue; - SnapshotCache snapshot_cache(snapshot.get()); - auto manifests_result = snapshot_cache.Manifests(file_io); - if (!manifests_result.has_value()) continue; - - for (const auto& manifest : manifests_result.value()) { - if (manifest.manifest_path != manifest_path) continue; + // Step 1: Collect live file paths from manifests being deleted. + // Use LiveEntries() (ADDED/EXISTING only) to match Java's ManifestFiles.readPaths + // which delegates to liveEntries(). Using Entries() would include DELETED entries + // and could cause storage leaks. + for (const auto& [path, manifest] : manifest_cache_) { + if (!manifests_to_delete.contains(path)) continue; - auto schema_result = metadata.Schema(); - if (!schema_result.has_value()) continue; - auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); - if (!spec_result.has_value()) continue; + auto reader_result = MakeManifestReader(manifest, file_io); + if (!reader_result.has_value()) continue; - auto reader_result = ManifestReader::Make( - manifest, file_io, schema_result.value(), spec_result.value()); - if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) continue; - auto entries_result = reader_result.value()->Entries(); - if (!entries_result.has_value()) continue; - - for (const auto& entry : entries_result.value()) { - if (entry.data_file) { - data_files_to_delete.insert(entry.data_file->file_path); - } - } - goto next_manifest; // Found and processed this manifest, move to next + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.insert(entry.data_file->file_path); } } - next_manifest:; } if (data_files_to_delete.empty()) { - return {}; + return data_files_to_delete; } // Step 2: Remove any files that are still referenced by retained manifests. - // This ensures we don't delete files that are shared across manifests. + // If reading a retained manifest fails, we must NOT delete its data files + // to avoid accidental data loss (matching Java's retry + throwFailureWhenFinished). for (const auto& manifest_path : retained_manifests) { if (data_files_to_delete.empty()) break; - for (const auto& snapshot : metadata.snapshots) { - if (!snapshot) continue; - SnapshotCache snapshot_cache(snapshot.get()); - auto manifests_result = snapshot_cache.Manifests(file_io); - if (!manifests_result.has_value()) continue; - - for (const auto& manifest : manifests_result.value()) { - if (manifest.manifest_path != manifest_path) continue; - - auto schema_result = metadata.Schema(); - if (!schema_result.has_value()) continue; - auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); - if (!spec_result.has_value()) continue; + auto it = manifest_cache_.find(manifest_path); + if (it == manifest_cache_.end()) continue; - auto reader_result = ManifestReader::Make( - manifest, file_io, schema_result.value(), spec_result.value()); - if (!reader_result.has_value()) continue; + auto reader_result = MakeManifestReader(it->second, file_io); + if (!reader_result.has_value()) { + // Cannot read a retained manifest — abort data file deletion to prevent + // accidental data loss. Java retries and throws on failure here. + return std::unordered_set{}; + } - auto entries_result = reader_result.value()->Entries(); - if (!entries_result.has_value()) continue; + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) { + return std::unordered_set{}; + } - for (const auto& entry : entries_result.value()) { - if (entry.data_file) { - data_files_to_delete.erase(entry.data_file->file_path); - } - } - goto next_retained; + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.erase(entry.data_file->file_path); } } - next_retained:; } - return {}; + return data_files_to_delete; } Status ExpireSnapshots::CleanExpiredFiles( @@ -483,13 +472,13 @@ Status ExpireSnapshots::CleanExpiredFiles( // Only read entries from manifests being deleted (not all expired manifests), // then subtract any files still reachable from retained manifests. if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) { - std::unordered_set data_files_to_delete; - std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths, - data_files_to_delete); - - // TODO(shangxinli): Parallelize file deletion with a thread pool. - for (const auto& path : data_files_to_delete) { - DeleteFilePath(path); + auto data_files_result = + FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths); + if (data_files_result.has_value()) { + // TODO(shangxinli): Parallelize file deletion with a thread pool. + for (const auto& path : data_files_result.value()) { + DeleteFilePath(path); + } } } @@ -508,17 +497,31 @@ Status ExpireSnapshots::CleanExpiredFiles( } } - // Phase 6: Delete expired statistics files. - // Use set difference between before and after states (matching Java behavior). - // Since Finalize runs before table_ is updated, "after" is base() minus expired. - std::unordered_set retained_stats_snapshots(retained_snapshot_ids); + // Phase 6: Delete expired statistics files using path-based set difference. + // A statistics file should only be deleted if its path is not referenced by any + // retained snapshot, since the same file path could be shared across snapshots. + // Collect paths from retained snapshots, then delete any not in that set. + std::unordered_set retained_stat_paths; + std::unordered_set retained_part_stat_paths; + for (const auto& stat_file : metadata.statistics) { + if (stat_file && retained_snapshot_ids.contains(stat_file->snapshot_id)) { + retained_stat_paths.insert(stat_file->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && retained_snapshot_ids.contains(part_stat->snapshot_id)) { + retained_part_stat_paths.insert(part_stat->path); + } + } for (const auto& stat_file : metadata.statistics) { - if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) { + if (stat_file && expired_id_set.contains(stat_file->snapshot_id) && + !retained_stat_paths.contains(stat_file->path)) { DeleteFilePath(stat_file->path); } } for (const auto& part_stat : metadata.partition_statistics) { - if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) { + if (part_stat && expired_id_set.contains(part_stat->snapshot_id) && + !retained_part_stat_paths.contains(part_stat->path)) { DeleteFilePath(part_stat->path); } } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 70e877fdb..9aaaea41c 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -23,10 +23,13 @@ #include #include #include +#include #include #include #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" @@ -195,11 +198,17 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { Status ReadManifestsForSnapshot(int64_t snapshot_id, std::unordered_set& manifest_paths); - /// \brief Find data files to delete by reading entries from manifests being deleted, - /// then subtracting files still reachable from retained manifests. - Status FindDataFilesToDelete(const std::unordered_set& manifests_to_delete, - const std::unordered_set& retained_manifests, - std::unordered_set& data_files_to_delete); + /// \brief Find data files to delete by reading live entries from manifests being + /// deleted, then subtracting files still reachable from retained manifests. + /// If a retained manifest cannot be read, returns an empty set to prevent + /// accidental data loss. + Result> FindDataFilesToDelete( + const std::unordered_set& manifests_to_delete, + const std::unordered_set& retained_manifests); + + /// \brief Create a ManifestReader for the given ManifestFile. + Result> MakeManifestReader( + const ManifestFile& manifest, const std::shared_ptr& file_io); /// \brief Delete a file, suppressing errors (best-effort). /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. @@ -218,6 +227,11 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// Cached result from Apply(), used during Finalize() for file cleanup std::optional apply_result_; + + /// Cache of manifest path -> ManifestFile, built during ReadManifestsForSnapshot + /// to avoid O(M*S) repeated I/O from re-reading manifest lists in + /// FindDataFilesToDelete. + std::unordered_map manifest_cache_; }; } // namespace iceberg From 1604cb7a32aa4753f5cc6560e0e1443b4cc3a5f2 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Sat, 4 Apr 2026 08:23:48 -0700 Subject: [PATCH 3/5] refactor: introduce FileCleanupStrategy and ReachableFileCleanup Mirror Java's file cleanup class hierarchy for expire snapshots: - Add abstract FileCleanupStrategy with shared DeleteFile() and ExpiredStatisticsFilePaths() utilities (path-based set difference) - Add ReachableFileCleanup concrete class owning manifest_cache_, ReadManifestsForSnapshot(), and FindDataFilesToDelete() - Move MakeManifestReader() to a free function in anonymous namespace using ICEBERG_ASSIGN_OR_RAISE - Remove cleanup-specific private methods and manifest_cache_ from ExpireSnapshots class; Finalize() now delegates to the strategy - Clear apply_result_ after consumption in Finalize() - Rename DeleteFilePath to DeleteFile; use std::ignore for FileIO return - Remove manifest_list.h and manifest_reader.h from the header --- src/iceberg/update/expire_snapshots.cc | 480 ++++++++++++++----------- src/iceberg/update/expire_snapshots.h | 51 +-- 2 files changed, 266 insertions(+), 265 deletions(-) diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index b9f011ff5..7f3591c9c 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -43,6 +43,264 @@ namespace iceberg { +namespace { + +Result> MakeManifestReader( + const ManifestFile& manifest, const std::shared_ptr& file_io, + const TableMetadata& metadata) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(manifest.partition_spec_id)); + return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); +} + +/// \brief Abstract strategy for cleaning up files after snapshot expiration. +/// +/// Mirrors Java's FileCleanupStrategy: provides shared delete utilities while +/// allowing different cleanup algorithms (ReachableFileCleanup, IncrementalFileCleanup). +class FileCleanupStrategy { + public: + FileCleanupStrategy(std::shared_ptr file_io, + std::function delete_func) + : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {} + + virtual ~FileCleanupStrategy() = default; + + /// \brief Clean up files that are only reachable by expired snapshots. + /// + /// \param metadata Table metadata before expiration (contains all snapshots). + /// \param expired_snapshot_ids Snapshot IDs that were expired during this operation. + /// \param level Controls which types of files are eligible for deletion. + virtual Status CleanFiles(const TableMetadata& metadata, + const std::unordered_set& expired_snapshot_ids, + CleanupLevel level) = 0; + + protected: + /// \brief Delete a file, suppressing errors (best-effort). + /// + /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. + /// Matches Java's suppressFailureWhenFinished behavior. + void DeleteFile(const std::string& path) { + try { + if (delete_func_) { + delete_func_(path); + } else { + std::ignore = file_io_->DeleteFile(path); + } + } catch (...) { + // Suppress all exceptions during file cleanup to match Java's + // suppressFailureWhenFinished behavior. + } + } + + /// \brief Returns paths of statistics files referenced only by expired snapshots. + /// + /// Uses path-based set difference (matching Java's expiredStatisticsFilesLocations): + /// if the same file path is shared across snapshots, it is only deleted when + /// no retained snapshot references it. + std::unordered_set ExpiredStatisticsFilePaths( + const TableMetadata& metadata, + const std::unordered_set& expired_ids) { + std::unordered_set retained_paths; + for (const auto& stat : metadata.statistics) { + if (stat && !expired_ids.contains(stat->snapshot_id)) { + retained_paths.insert(stat->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && !expired_ids.contains(part_stat->snapshot_id)) { + retained_paths.insert(part_stat->path); + } + } + + std::unordered_set expired_paths; + for (const auto& stat : metadata.statistics) { + if (stat && expired_ids.contains(stat->snapshot_id) && + !retained_paths.contains(stat->path)) { + expired_paths.insert(stat->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && expired_ids.contains(part_stat->snapshot_id) && + !retained_paths.contains(part_stat->path)) { + expired_paths.insert(part_stat->path); + } + } + return expired_paths; + } + + std::shared_ptr file_io_; + std::function delete_func_; +}; + +/// \brief File cleanup strategy that determines safe deletions via full reachability. +/// +/// Mirrors Java's ReachableFileCleanup: collects manifests from all expired and +/// retained snapshots, prunes candidates still referenced by retained snapshots, +/// then deletes orphaned manifests, data files, manifest lists, and statistics files. +/// +/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +class ReachableFileCleanup : public FileCleanupStrategy { + public: + using FileCleanupStrategy::FileCleanupStrategy; + + Status CleanFiles(const TableMetadata& metadata, + const std::unordered_set& expired_snapshot_ids, + CleanupLevel level) override { + std::unordered_set retained_snapshot_ids; + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !expired_snapshot_ids.contains(snapshot->snapshot_id)) { + retained_snapshot_ids.insert(snapshot->snapshot_id); + } + } + + // Phase 1: Collect manifest paths from expired and retained snapshots. + // The manifest_cache_ is populated here to avoid O(M*S) repeated I/O in + // FindDataFilesToDelete. + std::unordered_set expired_manifest_paths; + for (int64_t snapshot_id : expired_snapshot_ids) { + ReadManifestsForSnapshot(metadata, snapshot_id, expired_manifest_paths); + } + std::unordered_set retained_manifest_paths; + for (int64_t snapshot_id : retained_snapshot_ids) { + ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths); + } + + // Phase 2: Prune manifests still referenced by retained snapshots. + std::unordered_set manifests_to_delete; + for (const auto& path : expired_manifest_paths) { + if (!retained_manifest_paths.contains(path)) { + manifests_to_delete.insert(path); + } + } + + // Phase 3: Delete data files if cleanup level is kAll. + if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) { + auto data_files_result = + FindDataFilesToDelete(metadata, manifests_to_delete, retained_manifest_paths); + if (data_files_result.has_value()) { + for (const auto& path : data_files_result.value()) { + DeleteFile(path); + } + } + } + + // Phase 4: Delete orphaned manifest files. + for (const auto& path : manifests_to_delete) { + DeleteFile(path); + } + + // Phase 5: Delete manifest lists from expired snapshots. + for (int64_t snapshot_id : expired_snapshot_ids) { + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) continue; + const auto& snapshot = snapshot_result.value(); + if (!snapshot->manifest_list.empty()) { + DeleteFile(snapshot->manifest_list); + } + } + + // Phase 6: Delete expired statistics files using path-based set difference. + for (const auto& path : ExpiredStatisticsFilePaths(metadata, expired_snapshot_ids)) { + DeleteFile(path); + } + + return {}; + } + + private: + /// Cache of manifest path -> ManifestFile, populated during Phase 1 to avoid + /// re-reading manifest lists in FindDataFilesToDelete. + std::unordered_map manifest_cache_; + + /// \brief Collect manifest paths for a snapshot into manifest_paths. + /// + /// Best-effort: if the snapshot or its manifest list cannot be read, the error + /// is silently suppressed. This is safe for expired snapshots (missed deletions + /// can be cleaned up by GC later) and conservative for retained snapshots (we + /// only delete files we can confirm are unreachable). + void ReadManifestsForSnapshot(const TableMetadata& metadata, int64_t snapshot_id, + std::unordered_set& manifest_paths) { + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) return; + auto& snapshot = snapshot_result.value(); + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) return; + + for (const auto& manifest : manifests_result.value()) { + manifest_paths.insert(manifest.manifest_path); + manifest_cache_.emplace(manifest.manifest_path, manifest); + } + } + + /// \brief Find data files to delete from manifests being removed. + /// + /// Reads live entries (ADDED/EXISTING) from manifests_to_delete, then subtracts + /// any files still referenced by retained_manifests. Uses LiveEntries() to match + /// Java's ManifestFiles.readPaths (delegates to liveEntries()). + /// + /// If any retained manifest cannot be read, returns an empty set to prevent + /// accidental data loss (matching Java's throwFailureWhenFinished for retained + /// manifest reads). + Result> FindDataFilesToDelete( + const TableMetadata& metadata, + const std::unordered_set& manifests_to_delete, + const std::unordered_set& retained_manifests) { + std::unordered_set data_files_to_delete; + + // Step 1: Collect live file paths from manifests being deleted. + for (const auto& [path, manifest] : manifest_cache_) { + if (!manifests_to_delete.contains(path)) continue; + + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) continue; + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.insert(entry.data_file->file_path); + } + } + } + + if (data_files_to_delete.empty()) { + return data_files_to_delete; + } + + // Step 2: Remove files still referenced by retained manifests. + // Abort entirely if a retained manifest cannot be read to prevent data loss. + for (const auto& manifest_path : retained_manifests) { + if (data_files_to_delete.empty()) break; + + auto it = manifest_cache_.find(manifest_path); + if (it == manifest_cache_.end()) continue; + + auto reader_result = MakeManifestReader(it->second, file_io_, metadata); + if (!reader_result.has_value()) { + return std::unordered_set{}; + } + + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) { + return std::unordered_set{}; + } + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.erase(entry.data_file->file_path); + } + } + } + + return data_files_to_delete; + } +}; + +} // namespace + Result> ExpireSnapshots::Make( std::shared_ptr ctx) { ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a context"); @@ -310,223 +568,15 @@ Status ExpireSnapshots::Finalize(std::optional commit_error) { return {}; } + std::unordered_set expired_ids( + apply_result_->snapshot_ids_to_remove.begin(), + apply_result_->snapshot_ids_to_remove.end()); + apply_result_.reset(); + // File cleanup is best-effort: log and continue on individual file deletion failures // to avoid blocking metadata updates (matching Java behavior). - return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove); -} - -void ExpireSnapshots::DeleteFilePath(const std::string& path) { - try { - if (delete_func_) { - delete_func_(path); - } else { - auto status = ctx_->table->io()->DeleteFile(path); - // Best-effort: ignore NotFound (file already deleted) and other errors. - // Java uses suppressFailureWhenFinished + onFailure logging. - std::ignore = status; - } - } catch (...) { - // Suppress all exceptions during file cleanup to match Java's - // suppressFailureWhenFinished behavior. - } -} - -Result> ExpireSnapshots::MakeManifestReader( - const ManifestFile& manifest, const std::shared_ptr& file_io) { - const TableMetadata& metadata = base(); - auto schema_result = metadata.Schema(); - if (!schema_result.has_value()) return std::unexpected(schema_result.error()); - auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); - if (!spec_result.has_value()) return std::unexpected(spec_result.error()); - return ManifestReader::Make(manifest, file_io, schema_result.value(), - spec_result.value()); -} - -Status ExpireSnapshots::ReadManifestsForSnapshot( - int64_t snapshot_id, std::unordered_set& manifest_paths) { - const TableMetadata& metadata = base(); - auto file_io = ctx_->table->io(); - - auto snapshot_result = metadata.SnapshotById(snapshot_id); - if (!snapshot_result.has_value()) { - return {}; - } - auto& snapshot = snapshot_result.value(); - - SnapshotCache snapshot_cache(snapshot.get()); - auto manifests_result = snapshot_cache.Manifests(file_io); - if (!manifests_result.has_value()) { - // Best-effort: skip this snapshot if we can't read its manifests - return {}; - } - - for (const auto& manifest : manifests_result.value()) { - manifest_paths.insert(manifest.manifest_path); - // Cache manifest metadata for later use in FindDataFilesToDelete, - // avoiding O(M*S) repeated I/O from re-reading manifest lists. - manifest_cache_.emplace(manifest.manifest_path, manifest); - } - - return {}; -} - -Result> ExpireSnapshots::FindDataFilesToDelete( - const std::unordered_set& manifests_to_delete, - const std::unordered_set& retained_manifests) { - auto file_io = ctx_->table->io(); - std::unordered_set data_files_to_delete; - - // Step 1: Collect live file paths from manifests being deleted. - // Use LiveEntries() (ADDED/EXISTING only) to match Java's ManifestFiles.readPaths - // which delegates to liveEntries(). Using Entries() would include DELETED entries - // and could cause storage leaks. - for (const auto& [path, manifest] : manifest_cache_) { - if (!manifests_to_delete.contains(path)) continue; - - auto reader_result = MakeManifestReader(manifest, file_io); - if (!reader_result.has_value()) continue; - - auto entries_result = reader_result.value()->LiveEntries(); - if (!entries_result.has_value()) continue; - - for (const auto& entry : entries_result.value()) { - if (entry.data_file) { - data_files_to_delete.insert(entry.data_file->file_path); - } - } - } - - if (data_files_to_delete.empty()) { - return data_files_to_delete; - } - - // Step 2: Remove any files that are still referenced by retained manifests. - // If reading a retained manifest fails, we must NOT delete its data files - // to avoid accidental data loss (matching Java's retry + throwFailureWhenFinished). - for (const auto& manifest_path : retained_manifests) { - if (data_files_to_delete.empty()) break; - - auto it = manifest_cache_.find(manifest_path); - if (it == manifest_cache_.end()) continue; - - auto reader_result = MakeManifestReader(it->second, file_io); - if (!reader_result.has_value()) { - // Cannot read a retained manifest — abort data file deletion to prevent - // accidental data loss. Java retries and throws on failure here. - return std::unordered_set{}; - } - - auto entries_result = reader_result.value()->LiveEntries(); - if (!entries_result.has_value()) { - return std::unordered_set{}; - } - - for (const auto& entry : entries_result.value()) { - if (entry.data_file) { - data_files_to_delete.erase(entry.data_file->file_path); - } - } - } - - return data_files_to_delete; -} - -Status ExpireSnapshots::CleanExpiredFiles( - const std::vector& expired_snapshot_ids) { - const TableMetadata& metadata = base(); - - // Build expired and retained snapshot ID sets. - // The retained set includes ALL snapshots referenced by any branch or tag, - // since Apply() already computed retention across all refs. - std::unordered_set expired_id_set(expired_snapshot_ids.begin(), - expired_snapshot_ids.end()); - std::unordered_set retained_snapshot_ids; - for (const auto& snapshot : metadata.snapshots) { - if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) { - retained_snapshot_ids.insert(snapshot->snapshot_id); - } - } - - // Phase 1: Collect manifest paths from expired and retained snapshots. - // TODO(shangxinli): Parallelize manifest collection with a thread pool. - std::unordered_set expired_manifest_paths; - for (int64_t snapshot_id : expired_snapshot_ids) { - std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths); - } - - std::unordered_set retained_manifest_paths; - for (int64_t snapshot_id : retained_snapshot_ids) { - std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths); - } - - // Phase 2: Prune manifests still referenced by retained snapshots. - // Only manifests exclusively in expired snapshots should be deleted. - std::unordered_set manifests_to_delete; - for (const auto& path : expired_manifest_paths) { - if (!retained_manifest_paths.contains(path)) { - manifests_to_delete.insert(path); - } - } - - // Phase 3: If cleanup level is kAll, find data files to delete. - // Only read entries from manifests being deleted (not all expired manifests), - // then subtract any files still reachable from retained manifests. - if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) { - auto data_files_result = - FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths); - if (data_files_result.has_value()) { - // TODO(shangxinli): Parallelize file deletion with a thread pool. - for (const auto& path : data_files_result.value()) { - DeleteFilePath(path); - } - } - } - - // Phase 4: Delete orphaned manifest files. - for (const auto& path : manifests_to_delete) { - DeleteFilePath(path); - } - - // Phase 5: Delete manifest lists from expired snapshots. - for (int64_t snapshot_id : expired_snapshot_ids) { - auto snapshot_result = metadata.SnapshotById(snapshot_id); - if (!snapshot_result.has_value()) continue; - auto& snapshot = snapshot_result.value(); - if (!snapshot->manifest_list.empty()) { - DeleteFilePath(snapshot->manifest_list); - } - } - - // Phase 6: Delete expired statistics files using path-based set difference. - // A statistics file should only be deleted if its path is not referenced by any - // retained snapshot, since the same file path could be shared across snapshots. - // Collect paths from retained snapshots, then delete any not in that set. - std::unordered_set retained_stat_paths; - std::unordered_set retained_part_stat_paths; - for (const auto& stat_file : metadata.statistics) { - if (stat_file && retained_snapshot_ids.contains(stat_file->snapshot_id)) { - retained_stat_paths.insert(stat_file->path); - } - } - for (const auto& part_stat : metadata.partition_statistics) { - if (part_stat && retained_snapshot_ids.contains(part_stat->snapshot_id)) { - retained_part_stat_paths.insert(part_stat->path); - } - } - for (const auto& stat_file : metadata.statistics) { - if (stat_file && expired_id_set.contains(stat_file->snapshot_id) && - !retained_stat_paths.contains(stat_file->path)) { - DeleteFilePath(stat_file->path); - } - } - for (const auto& part_stat : metadata.partition_statistics) { - if (part_stat && expired_id_set.contains(part_stat->snapshot_id) && - !retained_part_stat_paths.contains(part_stat->path)) { - DeleteFilePath(part_stat->path); - } - } - - return {}; + ReachableFileCleanup strategy(ctx_->table->io(), delete_func_); + return strategy.CleanFiles(base(), expired_ids, cleanup_level_); } // TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 9aaaea41c..fc2910b58 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -28,8 +28,6 @@ #include #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_list.h" -#include "iceberg/manifest/manifest_reader.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" @@ -173,48 +171,6 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { Result> UnreferencedSnapshotIdsToRetain( const SnapshotToRef& refs) const; - /// \brief Clean up files no longer referenced after snapshot expiration. - /// - /// Implements the "reachable file cleanup" strategy from Java's ReachableFileCleanup: - /// 1. Collect manifests from expired and retained snapshots - /// 2. Prune manifests still referenced by retained snapshots - /// 3. Find data files only in manifests being deleted (if kAll) - /// 4. Remove data files still reachable from retained manifests - /// 5. Delete orphaned manifests, manifest lists, and statistics files - /// - /// All deletions are best-effort: failures are suppressed to avoid blocking - /// metadata updates (matching Java's suppressFailureWhenFinished behavior). - /// - /// Branch/tag awareness: retained_snapshot_ids includes all snapshots referenced - /// by any branch or tag, as computed by Apply(). This prevents deleting files - /// that are still reachable from any ref. - /// - /// TODO(shangxinli): Add multi-threaded file deletion support. - /// TODO(shangxinli): Add IncrementalFileCleanup strategy for linear ancestry. - Status CleanExpiredFiles(const std::vector& expired_snapshot_ids); - - /// \brief Read manifest paths from a single snapshot. - /// Best-effort: returns OK even if the snapshot or its manifests can't be read. - Status ReadManifestsForSnapshot(int64_t snapshot_id, - std::unordered_set& manifest_paths); - - /// \brief Find data files to delete by reading live entries from manifests being - /// deleted, then subtracting files still reachable from retained manifests. - /// If a retained manifest cannot be read, returns an empty set to prevent - /// accidental data loss. - Result> FindDataFilesToDelete( - const std::unordered_set& manifests_to_delete, - const std::unordered_set& retained_manifests); - - /// \brief Create a ManifestReader for the given ManifestFile. - Result> MakeManifestReader( - const ManifestFile& manifest, const std::shared_ptr& file_io); - - /// \brief Delete a file, suppressing errors (best-effort). - /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. - void DeleteFilePath(const std::string& path); - - private: const TimePointMs current_time_ms_; const int64_t default_max_ref_age_ms_; int32_t default_min_num_snapshots_; @@ -225,13 +181,8 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; - /// Cached result from Apply(), used during Finalize() for file cleanup + /// Cached result from Apply(), consumed by Finalize() and cleared after use. std::optional apply_result_; - - /// Cache of manifest path -> ManifestFile, built during ReadManifestsForSnapshot - /// to avoid O(M*S) repeated I/O from re-reading manifest lists in - /// FindDataFilesToDelete. - std::unordered_map manifest_cache_; }; } // namespace iceberg From 0059a7b0436f5bb0aea6d2cc36a870b46d615f08 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Sat, 4 Apr 2026 09:46:52 -0700 Subject: [PATCH 4/5] style: fix clang-format violations --- src/iceberg/update/expire_snapshots.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 7f3591c9c..257656946 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -49,7 +49,8 @@ Result> MakeManifestReader( const ManifestFile& manifest, const std::shared_ptr& file_io, const TableMetadata& metadata) { ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + metadata.PartitionSpecById(manifest.partition_spec_id)); return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); } @@ -98,8 +99,7 @@ class FileCleanupStrategy { /// if the same file path is shared across snapshots, it is only deleted when /// no retained snapshot references it. std::unordered_set ExpiredStatisticsFilePaths( - const TableMetadata& metadata, - const std::unordered_set& expired_ids) { + const TableMetadata& metadata, const std::unordered_set& expired_ids) { std::unordered_set retained_paths; for (const auto& stat : metadata.statistics) { if (stat && !expired_ids.contains(stat->snapshot_id)) { @@ -568,9 +568,8 @@ Status ExpireSnapshots::Finalize(std::optional commit_error) { return {}; } - std::unordered_set expired_ids( - apply_result_->snapshot_ids_to_remove.begin(), - apply_result_->snapshot_ids_to_remove.end()); + std::unordered_set expired_ids(apply_result_->snapshot_ids_to_remove.begin(), + apply_result_->snapshot_ids_to_remove.end()); apply_result_.reset(); // File cleanup is best-effort: log and continue on individual file deletion failures From 2d1ffed0be0cb4c79acb4625155f9206d57de5b8 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Fri, 17 Apr 2026 16:58:52 -0700 Subject: [PATCH 5/5] fix: guard manifest deletion when retained snapshot reads fail; defer stats file deletion P0: ReadManifestsForSnapshot now returns bool. If any retained snapshot's manifest list cannot be read, phases 2-4 (manifest and data file deletion) are skipped entirely. An incomplete retained set makes it unsafe to compute manifests_to_delete, as manifests still referenced by unreadable snapshots would be wrongly included. This matches Java's throwFailureWhenFinished behavior in ReachableFileCleanup. Manifest list deletion (phase 5) is unaffected since it is keyed on expired snapshots only. P1: Remove physical statistics and partition-statistics file deletion (the former phase 6). RemoveStatistics/RemovePartitionStatistics are still not called in RemoveSnapshots (the TODO in table_metadata.cc), so the committed metadata still references those files after they would be deleted on disk. Deletion is deferred until the metadata-level removal is wired in, at which point the two operations can be kept in sync. --- src/iceberg/table_metadata.cc | 4 +- src/iceberg/update/expire_snapshots.cc | 108 ++++++++++--------------- 2 files changed, 45 insertions(+), 67 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index d3b5629c8..aa817b5fc 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -1440,7 +1440,9 @@ Status TableMetadataBuilder::Impl::RemoveSnapshots( if (ids_to_remove.contains(snapshot_id)) { snapshots_by_id_.erase(snapshot_id); snapshot_ids_to_remove.push_back(snapshot_id); - // FIXME: implement statistics removal and uncomment below + // TODO: Remove statistics metadata entries when a snapshot is expired. + // Until this is wired in, physical stats-file deletion is deferred in + // ReachableFileCleanup::CleanFiles to keep metadata and physical files in sync. // ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id)); // ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id)); } else { diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 257656946..8c7e80577 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -33,7 +33,6 @@ #include "iceberg/manifest/manifest_reader.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" -#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" @@ -93,41 +92,6 @@ class FileCleanupStrategy { } } - /// \brief Returns paths of statistics files referenced only by expired snapshots. - /// - /// Uses path-based set difference (matching Java's expiredStatisticsFilesLocations): - /// if the same file path is shared across snapshots, it is only deleted when - /// no retained snapshot references it. - std::unordered_set ExpiredStatisticsFilePaths( - const TableMetadata& metadata, const std::unordered_set& expired_ids) { - std::unordered_set retained_paths; - for (const auto& stat : metadata.statistics) { - if (stat && !expired_ids.contains(stat->snapshot_id)) { - retained_paths.insert(stat->path); - } - } - for (const auto& part_stat : metadata.partition_statistics) { - if (part_stat && !expired_ids.contains(part_stat->snapshot_id)) { - retained_paths.insert(part_stat->path); - } - } - - std::unordered_set expired_paths; - for (const auto& stat : metadata.statistics) { - if (stat && expired_ids.contains(stat->snapshot_id) && - !retained_paths.contains(stat->path)) { - expired_paths.insert(stat->path); - } - } - for (const auto& part_stat : metadata.partition_statistics) { - if (part_stat && expired_ids.contains(part_stat->snapshot_id) && - !retained_paths.contains(part_stat->path)) { - expired_paths.insert(part_stat->path); - } - } - return expired_paths; - } - std::shared_ptr file_io_; std::function delete_func_; }; @@ -136,7 +100,7 @@ class FileCleanupStrategy { /// /// Mirrors Java's ReachableFileCleanup: collects manifests from all expired and /// retained snapshots, prunes candidates still referenced by retained snapshots, -/// then deletes orphaned manifests, data files, manifest lists, and statistics files. +/// then deletes orphaned manifests, data files, and manifest lists. /// /// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. class ReachableFileCleanup : public FileCleanupStrategy { @@ -160,33 +124,44 @@ class ReachableFileCleanup : public FileCleanupStrategy { for (int64_t snapshot_id : expired_snapshot_ids) { ReadManifestsForSnapshot(metadata, snapshot_id, expired_manifest_paths); } + bool retained_manifests_complete = true; std::unordered_set retained_manifest_paths; for (int64_t snapshot_id : retained_snapshot_ids) { - ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths); + if (!ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths)) { + retained_manifests_complete = false; + break; + } } - // Phase 2: Prune manifests still referenced by retained snapshots. - std::unordered_set manifests_to_delete; - for (const auto& path : expired_manifest_paths) { - if (!retained_manifest_paths.contains(path)) { - manifests_to_delete.insert(path); + // If any retained snapshot's manifests could not be read, skip manifest and + // data file deletion. An incomplete retained set means we cannot safely + // determine which manifests are still live, so deleting any candidates risks + // removing files still referenced by retained snapshots. This matches Java's + // throwFailureWhenFinished behavior. + if (retained_manifests_complete) { + // Phase 2: Prune manifests still referenced by retained snapshots. + std::unordered_set manifests_to_delete; + for (const auto& path : expired_manifest_paths) { + if (!retained_manifest_paths.contains(path)) { + manifests_to_delete.insert(path); + } } - } - // Phase 3: Delete data files if cleanup level is kAll. - if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) { - auto data_files_result = - FindDataFilesToDelete(metadata, manifests_to_delete, retained_manifest_paths); - if (data_files_result.has_value()) { - for (const auto& path : data_files_result.value()) { - DeleteFile(path); + // Phase 3: Delete data files if cleanup level is kAll. + if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) { + auto data_files_result = + FindDataFilesToDelete(metadata, manifests_to_delete, retained_manifest_paths); + if (data_files_result.has_value()) { + for (const auto& path : data_files_result.value()) { + DeleteFile(path); + } } } - } - // Phase 4: Delete orphaned manifest files. - for (const auto& path : manifests_to_delete) { - DeleteFile(path); + // Phase 4: Delete orphaned manifest files. + for (const auto& path : manifests_to_delete) { + DeleteFile(path); + } } // Phase 5: Delete manifest lists from expired snapshots. @@ -199,10 +174,10 @@ class ReachableFileCleanup : public FileCleanupStrategy { } } - // Phase 6: Delete expired statistics files using path-based set difference. - for (const auto& path : ExpiredStatisticsFilePaths(metadata, expired_snapshot_ids)) { - DeleteFile(path); - } + // TODO(shangxinli): Delete expired statistics and partition-statistics files here. + // This requires RemoveStatistics/RemovePartitionStatistics to be wired into + // RemoveSnapshots first (see TODO in table_metadata.cc), so that physical file + // deletion and metadata entry removal stay in sync. return {}; } @@ -214,24 +189,25 @@ class ReachableFileCleanup : public FileCleanupStrategy { /// \brief Collect manifest paths for a snapshot into manifest_paths. /// - /// Best-effort: if the snapshot or its manifest list cannot be read, the error - /// is silently suppressed. This is safe for expired snapshots (missed deletions - /// can be cleaned up by GC later) and conservative for retained snapshots (we - /// only delete files we can confirm are unreachable). - void ReadManifestsForSnapshot(const TableMetadata& metadata, int64_t snapshot_id, + /// Returns true if manifests were collected successfully. Returns false if the + /// snapshot or its manifest list cannot be read — callers must treat a false + /// result for a retained snapshot as an incomplete retained set and skip + /// manifest deletion to avoid removing live files. + bool ReadManifestsForSnapshot(const TableMetadata& metadata, int64_t snapshot_id, std::unordered_set& manifest_paths) { auto snapshot_result = metadata.SnapshotById(snapshot_id); - if (!snapshot_result.has_value()) return; + if (!snapshot_result.has_value()) return false; auto& snapshot = snapshot_result.value(); SnapshotCache snapshot_cache(snapshot.get()); auto manifests_result = snapshot_cache.Manifests(file_io_); - if (!manifests_result.has_value()) return; + if (!manifests_result.has_value()) return false; for (const auto& manifest : manifests_result.value()) { manifest_paths.insert(manifest.manifest_path); manifest_cache_.emplace(manifest.manifest_path, manifest); } + return true; } /// \brief Find data files to delete from manifests being removed.