From 266558ab0b48f137fea199bc9d0586ae2fc17dc2 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 14 Mar 2023 15:16:31 -0500 Subject: [PATCH] Provide config service methods to java clients (#3532) Refactors existing java-client gRPC response observers into one shared implementation, so that all can be canceled. Fixes #3528 --- .../deephaven/client/impl/ConfigService.java | 24 ++ .../io/deephaven/client/impl/Session.java | 3 +- .../io/deephaven/client/impl/SessionImpl.java | 380 +++--------------- .../client/impl/UnaryGrpcFuture.java | 93 +++++ 4 files changed, 177 insertions(+), 323 deletions(-) create mode 100644 java-client/session/src/main/java/io/deephaven/client/impl/ConfigService.java create mode 100644 java-client/session/src/main/java/io/deephaven/client/impl/UnaryGrpcFuture.java diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ConfigService.java b/java-client/session/src/main/java/io/deephaven/client/impl/ConfigService.java new file mode 100644 index 00000000000..1badab197a1 --- /dev/null +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ConfigService.java @@ -0,0 +1,24 @@ +package io.deephaven.client.impl; + +import io.deephaven.proto.backplane.grpc.ConfigValue; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Provides server-specified configuration values to gRPC clients. + */ +public interface ConfigService { + /** + * Returns constants from the server that may be helpful to correctly authenticate with the server. As such, + * authentication is not required to obtain these values. + */ + CompletableFuture> getAuthenticationConstants(); + + /** + * Returns constants from the server that are specified as being appropriate for clients to read. By default these + * include values like the suggested authentication token refresh interval, and the server-side version of + * deephaven, barrage, and java. + */ + CompletableFuture> getConfigurationConstants(); +} diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/Session.java b/java-client/session/src/main/java/io/deephaven/client/impl/Session.java index 4b7c4259481..4a1dbaa2ade 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/Session.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/Session.java @@ -11,7 +11,8 @@ * A session represents a client-side connection to a Deephaven server. */ public interface Session - extends AutoCloseable, ApplicationService, ConsoleService, InputTableService, ObjectService, TableService { + extends AutoCloseable, ApplicationService, ConsoleService, InputTableService, ObjectService, TableService, + ConfigService { // ---------------------------------------------------------- diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java index 15810e208f1..7378c00cde6 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java @@ -7,28 +7,22 @@ import io.deephaven.client.impl.script.Changes; import io.deephaven.proto.DeephavenChannel; import io.deephaven.proto.backplane.grpc.AddTableRequest; -import io.deephaven.proto.backplane.grpc.AddTableResponse; -import io.deephaven.proto.backplane.grpc.CloseSessionResponse; +import io.deephaven.proto.backplane.grpc.AuthenticationConstantsRequest; +import io.deephaven.proto.backplane.grpc.AuthenticationConstantsResponse; import io.deephaven.proto.backplane.grpc.ConfigValue; import io.deephaven.proto.backplane.grpc.ConfigurationConstantsRequest; import io.deephaven.proto.backplane.grpc.ConfigurationConstantsResponse; import io.deephaven.proto.backplane.grpc.DeleteTableRequest; -import io.deephaven.proto.backplane.grpc.DeleteTableResponse; import io.deephaven.proto.backplane.grpc.FetchObjectRequest; -import io.deephaven.proto.backplane.grpc.FetchObjectResponse; import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate; import io.deephaven.proto.backplane.grpc.HandshakeRequest; import io.deephaven.proto.backplane.grpc.ListFieldsRequest; import io.deephaven.proto.backplane.grpc.ReleaseRequest; -import io.deephaven.proto.backplane.grpc.ReleaseResponse; import io.deephaven.proto.backplane.grpc.Ticket; import io.deephaven.proto.backplane.grpc.TypedTicket; import io.deephaven.proto.backplane.script.grpc.BindTableToVariableRequest; -import io.deephaven.proto.backplane.script.grpc.BindTableToVariableResponse; import io.deephaven.proto.backplane.script.grpc.ExecuteCommandRequest; -import io.deephaven.proto.backplane.script.grpc.ExecuteCommandResponse; import io.deephaven.proto.backplane.script.grpc.StartConsoleRequest; -import io.deephaven.proto.backplane.script.grpc.StartConsoleResponse; import io.deephaven.proto.util.ExportTicketHelper; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; @@ -43,6 +37,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -147,9 +142,8 @@ public CompletableFuture console(String type) { final ExportId consoleId = new ExportId("Console", exportTicketCreator.createExportId()); final StartConsoleRequest request = StartConsoleRequest.newBuilder().setSessionType(type) .setResultId(consoleId.ticketId().ticket()).build(); - final ConsoleHandler handler = new ConsoleHandler(request); - bearerChannel.console().startConsole(request, handler); - return handler.future(); + return UnaryGrpcFuture.of(request, channel().console()::startConsole, + response -> new ConsoleSessionImpl(request)); } @Override @@ -157,10 +151,11 @@ public CompletableFuture publish(String name, HasTicketId ticketId) { if (!SourceVersion.isName(name)) { throw new IllegalArgumentException("Invalid name"); } - PublishObserver observer = new PublishObserver(); - bearerChannel.console().bindTableToVariable(BindTableToVariableRequest.newBuilder() - .setVariableName(name).setTableId(ticketId.ticketId().ticket()).build(), observer); - return observer.future; + BindTableToVariableRequest request = BindTableToVariableRequest.newBuilder() + .setVariableName(name) + .setTableId(ticketId.ticketId().ticket()) + .build(); + return UnaryGrpcFuture.ignoreResponse(request, channel().console()::bindTableToVariable); } @Override @@ -171,9 +166,25 @@ public CompletableFuture fetchObject(String type, HasTicketId tic .setTicket(ticketId.ticketId().ticket()) .build()) .build(); - final FetchObserver observer = new FetchObserver(); - bearerChannel.object().fetchObject(request, observer); - return observer.future; + + return UnaryGrpcFuture.of(request, channel().object()::fetchObject, + response -> { + final String responseType = response.getType(); + final ByteString data = response.getData(); + final List exportIds = response.getTypedExportIdList().stream() + .map(t -> { + final String ticketType; + if (t.getType().isEmpty()) { + ticketType = null; + } else { + ticketType = t.getType(); + } + final int exportId = ExportTicketHelper.ticketToExportId(t.getTicket(), "exportId"); + return new ExportId(ticketType, exportId); + }) + .collect(Collectors.toList()); + return new FetchedObject(responseType, data, exportIds); + }); } @Override @@ -194,9 +205,7 @@ public void close() { public CompletableFuture closeFuture() { pingJob.cancel(false); HandshakeRequest handshakeRequest = HandshakeRequest.getDefaultInstance(); - CloseSessionHandler handler = new CloseSessionHandler(); - bearerChannel.session().closeSession(handshakeRequest, handler); - return handler.future; + return UnaryGrpcFuture.ignoreResponse(handshakeRequest, channel().session()::closeSession); } @Override @@ -229,10 +238,10 @@ public ExportId newExportId() { @Override public CompletableFuture release(ExportId exportId) { - final ReleaseTicketObserver observer = new ReleaseTicketObserver(); - bearerChannel.session().release( - ReleaseRequest.newBuilder().setId(exportId.ticketId().ticket()).build(), observer); - return observer.future; + ReleaseRequest request = ReleaseRequest.newBuilder() + .setId(exportId.ticketId().ticket()) + .build(); + return UnaryGrpcFuture.ignoreResponse(request, channel().session()::release); } @Override @@ -246,9 +255,7 @@ public CompletableFuture addToInputTable(HasTicketId destination, HasTicke .setInputTable(destination.ticketId().ticket()) .setTableToAdd(source.ticketId().ticket()) .build(); - final AddToInputTableObserver observer = new AddToInputTableObserver(); - bearerChannel.inputTable().addTableToInputTable(request, observer); - return observer.future; + return UnaryGrpcFuture.ignoreResponse(request, channel().inputTable()::addTableToInputTable); } @Override @@ -257,9 +264,8 @@ public CompletableFuture deleteFromInputTable(HasTicketId destination, Has .setInputTable(destination.ticketId().ticket()) .setTableToRemove(source.ticketId().ticket()) .build(); - final DeleteFromInputTableObserver observer = new DeleteFromInputTableObserver(); - bearerChannel.inputTable().deleteTableFromInputTable(request, observer); - return observer.future; + return UnaryGrpcFuture.ignoreResponse(request, + channel().inputTable()::deleteTableFromInputTable); } @Override @@ -282,176 +288,24 @@ public long releaseCount() { return states.releaseCount(); } - private static class PublishObserver - implements ClientResponseObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void beforeStart( - ClientCallStreamObserver requestStream) { - future.whenComplete((session, throwable) -> { - if (future.isCancelled()) { - requestStream.cancel("User cancelled", null); - } - }); - } - - @Override - public void onNext(BindTableToVariableResponse value) { - future.complete(null); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally( - new IllegalStateException("Observer completed without response")); - } - } - } - - private static final class FetchObserver - implements ClientResponseObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - future.whenComplete((session, throwable) -> { - if (future.isCancelled()) { - requestStream.cancel("User cancelled", null); - } - }); - } - - @Override - public void onNext(FetchObjectResponse value) { - final String type = value.getType(); - final ByteString data = value.getData(); - final List exportIds = value.getTypedExportIdList().stream() - .map(FetchObserver::toExportId) - .collect(Collectors.toList()); - future.complete(new FetchedObject(type, data, exportIds)); - } - - private static ExportId toExportId(TypedTicket e) { - final String type; - if (e.getType().isEmpty()) { - type = null; - } else { - type = e.getType(); - } - final int exportId = ExportTicketHelper.ticketToExportId(e.getTicket(), "exportId"); - return new ExportId(type, exportId); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally( - new IllegalStateException("Observer completed without response")); - } - } - } - - private static class CloseSessionHandler implements StreamObserver { - - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void onNext(CloseSessionResponse value) { - - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - future.complete(null); - } - } - - private static class ExecuteCommandHandler implements StreamObserver { - - private final CompletableFuture future = new CompletableFuture<>(); - - private static Changes of(ExecuteCommandResponse value) { - Changes.Builder builder = Changes.builder().changes(new FieldChanges(value.getChanges())); - if (!value.getErrorMessage().isEmpty()) { - builder.errorMessage(value.getErrorMessage()); - } - return builder.build(); - } - - @Override - public void onNext(ExecuteCommandResponse value) { - future.complete(of(value)); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally(new IllegalStateException("ExecuteCommandHandler.onNext not called")); - } - } + @Override + public CompletableFuture> getAuthenticationConstants() { + return UnaryGrpcFuture.of(AuthenticationConstantsRequest.getDefaultInstance(), + channel().config()::getAuthenticationConstants, AuthenticationConstantsResponse::getConfigValuesMap); } - private class ConsoleHandler implements StreamObserver { - private final StartConsoleRequest request; - private final CompletableFuture future; - - public ConsoleHandler(StartConsoleRequest request) { - this.request = Objects.requireNonNull(request); - this.future = new CompletableFuture<>(); - } - - CompletableFuture future() { - return future; - } - - @Override - public void onNext(StartConsoleResponse response) { - future.complete(new ConsoleSessionImpl(request, response)); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally(new IllegalStateException("ConsoleHandler.onNext not called")); - } - } + @Override + public CompletableFuture> getConfigurationConstants() { + return UnaryGrpcFuture.of(ConfigurationConstantsRequest.getDefaultInstance(), + channel().config()::getConfigurationConstants, ConfigurationConstantsResponse::getConfigValuesMap); } private class ConsoleSessionImpl implements ConsoleSession { private final StartConsoleRequest request; - private final StartConsoleResponse response; - public ConsoleSessionImpl(StartConsoleRequest request, StartConsoleResponse response) { + public ConsoleSessionImpl(StartConsoleRequest request) { this.request = Objects.requireNonNull(request); - this.response = Objects.requireNonNull(response); } @Override @@ -479,9 +333,14 @@ public Changes executeScript(Path path) public CompletableFuture executeCodeFuture(String code) { final ExecuteCommandRequest request = ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).build(); - final ExecuteCommandHandler handler = new ExecuteCommandHandler(); - bearerChannel.console().executeCommand(request, handler); - return handler.future; + return UnaryGrpcFuture.of(request, channel().console()::executeCommand, + response -> { + Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges())); + if (!response.getErrorMessage().isEmpty()) { + builder.errorMessage(response.getErrorMessage()); + } + return builder.build(); + }); } @Override @@ -492,9 +351,10 @@ public CompletableFuture executeScriptFuture(Path path) throws IOExcept @Override public CompletableFuture closeFuture() { - final ConsoleCloseHandler handler = new ConsoleCloseHandler(); - bearerChannel.session().release(ReleaseRequest.newBuilder().setId(request.getResultId()).build(), handler); - return handler.future(); + ReleaseRequest request = ReleaseRequest.newBuilder() + .setId(this.request.getResultId()) + .build(); + return UnaryGrpcFuture.ignoreResponse(request, channel().session()::release); } @Override @@ -512,130 +372,6 @@ public void close() { } } - private static class ConsoleCloseHandler implements StreamObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - CompletableFuture future() { - return future; - } - - @Override - public void onNext(ReleaseResponse value) { - future.complete(null); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally(new IllegalStateException("ConsoleCloseHandler.onNext not called")); - } - } - } - - private static class ReleaseTicketObserver - implements ClientResponseObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void beforeStart( - ClientCallStreamObserver requestStream) { - future.whenComplete((session, throwable) -> { - if (future.isCancelled()) { - requestStream.cancel("User cancelled", null); - } - }); - } - - @Override - public void onNext(ReleaseResponse value) { - future.complete(null); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally( - new IllegalStateException("Observer completed without response")); - } - } - } - - private static class AddToInputTableObserver - implements ClientResponseObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void beforeStart( - ClientCallStreamObserver requestStream) { - future.whenComplete((session, throwable) -> { - if (future.isCancelled()) { - requestStream.cancel("User cancelled", null); - } - }); - } - - @Override - public void onNext(AddTableResponse value) { - future.complete(null); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally( - new IllegalStateException("Observer completed without response")); - } - } - } - - private static class DeleteFromInputTableObserver - implements ClientResponseObserver { - private final CompletableFuture future = new CompletableFuture<>(); - - @Override - public void beforeStart( - ClientCallStreamObserver requestStream) { - future.whenComplete((session, throwable) -> { - if (future.isCancelled()) { - requestStream.cancel("User cancelled", null); - } - }); - } - - @Override - public void onNext(DeleteTableResponse value) { - future.complete(null); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally( - new IllegalStateException("Observer completed without response")); - } - } - } - private static class ListFieldsObserver implements Cancel, ClientResponseObserver { diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/UnaryGrpcFuture.java b/java-client/session/src/main/java/io/deephaven/client/impl/UnaryGrpcFuture.java new file mode 100644 index 00000000000..cb30d71b78d --- /dev/null +++ b/java-client/session/src/main/java/io/deephaven/client/impl/UnaryGrpcFuture.java @@ -0,0 +1,93 @@ +package io.deephaven.client.impl; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * ClientResponseObserver implementation that can map results into a CompletableFuture. To keep generics under control, + * has two factory methods, {@link #of(Object, BiConsumer, Function)} and {@link #ignoreResponse(Object, BiConsumer)}. + * + * @param The type of the gRPC request + * @param The type of the gRPC response + * @param The type of the result to make available in the future + */ +public class UnaryGrpcFuture implements ClientResponseObserver { + /** + * Factory method to take a request instance, a method reference to a unary call, and a mapping function to + * transform the result before supplying it to the future. + * + * @param request the gRPC request value + * @param call the unary gRPC call reference/lambda + * @param mapping a mapping function to transform the response into a result + * @param The type of the gRPC request + * @param The type of the gRPC response + * @param The type of the result to make available in the future + * @return A CompletableFuture that will succeed, fail, or be canceled. If it succeeds, will contain a result from + * the mapping function. + */ + public static CompletableFuture of(ReqT request, BiConsumer> call, + Function mapping) { + UnaryGrpcFuture observer = new UnaryGrpcFuture<>(mapping); + call.accept(request, observer); + return observer.future(); + } + + /** + * Factory method to take a request instance and a method reference to a unary call. Returns a void + * completablefuture, as many gRPC calls in Deephaven have no meaningful result, except for the fact that they + * succeeded and server-side state has been modified. + * + * @param request the gRPC request value + * @param call the unary gRPC call reference/lambda + * @param The type of the gRPC request + * @param The type of the gRPC response + * @return a CompletableFuture that will succeed, fail, or be canceled, but won't contain a value + */ + public static CompletableFuture ignoreResponse(ReqT request, + BiConsumer> call) { + return of(request, call, ignore -> null); + } + + private final CompletableFuture future = new CompletableFuture<>(); + private final Function mapping; + + private UnaryGrpcFuture(Function mapping) { + this.mapping = mapping; + } + + public CompletableFuture future() { + return future; + } + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + future.whenComplete((session, throwable) -> { + if (future.isCancelled()) { + requestStream.cancel("User cancelled", null); + } + }); + } + + @Override + public void onNext(RespT value) { + future.complete(mapping.apply(value)); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally( + new IllegalStateException("Observer completed without response")); + } + } +}