Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support lazy decoding of query results #2847

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private TraceWrapper tracer;
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DecodeMode defaultDecodeMode = SpannerOptions.Builder.DEFAULT_DECODE_MODE;
private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();
Expand Down Expand Up @@ -111,6 +112,11 @@ B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
return self();
}

B setDefaultDecodeMode(DecodeMode defaultDecodeMode) {
this.defaultDecodeMode = defaultDecodeMode;
return self();
}

B setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return self();
Expand Down Expand Up @@ -411,8 +417,8 @@ void initTransaction() {
TraceWrapper tracer;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final DirectedReadOptions defaultDirectedReadOptions;
private final DecodeMode defaultDecodeMode;
private final Clock clock;

@GuardedBy("lock")
Expand All @@ -438,6 +444,7 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.defaultDirectedReadOptions = builder.defaultDirectedReadOption;
this.defaultDecodeMode = builder.defaultDecodeMode;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
Expand Down Expand Up @@ -727,7 +734,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
}

/**
Expand Down Expand Up @@ -871,7 +879,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, readOptions.hasDecodeMode() ? readOptions.decodeMode() : defaultDecodeMode);
}

private Struct consumeSingleRow(ResultSet resultSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDecodeMode(sessionClient.getSpanner().getDefaultDecodeMode())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
Expand All @@ -81,6 +82,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDecodeMode(sessionClient.getSpanner().getDefaultDecodeMode())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

/** Specifies how and when to decode a value from protobuf to a plain Java object. */
public enum DecodeMode {
/**
* Decodes all columns of a row directly when a {@link ResultSet} is advanced to the next row with
* {@link ResultSet#next()}
*/
DIRECT,
/**
* Decodes all columns of a row the first time a {@link ResultSet} value is retrieved from the
* row.
*/
LAZY_PER_ROW,
/**
* Decodes a columns of a row the first time the value of that column is retrieved from the row.
*/
LAZY_PER_COL,
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.spanner.v1.ResultSetStats;

/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {
public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {

private Supplier<ResultSet> delegate;

Expand Down Expand Up @@ -55,6 +55,22 @@ public boolean next() throws SpannerException {
return delegate.get().next();
}

@Override
public boolean canGetProtobufValue(int columnIndex) {
ResultSet resultSetDelegate = delegate.get();
return (resultSetDelegate instanceof ProtobufResultSet)
&& ((ProtobufResultSet) resultSetDelegate).canGetProtobufValue(columnIndex);
}

@Override
public com.google.protobuf.Value getProtobufValue(int columnIndex) {
ResultSet resultSetDelegate = delegate.get();
Preconditions.checkState(
resultSetDelegate instanceof ProtobufResultSet,
"The result set does not support protobuf values");
return ((ProtobufResultSet) resultSetDelegate).getProtobufValue(columnIndex);
}

@Override
public Struct getCurrentRowAsStruct() {
return delegate.get().getCurrentRowAsStruct();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
Expand All @@ -28,18 +29,37 @@
import javax.annotation.Nullable;

@VisibleForTesting
class GrpcResultSet extends AbstractResultSet<List<Object>> {
class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufResultSet {
private final GrpcValueIterator iterator;
private final Listener listener;
private final DecodeMode decodeMode;
private ResultSetMetadata metadata;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
this(iterator, listener, DecodeMode.DIRECT);
}

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.decodeMode = decodeMode;
}

@Override
public boolean canGetProtobufValue(int columnIndex) {
return !closed && currRow != null && currRow.canGetProtoValue(columnIndex);
}

@Override
public Value getProtobufValue(int columnIndex) {
checkState(!closed, "ResultSet is closed");
checkState(currRow != null, "next() call required");
return currRow.getProtoValueInternal(columnIndex);
}

@Override
Expand All @@ -65,7 +85,7 @@ public boolean next() throws SpannerException {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
currRow = new GrpcStruct(iterator.type(), new ArrayList<>(), decodeMode);
}
boolean hasNext = currRow.consumeRow(iterator);
if (!hasNext) {
Expand Down
Loading
Loading