Skip to content

Commit

Permalink
SOLR-16580: Avoid making copies of DocCollection for PRS updates
Browse files Browse the repository at this point in the history
  • Loading branch information
noblepaul authored and Ishan Chattopadhyaya committed Oct 29, 2023
1 parent 1e5c88c commit a639d26
Show file tree
Hide file tree
Showing 21 changed files with 370 additions and 172 deletions.
6 changes: 3 additions & 3 deletions solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ else if (LeaderStatus.YES != isLeader) {
while (unprocessedMessages.size() > 0) {
clusterState = zkStateWriter.writePendingUpdates();
Message m = unprocessedMessages.remove(0);
clusterState = m.run(clusterState, Overseer.this);
clusterState = m.run(clusterState, Overseer.this, zkStateWriter);
}
// The callback always be called on this thread
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
Expand Down Expand Up @@ -1034,8 +1034,8 @@ public void submit(Message message) {
}

public interface Message {
ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;

ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw)
throws Exception;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.solr.cloud;

import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
Expand All @@ -33,18 +34,36 @@ public RefreshCollectionMessage(String collection) {
this.collection = collection;
}

public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);


@Override
public ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zkStateWriter)
throws Exception {
Stat stat =
overseer
.getZkStateReader()
.getZkClient()
.exists(ZkStateReader.getCollectionPath(collection), null, true);
if (stat == null) {
//collection does not exist
// collection does not exist
return clusterState.copyWith(collection, null);
}
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
//our state is up to date
// our state is up to date
return clusterState;
} else {
coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
overseer.getZkStateReader().forceUpdateCollection(collection);
coll = overseer.getZkStateReader().getCollection(collection);

// During collection creation for a PRS collection, the cluster state (state.json) for the
// collection is written to ZK directly by the node (that received the CREATE request).
// Hence, we need the overseer's ZkStateWriter and the overseer's internal copy of the cluster
// state
// to be updated to contain that collection via this refresh.

zkStateWriter.updateClusterState(
it -> it.copyWith(collection, overseer.getZkStateReader().getCollection(collection)));
return clusterState.copyWith(collection, coll);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public SnapshotClusterStateProvider(Map<String, Object> snapshot) {
clusterProperties = (Map<String, Object>)snapshot.getOrDefault("clusterProperties", Collections.emptyMap());
Map<String, Object> stateMap = new HashMap<>((Map<String, Object>)snapshot.getOrDefault("clusterState", Collections.emptyMap()));
Number version = (Number)stateMap.remove("version");
clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE);
clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE, null);
}

public Map<String, Object> getSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ public ZkWriteCommand createCollection(ClusterState clusterState, ZkNodeProps me
String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
: ZkStateReader.getCollectionPath(cName);

DocCollection newCollection = new DocCollection(cName,
slices, collectionProps, router, -1, znode);
DocCollection newCollection =
new DocCollection(
cName, slices, collectionProps, router, -1, znode, stateManager.getPrsSupplier(cName));

return new ZkWriteCommand(cName, newCollection);
}
Expand Down Expand Up @@ -145,7 +146,6 @@ public static void getShardNames(Integer numShards, List<String> shardNames) {
final String sliceName = "shard" + (i + 1);
shardNames.add(sliceName);
}

}

public static void getShardNames(List<String> shardNames, String shards) {
Expand Down Expand Up @@ -199,7 +199,7 @@ public ZkWriteCommand migrateStateFormat(ClusterState clusterState, ZkNodeProps

return new ZkWriteCommand(coll.getName(),
new DocCollection(coll.getName(), coll.getSlicesMap(), coll.getProperties(), coll.getRouter(), 0,
ZkStateReader.getCollectionPath(collection)));
ZkStateReader.getCollectionPath(collection), stateManager.getPrsSupplier(collection)));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public ZkWriteCommand modifyCollection(final ClusterState clusterState, ZkNodePr
return ZkStateWriter.NO_OP;
}

DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode());
DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode(), stateManager.getPrsSupplier(coll.getName()));
if (replicaOps == null){
return new ZkWriteCommand(coll.getName(), collection);
} else {
Expand All @@ -175,7 +175,7 @@ public static DocCollection updateSlice(String collectionName, DocCollection col
slices.put(slice.getName(), slice);
Map<String, Object> props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, Utils.makeMap(NAME, ImplicitDocRouter.NAME));
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter(), -1);
} else {
slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.codahale.metrics.Timer;
import org.apache.solr.cloud.Overseer;
Expand Down Expand Up @@ -85,6 +86,15 @@ public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
this.clusterState = zkStateReader.getClusterState();
}

/**
* if any collection is updated not through this class (directly written to ZK, then it needs to
* be updated locally)
*/
public void updateClusterState(Function<ClusterState, ClusterState> fun) {
clusterState = fun.apply(clusterState);
}


