Skip to content

Commit

Permalink
fix: reset send timestamp each time a request is sent
Browse files Browse the repository at this point in the history
Each time a request is removed from the inflight queue its send
timestamp should be removed by setting it to null as we are no longer
waiting for a response to arrive for it.
  • Loading branch information
agrawal-siddharth committed May 11, 2024
1 parent 4600976 commit d00fc48
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,10 @@ private void appendLoop() {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
// Check whether we should error out the current append loop.
if (inflightRequestQueue.size() > 0) {
throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp);
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
throwIfWaitCallbackTooLong(sendInstant);
}
}

// Copy the streamConnectionIsConnected guarded by lock to a local variable.
Expand All @@ -711,7 +714,9 @@ private void appendLoop() {
// from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be
// prepended as they need to be sent before new requests.
while (!inflightRequestQueue.isEmpty()) {
waitingRequestQueue.addFirst(inflightRequestQueue.pollLast());
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
requestWrapper.requestSendTimeStamp = null;
waitingRequestQueue.addFirst(requestWrapper);
}

// If any of the inflight messages were meant to be ignored during requestCallback, they
Expand All @@ -721,7 +726,6 @@ private void appendLoop() {
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
waitForBackoffIfNecessary(requestWrapper);
requestWrapper.trySetRequestInsertQueueTime();
this.inflightRequestQueue.add(requestWrapper);
localQueue.addLast(requestWrapper);
}
Expand Down Expand Up @@ -760,6 +764,7 @@ private void appendLoop() {
firstRequestForTableOrSchemaSwitch = true;
}
while (!localQueue.isEmpty()) {
localQueue.peekFirst().setRequestSendQueueTime();
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
// Always respect the first writer schema seen by the loop.
Expand Down Expand Up @@ -1217,6 +1222,7 @@ private void doneCallback(Throwable finalStatus) {
private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
AppendRequestAndResponse requestWrapper =
pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll();
requestWrapper.requestSendTimeStamp = null;
--this.inflightRequests;
this.inflightBytes -= requestWrapper.messageSize;
this.inflightReduced.signal();
Expand Down Expand Up @@ -1256,7 +1262,9 @@ static final class AppendRequestAndResponse {

TimedAttemptSettings attemptSettings;

Instant requestCreationTimeStamp;
// Time at which request was last sent over the network.
// If a response is no longer expected this is set back to null.
Instant requestSendTimeStamp;

AppendRequestAndResponse(
AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) {
Expand All @@ -1276,11 +1284,8 @@ static final class AppendRequestAndResponse {
}
}

void trySetRequestInsertQueueTime() {
// Only set the first time the caller tries to set the timestamp.
if (requestCreationTimeStamp == null) {
requestCreationTimeStamp = Instant.now();
}
void setRequestSendQueueTime() {
requestSendTimeStamp = Instant.now();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,25 @@ public void testAppendSuccessAndInternalQuotaErrorRetrySuccess() throws Exceptio
writer.close();
}

@Test
public void testInternalQuotaError_MaxWaitTimeExceed_RetrySuccess() throws Exception {
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(4));
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
StreamWriter writer = getTestStreamWriterRetryEnabled();
testBigQueryWrite.addStatusException(
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
testBigQueryWrite.addStatusException(
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
testBigQueryWrite.addStatusException(
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
testBigQueryWrite.addResponse(createAppendResponse(0));

ApiFuture<AppendRowsResponse> appendFuture1 =
writer.append(createProtoRows(new String[] {"A"}));
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
writer.close();
}

@Test
public void testAppendSuccessAndInternalErrorRetrySuccessExclusive() throws Exception {
// Ensure we return an error from the fake server when a retry is in progress
Expand Down

0 comments on commit d00fc48

Please sign in to comment.