Skip to content

Commit

Permalink
fix: Avoid blocking thread in AsyncResultSet
Browse files Browse the repository at this point in the history
  • Loading branch information
sakthivelmanii committed Nov 4, 2024
1 parent 7096899 commit 3089e0b
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 55 deletions.
6 changes: 5 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -790,5 +790,9 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ResultSet</className>
<method>boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,9 @@ ResultSet executeQueryInternalWithOptions(
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
stream.registerListener(streamListener);
Expand Down Expand Up @@ -961,8 +962,9 @@ ResultSet readInternalWithOptions(
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
stream.registerListener(streamListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.core.ApiFuture;
import com.google.common.base.Function;
import com.google.spanner.v1.PartialResultSet;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -227,16 +226,20 @@ interface ReadyCallback {
<T> List<T> toList(Function<StructReader, T> transformer) throws SpannerException;

/**
* An interface to register the listener for streaming gRPC request. It will be called when a chunk is received
* from gRPC streaming call.
* An interface to register the listener for streaming gRPC request. It will be called when a
* chunk is received from gRPC streaming call.
*/
interface StreamMessageListener {
void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor);
void onStreamMessage(
PartialResultSet partialResultSet,
int prefetchChunks,
int currentBufferSize,
StreamMessageRequestor streamMessageRequestor);
}

/**
* An interface to request more messages from the gRPC streaming call. It will be implemented by the class which has access
* to SpannerRpc.StreamingCall object
* An interface to request more messages from the gRPC streaming call. It will be implemented by
* the class which has access to SpannerRpc.StreamingCall object
*/
interface StreamMessageRequestor {
void requestMessages(int numOfMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -40,10 +39,10 @@
import java.util.logging.Logger;

/** Default implementation for {@link AsyncResultSet}. */
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
class AsyncResultSetImpl extends ForwardingStructReader
implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());


/** State of an {@link AsyncResultSetImpl}. */
private enum State {
INITIALIZED,
Expand Down Expand Up @@ -112,6 +111,9 @@ private enum State {

private State state = State.INITIALIZED;

/** This variable indicates that produce rows thread is initiated */
private volatile boolean produceRowsInitiated;

/**
* This variable indicates whether all the results from the underlying result set have been read.
*/
Expand Down Expand Up @@ -458,7 +460,7 @@ private class InitiateStreamingRunnable implements Runnable {
@Override
public void run() {
try {
if(!initiateStreaming(AsyncResultSetImpl.this)) {
if (!initiateStreaming(AsyncResultSetImpl.this)) {
initiateProduceRows();
}
} catch (SpannerException e) {
Expand Down Expand Up @@ -489,7 +491,10 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {

private void initiateProduceRows() {
this.service.execute(new ProduceRowsRunnable());
this.state = State.RUNNING;
if (this.state == State.IN_PROGRESS) {
this.state = State.RUNNING;
}
produceRowsInitiated = true;
}

Future<Void> getResult() {
Expand All @@ -504,7 +509,6 @@ public void cancel() {
"cannot cancel a result set without a callback");
state = State.CANCELLED;
pausedLatch.countDown();
this.result.setException(CANCELLED_EXCEPTION);
}
}

Expand Down Expand Up @@ -625,18 +629,25 @@ public Struct getCurrentRowAsStruct() {
}

@Override
public void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor) {
public void onStreamMessage(
PartialResultSet partialResultSet,
int prefetchChunks,
int currentBufferSize,
StreamMessageRequestor streamMessageRequestor) {
synchronized (monitor) {
if (state == State.IN_PROGRESS) {
// if PartialResultSet contains resume token or buffer size is more than configured size or we have reached
// end of stream, we can start the thread
boolean startJobThread = !partialResultSet.getResumeToken().isEmpty()
|| currentBufferSize > prefetchChunks || partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread){
initiateProduceRows();
} else {
streamMessageRequestor.requestMessages(1);
}
if (produceRowsInitiated) {
return;
}
// if PartialResultSet contains resume token or buffer size is more than configured size or
// we have reached end of stream, we can start the thread
boolean startJobThread =
!partialResultSet.getResumeToken().isEmpty()
|| currentBufferSize > prefetchChunks
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread || state != State.IN_PROGRESS) {
initiateProduceRows();
} else {
streamMessageRequestor.requestMessages(1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,6 @@ public ResultSetMetadata getMetadata() {

@Override
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return delegate.get().initiateStreaming(streamMessageListener);
return delegate.get().initiateStreaming(streamMessageListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.spanner.v1.PartialResultSet;
import org.threeten.bp.Duration;

import javax.annotation.Nullable;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Adapts a streaming read/query call into an iterator over partial result sets. */
@VisibleForTesting
Expand Down Expand Up @@ -199,7 +198,7 @@ public boolean cancelQueryWhenClientIsClosed() {
}

private void onStreamMessage(PartialResultSet partialResultSet) {
Optional.ofNullable(streamMessageListener).ifPresent(
sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
Optional.ofNullable(streamMessageListener)
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ default ResultSetMetadata getMetadata() {
throw new UnsupportedOperationException("Method should be overridden");
}

/**
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by
* AsyncResultSet to initiate gRPC streaming
*/
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.grpc.GrpcStatusCode;
Expand All @@ -30,8 +35,6 @@
import com.google.spanner.v1.PartialResultSet;
import io.grpc.Context;
import io.opentelemetry.api.common.Attributes;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Objects;
Expand All @@ -41,11 +44,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.Nullable;

/**
* Wraps an iterator over partial result sets, supporting resuming RPCs on error. This class keeps
Expand Down Expand Up @@ -198,8 +197,8 @@ public void execute(Runnable command) {
}
}

abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamMessageListener);
abstract CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);

/**
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
Expand All @@ -226,7 +225,7 @@ public boolean isWithBeginTransaction() {
@Override
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
this.streamMessageListener = streamMessageListener;
eagerStartStreaming();
startGrpcStreaming();
return true;
}

Expand All @@ -236,7 +235,7 @@ protected PartialResultSet computeNext() {
Context context = Context.current();
while (true) {
// Eagerly start stream before consuming any buffered items.
eagerStartStreaming();
startGrpcStreaming();
// Buffer contains items up to a resume token or has reached capacity: flush.
if (!buffer.isEmpty()
&& (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
Expand Down Expand Up @@ -315,12 +314,12 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
}
}

private void eagerStartStreaming() {
private void startGrpcStreaming() {
if (stream == null) {
span.addAnnotation(
"Starting/Resuming stream",
"ResumeToken",
resumeToken == null ? "null" : resumeToken.toStringUtf8());
"Starting/Resuming stream",
"ResumeToken",
resumeToken == null ? "null" : resumeToken.toStringUtf8());
try (IScope scope = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public boolean next() throws SpannerException {
}

@Override
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
public boolean initiateStreaming(
AsyncResultSet.StreamMessageListener streamMessageListener) {
try {
boolean ret = super.initiateStreaming(streamMessageListener);
if (beforeFirst) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -32,6 +33,7 @@
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.common.base.Function;
import com.google.common.collect.Range;
import com.google.spanner.v1.PartialResultSet;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
Expand All @@ -48,8 +50,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -390,6 +390,13 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()

try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {

when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class)))
.thenAnswer(
answer -> {
rs.onStreamMessage(PartialResultSet.newBuilder().build(), 4, 1, null);
return null;
});
callbackResult =
rs.setCallback(
executor,
Expand All @@ -402,7 +409,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()

SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
assertEquals(0, callbackCounter.get());
assertEquals(1, callbackCounter.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
public class ResumableStreamIteratorTest {
interface Starter {
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamMessageListener);
}

interface ResultSetStream {
Expand Down Expand Up @@ -164,7 +165,8 @@ private void initWithLimit(int maxBufferSize) {
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) {
@Override
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener) {
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamMessageListener) {
return starter.startStream(resumeToken, null);
}
};
Expand Down

0 comments on commit 3089e0b

Please sign in to comment.