Skip to content

Commit

Permalink
fix(spanner): Avoid blocking thread in AsyncResultSet (#3446)
Browse files Browse the repository at this point in the history
* fix: Avoid blocking thread in AsyncResultSet

* Addressed comments

* Addressed comments

* Addressed comments

* Addressed comments

* Addressed comments
  • Loading branch information
sakthivelmanii authored Nov 11, 2024
1 parent fa18894 commit 7c82f1c
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -768,9 +768,14 @@ ResultSet executeQueryInternalWithOptions(
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
Expand All @@ -791,8 +796,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down Expand Up @@ -959,9 +964,14 @@ ResultSet readInternalWithOptions(
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
Expand All @@ -980,8 +990,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ interface CloseableIterator<T> extends Iterator<T> {
void close(@Nullable String message);

boolean isWithBeginTransaction();

/**
* @param streamMessageListener A class object which implements StreamMessageListener
* @return true if streaming is supported by the iterator, otherwise false
*/
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return false;
}
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.common.base.Function;
import com.google.spanner.v1.PartialResultSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -223,4 +224,12 @@ interface ReadyCallback {
* @param transformer function which will be used to transform the row. It should not return null.
*/
<T> List<T> toList(Function<StructReader, T> transformer) throws SpannerException;

/**
* An interface to register the listener for streaming gRPC request. It will be called when a
* chunk is received from gRPC streaming call.
*/
interface StreamMessageListener {
void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
Expand All @@ -29,13 +28,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand All @@ -45,12 +44,14 @@
import java.util.logging.Logger;

/** Default implementation for {@link AsyncResultSet}. */
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
class AsyncResultSetImpl extends ForwardingStructReader
implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());

/** State of an {@link AsyncResultSetImpl}. */
private enum State {
INITIALIZED,
STREAMING_INITIALIZED,
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
SYNC,
CONSUMING,
Expand Down Expand Up @@ -115,12 +116,15 @@ private enum State {

private State state = State.INITIALIZED;

/** This variable indicates that produce rows thread is initiated */
private volatile boolean produceRowsInitiated;

/**
* This variable indicates whether all the results from the underlying result set have been read.
*/
private volatile boolean finished;

private volatile ApiFuture<Void> result;
private volatile SettableApiFuture<Void> result;

/**
* This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
Expand Down Expand Up @@ -329,12 +333,12 @@ public void run() {
private final CallbackRunnable callbackRunnable = new CallbackRunnable();

/**
* {@link ProduceRowsCallable} reads data from the underlying {@link ResultSet}, places these in
* {@link ProduceRowsRunnable} reads data from the underlying {@link ResultSet}, places these in
* the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
*/
private class ProduceRowsCallable implements Callable<Void> {
private class ProduceRowsRunnable implements Runnable {
@Override
public Void call() throws Exception {
public void run() {
boolean stop = false;
boolean hasNext = false;
try {
Expand Down Expand Up @@ -393,12 +397,17 @@ public Void call() throws Exception {
}
// Call the callback if there are still rows in the buffer that need to be processed.
while (!stop) {
waitIfPaused();
startCallbackIfNecessary();
// Make sure we wait until the callback runner has actually finished.
consumingLatch.await();
synchronized (monitor) {
stop = cursorReturnedDoneOrException;
try {
waitIfPaused();
startCallbackIfNecessary();
// Make sure we wait until the callback runner has actually finished.
consumingLatch.await();
synchronized (monitor) {
stop = cursorReturnedDoneOrException;
}
} catch (Throwable e) {
result.setException(e);
return;
}
}
} finally {
Expand All @@ -410,14 +419,14 @@ public Void call() throws Exception {
}
synchronized (monitor) {
if (executionException != null) {
throw executionException;
}
if (state == State.CANCELLED) {
throw CANCELLED_EXCEPTION;
result.setException(executionException);
} else if (state == State.CANCELLED) {
result.setException(CANCELLED_EXCEPTION);
} else {
result.set(null);
}
}
}
return null;
}

private void waitIfPaused() throws InterruptedException {
Expand Down Expand Up @@ -449,6 +458,26 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
}
}

private class InitiateStreamingRunnable implements Runnable {

@Override
public void run() {
try {
// This method returns true if the underlying result set is a streaming result set (e.g. a
// GrpcResultSet).
// Those result sets will trigger initiateProduceRows() when the first results are received.
// Non-streaming result sets do not trigger this callback, and for those result sets, we
// need to eagerly start the ProduceRowsRunnable.
if (!initiateStreaming(AsyncResultSetImpl.this)) {
initiateProduceRows();
}
} catch (Throwable exception) {
executionException = SpannerExceptionFactory.asSpannerException(exception);
initiateProduceRows();
}
}
}

/** Sets the callback for this {@link AsyncResultSet}. */
@Override
public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
Expand All @@ -458,16 +487,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
this.state == State.INITIALIZED, "callback may not be set multiple times");

// Start to fetch data and buffer these.
this.result =
new ListenableFutureToApiFuture<>(this.service.submit(new ProduceRowsCallable()));
this.result = SettableApiFuture.create();
this.state = State.STREAMING_INITIALIZED;
this.service.execute(new InitiateStreamingRunnable());
this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec));
this.callback = Preconditions.checkNotNull(cb);
this.state = State.RUNNING;
pausedLatch.countDown();
return result;
}
}

private void initiateProduceRows() {
if (this.state == State.STREAMING_INITIALIZED) {
this.state = State.RUNNING;
}
produceRowsInitiated = true;
this.service.execute(new ProduceRowsRunnable());
}

Future<Void> getResult() {
return result;
}
Expand Down Expand Up @@ -578,6 +615,10 @@ public ResultSetMetadata getMetadata() {
return delegateResultSet.get().getMetadata();
}

boolean initiateStreaming(StreamMessageListener streamMessageListener) {
return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener);
}

@Override
protected void checkValidState() {
synchronized (monitor) {
Expand All @@ -593,4 +634,22 @@ public Struct getCurrentRowAsStruct() {
checkValidState();
return currentRow;
}

@Override
public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
synchronized (monitor) {
if (produceRowsInitiated) {
return;
}
// if PartialResultSet contains a resume token or buffer size is full, or
// we have reached the end of the stream, we can start the thread.
boolean startJobThread =
!partialResultSet.getResumeToken().isEmpty()
|| bufferIsFull
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread || state != State.STREAMING_INITIALIZED) {
initiateProduceRows();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;

/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {
public class ForwardingResultSet extends ForwardingStructReader
implements ProtobufResultSet, StreamingResultSet {

private Supplier<? extends ResultSet> delegate;

Expand Down Expand Up @@ -102,4 +104,10 @@ public ResultSetStats getStats() {
public ResultSetMetadata getMetadata() {
return delegate.get().getMetadata();
}

@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
Expand All @@ -30,7 +31,8 @@
import javax.annotation.Nullable;

@VisibleForTesting
class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufResultSet {
class GrpcResultSet extends AbstractResultSet<List<Object>>
implements ProtobufResultSet, StreamingResultSet {
private final GrpcValueIterator iterator;
private final Listener listener;
private final DecodeMode decodeMode;
Expand Down Expand Up @@ -123,6 +125,12 @@ public ResultSetMetadata getMetadata() {
return metadata;
}

@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return iterator.initiateStreaming(streamMessageListener);
}

@Override
public void close() {
synchronized (this) {
Expand Down
Loading

0 comments on commit 7c82f1c

Please sign in to comment.