Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17582 Stream CLUSTERSTATUS API response #2916

Merged
merged 12 commits into from
Jan 4, 2025
3 changes: 2 additions & 1 deletion solr/core/src/java/org/apache/solr/cli/StatusTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ protected Map<String, String> getCloudStatus(SolrClient solrClient, String zkHos
cloudStatus.put("liveNodes", String.valueOf(liveNodes.size()));

// TODO get this as a metric from the metrics API instead, or something else.
var collections = (NamedList<Object>) json.findRecursive("cluster", "collections");
Map<String, Object> collections =
(Map<String, Object>) json.findRecursive("cluster", "collections");
cloudStatus.put("collections", String.valueOf(collections.size()));

return cloudStatus;
Expand Down
140 changes: 93 additions & 47 deletions solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
*/
package org.apache.solr.handler.admin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
Expand Down Expand Up @@ -205,54 +208,24 @@ private void fetchClusterStatusForCollOrAlias(
}
}

// TODO use an Iterable to stream the data to the client instead of gathering it all in mem

NamedList<Object> collectionProps = new SimpleOrderedMap<>();

collectionStream.forEach(
clusterStateCollection -> {
Map<String, Object> collectionStatus;
String name = clusterStateCollection.getName();

Set<String> requestedShards = new HashSet<>();
if (routeKey != null) {
DocRouter router = clusterStateCollection.getRouter();
Collection<Slice> slices =
router.getSearchSlices(routeKey, null, clusterStateCollection);
for (Slice slice : slices) {
requestedShards.add(slice.getName());
}
MapWriter collectionPropsWriter =
ew -> {
SolrCollectionProperiesIterator collectionProps =
new SolrCollectionProperiesIterator(
collectionStream.iterator(), collectionVsAliases, routeKey, liveNodes, shard);
while (collectionProps.hasNext()) {
Map<String, Object> collectionPropsMap = (collectionProps.next().asMap());
collectionPropsMap.forEach(
(key, value) -> {
try {
ew.put(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
if (shard != null) {
String[] paramShards = shard.split(",");
requestedShards.addAll(Arrays.asList(paramShards));
}

byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
collectionProps.add(name, collectionStatus);
});

// now we need to walk the collectionProps tree to cross-check replica state with live nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);

clusterStatus.add("collections", collectionProps);
};
clusterStatus.add("collections", collectionPropsWriter);
}

private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) {
Expand Down Expand Up @@ -368,4 +341,77 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object>
collection.put("health", Health.combine(healthStates).toString());
return collection;
}

private class SolrCollectionProperiesIterator implements Iterator<NamedList<Object>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we wouldn't need a custom Iterator. We do need a method that takes in DocCollection (and some other context that is the same across collections) and returns a NamedList. With such a method, we can call it via streamOfDocCollection.map(collState -> theMethod(collState, routeKey, liveNodes, etc.).iterator()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah don't know why I didn't do this first. Changed it appropriately.


final Iterator<DocCollection> it;
Map<String, List<String>> collectionVsAliases;
String routeKey;
List<String> liveNodes;
String shard;

public SolrCollectionProperiesIterator(
Iterator<DocCollection> it,
Map<String, List<String>> collectionVsAliases,
String routeKey,
List<String> liveNodes,
String shard) {
this.it = it;
this.collectionVsAliases = collectionVsAliases;
this.routeKey = routeKey;
this.liveNodes = liveNodes;
this.shard = shard;
}

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public NamedList<Object> next() {
NamedList<Object> collectionProps = new SimpleOrderedMap<>();
DocCollection clusterStateCollection = it.next();
Map<String, Object> collectionStatus;
String name = clusterStateCollection.getName();

Set<String> requestedShards = new HashSet<>();
if (routeKey != null) {
DocRouter router = clusterStateCollection.getRouter();
Collection<Slice> slices = router.getSearchSlices(routeKey, null, clusterStateCollection);
for (Slice slice : slices) {
requestedShards.add(slice.getName());
}
}
if (shard != null) {
String[] paramShards = shard.split(",");
requestedShards.addAll(Arrays.asList(paramShards));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this is done up front; not per iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it out so its not per iteration.


byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why round-trip this?

Copy link
Contributor Author

@mlbiscoc mlbiscoc Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So looked at this and I understand now why it was done this way. It wants to just write this out to the TextWriter as a normal POJO but that doesn't seem to be what the DocCollection class is. So instead what was done was to just write it to JSON byte[] then back to a Map which was the "easiest" way.

Was looking to avoid this back and forth, but there were a few options. I tried using an ObjectMapper to Map.class but there is an error for unable to cast Instant due to a missing dependency for Jackson we need to introduce.
Java 8 date/time type java.time.Instant not supported by default Issue

Other way is to introduce a some kind of toMap method so that the TextWriter can write this as a generic Map.

Another option which actually looks like the way we should go is I found that the DocCollection class extends ZkNodeProps which implements MapWriter. DocCollection already overrides writeMap so we could just return this to the TextWriter! Unfortunately the ClusterStatus class does a bunch of JSON post processing such as Health added to the Map that the output is missing some things because of this postProcessCollectionJSON() method.

I am thinking we should refactor DocCollection to so we can just return this but the changes were much more drastic but may be worth it. Maybe in a different JIRA? This scope continues to creep with me adding improvement to NamedList. Would be happy to refactor this and pick it up if you agree. Would clean up much more code and avoid this JSON processing for every collection iteration.

collectionStatus = getCollectionStatus(docCollection, name, requestedShards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
collectionProps.add(name, collectionStatus);

// now we need to walk the collectionProps tree to cross-check replica state with live
// nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);
return collectionProps;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ private void testModifyCollection() throws Exception {
.getResponse();
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
assertEquals("25", collections._getStr(List.of(COLLECTION_NAME, "replicationFactor"), null));
Map<?, ?> collectionProperties = (Map<?, ?>) collections.get(COLLECTION_NAME);
collectionProperties.get("replicationFactor");
assertEquals("25", collectionProperties.get("replicationFactor"));

params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MODIFYCOLLECTION.toString());
Expand All @@ -151,10 +153,12 @@ private void testModifyCollection() throws Exception {
System.out.println(rsp);
cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
collections = (NamedList<?>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
assertNull(collections._getStr(List.of(COLLECTION_NAME, "replicationFactor"), null));
collectionProperties = (Map<?, ?>) collections.get(COLLECTION_NAME);
collectionProperties.get("replicationFactor");
assertNull(collectionProperties.get("replicationFactor"));

params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MODIFYCOLLECTION.toString());
Expand Down Expand Up @@ -253,7 +257,7 @@ private void testNoConfigset() throws Exception {
NamedList<?> rsp = client.request(req);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(
"Testing to insure collections are returned", collections.get(COLLECTION_NAME1));
Expand All @@ -280,7 +284,7 @@ private void assertCountsForRepFactorAndNrtReplicas(CloudSolrClient client, Stri
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
@SuppressWarnings({"unchecked"})
Expand All @@ -302,7 +306,7 @@ private void clusterStatusWithCollectionAndShard() throws IOException, SolrServe
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME));
assertEquals(1, collections.size());
Expand All @@ -328,7 +332,7 @@ private void clusterStatusWithCollectionAndMultipleShards()
NamedList<Object> rsp = request.process(client).getResponse();
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME));
assertEquals(1, collections.size());
Expand Down Expand Up @@ -463,7 +467,7 @@ private void clusterStatusNoCollection() throws Exception {
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME1));
assertEquals(4, collections.size());
Expand All @@ -485,7 +489,7 @@ private void clusterStatusWithCollection() throws IOException, SolrServerExcepti
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
@SuppressWarnings({"unchecked"})
Expand Down Expand Up @@ -515,7 +519,7 @@ private void clusterStatusZNodeVersion() throws Exception {
NamedList<Object> rsp = client.request(request);
NamedList<Object> cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
Map<String, Object> collection = (Map<String, Object>) collections.get(cname);
Expand All @@ -531,7 +535,7 @@ private void clusterStatusZNodeVersion() throws Exception {

rsp = client.request(request);
cluster = (NamedList<Object>) rsp.get("cluster");
collections = (NamedList<Object>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
collection = (Map<String, Object>) collections.get(cname);
Integer newVersion = (Integer) collection.get("znodeVersion");
assertNotNull(newVersion);
Expand All @@ -558,7 +562,7 @@ private void clusterStatusWithRouteKey() throws IOException, SolrServerException
NamedList<Object> cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
@SuppressWarnings({"unchecked"})
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
assertEquals(1, collections.size());
Expand Down Expand Up @@ -605,7 +609,7 @@ private void clusterStatusAliasTest() throws Exception {
DEFAULT_COLLECTION + "," + COLLECTION_NAME,
aliases.get("myalias"));

NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
Map<String, Object> collection = (Map<String, Object>) collections.get(DEFAULT_COLLECTION);
Expand All @@ -625,7 +629,7 @@ private void clusterStatusAliasTest() throws Exception {

cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
collections = (NamedList<Object>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
assertNotNull(collections.get(COLLECTION_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -140,11 +139,11 @@ private ClusterState fetchClusterState(SolrClient client)
liveNodesTimestamp = System.nanoTime();
}

var collectionsNl = (NamedList<Map<String, Object>>) cluster.get("collections");
var collectionsNl = (Map<String, Map<String, Object>>) cluster.get("collections");
dsmiley marked this conversation as resolved.
Show resolved Hide resolved

Map<String, DocCollection> collStateByName =
CollectionUtil.newLinkedHashMap(collectionsNl.size());
for (Entry<String, Map<String, Object>> entry : collectionsNl) {
for (Entry<String, Map<String, Object>> entry : collectionsNl.entrySet()) {
collStateByName.put(
entry.getKey(), getDocCollectionFromObjects(entry.getKey(), entry.getValue()));
}
Expand Down Expand Up @@ -185,7 +184,9 @@ private DocCollection fetchCollectionState(SolrClient client, String collection)
SimpleOrderedMap<?> cluster =
submitClusterStateRequest(client, collection, ClusterStateRequestType.FETCH_COLLECTION);

var collStateMap = (Map<String, Object>) cluster.findRecursive("collections", collection);
var collState = (Map<String, Object>) cluster.findRecursive("collections");
mlbiscoc marked this conversation as resolved.
Show resolved Hide resolved
var collStateMap = (Map<String, Object>) collState.get(collection);

if (collStateMap == null) {
throw new NotACollectionException(); // probably an alias
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private Instant getCreationTimeFromClusterStatus(String collectionName)
NamedList<Object> response = clusterStatusResponse.getResponse();

NamedList<Object> cluster = (NamedList<Object>) response.get("cluster");
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<String, Object> collections = (Map<String, Object>) cluster.get("collections");
Map<String, Object> collection = (Map<String, Object>) collections.get(collectionName);
return Instant.ofEpochMilli((long) collection.get("creationTimeMillis"));
}
Expand Down
Loading