Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17582 Stream CLUSTERSTATUS API response #2916

Merged
merged 12 commits into from
Jan 4, 2025
4 changes: 3 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ Other Changes
================== 9.9.0 ==================
New Features
---------------------
(No changes)
* SOLR-17582: The CLUSTERSTATUS API will now stream each collection's status to the response,
fetching and computing it on the fly. To avoid a backwards compatibilty concern, this won't work
for wt=javabin. (Matthew Biscocho, David Smiley)

Improvements
---------------------
Expand Down
158 changes: 90 additions & 68 deletions solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.solr.handler.admin;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,15 +26,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
Expand Down Expand Up @@ -180,6 +179,8 @@ private void fetchClusterStatusForCollOrAlias(
String routeKey = solrParams.get(ShardParams._ROUTE_);
String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);

Set<String> requestedShards = (shard != null) ? Set.of(shard.split(",")) : null;

Stream<DocCollection> collectionStream;
if (collection == null) {
collectionStream = clusterState.collectionStream();
Expand All @@ -205,54 +206,35 @@ private void fetchClusterStatusForCollOrAlias(
}
}

// TODO use an Iterable to stream the data to the client instead of gathering it all in mem

NamedList<Object> collectionProps = new SimpleOrderedMap<>();

collectionStream.forEach(
clusterStateCollection -> {
Map<String, Object> collectionStatus;
String name = clusterStateCollection.getName();

Set<String> requestedShards = new HashSet<>();
if (routeKey != null) {
DocRouter router = clusterStateCollection.getRouter();
Collection<Slice> slices =
router.getSearchSlices(routeKey, null, clusterStateCollection);
for (Slice slice : slices) {
requestedShards.add(slice.getName());
}
}
if (shard != null) {
String[] paramShards = shard.split(",");
requestedShards.addAll(Arrays.asList(paramShards));
}

byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
collectionProps.add(name, collectionStatus);
});

// now we need to walk the collectionProps tree to cross-check replica state with live nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);

clusterStatus.add("collections", collectionProps);
// Because of back-compat for SolrJ, create the whole response into a NamedList
// Otherwise stream with MapWriter to save memory
if (CommonParams.JAVABIN.equals(solrParams.get(CommonParams.WT))) {
NamedList<Object> collectionProps = new SimpleOrderedMap<>();
collectionStream.forEach(
collectionState -> {
collectionProps.add(
collectionState.getName(),
buildResponseForCollection(
collectionState, collectionVsAliases, routeKey, liveNodes, requestedShards));
});
clusterStatus.add("collections", collectionProps);
} else {
MapWriter collectionPropsWriter =
ew -> {
collectionStream.forEach(
(collectionState) -> {
ew.putNoEx(
collectionState.getName(),
buildResponseForCollection(
collectionState,
collectionVsAliases,
routeKey,
liveNodes,
requestedShards));
});
};
clusterStatus.add("collections", collectionPropsWriter);
}
}

private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) {
Expand Down Expand Up @@ -307,23 +289,20 @@ private Map<String, Object> getCollectionStatus(
*/
@SuppressWarnings("unchecked")
protected void crossCheckReplicaStateWithLiveNodes(
List<String> liveNodes, NamedList<Object> collectionProps) {
for (Map.Entry<String, Object> next : collectionProps) {
Map<String, Object> collMap = (Map<String, Object>) next.getValue();
Map<String, Object> shards = (Map<String, Object>) collMap.get("shards");
for (Object nextShard : shards.values()) {
Map<String, Object> shardMap = (Map<String, Object>) nextShard;
Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas");
for (Object nextReplica : replicas.values()) {
Map<String, Object> replicaMap = (Map<String, Object>) nextReplica;
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP))
!= Replica.State.DOWN) {
// not down, so verify the node is live
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
if (!liveNodes.contains(node_name)) {
// node is not live, so this replica is actually down
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
}
List<String> liveNodes, Map<String, Object> collectionProps) {
var shards = (Map<String, Object>) collectionProps.get("shards");
for (Object nextShard : shards.values()) {
var shardMap = (Map<String, Object>) nextShard;
var replicas = (Map<String, Object>) shardMap.get("replicas");
for (Object nextReplica : replicas.values()) {
var replicaMap = (Map<String, Object>) nextReplica;
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP))
!= Replica.State.DOWN) {
// not down, so verify the node is live
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
if (!liveNodes.contains(node_name)) {
// node is not live, so this replica is actually down
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
}
}
}
Expand Down Expand Up @@ -368,4 +347,47 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object>
collection.put("health", Health.combine(healthStates).toString());
return collection;
}

private Map<String, Object> buildResponseForCollection(
DocCollection clusterStateCollection,
Map<String, List<String>> collectionVsAliases,
String routeKey,
List<String> liveNodes,
Set<String> requestedShards) {
Map<String, Object> collectionStatus;
Set<String> shards = new HashSet<>();
String name = clusterStateCollection.getName();

if (routeKey != null)
clusterStateCollection
.getRouter()
.getSearchSlices(routeKey, null, clusterStateCollection)
.forEach((slice) -> shards.add(slice.getName()));

if (requestedShards != null) shards.addAll(requestedShards);

byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, shards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}

// now we need to walk the collectionProps tree to cross-check replica state with live nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionStatus);

return collectionStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud.api.collections;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -28,6 +29,7 @@
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.NoOpResponseParser;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
Expand Down Expand Up @@ -81,7 +83,6 @@ public void test() throws Exception {
client.request(req);
createCollection(null, COLLECTION_NAME1, 1, 1, client, null, "conf1");
}

waitForCollection(ZkStateReader.from(cloudClient), COLLECTION_NAME, 2);
waitForCollection(ZkStateReader.from(cloudClient), COLLECTION_NAME1, 1);
waitForRecoveriesToFinish(COLLECTION_NAME, false);
Expand All @@ -91,6 +92,7 @@ public void test() throws Exception {
clusterStatusNoCollection();
clusterStatusWithCollection();
clusterStatusWithCollectionAndShard();
clusterStatusWithCollectionAndShardJSON();
clusterStatusWithCollectionAndMultipleShards();
clusterStatusWithCollectionHealthState();
clusterStatusWithRouteKey();
Expand Down Expand Up @@ -648,6 +650,39 @@ private void clusterStatusAliasTest() throws Exception {
}
}

@SuppressWarnings("unchecked")
private void clusterStatusWithCollectionAndShardJSON() throws IOException, SolrServerException {

try (CloudSolrClient client = createCloudClient(null)) {
ObjectMapper mapper = new ObjectMapper();

ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.CLUSTERSTATUS.toString());
params.set("collection", COLLECTION_NAME);
params.set("shard", SHARD1);
params.set("wt", "json");
QueryRequest request = new QueryRequest(params);
request.setResponseParser(new NoOpResponseParser("json"));
request.setPath("/admin/collections");
NamedList<Object> rsp = client.request(request);
String actualResponse = (String) rsp.get("response");

Map<String, Object> result = mapper.readValue(actualResponse, Map.class);

var cluster = (Map<String, Object>) result.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
var collections = (Map<String, Object>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME));
assertEquals(1, collections.size());
var collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
var shardStatus = (Map<String, Object>) collection.get("shards");
assertEquals(1, shardStatus.size());
Map<String, Object> selectedShardStatus = (Map<String, Object>) shardStatus.get(SHARD1);
assertNotNull(selectedShardStatus);
}
}

private void clusterStatusRolesTest() throws Exception {
try (CloudSolrClient client = createCloudClient(null)) {
client.connect();
Expand Down
Loading