Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: add sync methods #3856

Merged
merged 13 commits into from
Nov 7, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand Down Expand Up @@ -136,6 +137,54 @@ public static BigtableDataClient create(BigtableDataSettings settings) throws IO
this.stub = stub;
}

/**
* Convenience method for synchronously reading a single row. If the row does not exist, the
* value will be null.
*
* <p>Sample code:
*
* <pre>{code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Row row = bigtableDataClient.readRow(tableId, ByteString.copyFromUtf8("key"));
* // Do something with row, for example, display all cells
* System.out.println(row.getKey().toStringUtf8());
* for(RowCell cell : row.getCells()) {
* System.out.println("Family: " + cell.getFamily() + " Qualifier: " + cell.getQualifier().toStringUtf8() + " Value: " + cell.getValue().toStringUtf8());
* }
* }
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
* }</pre>
*/
public Row readRow(String tableId, ByteString rowKey) {
return ApiExceptions.callAndTranslateApiException(readRowAsync(tableId, rowKey));
}

/**
* Convenience method for synchronously reading a single row. If the row does not exist, the
* value will be null.
*
* <p>Sample code:
*
* <pre>{code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Row row = bigtableDataClient.readRow(tableId, "key");
* // Do something with row, for example, display all cells
* System.out.println(row.getKey().toStringUtf8());

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* for(RowCell cell : row.getCells()) {
* System.out.println("Family: " + cell.getFamily() + " Qualifier: " + cell.getQualifier().toStringUtf8() + " Value: " + cell.getValue().toStringUtf8());
* }
* }
* }</pre>
*/
public Row readRow(String tableId, String rowKey) {
return ApiExceptions.callAndTranslateApiException(readRowAsync(tableId, rowKey));
}

/**
* Convenience method for asynchronously reading a single row. If the row does not exist, the
* future's value will be null.
Expand Down Expand Up @@ -374,6 +423,29 @@ public <RowT> ServerStreamingCallable<Query, RowT> readRowsCallable(RowAdapter<R
return stub.createReadRowsCallable(rowAdapter);
}

/**
* Convenience method to synchronously return a sample of row keys in the table. The returned row
* keys will delimit contiguous sections of the table of approximately equal size, which can be
* used to break up the data for distributed tasks like mapreduces.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE_ID]";
*
* List<KeyOffset> keyOffsets = bigtableDataClient.sampleRowKeys(tableId);
* for(KeyOffset keyOffset : keyOffsets) {
* // Do something with keyOffset
* }
* }
* }</pre>
*/
public List<KeyOffset> sampleRowKeys(String tableId) {
return ApiExceptions.callAndTranslateApiException(sampleRowKeysAsync(tableId));
}

/**
* Convenience method to asynchronously return a sample of row keys in the table. The returned row
* keys will delimit contiguous sections of the table of approximately equal size, which can be
Expand Down Expand Up @@ -447,6 +519,26 @@ public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
return stub.sampleRowKeysCallable();
}

/**
* Convenience method to synchronously mutate a single row atomically. Cells already present in
* the row are left unchanged unless explicitly changed by the {@link RowMutation}.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* RowMutation mutation = RowMutation.create("[TABLE]", "[ROW KEY]")
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]");
*
* bigtableDataClient.mutateRow(mutation);
* }
* }</pre>
*/
public void mutateRow(RowMutation rowMutation) {
ApiExceptions.callAndTranslateApiException(mutateRowAsync(rowMutation));
}

/**
* Convenience method to asynchronously mutate a single row atomically. Cells already present in
* the row are left unchanged unless explicitly changed by the {@link RowMutation}.
Expand Down Expand Up @@ -547,6 +639,28 @@ public BulkMutationBatcher newBulkMutationBatcher() {
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* BulkMutation batch = BulkMutation.create("[TABLE]");
* for (String someValue : someCollection) {
* batch.add("[ROW KEY]", Mutation.create().setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
* }
* bigtableDataClient.bulkMutateRows(batch);
* }
* }</pre>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*/
public void bulkMutateRows(BulkMutation mutation) {
ApiExceptions.callAndTranslateApiException(bulkMutateRowsAsync(mutation));
}

