Skip to content

Commit

Permalink
Specialize our safelyExecute calls to avoid logs
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jan 9, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 88d1bfd commit bddef5b
Showing 19 changed files with 110 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -146,7 +146,7 @@ public static void safelyExecute(final FunctionalInterfaces.ThrowingRunnable<Exc
*
* @param runner the runnable to execute safely
*/
public static void safelyExecuteLocked(final Object lockedObject,
private static void safelyExecuteLocked(final Object lockedObject,
final FunctionalInterfaces.ThrowingRunnable<Exception> runner) {
try {
// noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -158,6 +158,27 @@ public static void safelyExecuteLocked(final Object lockedObject,
}
}

public static <T> void safelyOnNext(StreamObserver<T> observer, T message) {
safelyExecuteLocked(observer, () -> observer.onNext(message));
}

public static <T> void safelyComplete(StreamObserver<T> observer, T message) {
safelyExecuteLocked(observer, () -> {
observer.onNext(message);
observer.onCompleted();
});
}

public static void safelyComplete(StreamObserver<?> observer) {
safelyExecuteLocked(observer, observer::onCompleted);
}

public static void safelyError(StreamObserver<?> observer, Throwable error) {
safelyExecuteLocked(observer, () -> {
observer.onError(error);
});
}

/**
* Writes an error to the observer in a try/catch block to minimize damage caused by failing observer call.
* <p>
Original file line number Diff line number Diff line change
@@ -355,7 +355,7 @@ public synchronized void close() {
if (!connected) {
return;
}
GrpcUtil.safelyExecute(observer::onCompleted);
GrpcUtil.safelyComplete(observer);
cleanup();
}

Original file line number Diff line number Diff line change
@@ -251,8 +251,7 @@ private boolean send(FieldsChangeUpdate changes) {

// must be sync wrt parent
private void notifyObserverAborted() {
GrpcUtil.safelyExecute(
() -> observer.onError(GrpcUtil.statusRuntimeException(Code.ABORTED, "subscription cancelled")));
GrpcUtil.safelyError(observer, Code.ABORTED, "subscription cancelled");
}
}

Original file line number Diff line number Diff line change
@@ -174,7 +174,7 @@ public void onNext(final InputStream request) {
resultTable.handleBarrageMessage(msg);

// no app_metadata to report; but ack the processing
GrpcUtil.safelyExecuteLocked(observer, () -> observer.onNext(Flight.PutResult.getDefaultInstance()));
GrpcUtil.safelyOnNext(observer, Flight.PutResult.getDefaultInstance());
});
}

@@ -242,7 +242,7 @@ public void onCompleted() {

// no more changes allowed; this is officially static content
localResultTable.sealTable(() -> localExportBuilder.submit(() -> {
GrpcUtil.safelyExecuteLocked(observer, observer::onCompleted);
GrpcUtil.safelyComplete(observer);
session.removeOnCloseCallback(this);
return localResultTable;
}), () -> {
@@ -693,7 +693,7 @@ public synchronized void close() {
htvs.completed();
htvs = null;
} else {
GrpcUtil.safelyExecuteLocked(listener, listener::onCompleted);
GrpcUtil.safelyComplete(listener);
}

if (preExportSubscriptions != null) {
Original file line number Diff line number Diff line change
@@ -87,12 +87,10 @@ public void onNext(final Flight.HandshakeRequest value) {

final AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener =
(protocol, response) -> {
GrpcUtil.safelyExecute(() -> {
responseObserver.onNext(Flight.HandshakeResponse.newBuilder()
.setProtocolVersion(protocol)
.setPayload(ByteStringAccess.wrap(response))
.build());
});
GrpcUtil.safelyComplete(responseObserver, Flight.HandshakeResponse.newBuilder()
.setProtocolVersion(protocol)
.setPayload(ByteStringAccess.wrap(response))
.build());
};

final ByteString payload = value.getPayload();
Original file line number Diff line number Diff line change
@@ -951,8 +951,7 @@ public void run() {
final StatusRuntimeException apiError = GrpcUtil.securelyWrapError(log, exception);

Stream.concat(activeSubscriptions.stream(), pendingSubscriptions.stream()).distinct()
.forEach(sub -> GrpcUtil.safelyExecuteLocked(sub.listener,
() -> sub.listener.onError(apiError)));
.forEach(sub -> GrpcUtil.safelyError(sub.listener, apiError));

activeSubscriptions.clear();
pendingSubscriptions.clear();
@@ -1423,7 +1422,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
if (pendingError != null) {
for (final Subscription subscription : activeSubscriptions) {
// TODO (core#801): effective error reporting to api clients
GrpcUtil.safelyExecute(() -> subscription.listener.onError(pendingError));
GrpcUtil.safelyError(subscription.listener, pendingError);
}
}

@@ -1537,7 +1536,7 @@ private void propagateSnapshotForSubscription(
subscription.subscribedColumns));

} catch (final Exception e) {
GrpcUtil.safelyExecute(() -> subscription.listener.onError(GrpcUtil.securelyWrapError(log, e)));
GrpcUtil.safelyError(subscription.listener, GrpcUtil.securelyWrapError(log, e));
removeSubscription(subscription.listener);
}
}
Original file line number Diff line number Diff line change
@@ -82,13 +82,9 @@ public void onMessageReceived(ReqT message) {
public void onCancel() {
StatusRuntimeException canceled =
GrpcUtil.statusRuntimeException(Code.CANCELLED, "Stream canceled on the server");
GrpcUtil.safelyExecute(() -> {
responseObserver.onError(canceled);
});
GrpcUtil.safelyError(responseObserver, canceled);

GrpcUtil.safelyExecute(() -> {
requestObserver.onError(canceled);
});
GrpcUtil.safelyError(requestObserver, canceled);
}

@Override
Original file line number Diff line number Diff line change
@@ -38,8 +38,8 @@
import javax.inject.Singleton;
import java.util.Map;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecute;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext;

@Singleton
public class ConsoleServiceGrpcImpl extends ConsoleServiceGrpc.ConsoleServiceImplBase {
@@ -110,12 +110,9 @@ public void startConsole(StartConsoleRequest request, StreamObserver<StartConsol
.submit(() -> {
final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get());

safelyExecute(() -> {
responseObserver.onNext(StartConsoleResponse.newBuilder()
.setResultId(request.getResultId())
.build());
responseObserver.onCompleted();
});
safelyComplete(responseObserver, StartConsoleResponse.newBuilder()
.setResultId(request.getResultId())
.build());

return scriptSession;
});
@@ -257,12 +254,14 @@ public StreamObserver<AutoCompleteRequest> autoCompleteStream(
}
if (PythonDeephavenSession.SCRIPT_TYPE.equals(scriptSessionProvider.get().scriptType())) {
PyObject[] settings = new PyObject[1];
safelyExecute(() -> {
try {
final ScriptSession scriptSession = scriptSessionProvider.get();
scriptSession.evaluateScript(
"from deephaven_internal.auto_completer import jedi_settings ; jedi_settings.set_scope(globals())");
settings[0] = (PyObject) scriptSession.getVariable("jedi_settings");
});
} catch (Exception err) {
log.error().append("Error trying to enable jedi autocomplete").append(err).endl();
}
boolean canJedi = settings[0] != null && settings[0].call("can_jedi").getBooleanValue();
log.info().append(canJedi ? "Using jedi for python autocomplete"
: "No jedi dependency available in python environment; disabling autocomplete.").endl();
@@ -284,13 +283,11 @@ public NoopAutoCompleteObserver(SessionState session, StreamObserver<AutoComplet
public void onNext(AutoCompleteRequest value) {
// This implementation only responds to autocomplete requests with "success, nothing found"
if (value.getRequestCase() == AutoCompleteRequest.RequestCase.GET_COMPLETION_ITEMS) {
safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(AutoCompleteResponse.newBuilder()
.setCompletionItems(
GetCompletionItemsResponse.newBuilder().setSuccess(true)
.setRequestId(value.getGetCompletionItems().getRequestId()))
.build());
});
safelyOnNext(responseObserver, AutoCompleteResponse.newBuilder()
.setCompletionItems(
GetCompletionItemsResponse.newBuilder().setSuccess(true)
.setRequestId(value.getGetCompletionItems().getRequestId()))
.build());
}
}

@@ -302,7 +299,7 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
safelyComplete(responseObserver);
}
}

Original file line number Diff line number Diff line change
@@ -23,10 +23,10 @@
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext;

/**
* Autocomplete handling for JVM languages, that directly can interact with Java instances without any name mangling,
@@ -117,12 +117,12 @@ private void getCompletionItems(GetCompletionItemsRequest request,
if (log.isTraceEnabled()) {
log.trace().append("Completion canceled").append(exception).endl();
}
safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(GetCompletionItemsResponse.newBuilder()
.setSuccess(false)
.setRequestId(request.getRequestId()))
.build()));
.build());
return;
}

@@ -141,10 +141,10 @@ private void getCompletionItems(GetCompletionItemsRequest request,
.collect(Collectors.toSet()))
.build();

safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(mangledResults)
.build()));
.build());
} catch (Exception exception) {
if (ConsoleServiceGrpcImpl.QUIET_AUTOCOMPLETE_ERRORS) {
if (log.isTraceEnabled()) {
@@ -153,12 +153,12 @@ private void getCompletionItems(GetCompletionItemsRequest request,
} else {
log.error().append("Exception occurred during autocomplete").append(exception).endl();
}
safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(GetCompletionItemsResponse.newBuilder()
.setSuccess(false)
.setRequestId(request.getRequestId()))
.build()));
.build());
}
}

@@ -170,6 +170,6 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested, and we'll maintain state if the session isn't gc'd
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
safelyComplete(responseObserver);
}
}
Original file line number Diff line number Diff line change
@@ -19,7 +19,8 @@
import java.util.ArrayList;
import java.util.List;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext;

/**
* Autocomplete handling for python that will use the jedi library, if it is installed.
@@ -107,12 +108,12 @@ private void getCompletionItems(GetCompletionItemsRequest request,
if (!canJedi) {
log.trace().append("Ignoring completion request because jedi is disabled").endl();
// send back an empty, failed response...
safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(GetCompletionItemsResponse.newBuilder()
.setSuccess(false)
.setRequestId(request.getRequestId()))
.build()));
.build());
return;
}
final VersionedTextDocumentIdentifier doc = request.getTextDocument();
@@ -171,10 +172,10 @@ private void getCompletionItems(GetCompletionItemsRequest request,
.build();

try {
safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(builtItems)
.build()));
.build());
} finally {
// let's track how long completions take, as it's known that some
// modules like numpy can cause slow completion, and we'll want to know what was causing them
@@ -202,12 +203,12 @@ private void getCompletionItems(GetCompletionItemsRequest request,
} else {
log.error().append("Exception occurred during autocomplete").append(exception).endl();
}
safelyExecuteLocked(responseObserver,
() -> responseObserver.onNext(AutoCompleteResponse.newBuilder()
safelyOnNext(responseObserver,
AutoCompleteResponse.newBuilder()
.setCompletionItems(GetCompletionItemsResponse.newBuilder()
.setSuccess(false)
.setRequestId(request.getRequestId()))
.build()));
.build());
if (exception instanceof Error) {
throw exception;
}
@@ -232,6 +233,6 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
// just hang up too, browser will reconnect if interested
safelyExecuteLocked(responseObserver, responseObserver::onCompleted);
safelyComplete(responseObserver);
}
}
Loading

0 comments on commit bddef5b

Please sign in to comment.