/**
* Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
* {@link ClusterState} is returned and it is expected that the caller will use the returned
Expand Down Expand Up @@ -250,12 +260,12 @@ public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) thr
log.debug("going to update_collection {} version: {}", path, c.getZNodeVersion());
}
Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path, new PerReplicaStates.LazyPrsSupplier(reader.getZkClient(), path));
clusterState = clusterState.copyWith(name, newCollection);
} else {
log.debug("going to create_collection {}", path);
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path, new PerReplicaStates.LazyPrsSupplier(reader.getZkClient(), path));
clusterState = clusterState.copyWith(name, newCollection);
}
} else if (c.getStateFormat() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
Expand Down Expand Up @@ -193,6 +194,10 @@ public void getClusterStatus(@SuppressWarnings({"rawtypes"})NamedList results)
String configName = zkStateReader.readConfigName(name);
collectionStatus.put("configName", configName);
collectionProps.add(name, collectionStatus);
if (message.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
} catch (KeeperException.NoNodeException ex) {
// skip this collection because the configset's znode has been deleted
// which can happen during aggressive collection removal, see SOLR-10720
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,17 +1569,17 @@ public static void waitForActiveCollection(String collectionName, CoreContainer
Collection<Replica> replicas;
if (!checkLeaderOnly) replicas = shard.getReplicas();
else {
replicas = new ArrayList<Replica>();
replicas = new ArrayList<>();
replicas.add(shard.getLeader());
}
for (Replica replica : replicas) {
String state = replica.getStr(ZkStateReader.STATE_PROP);
State state = replica.getState();
if (log.isDebugEnabled()) {
log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
replica.getCoreUrl(), state);
}
if (!n.contains(replica.getNodeName())
|| !state.equals(Replica.State.ACTIVE.toString())) {
|| state != Replica.State.ACTIVE) {
if (log.isDebugEnabled()) {
log.debug("inactive replica {} , state {}", replica.getName(), replica.getReplicaState());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ private void handleCreateCollMessage(byte[] bytes) {
if (CollectionParams.CollectionAction.CREATE.isEqual(props.getStr("operation"))) {
String collName = props.getStr("name");
if (collName != null) collectionsSet.put(collName, new ClusterState.CollectionRef(
new DocCollection(collName, new HashMap<>(), props.getProperties(), DocRouter.DEFAULT)));
new DocCollection(collName, new HashMap<>(), props.getProperties(), DocRouter.DEFAULT, -1,null,
distribStateManagerMock.getPrsSupplier(collName) )));
}
if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) {
replicas.add(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
Expand All @@ -31,10 +32,12 @@
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.TimeOut;

public class ZkStateReaderTest extends SolrTestCaseJ4 {
Expand Down Expand Up @@ -87,7 +90,8 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting

{
// create new collection with stateFormat = 1
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE,
new PerReplicaStates.LazyPrsSupplier(zkClient, ZkStateReader.getCollectionPath("c1")));
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
Expand All @@ -110,7 +114,8 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting

{
// Now update the collection to stateFormat = 2
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c1"),
new PerReplicaStates.LazyPrsSupplier(zkClient, ZkStateReader.getCollectionPath("c1")));
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
writer.writePendingUpdates();
Expand Down Expand Up @@ -157,8 +162,19 @@ public void testExternalCollectionWatchedNotWatched() throws Exception{
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

// create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
// create new collection
ZkWriteCommand c1 =
new ZkWriteCommand(
"c1",
new DocCollection(
"c1",
new HashMap<>(),
ImmutableMap.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
ZkStateReader.getCollectionPath("c1"),
new PerReplicaStates.LazyPrsSupplier(
zkClient, ZkStateReader.getCollectionPath("c1"))));
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
Expand Down Expand Up @@ -195,14 +211,18 @@ public void testCollectionStateWatcherCaching() throws Exception {
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c1"),
new PerReplicaStates.LazyPrsSupplier(
zkClient, ZkStateReader.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);

state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c1"),
new PerReplicaStates.LazyPrsSupplier(
zkClient, ZkStateReader.getCollectionPath("c1")));
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
Expand Down Expand Up @@ -254,7 +274,9 @@ public void testWatchedCollectionCreation() throws Exception {


// create new collection with stateFormat = 2
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c1"),
new PerReplicaStates.LazyPrsSupplier(
zkClient, ZkStateReader.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
Expand Down Expand Up @@ -112,10 +114,20 @@ default List<String> listTree(String root) throws NoSuchElementException, IOExce
return tree;
}

default PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
default PerReplicaStates getReplicaStates(String path)
throws KeeperException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
}


default DocCollection.PrsSupplier getPrsSupplier(String collName) {
return new DocCollection.PrsSupplier(
() -> {
try {
return getReplicaStates(ZkStateReader.getCollectionPath(collName));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

/**
Expand Down
Loading

0 comments on commit a639d26

Please sign in to comment.