/**
* Convenience method to mutate multiple rows in a batch. Each individual row is mutated
* atomically as in MutateRow, but the entire batch is not executed atomically. Unlike {@link
* #newBulkMutationBatcher()}, this method expects the mutations to be pre-batched.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableClient bigtableClient = BigtableClient.create(instanceName)) {
* BulkMutation batch = BulkMutation.create("[TABLE]");
* for (String someValue : someCollection) {
* ApiFuture<Void> entryFuture = batch.add("[ROW KEY]",
* Mutation.create().setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
* }
Expand Down Expand Up @@ -594,6 +708,29 @@ public UnaryCallable<BulkMutation, Void> bulkMutationCallable() {
return stub.bulkMutateRowsCallable();
}

/**
* Convenience method to synchronously mutate a row atomically based on the output of a filter.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* ConditionalRowMutation mutation = ConditionalRowMutation.create("[TABLE]", "[KEY]")
* .condition(FILTERS.value().regex("old-value"))
* .then(
* Mutation.create()
* .setCell("[FAMILY]", "[QUALIFIER]", "[VALUE]")
* );
*
* Boolean result = bigtableDataClient.checkAndMutateRow(mutation);
* }
* }</pre>
*/
public Boolean checkAndMutateRow(ConditionalRowMutation mutation) {
return ApiExceptions.callAndTranslateApiException(checkAndMutateRowAsync(mutation));
}

/**
* Convenience method to asynchronously mutate a row atomically based on the output of a filter.
*
Expand Down Expand Up @@ -663,6 +800,29 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
return stub.checkAndMutateRowCallable();
}

/**
* Convenience method that synchronously modifies a row atomically on the server. The method
* reads the latest existing timestamp and value from the specified columns and writes a new
* entry. The new value for the timestamp is the greater of the existing timestamp or the current
* server time. The method returns the new contents of all modified cells.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* ReadModifyWriteRow mutation = ReadModifyWriteRow.create("[TABLE]", "[KEY]")
* .increment("[FAMILY]", "[QUALIFIER]", 1)
* .append("[FAMILY2]", "[QUALIFIER2]", "suffix");
*
* Row success = bigtableDataClient.readModifyWriteRow(mutation);
* }
* }</pre>
*/
public Row readModifyWriteRow(ReadModifyWriteRow mutation) {
return ApiExceptions.callAndTranslateApiException(readModifyWriteRowAsync(mutation));
}

