Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cat Nodes API with Protobuf #9097

Closed
Closed
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9be33d7
Adding BaseWriteable and ProtobufWriteable
VachaShah Apr 21, 2023
0034b0a
Adding proto messages for TaskId and TaskResourceStats
VachaShah Apr 21, 2023
cdb6806
Adding Task related classes with protobuf integration
VachaShah Apr 21, 2023
c8c2f53
Adding ProtobufStreamInput and ProtobufStreamOutput for additional st…
VachaShah Apr 21, 2023
a2be2fa
Adding TransportMessage and TransportRequest classes with protobuf in…
VachaShah Apr 22, 2023
c0bbf92
Fixing build and precommit
VachaShah Apr 25, 2023
551fe82
Adding protobuf integrations for client, transport, request
VachaShah May 8, 2023
a1ab589
Fixing build and integrating protobuf for classes related to RestNode…
VachaShah May 25, 2023
8e57034
Fixing node crashes
VachaShah Jun 21, 2023
a5ce258
Fixes
VachaShah Jun 21, 2023
0d76b3b
Fixing nodes api response for protobuf
VachaShah Jun 30, 2023
e6f6190
ProtobufClusterState to ClusterState
VachaShah Jul 10, 2023
b94a49e
Fixing cluster manager x to *
VachaShah Jul 10, 2023
cb320c7
Eliminating a lot of protobuf classes to merge with original classes …
VachaShah Jul 11, 2023
13c4568
This fixes the single node calls after the refactoring in the previou…
VachaShah Jul 11, 2023
b659260
Trying serialization and deserialization across nodes
VachaShah Jul 14, 2023
e96f3e0
Debugging and fixing compile errors
VachaShah Jul 17, 2023
496cdbb
Trying proto message serde across nodes with tests and example request
VachaShah Jul 19, 2023
8aab04b
Changes for multi node - working on single and multi node
VachaShah Jul 28, 2023
b583179
Calculating time
VachaShah Jul 28, 2023
515110a
Removing sysouts
VachaShah Jul 28, 2023
f94f165
Cleaning up code
VachaShah Jul 31, 2023
35d8906
Ignoring proto generated classes
VachaShah Jul 31, 2023
d511930
Cleaning up more code
VachaShah Aug 1, 2023
562e6c3
Cleaning up CodedInputStream, CodedOutputStream and TryWriteable
VachaShah Aug 1, 2023
d5f3a0e
Performance improvements
VachaShah Aug 2, 2023
2f423b9
Fixing compile errors after merging with main
VachaShah Aug 2, 2023
eeeb40f
Renaming proto messages and related classes
VachaShah Aug 2, 2023
8f5a327
Fixing precommit failures
VachaShah Aug 2, 2023
d409e71
Cleaning up code
VachaShah Aug 3, 2023
967d26b
Improvements
VachaShah Aug 3, 2023
0e9a86e
[Refactor] Network and Transport common classes to Libraries (#9073)
nknize Aug 2, 2023
d73fd6a
[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po…
harishbhakuni Aug 3, 2023
f3a17fc
Avoid duplicate indexing in case of SegRep enabled indices' translog …
gbbafna Aug 3, 2023
64a7457
Fix flaky test testStatsOnShardUnassigned in RemoteStoreStatsIT (#9057)
ashking94 Aug 3, 2023
6030039
Fix test testDropPrimaryDuringReplication and clean up ReplicationChe…
mch2 Aug 3, 2023
46aaea2
Getting latest changes from main
VachaShah Aug 3, 2023
c05f5aa
Merge branch 'main' into poc-cat-nodes-protobuf
VachaShah Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ProtobufClusterState to ClusterState
Signed-off-by: Vacha Shah <[email protected]>
VachaShah committed Aug 2, 2023
commit e6f61909a48175bd4b539bed93f8fd99c1b39735
Original file line number Diff line number Diff line change
@@ -64,6 +64,8 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import com.google.protobuf.CodedOutputStream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -166,7 +168,7 @@ public static class Response extends ActionResponse implements ToXContentObject
*
* @opensearch.internal
*/
public static class DataStreamInfo extends AbstractDiffable<DataStreamInfo> implements ToXContentObject {
public static class DataStreamInfo extends AbstractDiffable<DataStreamInfo, DataStreamInfo> implements ToXContentObject {

public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
@@ -235,6 +237,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate);
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'writeTo'");
}
}

private final List<DataStreamInfo> dataStreams;
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import com.google.protobuf.CodedOutputStream;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
@@ -54,7 +56,7 @@
*
* @opensearch.internal
*/
public class RolloverInfo extends AbstractDiffable<RolloverInfo> implements Writeable, ToXContentFragment {
public class RolloverInfo extends AbstractDiffable<RolloverInfo, RolloverInfo> implements Writeable, ToXContentFragment {

public static final ParseField CONDITION_FIELD = new ParseField("met_conditions");
public static final ParseField TIME_FIELD = new ParseField("time");
@@ -151,4 +153,10 @@ public boolean equals(Object obj) {
public String toString() {
return Strings.toString(XContentType.JSON, this);
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'writeTo'");
}
}
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ protected ProtobufTransportAction(String actionName, ProtobufActionFilters actio

private Releasable registerChildNode(ProtobufTaskId parentTask) {
if (parentTask.isSet()) {
return taskManager.registerProtobufChildNode(parentTask.getId(), taskManager.localProtobufNode());
return taskManager.registerProtobufChildNode(parentTask.getId(), taskManager.localNode());
} else {
return () -> {};
}
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.node.ProtobufDiscoveryNodes;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
@@ -215,9 +215,13 @@ public Exception getTimeoutException(Exception e) {

protected void doStart(ProtobufClusterState clusterState) {
try {
final ProtobufDiscoveryNodes nodes = clusterState.nodes();
final DiscoveryNodes discoveryNodes = clusterService.state().nodes();
if (discoveryNodes.isLocalNodeElectedClusterManager() || localExecute(request)) {
System.out.println("ProtobufTransportClusterManagerNodeAction.doStart");
//this needs fixing
final DiscoveryNodes nodes = clusterService.state().nodes();
System.out.println("nodes: " + nodes);
// final DiscoveryNodes discoveryNodes = clusterService.state().nodes();
if (nodes.isLocalNodeElectedClusterManager() || localExecute(request)) {
System.out.println("ProtobufTransportClusterManagerNodeAction.doStart.isLocalNodeElectedClusterManager");
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
@@ -256,10 +260,13 @@ protected void doStart(ProtobufClusterState clusterState) {
.execute(ActionRunnable.wrap(delegate, l -> clusterManagerOperation(task, request, clusterState, l)));
}
} else {
System.out.println("In else");
if (nodes.getClusterManagerNode() == null) {
System.out.println("In else if");
logger.debug("no known cluster-manager node, scheduling a retry");
retryOnMasterChange(clusterState, null);
} else {
System.out.println("In else else");
DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode();
final String actionName = getClusterManagerActionName(clusterManagerNode);
transportService.sendRequest(
Original file line number Diff line number Diff line change
@@ -231,8 +231,11 @@ public Exception getTimeoutException(Exception e) {

protected void doStart(ClusterState clusterState) {
try {
System.out.println("TransportClusterManagerNodeAction.doStart");
final DiscoveryNodes nodes = clusterState.nodes();
System.out.println("nodes: " + nodes);
if (nodes.isLocalNodeElectedClusterManager() || localExecute(request)) {
System.out.println("TransportClusterManagerNodeAction.doStart.isLocalNodeElectedClusterManager");
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
@@ -271,7 +274,9 @@ protected void doStart(ClusterState clusterState) {
.execute(ActionRunnable.wrap(delegate, l -> clusterManagerOperation(task, request, clusterState, l)));
}
} else {
System.out.println("In else");
if (nodes.getClusterManagerNode() == null) {
System.out.println("In else if");
logger.debug("no known cluster-manager node, scheduling a retry");
retryOnMasterChange(clusterState, null);
} else {
Original file line number Diff line number Diff line change
@@ -33,9 +33,13 @@
package org.opensearch.cluster;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;

import java.io.IOException;

/**
@@ -44,7 +48,7 @@
*
* @opensearch.internal
*/
public abstract class AbstractDiffable<T extends Diffable<T>> implements Diffable<T> {
public abstract class AbstractDiffable<T extends Diffable<T>, V extends ProtobufDiffable<V>> implements Diffable<T>, ProtobufDiffable<V> {

private static final Diff<?> EMPTY = new CompleteDiff<>();

@@ -58,6 +62,16 @@ public Diff<T> diff(T previousState) {
}
}

@SuppressWarnings("unchecked")
@Override
public ProtobufDiff<V> protobufDiff(V previousState) {
if (this.equals(previousState)) {
return (ProtobufDiff<V>) EMPTY;
} else {
return new CompleteDiffProtobuf<>((V) this);
}
}

@SuppressWarnings("unchecked")
public static <T extends Diffable<T>> Diff<T> readDiffFrom(Reader<T> reader, StreamInput in) throws IOException {
if (in.readBoolean()) {
@@ -66,6 +80,14 @@ public static <T extends Diffable<T>> Diff<T> readDiffFrom(Reader<T> reader, Str
return (Diff<T>) EMPTY;
}

@SuppressWarnings("unchecked")
public static <V extends ProtobufDiffable<V>> ProtobufDiff<V> readDiffFromProtobuf(ProtobufWriteable.Reader<V> reader, CodedInputStream in) throws IOException {
if (in.readBool()) {
return new CompleteDiffProtobuf<>(reader.read(in));
}
return (ProtobufDiff<V>) EMPTY;
}

/**
* A complete diff.
*
@@ -109,4 +131,48 @@ public T apply(T part) {
}
}
}

/**
* A complete diff.
*
* @opensearch.internal
*/
private static class CompleteDiffProtobuf<T extends ProtobufDiffable<T>> implements ProtobufDiff<T> {

@Nullable
private final T part;

/**
* Creates simple diff with changes
*/
CompleteDiffProtobuf(T part) {
this.part = part;
}

/**
* Creates simple diff without changes
*/
CompleteDiffProtobuf() {
this.part = null;
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
if (part != null) {
out.writeBoolNoTag(true);
part.writeTo(out);
} else {
out.writeBoolNoTag(false);
}
}

@Override
public T apply(T part) {
if (this.part != null) {
return this.part;
} else {
return part;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -10,6 +10,8 @@

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;

import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;

import java.io.IOException;
@@ -26,7 +28,7 @@ public abstract class ProtobufAbstractDiffable<T extends ProtobufDiffable<T>> im

@SuppressWarnings("unchecked")
@Override
public ProtobufDiff<T> diff(T previousState) {
public ProtobufDiff<T> protobufDiff(T previousState) {
if (this.equals(previousState)) {
return (ProtobufDiff<T>) EMPTY;
} else {
Original file line number Diff line number Diff line change
@@ -12,7 +12,8 @@
import org.opensearch.cluster.metadata.IndexGraveyard.IndexGraveyardDiff;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.ProtobufDiscoveryNodes;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.gateway.GatewayService;
import org.opensearch.index.Index;

@@ -38,7 +39,7 @@ public class ProtobufClusterChangedEvent {

private final ProtobufClusterState state;

private final ProtobufDiscoveryNodes.Delta nodesDelta;
private final DiscoveryNodes.Delta nodesDelta;

public ProtobufClusterChangedEvent(String source, ProtobufClusterState state, ProtobufClusterState previousState) {
Objects.requireNonNull(source, "source must not be null");
@@ -201,10 +202,10 @@ public boolean localNodeMaster() {
}

/**
* Returns the {@link org.opensearch.cluster.node.ProtobufDiscoveryNodes.Delta} between
* Returns the {@link org.opensearch.cluster.node.DiscoveryNodes.Delta} between
* the previous cluster state and the new cluster state.
*/
public ProtobufDiscoveryNodes.Delta nodesDelta() {
public DiscoveryNodes.Delta nodesDelta() {
return this.nodesDelta;
}

Loading