From eab91a84a5b26551301e012920309f0c57f62c4d Mon Sep 17 00:00:00 2001 From: Jerry Lin Date: Sat, 27 Jun 2026 16:36:46 +0800 Subject: [PATCH 1/2] ZkController.OnReconnect Is Trigger Unnecessarily Curator's RECONNECTED event is different from the previous RECONNECTED event. Before Solr10, the OnReconnect is only triggered after a reconnection from a session expiration. Check the following https://github.com/apache/solr/blob/fdb5314279657f7895a90123436d834e81ea3157/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ConnectionManager.java#L165 https://github.com/apache/solr/blob/fdb5314279657f7895a90123436d834e81ea3157/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ConnectionManager.java#L199 But Curator's RECONNECTED event is triggered every time a Solr node is disconnected from a ZooKeeper instance and reconnected to another ZooKeeper instance. Therefore, currently ZkController.onReconnect is invoked every time a Solr node reconnects to a ZooKeeper instance without a session expiration, which is a huge overhead, especially when we need to rolling-restart a ZooKeeper Cluster. It can take more than 10 minutes for a small Solr cluster to level out. Similiarly, now ZkController.onDisconnect is triggered just after a disconnection from a Zookeeper instance. It should only be triggered after a session expiration. --- .../cloud/ShardLeaderElectionContext.java | 3 +- .../org/apache/solr/cloud/ZkController.java | 94 ++++++----- .../handler/admin/ZookeeperInfoHandler.java | 9 +- .../solr/schema/ZkIndexSchemaReader.java | 12 +- .../solr/cloud/DistributedQueueTest.java | 10 +- .../apache/solr/cloud/LeaderElectionTest.java | 11 +- ...stExpiredReconnectionListenerSupport.java} | 38 ++--- .../apache/solr/cloud/ZkControllerTest.java | 151 ++++++++++++++++++ .../solr/common/cloud/OnDisconnect.java | 32 ---- .../apache/solr/common/cloud/OnReconnect.java | 39 ----- .../solr/common/cloud/SolrCuratorEvent.java | 62 +++++++ .../solr/common/cloud/ZkStateReader.java | 4 +- 12 files changed, 308 insertions(+), 157 deletions(-) rename solr/core/src/test/org/apache/solr/cloud/{TestOnReconnectListenerSupport.java => TestExpiredReconnectionListenerSupport.java} (76%) delete mode 100644 solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java delete mode 100644 solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java create mode 100644 solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrCuratorEvent.java diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java index 7512867a794d..7b9f55352349 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java @@ -164,7 +164,8 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup if (shouldAbort()) { // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the - // later, we cannot be sure we are still the leader, so we should bail out. The OnReconnect + // later, we cannot be sure we are still the leader, so we should bail out. The + // OnExpiredReconnection // handler will re-register the cores and handle a new leadership election. return; } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index a3702c1f1e31..bd3a9bb50f9d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -27,6 +27,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE; +import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.api.internal.StringUtils; import java.io.Closeable; import java.io.IOException; @@ -58,6 +59,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.solr.client.api.util.SolrVersion; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.cloud.SolrCloudManager; @@ -84,13 +86,12 @@ import org.apache.solr.common.cloud.DocCollectionWatcher; import org.apache.solr.common.cloud.LiveNodesListener; import org.apache.solr.common.cloud.NodesSysPropsCacher; -import org.apache.solr.common.cloud.OnDisconnect; -import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.solr.common.cloud.PerReplicaStatesOps; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.SecurityAwareZkACLProvider; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkACLProvider; import org.apache.solr.common.cloud.ZkCoreNodeProps; @@ -212,8 +213,10 @@ public String toString() { private final ExecutorService zkConnectionListenerCallbackExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor( new SolrNamedThreadFactory("zkConnectionListenerCallback")); - private final OnReconnect onReconnect = this::onReconnect; - private final OnDisconnect onDisconnect = this::onDisconnect; + private final ConnectionStateListener onExpiredReconnection = + SolrCuratorEvent.EXPIRED_RECONNECTION.of(this::onExpiredReconnection); + private final ConnectionStateListener onSessionExpiration = + SolrCuratorEvent.SESSION_EXPIRATION.of(this::onSessionExpiration); private final String zkServerAddress; // example: 127.0.0.1:54062/solr @@ -253,7 +256,8 @@ public String toString() { // keeps track of a list of objects that need to know a new ZooKeeper session was created after // expiration occurred ref is held as a HashSet since we clone the set before notifying to avoid // synchronizing too long - private final HashSet reconnectListeners = new HashSet<>(); + private final HashSet expiredReconnectionListeners = + new HashSet<>(); private class RegisterCoreAsync implements Callable { @@ -322,7 +326,7 @@ public ZkController( new DefaultZkCredentialsProvider()); zkCredentialsProvider.setZkCredentialsInjector(zkCredentialsInjector); - addOnReconnectListener(getConfigDirListener()); + addExpiredReconnectionListener(getConfigDirListener()); final var compressor = loadPluginOrDefault( @@ -342,11 +346,11 @@ public ZkController( zkClient .getCuratorFramework() .getConnectionStateListenable() - .addListener(onReconnect, zkConnectionListenerCallbackExecutor); + .addListener(onExpiredReconnection, zkConnectionListenerCallbackExecutor); zkClient .getCuratorFramework() .getConnectionStateListenable() - .addListener(onDisconnect, zkConnectionListenerCallbackExecutor); + .addListener(onSessionExpiration, zkConnectionListenerCallbackExecutor); // Refuse to start if ZK has a non-empty /clusterstate.json or a /solr.xml file checkNoOldClusterstate(zkClient); @@ -401,7 +405,7 @@ public ZkController( assert ObjectReleaseTracker.track(this); } - private void onDisconnect(boolean sessionExpired) { + private void onSessionExpiration() { try { overseer.close(); } catch (Exception e) { @@ -411,7 +415,7 @@ private void onDisconnect(boolean sessionExpired) { // Close outstanding leader elections List descriptors = cc.getCoreDescriptors(); for (CoreDescriptor descriptor : descriptors) { - closeExistingElectionContext(descriptor, sessionExpired); + closeExistingElectionContext(descriptor); } // Mark all cores as not leader @@ -430,7 +434,7 @@ private T loadPluginOrDefault( return cc.getResourceLoader().newInstance(concretePluginClassName, basePluginType); } - private void onReconnect() { + private void onExpiredReconnection() { // on reconnect, reload cloud info log.info("ZooKeeper session re-connected ... refreshing core states after session expiration."); clearZkCollectionTerms(); @@ -507,34 +511,34 @@ private void onReconnect() { } // notify any other objects that need to know when the session was re-connected - HashSet clonedListeners; - synchronized (reconnectListeners) { - clonedListeners = new HashSet<>(reconnectListeners); + HashSet clonedListeners; + synchronized (expiredReconnectionListeners) { + clonedListeners = new HashSet<>(expiredReconnectionListeners); } - // the OnReconnect operation can be expensive per listener, so do that async in + // the ExpiredReconnection operation can be expensive per listener, so do that async in // the background - for (OnReconnect listener : clonedListeners) { + for (SolrCuratorEvent.EventAction listener : clonedListeners) { try { if (executorService != null) { executorService.execute( () -> { try { - listener.onReconnect(); + listener.respond(); } catch (Throwable exc) { // not much we can do here other than warn in the log log.warn( - "Error when notifying OnReconnect listener {} after session re-connected.", + "Error when notifying ExpiredReconnection listener {} after session re-connected.", listener, exc); } }); } else { - listener.onReconnect(); + listener.respond(); } } catch (Throwable exc) { // not much we can do here other than warn in the log log.warn( - "Error when notifying OnReconnect listener {} after session re-connected.", + "Error when notifying ExpiredReconnection listener {} after session re-connected.", listener, exc); } @@ -755,7 +759,7 @@ public void waitForPendingTasksToComplete() { } } - private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) { + private ContextKey closeExistingElectionContext(CoreDescriptor cd) { // look for old context - if we find it, cancel it String collection = cd.getCloudDescriptor().getCollectionName(); final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); @@ -765,11 +769,7 @@ private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessi if (prevContext != null) { prevContext.close(); - // Only remove the election contexts if the session expired, otherwise the ephemeral nodes - // will still exist - if (sessionExpired) { - electionContexts.remove(contextKey); - } + electionContexts.remove(contextKey); } return contextKey; @@ -779,8 +779,14 @@ public void preClose() { this.isClosed = true; try { // We do not want to react to connection state changes after we have started to close - zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onReconnect); - zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onDisconnect); + zkClient + .getCuratorFramework() + .getConnectionStateListenable() + .removeListener(onExpiredReconnection); + zkClient + .getCuratorFramework() + .getConnectionStateListenable() + .removeListener(onSessionExpiration); ExecutorUtil.shutdownNowAndAwaitTermination(zkConnectionListenerCallbackExecutor); } catch (Exception e) { log.warn( @@ -2593,41 +2599,43 @@ public void throwErrorIfReplicaReplaced(CoreDescriptor desc) { * expiration occurs; in most cases, listeners will be components that have watchers that need to * be re-created. */ - public void addOnReconnectListener(OnReconnect listener) { + public void addExpiredReconnectionListener(SolrCuratorEvent.EventAction listener) { if (listener != null) { - synchronized (reconnectListeners) { - reconnectListeners.add(listener); - log.debug("Added new OnReconnect listener {}", listener); + synchronized (expiredReconnectionListeners) { + expiredReconnectionListeners.add(listener); + log.debug("Added new ExpiredReconnection listener {}", listener); } } } /** - * Removed a previously registered OnReconnect listener, such as when a core is removed or + * Removed a previously registered expired-reconnect listener, such as when a core is removed or * reloaded. */ - public void removeOnReconnectListener(OnReconnect listener) { + public void removeExpiredReconnectionListener(SolrCuratorEvent.EventAction listener) { if (listener != null) { boolean wasRemoved; - synchronized (reconnectListeners) { - wasRemoved = reconnectListeners.remove(listener); + synchronized (expiredReconnectionListeners) { + wasRemoved = expiredReconnectionListeners.remove(listener); } if (wasRemoved) { - log.debug("Removed OnReconnect listener {}", listener); + log.debug("Removed ExpiredReconnection listener {}", listener); } else { log.warn( - "Was asked to remove OnReconnect listener {}, but remove operation " + "Was asked to remove ExpiredReconnection listener {}, but remove operation " + "did not find it in the list of registered listeners.", listener); } } } + @VisibleForTesting @SuppressWarnings({"unchecked"}) - Set getCurrentOnReconnectListeners() { - HashSet clonedListeners; - synchronized (reconnectListeners) { - clonedListeners = (HashSet) reconnectListeners.clone(); + Set getCurrentExpiredReconnectionListeners() { + HashSet clonedListeners; + synchronized (expiredReconnectionListeners) { + clonedListeners = + (HashSet) expiredReconnectionListeners.clone(); } return clonedListeners; } @@ -2878,7 +2886,7 @@ private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) { } } - public OnReconnect getConfigDirListener() { + private SolrCuratorEvent.EventAction getConfigDirListener() { return () -> { synchronized (confDirectoryListeners) { for (String s : confDirectoryListeners.keySet()) { diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java index 967c153071f6..23d700c3e33d 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java @@ -44,9 +44,9 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection.CollectionStateProps; -import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice.SliceStateProps; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.MapSolrParams; @@ -251,7 +251,8 @@ public String toString() { * data, this object watches the /collections znode, which will change if a collection is added or * removed. */ - static final class PagedCollectionSupport implements Watcher, Comparator, OnReconnect { + static final class PagedCollectionSupport + implements Watcher, Comparator, SolrCuratorEvent.EventAction { // this is the full merged list of collections from ZooKeeper private List cachedCollections; @@ -336,7 +337,7 @@ public int compare(String left, String right) { /** Called after a ZooKeeper session expiration occurs */ @Override - public void onReconnect() { + public void respond() { // we need to re-establish the watcher on the collections list after session expires synchronized (this) { cachedCollections = null; @@ -379,7 +380,7 @@ private void ensurePagingSupportInitialized() { ZkController zkController = cores.getZkController(); if (zkController != null) { // Get notified when the ZK session expires (so we can clear cached collections) - zkController.addOnReconnectListener(pagingSupport); + zkController.addExpiredReconnectionListener(pagingSupport); } } } diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java index 21af5c3c6567..01285719645c 100644 --- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java +++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.OnReconnect; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.core.CloseHook; @@ -38,7 +38,7 @@ * Keeps a ManagedIndexSchema up-to-date when changes are made to the serialized managed schema in * ZooKeeper */ -public class ZkIndexSchemaReader implements OnReconnect { +public class ZkIndexSchemaReader implements SolrCuratorEvent.EventAction { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final ManagedIndexSchemaFactory managedIndexSchemaFactory; private final SolrZkClient zkClient; @@ -66,10 +66,10 @@ public void preClose(SolrCore core) { if (cc.isZooKeeperAware()) { if (log.isDebugEnabled()) { log.debug( - "Removing ZkIndexSchemaReader OnReconnect listener as core {} is shutting down.", + "Removing ZkIndexSchemaReader OnExpirationReconnection listener as core {} is shutting down.", core.getName()); } - cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); + cc.getZkController().removeExpiredReconnectionListener(ZkIndexSchemaReader.this); } } @@ -84,7 +84,7 @@ public void postClose(SolrCore core) { this.schemaWatcher = createSchemaWatcher(); - zkLoader.getZkController().addOnReconnectListener(this); + zkLoader.getZkController().addExpiredReconnectionListener(this); } public Object getSchemaUpdateLock() { @@ -225,7 +225,7 @@ void updateSchema(Watcher watcher, int expectedZkVersion) * the current schema from ZooKeeper. */ @Override - public void onReconnect() { + public void respond() { try { // setup a new watcher to get notified when the managed schema changes schemaWatcher = createSchemaWatcher(); diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java index 003a04bda613..68ee32868e8e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java @@ -27,7 +27,7 @@ import java.util.function.Predicate; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.common.cloud.OnDisconnect; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrNamedThreadFactory; @@ -295,13 +295,7 @@ private void forceSessionExpire() throws InterruptedException, TimeoutException zkClient .getCuratorFramework() .getConnectionStateListenable() - .addListener( - (OnDisconnect) - ((sessionExpired) -> { - if (sessionExpired) { - hasDisconnected.countDown(); - } - })); + .addListener(SolrCuratorEvent.SESSION_EXPIRATION.of(hasDisconnected::countDown)); long sessionId = zkClient.getZkSessionId(); zkServer.expire(sessionId); hasDisconnected.await(10, TimeUnit.SECONDS); diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java index e95c1b411116..ed3268b6dc68 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.test.KillSession; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.common.cloud.OnReconnect; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; @@ -106,15 +106,18 @@ class ElectorSetup { ZkController zkController; LeaderElector elector; - public ElectorSetup(OnReconnect onReconnect) { + public ElectorSetup(SolrCuratorEvent.EventAction onExpiredReconnection) { zkClient = new SolrZkClient.Builder() .withUrl(server.getZkAddress()) .withTimeout(TIMEOUT, TimeUnit.MILLISECONDS) .withConnTimeOut(TIMEOUT, TimeUnit.MILLISECONDS) .build(); - if (onReconnect != null) { - zkClient.getCuratorFramework().getConnectionStateListenable().addListener(onReconnect); + if (onExpiredReconnection != null) { + zkClient + .getCuratorFramework() + .getConnectionStateListenable() + .addListener(SolrCuratorEvent.EXPIRED_RECONNECTION.of(onExpiredReconnection)); } zkStateReader = new ZkStateReader(zkClient); elector = new LeaderElector(zkClient); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java b/solr/core/src/test/org/apache/solr/cloud/TestExpiredReconnectionListenerSupport.java similarity index 76% rename from solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java rename to solr/core/src/test/org/apache/solr/cloud/TestExpiredReconnectionListenerSupport.java index e5cefb2b2310..4e7e11e7223e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestExpiredReconnectionListenerSupport.java @@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.SolrCuratorEvent; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; import org.apache.solr.embedded.JettySolrRunner; @@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory; @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") -public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBase { +public class TestExpiredReconnectionListenerSupport extends AbstractFullDistribZkTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public TestOnReconnectListenerSupport() { + public TestExpiredReconnectionListenerSupport() { super(); sliceCount = 2; fixShardCount(3); @@ -80,11 +80,12 @@ public void test() throws Exception { leaderCoreId = leaderCore.getName() + ":" + leaderCore.getStartNanoTime(); } - // verify the ZkIndexSchemaReader is a registered OnReconnect listener - Set listeners = zkController.getCurrentOnReconnectListeners(); - assertNotNull("ZkController returned null OnReconnect listeners", listeners); + // verify the ZkIndexSchemaReader is a registered OnExpirationReconnection listener + Set listeners = + zkController.getCurrentExpiredReconnectionListeners(); + assertNotNull("ZkController returned null OnExpirationReconnection listeners", listeners); ZkIndexSchemaReader expectedListener = null; - for (OnReconnect listener : listeners) { + for (SolrCuratorEvent.EventAction listener : listeners) { if (listener instanceof ZkIndexSchemaReader reader) { if (leaderCoreId.equals(reader.getUniqueCoreId())) { expectedListener = reader; @@ -95,7 +96,7 @@ public void test() throws Exception { assertNotNull( "ZkIndexSchemaReader for core " + leaderCoreName - + " not registered as an OnReconnect listener and should be", + + " not registered as an OnExpirationReconnection listener and should be", expectedListener); // reload the collection @@ -106,7 +107,8 @@ public void test() throws Exception { + "' failed to reload within a reasonable amount of time!", wasReloaded); - // after reload, the new core should be registered as an OnReconnect listener and the old should + // after reload, the new core should be registered as an OnExpirationReconnection listener and + // the old should // not be String reloadedLeaderCoreId; try (SolrCore leaderCore = cores.getCore(leaderCoreName)) { @@ -116,17 +118,17 @@ public void test() throws Exception { // they shouldn't be equal after reload assertNotEquals(leaderCoreId, reloadedLeaderCoreId); - listeners = zkController.getCurrentOnReconnectListeners(); - assertNotNull("ZkController returned null OnReconnect listeners", listeners); + listeners = zkController.getCurrentExpiredReconnectionListeners(); + assertNotNull("ZkController returned null OnExpirationReconnection listeners", listeners); expectedListener = null; // reset - for (OnReconnect listener : listeners) { + for (SolrCuratorEvent.EventAction listener : listeners) { if (listener instanceof ZkIndexSchemaReader reader) { if (leaderCoreId.equals(reader.getUniqueCoreId())) { fail( "Previous core " + leaderCoreId - + " should no longer be a registered OnReconnect listener! Current listeners: " + + " should no longer be a registered OnExpirationReconnection listener! Current listeners: " + listeners); } else if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) { expectedListener = reader; @@ -138,7 +140,7 @@ public void test() throws Exception { assertNotNull( "ZkIndexSchemaReader for core " + reloadedLeaderCoreId - + " not registered as an OnReconnect listener and should be", + + " not registered as an OnExpirationReconnection listener and should be", expectedListener); // try to clean up @@ -149,18 +151,18 @@ public void test() throws Exception { log.warn("Could not delete collection {} after test completed", testCollectionName); } - listeners = zkController.getCurrentOnReconnectListeners(); - for (OnReconnect listener : listeners) { + listeners = zkController.getCurrentExpiredReconnectionListeners(); + for (SolrCuratorEvent.EventAction listener : listeners) { if (listener instanceof ZkIndexSchemaReader reader) { if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) { fail( "Previous core " + reloadedLeaderCoreId - + " should no longer be a registered OnReconnect listener after collection delete!"); + + " should no longer be a registered OnExpirationReconnection listener after collection delete!"); } } } - log.info("TestOnReconnectListenerSupport succeeded ... shutting down now!"); + log.info("TestOnExpirationReconnectionListenerSupport succeeded ... shutting down now!"); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 5c7cd7442d3f..8c19adbec04b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -34,8 +34,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.api.util.SolrVersion; import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; @@ -766,6 +770,123 @@ public void testOverseerEnabledClusterPropertyTrue() throws Exception { } } + @Test + public void testZkReconnectionEvents() throws Exception { + // Do not use MiniSolrCloudCluster + + // Create a zookeeper cluster with 3 nodes + try (TestingCluster zkCluster = new TestingCluster(3)) { + zkCluster.start(); + // Now create a ZkController - it should respect the cluster property and have overseer + // enabled + CoreContainer cc = getCoreContainer(); + try { + CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build(); + try (ZkController zkController = + new ZkController(cc, zkCluster.getConnectString(), TIMEOUT, cloudConfig)) { + AtomicBoolean invoked = new AtomicBoolean(Boolean.FALSE); + zkController.addExpiredReconnectionListener(() -> invoked.set(true)); + CuratorZookeeperClient zkClient = + zkController.getZkClient().getCuratorFramework().getZookeeperClient(); + zkClient.getZooKeeper().getTestable().injectSessionExpiration(); + // Wait 10 seconds to make sure Solr receives the ExpiredReconnection event and invokes + // listeners + Thread.sleep(10000); + assertTrue( + "Reconnected to ZK cluster after session expiration should have triggered the invocation of method onExpiredReconnection", + invoked.get()); + invoked.set(false); + + // Kill the connected server to force Solr to reconnected to another solr server + InstanceSpec connectedIns = zkCluster.findConnectionInstance(zkClient.getZooKeeper()); + zkCluster.killServer(connectedIns); + // Wait 3 seconds to let solr connectes to another server. + Thread.sleep(3000); + InstanceSpec newConnectedIns = zkCluster.findConnectionInstance(zkClient.getZooKeeper()); + assertNotEquals(connectedIns, newConnectedIns); + // Wait 10 seconds to make sure the event is received by zkController + Thread.sleep(10000); + assertFalse( + "Reconnected to ZK cluster before session expiration should NOT trigger the invocation of method onExpiredReconnection", + invoked.get()); + } + } finally { + cc.shutdown(); + } + } finally { + // Closing zookeeper cluster is asynchronous, we need some time to let it finish. Otherwise we + // may encounter + // Thread Leak + Thread.sleep(3000); + } + } + + @Test + public void testZkDisconnectionEvents() throws Exception { + // Do not use MiniSolrCloudCluster + + // Create a zookeeper cluster with 3 nodes + try (TestingCluster zkCluster = new TestingCluster(3)) { + zkCluster.start(); + // Now create a ZkController - it should respect the cluster property and have overseer + // enabled + CoreContainer cc = getCoreContainer(); + try { + CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build(); + MockClusterSingleton mockClusterSingleton = new MockClusterSingleton(); + cc.getClusterSingletons() + .getSingletons() + .put(mockClusterSingleton.getName(), mockClusterSingleton); + try (ZkController zkController = + new ZkController(cc, zkCluster.getConnectString(), TIMEOUT, cloudConfig)) { + // During initialization of ZkController, mockClusterSingleton.stop is invoked and thus we + // need to reset it here. + mockClusterSingleton.reset(); + assertFalse(mockClusterSingleton.isStopped()); + CuratorZookeeperClient zkClient = + zkController.getZkClient().getCuratorFramework().getZookeeperClient(); + // Kill the connected server to force Solr to reconnected to another solr server + InstanceSpec connectedIns = zkCluster.findConnectionInstance(zkClient.getZooKeeper()); + zkCluster.killServer(connectedIns); + // Wait 3 seconds to let solr connectes to another server. + Thread.sleep(3000); + InstanceSpec newConnectedIns = zkCluster.findConnectionInstance(zkClient.getZooKeeper()); + assertNotEquals(connectedIns, newConnectedIns); + // Wait 10 seconds to make sure the event is received by zkController + Thread.sleep(10000); + assertFalse( + "Reconnected to ZK cluster before session expiration should NOT trigger the invocation of method onSessionExpiration", + mockClusterSingleton.isStopped()); + + mockClusterSingleton.reset(); + assertFalse(mockClusterSingleton.isStopped()); + AtomicBoolean invoked = new AtomicBoolean(Boolean.FALSE); + zkController.addExpiredReconnectionListener(() -> invoked.set(true)); + // Stop the cluster to prevent invoking zkController.onExpiredReconnection, + // which also stops overseer and thus invokes mockClusterSingleton.stop. + zkCluster.stop(); + // Even if the cluster is stopped, the session won't be expired at once. We still need to + // manually expire it. + zkClient.getZooKeeper().getTestable().injectSessionExpiration(); + // Wait 10 seconds to make sure Solr receives the ExpiredReconnection event and invokes + // listeners + Thread.sleep(3000); + assertFalse("ExpiredReconnection should not be triggered", invoked.get()); + assertTrue( + "Session expiration should have triggered the invocation of method onSessionExpiration", + mockClusterSingleton.isStopped()); + } + } finally { + cc.shutdown(); + } + } finally { + // Closing zookeeper cluster is asynchronous, we need some time to let it finish. Otherwise we + // may encounter + // Thread Leak + Thread.sleep(3000); + } + } + private CoreContainer getCoreContainer() { return new MockCoreContainer(); } @@ -815,4 +936,34 @@ public SolrMetricManager getMetricManager() { return metricManager; } } + + private static class MockClusterSingleton implements ClusterSingleton { + protected volatile boolean isStopped = false; + + @Override + public String getName() { + return this.getClass().getName(); + } + + @Override + public void start() throws Exception {} + + @Override + public State getState() { + return null; + } + + @Override + public void stop() { + this.isStopped = true; + } + + public boolean isStopped() { + return isStopped; + } + + public void reset() { + isStopped = false; + } + } } diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java deleted file mode 100644 index 9535a59cef55..000000000000 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.common.cloud; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; - -public interface OnDisconnect extends ConnectionStateListener { - void onDisconnect(boolean sessionExpired); - - @Override - default void stateChanged(CuratorFramework client, ConnectionState newState) { - if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) { - onDisconnect(newState == ConnectionState.LOST); - } - } -} diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java deleted file mode 100644 index 8d54312d3e0f..000000000000 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.common.cloud; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; - -/** - * Implementations are expected to implement a correct hashCode and equals method needed to uniquely - * identify the listener as listeners are managed in a Set. In addition, your listener - * implementation should call - * org.apache.solr.cloud.ZkController#removeOnReconnectListener(OnReconnect) when it no longer needs - * to be notified of ZK reconnection events. - */ -public interface OnReconnect extends ConnectionStateListener { - void onReconnect(); - - @Override - default void stateChanged(CuratorFramework client, ConnectionState newState) { - if (ConnectionState.RECONNECTED.equals(newState)) { - onReconnect(); - } - } -} diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrCuratorEvent.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrCuratorEvent.java new file mode 100644 index 000000000000..8cffc9a56214 --- /dev/null +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrCuratorEvent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.cloud; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; + +public enum SolrCuratorEvent { + // Only triggered after resume from expiration + EXPIRED_RECONNECTION { + @Override + public ConnectionStateListener of(EventAction action) { + return new ConnectionStateListener() { + private final AtomicBoolean isExpired = new AtomicBoolean(false); + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + isExpired.set(true); + } else if (newState == ConnectionState.RECONNECTED) { + if (isExpired.compareAndSet(true, false)) { + action.respond(); + } + } + } + }; + } + }, + + SESSION_EXPIRATION { + @Override + public ConnectionStateListener of(EventAction action) { + return (client, newState) -> { + if (newState == ConnectionState.LOST) { + action.respond(); + } + }; + } + }; + + public abstract ConnectionStateListener of(EventAction action); + + public interface EventAction { + void respond(); + } +} diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 4f0c3bb38366..ff609508573f 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -428,7 +428,7 @@ public ZkStateReader( .getCuratorFramework() .getConnectionStateListenable() .addListener( - (OnReconnect) + SolrCuratorEvent.EXPIRED_RECONNECTION.of( () -> { // on reconnect, reload cloud info try { @@ -440,7 +440,7 @@ public ZkStateReader( } catch (Throwable e) { log.error("An error has occurred while updating the cluster state", e); } - }); + })); this.closeClient = true; this.securityNodeWatcher = null; collectionPropertiesZkStateReader = new CollectionPropertiesZkStateReader(this); From 4a67aff0ecb7705a5ba784f7795ff73a4c91134c Mon Sep 17 00:00:00 2001 From: Jerry Lin Date: Wed, 1 Jul 2026 11:03:34 +0800 Subject: [PATCH 2/2] Add changelog --- changelog/unreleased/SOLR-18298.yml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 changelog/unreleased/SOLR-18298.yml diff --git a/changelog/unreleased/SOLR-18298.yml b/changelog/unreleased/SOLR-18298.yml new file mode 100644 index 000000000000..3950da1b752a --- /dev/null +++ b/changelog/unreleased/SOLR-18298.yml @@ -0,0 +1,7 @@ +title: Restore leader re-election behavior to only trigger on session expiry +type: fixed +authors: + - name: Jerry Lin +links: + - name: SOLR-18298 + url: https://issues.apache.org/jira/browse/SOLR-18298