Skip to content

Commit

Permalink
BigQuery: insertAll always retries operation (#4165)
Browse files Browse the repository at this point in the history
* fix #3321 BigQuery insertAll always retries operation
  • Loading branch information
ajaaym authored Dec 4, 2018
1 parent 59bd938 commit 174e62d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -548,17 +548,18 @@ public InsertAllResponse insertAll(InsertAllRequest request) {
// an anonymous inner class.
final boolean[] allInsertIdsSet = {true};
List<Rows> rowsPb =
Lists.transform(
request.getRows(),
new Function<RowToInsert, Rows>() {
@Override
public Rows apply(RowToInsert rowToInsert) {
allInsertIdsSet[0] &= rowToInsert.getId() != null;
return new Rows()
.setInsertId(rowToInsert.getId())
.setJson(rowToInsert.getContent());
}
});
FluentIterable.from(request.getRows())
.transform(
new Function<RowToInsert, Rows>() {
@Override
public Rows apply(RowToInsert rowToInsert) {
allInsertIdsSet[0] &= rowToInsert.getId() != null;
return new Rows()
.setInsertId(rowToInsert.getId())
.setJson(rowToInsert.getContent());
}
})
.toList();
requestPb.setRows(rowsPb);

TableDataInsertAllResponse responsePb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public void testUpdateTableWithSelectedFields() {
}

@Test
public void testInsertAll() {
public void testInsertAllWithRowIdShouldRetry() {
Map<String, Object> row1 = ImmutableMap.<String, Object>of("field", "value1");
Map<String, Object> row2 = ImmutableMap.<String, Object>of("field", "value2");
List<RowToInsert> rows =
Expand Down Expand Up @@ -836,17 +836,65 @@ public TableDataInsertAllRequest.Rows apply(RowToInsert rowToInsert) {
new TableDataInsertAllResponse.InsertErrors()
.setIndex(0L)
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")))));
EasyMock.expect(bigqueryRpcMock.insertAll(PROJECT, DATASET, TABLE, requestPb))
.andThrow(new BigQueryException(500, "InternalError"));
EasyMock.expect(bigqueryRpcMock.insertAll(PROJECT, DATASET, TABLE, requestPb))
.andReturn(responsePb);
EasyMock.replay(bigqueryRpcMock);
bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();
InsertAllResponse response = bigquery.insertAll(request);
assertNotNull(response.getErrorsFor(0L));
assertNull(response.getErrorsFor(1L));
assertEquals(1, response.getErrorsFor(0L).size());
assertEquals("ErrorMessage", response.getErrorsFor(0L).get(0).getMessage());
}

@Test
public void testInsertAllWithoutRowIdShouldNotRetry() {
Map<String, Object> row1 = ImmutableMap.<String, Object>of("field", "value1");
Map<String, Object> row2 = ImmutableMap.<String, Object>of("field", "value2");
List<RowToInsert> rows = ImmutableList.of(RowToInsert.of(row1), RowToInsert.of(row2));
InsertAllRequest request =
InsertAllRequest.newBuilder(TABLE_ID)
.setRows(rows)
.setSkipInvalidRows(false)
.setIgnoreUnknownValues(true)
.setTemplateSuffix("suffix")
.build();
TableDataInsertAllRequest requestPb =
new TableDataInsertAllRequest()
.setRows(
Lists.transform(
rows,
new Function<RowToInsert, TableDataInsertAllRequest.Rows>() {
@Override
public TableDataInsertAllRequest.Rows apply(RowToInsert rowToInsert) {
return new TableDataInsertAllRequest.Rows()
.setInsertId(rowToInsert.getId())
.setJson(rowToInsert.getContent());
}
}))
.setSkipInvalidRows(false)
.setIgnoreUnknownValues(true)
.setTemplateSuffix("suffix");
EasyMock.expect(bigqueryRpcMock.insertAll(PROJECT, DATASET, TABLE, requestPb))
.andThrow(new BigQueryException(500, "InternalError"));
EasyMock.replay(bigqueryRpcMock);
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();
thrown.expect(BigQueryException.class);
bigquery.insertAll(request);
}

@Test
public void testInsertAllWithProject() {
Map<String, Object> row1 = ImmutableMap.<String, Object>of("field", "value1");
Expand Down

0 comments on commit 174e62d

Please sign in to comment.