Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/SOLR-18298.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
title: Restore leader re-election behavior to only trigger on session expiry
type: fixed
authors:
- name: Jerry Lin
links:
- name: SOLR-18298
url: https://issues.apache.org/jira/browse/SOLR-18298
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ void runLeaderProcess(boolean weAreReplacement) throws KeeperException, Interrup

if (shouldAbort()) {
// Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the
// later, we cannot be sure we are still the leader, so we should bail out. The OnReconnect
// later, we cannot be sure we are still the leader, so we should bail out. The
// OnExpiredReconnection
// handler will re-register the cores and handle a new leadership election.
return;
}
Expand Down
94 changes: 51 additions & 43 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.internal.StringUtils;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.solr.client.api.util.SolrVersion;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
Expand All @@ -84,13 +86,12 @@
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.NodesSysPropsCacher;
import org.apache.solr.common.cloud.OnDisconnect;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SecurityAwareZkACLProvider;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrCuratorEvent;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkACLProvider;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
Expand Down Expand Up @@ -212,8 +213,10 @@ public String toString() {
private final ExecutorService zkConnectionListenerCallbackExecutor =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("zkConnectionListenerCallback"));
private final OnReconnect onReconnect = this::onReconnect;
private final OnDisconnect onDisconnect = this::onDisconnect;
private final ConnectionStateListener onExpiredReconnection =
SolrCuratorEvent.EXPIRED_RECONNECTION.of(this::onExpiredReconnection);
private final ConnectionStateListener onSessionExpiration =
SolrCuratorEvent.SESSION_EXPIRATION.of(this::onSessionExpiration);

private final String zkServerAddress; // example: 127.0.0.1:54062/solr