/**
* Convenience method that asynchronously modifies a row atomically on the server. The method
* reads the latest existing timestamp and value from the specified columns and writes a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package com.google.cloud.bigtable.data.v2;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Matchers.any;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
Expand All @@ -37,10 +39,13 @@
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.Status.Code;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
Expand All @@ -51,6 +56,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -115,7 +121,6 @@ public void proxyReadRowAsyncTest() {
@Test
public void proxyReadRowStrAsyncTest() {
bigtableDataClient.readRowAsync("fake-table", "fake-row-key");

ArgumentCaptor<Query> requestCaptor = ArgumentCaptor.forClass(Query.class);
Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture());

Expand All @@ -131,6 +136,48 @@ public void proxyReadRowStrAsyncTest() {
.build());
}

@Test
public void readRowTest() {
Mockito.when(mockReadRowsCallable.first().futureCall(any(Query.class)))
.thenReturn(ApiFutures.immediateFuture(Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList())));

This comment was marked as spam.

bigtableDataClient.readRow("fake-table", ByteString.copyFromUtf8("fake-row-key"));

ArgumentCaptor<Query> requestCaptor = ArgumentCaptor.forClass(Query.class);
Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture());

RequestContext ctx =
RequestContext.create(InstanceName.of("fake-project", "fake-instance"), "fake-profile");
// NOTE: limit(1) is added by the mocked first() call, so it's not tested here
assertThat(requestCaptor.getValue().toProto(ctx))
.isEqualTo(
ReadRowsRequest.newBuilder()
.setTableName("projects/fake-project/instances/fake-instance/tables/fake-table")
.setAppProfileId("fake-profile")
.setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("fake-row-key")))
.build());
}

@Test
public void readRowStrTest() {
Mockito.when(mockReadRowsCallable.first().futureCall(any(Query.class)))
.thenReturn(ApiFutures.immediateFuture(Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList())));

This comment was marked as spam.

bigtableDataClient.readRow("fake-table", "fake-row-key");

ArgumentCaptor<Query> requestCaptor = ArgumentCaptor.forClass(Query.class);
Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture());

RequestContext ctx =
RequestContext.create(InstanceName.of("fake-project", "fake-instance"), "fake-profile");
// NOTE: limit(1) is added by the mocked first() call, so it's not tested here
assertThat(requestCaptor.getValue().toProto(ctx))
.isEqualTo(
ReadRowsRequest.newBuilder()
.setTableName("projects/fake-project/instances/fake-instance/tables/fake-table")
.setAppProfileId("fake-profile")
.setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("fake-row-key")))
.build());
}

@Test
public void proxyReadRowsSyncTest() {
Query query = Query.create("fake-table");
Expand Down Expand Up @@ -160,6 +207,13 @@ public void proxySampleRowKeysTest() {
Mockito.verify(mockSampleRowKeysCallable).futureCall("fake-table");
}

@Test
public void sampleRowKeysTest() {
Mockito.when(mockSampleRowKeysCallable.futureCall(any(String.class))).thenReturn(ApiFutures.immediateFuture(Collections.emptyList()));

This comment was marked as spam.

bigtableDataClient.sampleRowKeys("fake-table");
Mockito.verify(mockSampleRowKeysCallable).futureCall("fake-table");
}

@Test
public void proxyMutateRowCallableTest() {
assertThat(bigtableDataClient.mutateRowCallable()).isSameAs(mockMutateRowCallable);
Expand All @@ -168,13 +222,26 @@ public void proxyMutateRowCallableTest() {
@Test
public void proxyMutateRowTest() {
RowMutation request =
RowMutation.create("fake-table", "some-key")
.setCell("some-family", "fake-qualifier", "fake-value");
RowMutation.create("fake-table", "some-key")
.setCell("some-family", "fake-qualifier", "fake-value");

bigtableDataClient.mutateRowAsync(request);
Mockito.verify(mockMutateRowCallable).futureCall(request);
}

@Test
public void mutateRowTest() {
Mockito.when(mockMutateRowCallable.futureCall(any(RowMutation.class)))
.thenAnswer((Answer) (invocationOnMock) -> ApiFutures.immediateFuture(Empty.getDefaultInstance()));
chingor13 marked this conversation as resolved.
Show resolved Hide resolved

RowMutation request =
RowMutation.create("fake-table", "some-key")
.setCell("some-family", "fake-qualifier", "fake-value");

bigtableDataClient.mutateRow(request);
Mockito.verify(mockMutateRowCallable).futureCall(request);
}

@Test
public void proxyBulkMutatesRowTest() {
BulkMutation request =
Expand All @@ -187,6 +254,21 @@ public void proxyBulkMutatesRowTest() {
Mockito.verify(mockBulkMutateRowsCallable).futureCall(request);
}

@Test
public void bulkMutatesRowTest() {
Mockito.when(mockBulkMutateRowsCallable.futureCall(any(BulkMutation.class)))
.thenAnswer((Answer) (invocationOnMock) -> ApiFutures.immediateFuture(Empty.getDefaultInstance()));
chingor13 marked this conversation as resolved.
Show resolved Hide resolved

BulkMutation request =
BulkMutation.create("fake-table")
.add(
"fake-key",
Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));

bigtableDataClient.bulkMutateRows(request);
Mockito.verify(mockBulkMutateRowsCallable).futureCall(request);
}

@Test
public void proxyBulkMutationsBatchingSendTest() {
BulkMutationBatcher batcher = bigtableDataClient.newBulkMutationBatcher();
Expand Down Expand Up @@ -291,6 +373,17 @@ public void proxyCheckAndMutateRowTest() {
Mockito.verify(mockCheckAndMutateRowCallable).futureCall(mutation);
}

@Test
public void checkAndMutateRowTest() {
Mockito.when(mockCheckAndMutateRowCallable.futureCall(any(ConditionalRowMutation.class))).thenReturn(ApiFutures.immediateFuture(Boolean.TRUE));
ConditionalRowMutation mutation =
ConditionalRowMutation.create("fake-table", "fake-key")
.then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));
bigtableDataClient.checkAndMutateRow(mutation);

Mockito.verify(mockCheckAndMutateRowCallable).futureCall(mutation);
}

@Test
public void proxyReadModifyWriteRowTest() {
ReadModifyWriteRow request =
Expand All @@ -300,6 +393,16 @@ public void proxyReadModifyWriteRowTest() {
Mockito.verify(mockReadModifyWriteRowCallable).futureCall(request);
}

@Test
public void readModifyWriteRowTest() {
Mockito.when(mockReadModifyWriteRowCallable.futureCall(any(ReadModifyWriteRow.class))).thenReturn(ApiFutures.immediateFuture(Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.<RowCell>emptyList())));
ReadModifyWriteRow request =
ReadModifyWriteRow.create("fake-table", "some-key")
.append("fake-family", "fake-qualifier", "suffix");
bigtableDataClient.readModifyWriteRow(request);
Mockito.verify(mockReadModifyWriteRowCallable).futureCall(request);
}

@Test
public void proxyReadModifyWriterRowCallableTest() {
assertThat(bigtableDataClient.readModifyWriteRowCallable())
Expand Down