fix(cassandra): auto-recover session after Cassandra restart#2997
fix(cassandra): auto-recover session after Cassandra restart#2997dpol1 wants to merge 2 commits intoapache:masterfrom
Conversation
- Register ExponentialReconnectionPolicy on the Cluster builder so the
Datastax driver keeps retrying downed nodes in the background.
- Wrap every Session.execute() in executeWithRetry() with exponential
backoff on transient connectivity failures.
- Implement reconnectIfNeeded()/reset() so the pool reopens closed
sessions and issues a lightweight health-check (SELECT now() FROM
system.local) before subsequent queries.
- Add tunable options: cassandra.reconnect_base_delay,
cassandra.reconnect_max_delay, cassandra.reconnect_max_retries,
cassandra.reconnect_interval.
- Add unit tests covering defaults, overrides, disabling retries and
option keys.
Fixes apache#2740
97de8e9 to
fc3d291
Compare
|
The PR wraps Consider wrapping the async path as well, or at minimum adding a TODO/comment explaining why async commits are deliberately left un-retried (e.g., if retry semantics for async batches are too complex for this PR). |
|
Thanks @imbajin for the feedback, changed! |
| // Reconnection policy: let driver keep retrying nodes in background | ||
| // with exponential backoff after they go down (see issue #2740). | ||
| long reconnectBase = config.get( | ||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); |
There was a problem hiding this comment.
reconnect_base_delay
CASSANDRA_RECONNECT_BASE_DELAY is read twice from config: once in the constructor (line 72) to validate the base ≤ max constraint, and again here in open() (line 120-121) to build the reconnection policy.
The constructor already stores reconnectMax as this.retryMaxDelay, but reconnectBase is discarded after validation. This means:
config.get()is called redundantly on everyopen().- If the config object were mutable (or in tests with different config instances), the two reads could theoretically diverge.
Suggestion: store reconnectBase as a field alongside retryMaxDelay in the constructor, then use this.retryBaseDelay here:
// constructor
this.retryBaseDelay = reconnectBase; // add this field
this.retryMaxDelay = reconnectMax;
// open()
builder.withReconnectionPolicy(
new ExponentialReconnectionPolicy(this.retryBaseDelay,
this.retryMaxDelay));| // failed to reopen, clear opened so the next execute() does | ||
| // not NPE before executeWithRetry() can intercept. | ||
| if (this.session == null) { | ||
| this.opened = false; |
There was a problem hiding this comment.
this.opened in reconnectIfNeeded() breaks parent-class lifecycle invariants
opened is a field of AbstractBackendSession that tracks the ref-counted lifecycle managed by BackendSessionPool. Setting it to false here bypasses that contract.
The normal teardown flow is:
BackendSessionPool.closeSession()
→ session.detach() // decrements ref
→ if ref == 0: session.close()
→ threadLocalSession.remove()
→ sessions.remove(threadId)
→ sessionCount.decrementAndGet()
If this.opened = false is set here instead, the session object stays registered in sessions and sessionCount is never decremented. On the next getOrNewSession() call the pool sees opened == false and creates a new session, leaking the old entry.
Additionally, opened() already guards against a null session:
@Override
public boolean opened() {
if (this.opened && this.session == null) {
this.tryOpen(); // already handles the re-open
}
return this.opened && this.session != null;
}So the NPE concern in the finally comment is already handled by opened(). The finally block can be removed entirely — just catch the exception and log, letting the normal opened() / executeWithRetry() path handle the rest.
| int processors = Math.min(statements.size(), 1023); | ||
| List<ResultSetFuture> results = new ArrayList<>(processors + 1); | ||
| for (Statement s : statements) { | ||
| // TODO: commitAsync is not retried (async retry semantics are complex) |
There was a problem hiding this comment.
commitAsync() has no retry protection — inconsistent write reliability
commit() (sync path) wraps all statements in executeWithRetry, but commitAsync() calls this.session.executeAsync(s) directly. This means write operations on the async path get no protection against transient NoHostAvailableException / OperationTimedOutException — exactly the failures this PR targets.
Timeline during a Cassandra restart:
Thread A (sync): commit() → executeWithRetry() → retries → succeeds ✓
Thread B (async): commitAsync() → executeAsync() → NoHostAvailableException ✗
Both paths commit the same batch type; users cannot know which one they're calling or whether it will be protected.
If implementing async retry is genuinely deferred, the TODO should be more explicit about the consequence — callers may see write failures during a Cassandra restart even with maxRetries > 0 configured. Consider at minimum logging a warning at startup when maxRetries > 0.
| * {@code cassandra.reconnect_max_delay} (default 60000ms) so the | ||
| * request fails fast and pressure is released back to the caller. | ||
| */ | ||
| private ResultSet executeWithRetry(Statement statement) { |
There was a problem hiding this comment.
With the defaults maxRetries = 10, reconnect_interval = 5000 ms, and reconnect_max_delay = 60 000 ms, the worst-case wall time for one executeWithRetry() call is:
attempt 0 → sleep(min(5000 * 1, 60000)) = 5 000 ms
attempt 1 → sleep(min(5000 * 2, 60000)) = 10 000 ms
attempt 2 → sleep(min(5000 * 4, 60000)) = 20 000 ms
attempt 3 → sleep(min(5000 * 8, 60000)) = 40 000 ms
attempts 4–9 → sleep(60 000 ms each) = 360 000 ms
──────────
total blocking time ≈ 435 seconds (~7 min)
During a Cassandra outage every in-flight thread sleeps through this sequence. On a busy server, hundreds of threads pile up in Thread.sleep(), exhausting the thread pool well before maxRetries is hit.
Consider lowering the defaults so the query-level retry is a short hiccup buffer, while the driver-level ExponentialReconnectionPolicy handles the actual node reconnection in the background:
| Option | Current default | Suggested |
|---|---|---|
reconnect_max_retries |
10 | 3 |
reconnect_interval |
5 000 ms | 1 000 ms |
reconnect_max_delay |
60 000 ms | 10 000 ms |
| this.tryOpen(); | ||
| } | ||
| if (this.session != null) { | ||
| this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); |
There was a problem hiding this comment.
When session.execute(HEALTH_CHECK_CQL) throws a DriverException, the exception is caught and logged at DEBUG — but the session object that produced the failure is kept. The probe's purpose is to detect a dead session early; if it fails the session should be marked unhealthy immediately.
Current flow:
health-check → DriverException
→ LOG.debug(...)
→ session is NOT cleared
→ next executeWithRetry() tries the same dead session
→ pays the full retry cost anyway
Desired flow:
health-check → DriverException
→ session = null ← mark unhealthy now
→ opened() will call tryOpen() before the next real query
Suggested fix:
} catch (DriverException e) {
LOG.debug("Cassandra health-check failed, resetting session: {}",
e.getMessage());
this.session = null; // force re-open on next query via opened()
}This also makes the finally block that sets this.opened = false unnecessary (see separate comment on that).
| public void testExecuteWithRetrySucceedsAfterTransientFailures() { | ||
| Configuration conf = new PropertiesConfiguration(); | ||
| HugeConfig config = new HugeConfig(conf); | ||
| CassandraSessionPool pool = new CassandraSessionPool(config, |
There was a problem hiding this comment.
CassandraSessionPool with a blank config — fragile setup
new HugeConfig(new PropertiesConfiguration()) produces an empty config. The constructor calls config.get(CASSANDRA_RECONNECT_BASE_DELAY) etc., which falls back to option defaults only if HugeConfig performs that fallback correctly. If any option's rangeInt validator is applied to the absent value rather than the default, this test will fail with a confusing config error instead of a clear assertion.
The fields set via Whitebox.setInternalState immediately after construction throw away the constructed values anyway, so the constructor call is only there to get a valid object — but it's a fragile path.
Suggested fix: explicitly set the required config values so the test is self-describing and not dependent on default-fallback behaviour:
Configuration conf = new PropertiesConfiguration();
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L);
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES.name(), 3);
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(), 1L);
HugeConfig config = new HugeConfig(conf);| } | ||
|
|
||
| @Test | ||
| public void testReconnectOptionsExposeExpectedKeys() { |
There was a problem hiding this comment.
🧹 testReconnectOptionsExposeExpectedKeys only checks string literals — low-value test
This test asserts that a static constant's .name() returns the exact string that is defined on the next line of the same class. If someone renames the key, they update both places together, so this test adds no real safety net.
What is actually worth testing — and not currently covered — is that the validators reject out-of-range values, e.g.:
// base delay below minimum (100 ms) should be rejected
Configuration conf = new PropertiesConfiguration();
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 50L);
Assert.assertThrows(/* config validation exception */, () -> new HugeConfig(conf));
// max < base should be rejected (the E.checkArgument in the constructor)
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 5000L);
conf.setProperty(CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L);
Assert.assertThrows(IllegalArgumentException.class,
() -> new CassandraSessionPool(new HugeConfig(conf), "ks", "s"));Consider replacing or augmenting this test with boundary checks like the above.
Purpose of the PR
closes #2740
HugeGraphServer stops responding after Cassandra is restarted and never
recovers without a full server restart.
Root cause:
CassandraSessionPoolbuilds the DatastaxClusterwithout aReconnectionPolicy,CassandraSession.execute(...)calls the driver oncewith no retry, and thread-local sessions are never probed for liveness.
Once Cassandra goes down, transient
NoHostAvailableException/OperationTimedOutExceptionerrors surface to the user and the pool staysdead even after Cassandra comes back online.
Main Changes
Register
ExponentialReconnectionPolicy(baseDelay, maxDelay)on theClusterbuilder so the Datastax driver keeps retrying downed nodes inthe background.
Wrap every
Session.execute(...)inexecuteWithRetry(Statement)withexponential backoff on transient connectivity failures.
Implement
reconnectIfNeeded()/reset()onCassandraSessionso thepool reopens closed sessions and issues a lightweight health-check
(
SELECT now() FROM system.local) before subsequent queries.Add four tunables in
CassandraOptions(defaults preserve previousbehavior for healthy clusters):
cassandra.reconnect_base_delay1000mscassandra.reconnect_max_delay60000mscassandra.reconnect_max_retries100disables)cassandra.reconnect_interval5000msAdd unit tests covering defaults, overrides, disabling retries and option keys.
Verifying these changes
mvn -pl hugegraph-server/hugegraph-test -am test -Dtest=CassandraTest— 13/13 passDoes this PR potentially affect the following parts?
Documentation Status
Doc - TODO