Skip to content

Commit

Permalink
fix: fix tests and argument checks (#1833)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf authored Jul 10, 2023
1 parent 8d0b546 commit cb160af
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 25 deletions.
2 changes: 1 addition & 1 deletion google-cloud-bigtable-deps-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-dependencies</artifactId>
<version>3.12.0</version>
<version>3.13.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,28 +294,28 @@ public boolean isServerInitiatedFlowControlEnabled() {
@Override
public BigtableBatchingCallSettings build() {
Preconditions.checkState(batchingSettings != null, "batchingSettings must be set");
FlowControlSettings defaultSettings = batchingSettings.getFlowControlSettings();
FlowControlSettings flowControlSettings = batchingSettings.getFlowControlSettings();
Preconditions.checkState(
defaultSettings.getMaxOutstandingElementCount() != null,
flowControlSettings.getMaxOutstandingElementCount() != null,
"maxOutstandingElementCount must be set in BatchingSettings#FlowControlSettings");
Preconditions.checkState(
defaultSettings.getMaxOutstandingRequestBytes() != null,
flowControlSettings.getMaxOutstandingRequestBytes() != null,
"maxOutstandingRequestBytes must be set in BatchingSettings#FlowControlSettings");
Preconditions.checkArgument(
batchingSettings.getElementCountThreshold() == null
|| defaultSettings.getMaxOutstandingElementCount()
>= batchingSettings.getElementCountThreshold(),
"if elementCountThreshold is set in BatchingSettings, maxOutstandingElementCount must be >= elementCountThreshold");
|| flowControlSettings.getMaxOutstandingElementCount()
> batchingSettings.getElementCountThreshold(),
"if batch elementCountThreshold is set in BatchingSettings, flow control maxOutstandingElementCount must be > elementCountThreshold");
Preconditions.checkArgument(
batchingSettings.getRequestByteThreshold() == null
|| defaultSettings.getMaxOutstandingRequestBytes()
>= batchingSettings.getRequestByteThreshold(),
"if requestByteThreshold is set in BatchingSettings, getMaxOutstandingRequestBytes must be >= getRequestByteThreshold");
|| flowControlSettings.getMaxOutstandingRequestBytes()
> batchingSettings.getRequestByteThreshold(),
"if batch requestByteThreshold is set in BatchingSettings, flow control maxOutstandingRequestBytes must be > getRequestByteThreshold");
// Combine static FlowControlSettings with latency based throttling settings to create
// DynamicFlowControlSettings.
if (isLatencyBasedThrottlingEnabled()) {
long maxThrottlingElementCount = defaultSettings.getMaxOutstandingElementCount();
long maxThrottlingRequestByteCount = defaultSettings.getMaxOutstandingRequestBytes();
long maxThrottlingElementCount = flowControlSettings.getMaxOutstandingElementCount();
long maxThrottlingRequestByteCount = flowControlSettings.getMaxOutstandingRequestBytes();
// The maximum in flight element count is pretty high. Set the initial parallelism to 25%
// of the maximum and then work up or down. This reduction should reduce the
// impacts of a bursty job, such as those found in Dataflow.
Expand All @@ -332,7 +332,7 @@ public BigtableBatchingCallSettings build() {
}
dynamicFlowControlSettings =
DynamicFlowControlSettings.newBuilder()
.setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior())
.setLimitExceededBehavior(flowControlSettings.getLimitExceededBehavior())
.setInitialOutstandingElementCount(initialElementCount)
.setMaxOutstandingElementCount(maxThrottlingElementCount)
.setMinOutstandingElementCount(minElementCount)
Expand All @@ -343,13 +343,15 @@ public BigtableBatchingCallSettings build() {
} else {
dynamicFlowControlSettings =
DynamicFlowControlSettings.newBuilder()
.setLimitExceededBehavior(defaultSettings.getLimitExceededBehavior())
.setInitialOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setMaxOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setMinOutstandingElementCount(defaultSettings.getMaxOutstandingElementCount())
.setInitialOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.setMinOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.setMaxOutstandingRequestBytes(defaultSettings.getMaxOutstandingRequestBytes())
.setLimitExceededBehavior(flowControlSettings.getLimitExceededBehavior())
.setInitialOutstandingElementCount(
flowControlSettings.getMaxOutstandingElementCount())
.setMaxOutstandingElementCount(flowControlSettings.getMaxOutstandingElementCount())
.setMinOutstandingElementCount(flowControlSettings.getMaxOutstandingElementCount())
.setInitialOutstandingRequestBytes(
flowControlSettings.getMaxOutstandingRequestBytes())
.setMinOutstandingRequestBytes(flowControlSettings.getMaxOutstandingRequestBytes())
.setMaxOutstandingRequestBytes(flowControlSettings.getMaxOutstandingRequestBytes())
.build();
}
return new BigtableBatchingCallSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,6 @@ public UnaryCallSettings.Builder<PingAndWarmRequest, Void> pingAndWarmSettings()
public EnhancedBigtableStubSettings build() {
Preconditions.checkState(projectId != null, "Project id must be set");
Preconditions.checkState(instanceId != null, "Instance id must be set");

if (isRefreshingChannel) {
Preconditions.checkArgument(
getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ public void testFlowControlMandatorySettings() {
BatchingSettings.newBuilder()
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(10L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingElementCount(11L)
.setMaxOutstandingRequestBytes(11L)
.build())
.setElementCountThreshold(10L)
.setRequestByteThreshold(10L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ public void sendMessage(ReqT message) {
.setDelayThreshold(Duration.ofHours(1))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount((long) batchElementCount)
.setMaxOutstandingRequestBytes(1000L)
.setMaxOutstandingElementCount((long) batchElementCount + 1)
.setMaxOutstandingRequestBytes(1001L)
.build())
.build());
stubSettingsBuilder.setTracerFactory(mockFactory);
Expand Down Expand Up @@ -478,6 +478,9 @@ public void testBatchBlockingLatencies() throws InterruptedException {
batcher.add(RowMutationEntry.create("key").setCell("f", "q", "v"));
}

// closing the batcher to trigger the third flush
batcher.close();

int expectedNumRequests = 6 / batchElementCount;
ArgumentCaptor<Long> throttledTime = ArgumentCaptor.forClass(Long.class);
verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import com.google.api.gax.batching.BatchResource;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.BatchingDescriptor;
Expand Down Expand Up @@ -422,6 +423,8 @@ public Object answer(InvocationOnMock invocation) {
public void testBatchMutateRowsThrottledTime() throws Exception {
FlowController flowController = Mockito.mock(FlowController.class);
BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class);
when(batchingDescriptor.createResource(any())).thenReturn(new FakeBatchResource());
when(batchingDescriptor.createEmptyResource()).thenReturn(new FakeBatchResource());
// Mock throttling
final long throttled = 50;
doAnswer(
Expand Down Expand Up @@ -486,4 +489,29 @@ public Object answer(InvocationOnMock invocation) {
private static <T> StreamObserver<T> anyObserver(Class<T> returnType) {
return (StreamObserver<T>) any(returnType);
}

private class FakeBatchResource implements BatchResource {

FakeBatchResource() {}

@Override
public BatchResource add(BatchResource resource) {
return new FakeBatchResource();
}

@Override
public long getElementCount() {
return 1;
}

@Override
public long getByteCount() {
return 1;
}

@Override
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
return false;
}
}
}

0 comments on commit cb160af

Please sign in to comment.