Expand Down Expand Up @@ -253,7 +256,8 @@ public String toString() {
// keeps track of a list of objects that need to know a new ZooKeeper session was created after
// expiration occurred ref is held as a HashSet since we clone the set before notifying to avoid
// synchronizing too long
private final HashSet<OnReconnect> reconnectListeners = new HashSet<>();
private final HashSet<SolrCuratorEvent.EventAction> expiredReconnectionListeners =
new HashSet<>();

private class RegisterCoreAsync implements Callable<Object> {

Expand Down Expand Up @@ -322,7 +326,7 @@ public ZkController(
new DefaultZkCredentialsProvider());
zkCredentialsProvider.setZkCredentialsInjector(zkCredentialsInjector);

addOnReconnectListener(getConfigDirListener());
addExpiredReconnectionListener(getConfigDirListener());

final var compressor =
loadPluginOrDefault(
Expand All @@ -342,11 +346,11 @@ public ZkController(
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
.addListener(onReconnect, zkConnectionListenerCallbackExecutor);
.addListener(onExpiredReconnection, zkConnectionListenerCallbackExecutor);
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
.addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
.addListener(onSessionExpiration, zkConnectionListenerCallbackExecutor);
// Refuse to start if ZK has a non-empty /clusterstate.json or a /solr.xml file
checkNoOldClusterstate(zkClient);

Expand Down Expand Up @@ -401,7 +405,7 @@ public ZkController(
assert ObjectReleaseTracker.track(this);
}

private void onDisconnect(boolean sessionExpired) {
private void onSessionExpiration() {
try {
overseer.close();
} catch (Exception e) {
Expand All @@ -411,7 +415,7 @@ private void onDisconnect(boolean sessionExpired) {
// Close outstanding leader elections
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor, sessionExpired);
closeExistingElectionContext(descriptor);
}

// Mark all cores as not leader
Expand All @@ -430,7 +434,7 @@ private <T> T loadPluginOrDefault(
return cc.getResourceLoader().newInstance(concretePluginClassName, basePluginType);
}

private void onReconnect() {
private void onExpiredReconnection() {
// on reconnect, reload cloud info
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
clearZkCollectionTerms();
Expand Down Expand Up @@ -507,34 +511,34 @@ private void onReconnect() {
}

// notify any other objects that need to know when the session was re-connected
HashSet<OnReconnect> clonedListeners;
synchronized (reconnectListeners) {
clonedListeners = new HashSet<>(reconnectListeners);
HashSet<SolrCuratorEvent.EventAction> clonedListeners;
synchronized (expiredReconnectionListeners) {
clonedListeners = new HashSet<>(expiredReconnectionListeners);
}
// the OnReconnect operation can be expensive per listener, so do that async in
// the ExpiredReconnection operation can be expensive per listener, so do that async in
// the background
for (OnReconnect listener : clonedListeners) {
for (SolrCuratorEvent.EventAction listener : clonedListeners) {
try {
if (executorService != null) {
executorService.execute(
() -> {
try {
listener.onReconnect();
listener.respond();
} catch (Throwable exc) {
// not much we can do here other than warn in the log
log.warn(
"Error when notifying OnReconnect listener {} after session re-connected.",
"Error when notifying ExpiredReconnection listener {} after session re-connected.",
listener,
exc);
}
});
} else {
listener.onReconnect();
listener.respond();
}
} catch (Throwable exc) {
// not much we can do here other than warn in the log
log.warn(
"Error when notifying OnReconnect listener {} after session re-connected.",
"Error when notifying ExpiredReconnection listener {} after session re-connected.",
listener,
exc);
}
Expand Down Expand Up @@ -755,7 +759,7 @@ public void waitForPendingTasksToComplete() {
}
}

private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) {
private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
Expand All @@ -765,11 +769,7 @@ private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessi

if (prevContext != null) {
prevContext.close();
// Only remove the election contexts if the session expired, otherwise the ephemeral nodes
// will still exist
if (sessionExpired) {
electionContexts.remove(contextKey);
}
electionContexts.remove(contextKey);
}

return contextKey;
Expand All @@ -779,8 +779,14 @@ public void preClose() {
this.isClosed = true;
try {
// We do not want to react to connection state changes after we have started to close
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onReconnect);
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onDisconnect);
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
.removeListener(onExpiredReconnection);
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
.removeListener(onSessionExpiration);
ExecutorUtil.shutdownNowAndAwaitTermination(zkConnectionListenerCallbackExecutor);
} catch (Exception e) {
log.warn(
Expand Down Expand Up @@ -2593,41 +2599,43 @@ public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
* expiration occurs; in most cases, listeners will be components that have watchers that need to
* be re-created.
*/
public void addOnReconnectListener(OnReconnect listener) {
public void addExpiredReconnectionListener(SolrCuratorEvent.EventAction listener) {
if (listener != null) {
synchronized (reconnectListeners) {
reconnectListeners.add(listener);
log.debug("Added new OnReconnect listener {}", listener);
synchronized (expiredReconnectionListeners) {
expiredReconnectionListeners.add(listener);
log.debug("Added new ExpiredReconnection listener {}", listener);
}
}
}

/**
* Removed a previously registered OnReconnect listener, such as when a core is removed or
* Removed a previously registered expired-reconnect listener, such as when a core is removed or
* reloaded.
*/
public void removeOnReconnectListener(OnReconnect listener) {
public void removeExpiredReconnectionListener(SolrCuratorEvent.EventAction listener) {
if (listener != null) {
boolean wasRemoved;
synchronized (reconnectListeners) {
wasRemoved = reconnectListeners.remove(listener);
synchronized (expiredReconnectionListeners) {
wasRemoved = expiredReconnectionListeners.remove(listener);
}
if (wasRemoved) {
log.debug("Removed OnReconnect listener {}", listener);
log.debug("Removed ExpiredReconnection listener {}", listener);
} else {
log.warn(
"Was asked to remove OnReconnect listener {}, but remove operation "
"Was asked to remove ExpiredReconnection listener {}, but remove operation "
+ "did not find it in the list of registered listeners.",
listener);
}
}
}

@VisibleForTesting
@SuppressWarnings({"unchecked"})
Set<OnReconnect> getCurrentOnReconnectListeners() {
HashSet<OnReconnect> clonedListeners;
synchronized (reconnectListeners) {
clonedListeners = (HashSet<OnReconnect>) reconnectListeners.clone();
Set<SolrCuratorEvent.EventAction> getCurrentExpiredReconnectionListeners() {
HashSet<SolrCuratorEvent.EventAction> clonedListeners;
synchronized (expiredReconnectionListeners) {
clonedListeners =
(HashSet<SolrCuratorEvent.EventAction>) expiredReconnectionListeners.clone();
}
return clonedListeners;
}
Expand Down Expand Up @@ -2878,7 +2886,7 @@ private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
}
}

public OnReconnect getConfigDirListener() {
private SolrCuratorEvent.EventAction getConfigDirListener() {
return () -> {
synchronized (confDirectoryListeners) {
for (String s : confDirectoryListeners.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollection.CollectionStateProps;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice.SliceStateProps;
import org.apache.solr.common.cloud.SolrCuratorEvent;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.MapSolrParams;
Expand Down Expand Up @@ -251,7 +251,8 @@ public String toString() {
* data, this object watches the /collections znode, which will change if a collection is added or
* removed.
*/
static final class PagedCollectionSupport implements Watcher, Comparator<String>, OnReconnect {
static final class PagedCollectionSupport
implements Watcher, Comparator<String>, SolrCuratorEvent.EventAction {

// this is the full merged list of collections from ZooKeeper
private List<String> cachedCollections;
Expand Down Expand Up @@ -336,7 +337,7 @@ public int compare(String left, String right) {

/** Called after a ZooKeeper session expiration occurs */
@Override
public void onReconnect() {
public void respond() {
// we need to re-establish the watcher on the collections list after session expires
synchronized (this) {
cachedCollections = null;
Expand Down Expand Up @@ -379,7 +380,7 @@ private void ensurePagingSupportInitialized() {
ZkController zkController = cores.getZkController();
if (zkController != null) {
// Get notified when the ZK session expires (so we can clear cached collections)
zkController.addOnReconnectListener(pagingSupport);
zkController.addExpiredReconnectionListener(pagingSupport);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrCuratorEvent;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.core.CloseHook;
Expand All @@ -38,7 +38,7 @@
* Keeps a ManagedIndexSchema up-to-date when changes are made to the serialized managed schema in
* ZooKeeper
*/
public class ZkIndexSchemaReader implements OnReconnect {
public class ZkIndexSchemaReader implements SolrCuratorEvent.EventAction {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
private final SolrZkClient zkClient;
Expand Down Expand Up @@ -66,10 +66,10 @@ public void preClose(SolrCore core) {
if (cc.isZooKeeperAware()) {
if (log.isDebugEnabled()) {
log.debug(
"Removing ZkIndexSchemaReader OnReconnect listener as core {} is shutting down.",
"Removing ZkIndexSchemaReader OnExpirationReconnection listener as core {} is shutting down.",
core.getName());
}
cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
cc.getZkController().removeExpiredReconnectionListener(ZkIndexSchemaReader.this);
}
}

Expand All @@ -84,7 +84,7 @@ public void postClose(SolrCore core) {

this.schemaWatcher = createSchemaWatcher();

zkLoader.getZkController().addOnReconnectListener(this);
zkLoader.getZkController().addExpiredReconnectionListener(this);
}

public Object getSchemaUpdateLock() {
Expand Down Expand Up @@ -225,7 +225,7 @@ void updateSchema(Watcher watcher, int expectedZkVersion)
* the current schema from ZooKeeper.
*/
@Override
public void onReconnect() {
public void respond() {
try {
// setup a new watcher to get notified when the managed schema changes
schemaWatcher = createSchemaWatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.cloud.OnDisconnect;
import org.apache.solr.common.cloud.SolrCuratorEvent;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
Expand Down Expand Up @@ -295,13 +295,7 @@ private void forceSessionExpire() throws InterruptedException, TimeoutException
zkClient
.getCuratorFramework()
.getConnectionStateListenable()
.addListener(
(OnDisconnect)
((sessionExpired) -> {
if (sessionExpired) {
hasDisconnected.countDown();
}
}));
.addListener(SolrCuratorEvent.SESSION_EXPIRATION.of(hasDisconnected::countDown));
long sessionId = zkClient.getZkSessionId();
zkServer.expire(sessionId);
hasDisconnected.await(10, TimeUnit.SECONDS);
Expand Down
Loading