Skip to content

[#11298] feat(iceberg-rest): add async cleanup manager#11299

Open
roryqi wants to merge 1 commit into
apache:mainfrom
qqqttt123:async-iceberg-hard-deletion-pr2-manager
Open

[#11298] feat(iceberg-rest): add async cleanup manager#11299
roryqi wants to merge 1 commit into
apache:mainfrom
qqqttt123:async-iceberg-hard-deletion-pr2-manager

Conversation

@roryqi
Copy link
Copy Markdown
Contributor

@roryqi roryqi commented May 29, 2026

What changes were proposed in this pull request?

  • Add IcebergCleanupManager: a server-wide worker pool that polls the iceberg_cleanup_job store for PENDING jobs, claims them via a heartbeat lease, deletes the dropped table's reachable files in bulk through a shared deleteExecutor (with CallerRunsPolicy back-pressure), renews heartbeats on a separate scheduler thread, drives the retry state machine (transient failure → back to PENDING, give up at max-attemptsFAILED), and prunes finished rows past the retention window.
  • Add TestIcebergCleanupManager unit tests, running against the H2/MySQL/PostgreSQL backend matrix via TestJDBCBackend.
  • Sync the async hard-deletion design doc with the merged persistence layer (the dropTable enqueue snippet and the testing section).

Why are the changes needed?

Second of three stacked PRs for async hard deletion in the Iceberg REST catalog. The persistence layer (#11266) added the durable job store; this PR adds the worker engine that drains jobs and deletes files. REST integration follows in PR 3.

Fix: #11298

Does this PR introduce any user-facing change?

No. The manager is not yet wired into the REST drop path (that lands in PR 3), and the config keys (async-cleanup.*) were introduced with the persistence layer (#11266).

How was this patch tested?

./gradlew :iceberg:iceberg-rest-server:test --tests "org.apache.gravitino.iceberg.service.cleanup.TestIcebergCleanupManager" -PskipITs — 7 tests pass (lifecycle to SUCCEEDED, transient-retry to FAILED, bulk-delete batching, already-deleted-file tolerance, reachable-file cleanup, and tombstone delegation).

Add IcebergCleanupManager, the worker engine that pulls PENDING jobs from
the iceberg_cleanup_job store, drives the cleanup state machine (claim,
heartbeat, succeed, fail-after-max-attempts), deletes data files with a
server-wide pool, and prunes finished rows past the retention window.
Includes its unit tests, which run against the H2/MySQL/PostgreSQL backend
matrix via TestJDBCBackend.

Also sync the async hard-deletion design doc with the merged persistence
layer: the dropTable enqueue snippet now uses the IcebergCleanupJob
constructor and IcebergCleanupManager, and the testing section names
TestIcebergCleanupManager.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

Code Coverage Report

Overall Project 66.73% 🟢
Files changed 66.67% 🟢

Module Coverage
aliyun 1.72% 🔴
api 46.82% 🟢
authorization-common 85.96% 🟢
aws 3.66% 🔴
azure 2.47% 🔴
catalog-common 10.04% 🔴
catalog-fileset 80.33% 🟢
catalog-glue 66.08% 🟢
catalog-hive 79.55% 🟢
catalog-jdbc-clickhouse 80.02% 🟢
catalog-jdbc-common 45.31% 🟢
catalog-jdbc-doris 80.28% 🟢
catalog-jdbc-hologres 54.03% 🟢
catalog-jdbc-mysql 79.23% 🟢
catalog-jdbc-oceanbase 78.38% 🟢
catalog-jdbc-postgresql 82.29% 🟢
catalog-jdbc-starrocks 78.51% 🟢
catalog-kafka 77.01% 🟢
catalog-lakehouse-generic 44.89% 🟢
catalog-lakehouse-hudi 79.1% 🟢
catalog-lakehouse-iceberg 85.65% 🟢
catalog-lakehouse-paimon 79.29% 🟢
catalog-model 77.72% 🟢
cli 44.51% 🟢
client-java 77.91% 🟢
common 49.99% 🟢
core 82.42% 🟢
filesystem-hadoop3 76.97% 🟢
flink 0.0% 🔴
flink-common 41.2% 🟢
flink-runtime 0.0% 🔴
gcp 14.12% 🔴
hadoop-common 10.39% 🔴
hive-metastore-common 53.26% 🟢
iceberg-common 56.71% 🟢
iceberg-rest-server 71.89% -0.21% 🟢
idp-basic 85.99% 🟢
integration-test-common 0.0% 🔴
jobs 66.17% 🟢
lance-common 20.83% 🔴
lance-rest-server 60.27% 🟢
lineage 53.02% 🟢
optimizer 82.87% 🟢
optimizer-api 21.95% 🔴
server 85.73% 🟢
server-common 73.66% 🟢
spark 32.79% 🔴
spark-common 39.75% 🔴
trino-connector 39.44% 🔴
Files
Module File Coverage
iceberg-rest-server IcebergCleanupManager.java 66.67% 🟢

@roryqi roryqi self-assigned this May 30, 2026
.build());
cleanupManager.enqueue(
new IcebergCleanupJob(
0L, // id assigned from IdGenerator at enqueue (§5.4)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag (§5.4) is not so proper, and we can't get what it refers to

Comment on lines +136 to +137
workers = Executors.newFixedThreadPool(workerThreads, daemon("iceberg-cleanup-worker"));
scheduler = Executors.newScheduledThreadPool(1, daemon("iceberg-cleanup-scheduler"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to declare the queue in the thread pool, or the size is unlimited.

workers.submit(this::workerLoop);
}

long heartbeatIntervalMs = Math.max(1L, heartbeatTimeoutMs / 3L);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heartbeatIntervalMs can be 1?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Subtask] Async cleanup manager for the Iceberg REST catalog

2 participants