Skip to content

Commit

Permalink
Adding protobuf integrations for client, transport, request
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed May 8, 2023
1 parent d2c709d commit 83b5f49
Show file tree
Hide file tree
Showing 95 changed files with 15,337 additions and 20 deletions.
26 changes: 26 additions & 0 deletions server/src/main/java/org/opensearch/Build.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.opensearch.common.Booleans;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -218,6 +220,18 @@ public static Build readBuild(StreamInput in) throws IOException {
return new Build(type, hash, date, snapshot, version, distribution);
}

public static Build readBuildProtobuf(CodedInputStream in) throws IOException {
// the following is new for opensearch: we write the distribution to support any "forks"
final String distribution = in.readString();
// be lenient when reading on the wire, the enumeration values from other versions might be different than what we know
final Type type = Type.fromDisplayName(in.readString(), false);
String hash = in.readString();
String date = in.readString();
boolean snapshot = in.readBool();
final String version = in.readString();
return new Build(type, hash, date, snapshot, version, distribution);
}

public static void writeBuild(Build build, StreamOutput out) throws IOException {
// the following is new for opensearch: we write the distribution name to support any "forks" of the code
out.writeString(build.distribution);
Expand All @@ -230,6 +244,18 @@ public static void writeBuild(Build build, StreamOutput out) throws IOException
out.writeString(build.getQualifiedVersion());
}

public static void writeBuildProtobuf(Build build, CodedOutputStream out) throws IOException {
// the following is new for opensearch: we write the distribution name to support any "forks" of the code
out.writeStringNoTag(build.distribution);

final Type buildType = build.type();
out.writeStringNoTag(buildType.displayName());
out.writeStringNoTag(build.hash());
out.writeStringNoTag(build.date());
out.writeBoolNoTag(build.isSnapshot());
out.writeStringNoTag(build.getQualifiedVersion());
}

/**
* Get the distribution name (expected to be OpenSearch; empty if legacy; something else if forked)
* @return distribution name as a string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.opensearch.common.io.stream.ProtobufWriteable;

import java.io.IOException;

/**
* Base exception for a failed node
*
* @opensearch.internal
*/
public class ProtobufOpenSearchException extends RuntimeException implements ProtobufWriteable {

private String message;

public ProtobufOpenSearchException(String message) {
super(message);
this.message = message;
}

public ProtobufOpenSearchException(CodedInputStream in) throws IOException {
super(in.readString());
this.message = in.readString();
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
out.writeStringNoTag(this.getMessage());
}

public String getMessage() {
return this.message;
}

}
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch;

import com.google.protobuf.CodedInputStream;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -100,6 +101,10 @@ public static Version readVersion(StreamInput in) throws IOException {
return fromId(in.readVInt());
}

public static Version readVersionProtobuf(CodedInputStream in) throws IOException {
return fromId(in.readInt32());
}

public static Version fromId(int id) {
final Version known = LegacyESVersion.idToVersion.get(id);
if (known != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.opensearch.transport.ProtobufTransportRequest;

import java.io.IOException;

/**
* Base action request implemented by plugins.
*
* @opensearch.api
*/
public abstract class ProtobufActionRequest extends ProtobufTransportRequest {

public ProtobufActionRequest() {
super();
// this does not set the listenerThreaded API, if needed, its up to the caller to set it
// since most times, we actually want it to not be threaded...
// this.listenerThreaded = request.listenerThreaded();
}

public ProtobufActionRequest(CodedInputStream in) throws IOException {
super(in);
}

public abstract ActionRequestValidationException validate();

/**
* Should this task store its result after it has finished?
*/
public boolean getShouldStoreResult() {
return false;
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.client.OpenSearchClient;
import org.opensearch.client.ProtobufOpenSearchClient;
import org.opensearch.common.unit.TimeValue;

import java.util.Objects;

/**
* Base Action Request Builder
*
* @opensearch.api
*/
public abstract class ProtobufActionRequestBuilder<Request extends ProtobufActionRequest, Response extends ProtobufActionResponse> {

protected final ProtobufActionType<Response> action;
protected final Request request;
protected final ProtobufOpenSearchClient client;

protected ProtobufActionRequestBuilder(ProtobufOpenSearchClient client, ProtobufActionType<Response> action, Request request) {
Objects.requireNonNull(action, "action must not be null");
this.action = action;
this.request = request;
this.client = client;
}

public Request request() {
return this.request;
}

public ActionFuture<Response> execute() {
return client.execute(action, request);
}

/**
* Short version of execute().actionGet().
*/
public Response get() {
return execute().actionGet();
}

/**
* Short version of execute().actionGet().
*/
public Response get(TimeValue timeout) {
return execute().actionGet(timeout);
}

/**
* Short version of execute().actionGet().
*/
public Response get(String timeout) {
return execute().actionGet(timeout);
}

public void execute(ActionListener<Response> listener) {
client.execute(action, request, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action;

import com.google.protobuf.CodedInputStream;
import org.opensearch.transport.ProtobufTransportResponse;

import java.io.IOException;

/**
* Base class for responses to action requests implemented by plugins.
*
* @opensearch.api
*/
public abstract class ProtobufActionResponse extends ProtobufTransportResponse {

public ProtobufActionResponse() {}

public ProtobufActionResponse(CodedInputStream in) throws IOException {
super(in);
}
}
65 changes: 65 additions & 0 deletions server/src/main/java/org/opensearch/action/ProtobufActionType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.common.io.stream.ProtobufWriteable;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.settings.Settings;
import org.opensearch.transport.TransportRequestOptions;

/**
* A generic action. Should strive to make it a singleton.
*
* @opensearch.api
*/
public class ProtobufActionType<Response extends ProtobufActionResponse> {

private final String name;
private final ProtobufWriteable.Reader<Response> responseReader;

/**
* @param name The name of the action, must be unique across actions.
* @param responseReader A reader for the response type
*/
public ProtobufActionType(String name, ProtobufWriteable.Reader<Response> responseReader) {
this.name = name;
this.responseReader = responseReader;
}

/**
* The name of the action. Must be unique across actions.
*/
public String name() {
return this.name;
}

/**
* Get a reader that can create a new instance of the class from a {@link org.opensearch.common.io.stream.StreamInput}
*/
public ProtobufWriteable.Reader<Response> getResponseReader() {
return responseReader;
}

/**
* Optional request options for the action.
*/
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

@Override
public boolean equals(Object o) {
return o instanceof ProtobufActionType && name.equals(((ProtobufActionType<?>) o).name());
}

@Override
public int hashCode() {
return name.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.opensearch.ProtobufOpenSearchException;
import org.opensearch.common.io.stream.ProtobufWriteable;

import java.io.IOException;

/**
* Base exception for a failed node
*
* @opensearch.internal
*/
public class ProtobufFailedNodeException extends ProtobufOpenSearchException implements ProtobufWriteable {

private final String nodeId;

public ProtobufFailedNodeException(String nodeId, String msg, Throwable cause) {
super(msg);
this.nodeId = nodeId;
}

public String nodeId() {
return this.nodeId;
}

public ProtobufFailedNodeException(CodedInputStream in) throws IOException {
super(in);
nodeId = in.readString();
}

@Override
public void writeTo(CodedOutputStream out) throws IOException {
super.writeTo(out);
out.writeStringNoTag(nodeId);
}
}
Loading

0 comments on commit 83b5f49

Please sign in to comment.