From edd915d6956fe6e24ac37dd9255fd20e15e93920 Mon Sep 17 00:00:00 2001 From: yixiaoshen Date: Tue, 26 Jul 2022 18:16:05 -0700 Subject: [PATCH] [#22051]: Add read_time support to Google Cloud Datastore connector (#22052) * [#22051]: add read_time support in Google Cloud Datastore connector. * Re-arrange @Nullable annotation position. --- .../beam/gradle/BeamModulePlugin.groovy | 4 +- .../sdk/io/gcp/datastore/DatastoreV1.java | 167 +- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 1531 +++++++++-------- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 19 +- .../beam/sdk/io/gcp/datastore/V1ReadIT.java | 58 +- 5 files changed, 1023 insertions(+), 756 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index c7d8a55145bd..2e5cf0eccf2b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -592,7 +592,7 @@ class BeamModulePlugin implements Plugin { google_cloud_core_grpc : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version google_cloud_datacatalog_v1beta1 : "com.google.cloud:google-cloud-datacatalog", // google_cloud_platform_libraries_bom sets version google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304", - google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.2.10", + google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.9.0", google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version @@ -687,7 +687,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_bigtable_admin_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version - proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.93.10", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.100.0", // google_cloud_platform_libraries_bom sets version proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 9137335943c3..06b7c5292fd5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -45,6 +45,7 @@ import com.google.datastore.v1.PartitionId; import com.google.datastore.v1.Query; import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.client.Datastore; @@ -54,6 +55,8 @@ import com.google.datastore.v1.client.DatastoreOptions; import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; import com.google.rpc.Code; import java.io.IOException; import java.io.Serializable; @@ -320,6 +323,8 @@ public abstract static class Read extends PTransform public abstract @Nullable String getLocalhost(); + public abstract @Nullable Instant getReadTime(); + @Override public abstract String toString(); @@ -339,6 +344,8 @@ abstract static class Builder { abstract Builder setLocalhost(String localhost); + abstract Builder setReadTime(Instant readTime); + abstract Read build(); } @@ -346,10 +353,11 @@ abstract static class Builder { * Computes the number of splits to be performed on the given query by querying the estimated * size from Cloud Datastore. */ - static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) { + static int getEstimatedNumSplits( + Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime) { int numSplits; try { - long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace); + long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace, readTime); LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes); numSplits = (int) @@ -370,7 +378,8 @@ static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable Str * table. */ private static long queryLatestStatisticsTimestamp( - Datastore datastore, @Nullable String namespace) throws DatastoreException { + Datastore datastore, @Nullable String namespace, @Nullable Instant readTime) + throws DatastoreException { Query.Builder query = Query.newBuilder(); // Note: namespace either being null or empty represents the default namespace, in which // case we treat it as not provided by the user. @@ -381,7 +390,7 @@ private static long queryLatestStatisticsTimestamp( } query.addOrder(makeOrder("timestamp", DESCENDING)); query.setLimit(Int32Value.newBuilder().setValue(1)); - RunQueryRequest request = makeRequest(query.build(), namespace); + RunQueryRequest request = makeRequest(query.build(), namespace, readTime); RunQueryResponse response = datastore.runQuery(request); QueryResultBatch batch = response.getBatch(); @@ -392,10 +401,14 @@ private static long queryLatestStatisticsTimestamp( return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000; } - /** Retrieve latest table statistics for a given kind, namespace, and datastore. */ + /** + * Retrieve latest table statistics for a given kind, namespace, and datastore. If the Read has + * readTime specified, the latest statistics at or before readTime is retrieved. + */ private static Entity getLatestTableStats( - String ourKind, @Nullable String namespace, Datastore datastore) throws DatastoreException { - long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace); + String ourKind, @Nullable String namespace, Datastore datastore, @Nullable Instant readTime) + throws DatastoreException { + long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace, readTime); LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp); Query.Builder queryBuilder = Query.newBuilder(); @@ -410,7 +423,7 @@ private static Entity getLatestTableStats( makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(), makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build())); - RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime); long now = System.currentTimeMillis(); RunQueryResponse response = datastore.runQuery(request); @@ -433,10 +446,11 @@ private static Entity getLatestTableStats( * *

See https://cloud.google.com/datastore/docs/concepts/stats. */ - static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) + static long getEstimatedSizeBytes( + Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime) throws DatastoreException { String ourKind = query.getKind(0).getName(); - Entity entity = getLatestTableStats(ourKind, namespace, datastore); + Entity entity = getLatestTableStats(ourKind, namespace, datastore, readTime); return entity.getProperties().get("entity_bytes").getIntegerValue(); } @@ -451,21 +465,38 @@ private static PartitionId.Builder forNamespace(@Nullable String namespace) { return partitionBuilder; } - /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */ - static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { - return RunQueryRequest.newBuilder() - .setQuery(query) - .setPartitionId(forNamespace(namespace)) - .build(); + /** + * Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}, optionally at + * the requested {@code readTime}. + */ + static RunQueryRequest makeRequest( + Query query, @Nullable String namespace, @Nullable Instant readTime) { + RunQueryRequest.Builder request = + RunQueryRequest.newBuilder().setQuery(query).setPartitionId(forNamespace(namespace)); + if (readTime != null) { + Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis()); + request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build()); + } + return request.build(); } @VisibleForTesting - /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */ - static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) { - return RunQueryRequest.newBuilder() - .setGqlQuery(gqlQuery) - .setPartitionId(forNamespace(namespace)) - .build(); + /** + * Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}, optionally + * at the requested {@code readTime}. + */ + static RunQueryRequest makeRequest( + GqlQuery gqlQuery, @Nullable String namespace, @Nullable Instant readTime) { + RunQueryRequest.Builder request = + RunQueryRequest.newBuilder() + .setGqlQuery(gqlQuery) + .setPartitionId(forNamespace(namespace)); + if (readTime != null) { + Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis()); + request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build()); + } + + return request.build(); } /** @@ -477,10 +508,16 @@ private static List splitQuery( @Nullable String namespace, Datastore datastore, QuerySplitter querySplitter, - int numSplits) + int numSplits, + @Nullable Instant readTime) throws DatastoreException { // If namespace is set, include it in the split request so splits are calculated accordingly. - return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore); + PartitionId partitionId = forNamespace(namespace).build(); + if (readTime != null) { + Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis()); + return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto); + } + return querySplitter.getSplits(query, partitionId, numSplits, datastore); } /** @@ -497,11 +534,13 @@ private static List splitQuery( * problem in practice. */ @VisibleForTesting - static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, String namespace) + static Query translateGqlQueryWithLimitCheck( + String gql, Datastore datastore, String namespace, @Nullable Instant readTime) throws DatastoreException { String gqlQueryWithZeroLimit = gql + " LIMIT 0"; try { - Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace); + Query translatedQuery = + translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace, readTime); // Clear the limit that we set. return translatedQuery.toBuilder().clearLimit().build(); } catch (DatastoreException e) { @@ -512,7 +551,7 @@ static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, St LOG.warn("Failed to translate Gql query '{}': {}", gqlQueryWithZeroLimit, e.getMessage()); LOG.warn("User query might have a limit already set, so trying without zero limit"); // Retry without the zero limit. - return translateGqlQuery(gql, datastore, namespace); + return translateGqlQuery(gql, datastore, namespace, readTime); } else { throw e; } @@ -520,10 +559,11 @@ static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, St } /** Translates a gql query string to {@link Query}. */ - private static Query translateGqlQuery(String gql, Datastore datastore, String namespace) + private static Query translateGqlQuery( + String gql, Datastore datastore, String namespace, @Nullable Instant readTime) throws DatastoreException { GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build(); - RunQueryRequest req = makeRequest(gqlQuery, namespace); + RunQueryRequest req = makeRequest(gqlQuery, namespace, readTime); return datastore.runQuery(req).getQuery(); } @@ -628,6 +668,11 @@ public DatastoreV1.Read withLocalhost(String localhost) { return toBuilder().setLocalhost(localhost).build(); } + /** Returns a new {@link DatastoreV1.Read} that reads at the specified {@code readTime}. */ + public DatastoreV1.Read withReadTime(Instant readTime) { + return toBuilder().setReadTime(readTime).build(); + } + /** Returns Number of entities available for reading. */ public long getNumEntities( PipelineOptions options, String ourKind, @Nullable String namespace) { @@ -638,7 +683,7 @@ public long getNumEntities( datastoreFactory.getDatastore( options, v1Options.getProjectId(), v1Options.getLocalhost()); - Entity entity = getLatestTableStats(ourKind, namespace, datastore); + Entity entity = getLatestTableStats(ourKind, namespace, datastore, getReadTime()); return entity.getProperties().get("count").getIntegerValue(); } catch (Exception e) { return -1; @@ -688,13 +733,13 @@ public PCollection expand(PBegin input) { inputQuery = input .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of())) - .apply(ParDo.of(new GqlQueryTranslateFn(v1Options))); + .apply(ParDo.of(new GqlQueryTranslateFn(v1Options, getReadTime()))); } return inputQuery - .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits()))) + .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits(), getReadTime()))) .apply("Reshuffle", Reshuffle.viaRandomKey()) - .apply("Read", ParDo.of(new ReadFn(v1Options))); + .apply("Read", ParDo.of(new ReadFn(v1Options, getReadTime()))); } @Override @@ -705,7 +750,8 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId")) .addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace")) .addIfNotNull(DisplayData.item("query", query).withLabel("Query")) - .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery")); + .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery")) + .addIfNotNull(DisplayData.item("readTime", getReadTime()).withLabel("ReadTime")); } @VisibleForTesting @@ -764,15 +810,22 @@ public void populateDisplayData(DisplayData.Builder builder) { /** A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */ static class GqlQueryTranslateFn extends DoFn { private final V1Options v1Options; + private final @Nullable Instant readTime; private transient Datastore datastore; private final V1DatastoreFactory datastoreFactory; GqlQueryTranslateFn(V1Options options) { - this(options, new V1DatastoreFactory()); + this(options, null, new V1DatastoreFactory()); } - GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) { + GqlQueryTranslateFn(V1Options options, @Nullable Instant readTime) { + this(options, readTime, new V1DatastoreFactory()); + } + + GqlQueryTranslateFn( + V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) { this.v1Options = options; + this.readTime = readTime; this.datastoreFactory = datastoreFactory; } @@ -788,7 +841,8 @@ public void processElement(ProcessContext c) throws Exception { String gqlQuery = c.element(); LOG.info("User query: '{}'", gqlQuery); Query query = - translateGqlQueryWithLimitCheck(gqlQuery, datastore, v1Options.getNamespace()); + translateGqlQueryWithLimitCheck( + gqlQuery, datastore, v1Options.getNamespace(), readTime); LOG.info("User gql query translated to Query({})", query); c.output(query); } @@ -803,6 +857,8 @@ static class SplitQueryFn extends DoFn { private final V1Options options; // number of splits to make for a given query private final int numSplits; + // time from which to run the queries + private final @Nullable Instant readTime; private final V1DatastoreFactory datastoreFactory; // Datastore client @@ -811,14 +867,23 @@ static class SplitQueryFn extends DoFn { private transient QuerySplitter querySplitter; public SplitQueryFn(V1Options options, int numSplits) { - this(options, numSplits, new V1DatastoreFactory()); + this(options, numSplits, null, new V1DatastoreFactory()); + } + + public SplitQueryFn(V1Options options, int numSplits, @Nullable Instant readTime) { + this(options, numSplits, readTime, new V1DatastoreFactory()); } @VisibleForTesting - SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) { + SplitQueryFn( + V1Options options, + int numSplits, + @Nullable Instant readTime, + V1DatastoreFactory datastoreFactory) { this.options = options; this.numSplits = numSplits; this.datastoreFactory = datastoreFactory; + this.readTime = readTime; } @StartBundle @@ -842,7 +907,8 @@ public void processElement(ProcessContext c) throws Exception { int estimatedNumSplits; // Compute the estimated numSplits if numSplits is not specified by the user. if (numSplits <= 0) { - estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace()); + estimatedNumSplits = + getEstimatedNumSplits(datastore, query, options.getNamespace(), readTime); } else { estimatedNumSplits = numSplits; } @@ -852,7 +918,12 @@ public void processElement(ProcessContext c) throws Exception { try { querySplits = splitQuery( - query, options.getNamespace(), datastore, querySplitter, estimatedNumSplits); + query, + options.getNamespace(), + datastore, + querySplitter, + estimatedNumSplits, + readTime); } catch (Exception e) { LOG.warn("Unable to parallelize the given query: {}", query, e); querySplits = ImmutableList.of(query); @@ -873,6 +944,7 @@ public void populateDisplayData(DisplayData.Builder builder) { DisplayData.item("numQuerySplits", numSplits) .withLabel("Requested number of Query splits")); } + builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime")); } } @@ -880,6 +952,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @VisibleForTesting static class ReadFn extends DoFn { private final V1Options options; + private final @Nullable Instant readTime; private final V1DatastoreFactory datastoreFactory; // Datastore client private transient Datastore datastore; @@ -894,12 +967,17 @@ static class ReadFn extends DoFn { .withInitialBackoff(Duration.standardSeconds(5)); public ReadFn(V1Options options) { - this(options, new V1DatastoreFactory()); + this(options, null, new V1DatastoreFactory()); + } + + public ReadFn(V1Options options, @Nullable Instant readTime) { + this(options, readTime, new V1DatastoreFactory()); } @VisibleForTesting - ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) { + ReadFn(V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) { this.options = options; + this.readTime = readTime; this.datastoreFactory = datastoreFactory; } @@ -967,7 +1045,7 @@ public void processElement(ProcessContext context) throws Exception { queryBuilder.setStartCursor(currentBatch.getEndCursor()); } - RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime); RunQueryResponse response = runQueryWithRetries(request); currentBatch = response.getBatch(); @@ -1005,6 +1083,7 @@ public void processElement(ProcessContext context) throws Exception { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.include("options", options); + builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime")); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 0fc895b0d9e3..4aed59c4da38 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -67,8 +67,12 @@ import com.google.datastore.v1.client.DatastoreException; import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; import com.google.rpc.Code; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -101,17 +105,23 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** Tests for {@link DatastoreV1}. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class DatastoreV1Test { private static final String PROJECT_ID = "testProject"; private static final String NAMESPACE = "testNamespace"; @@ -119,6 +129,8 @@ public class DatastoreV1Test { private static final Query QUERY; private static final String LOCALHOST = "localhost:9955"; private static final String GQL_QUERY = "SELECT * from " + KIND; + private static final Instant TIMESTAMP = Instant.now(); + private static final Timestamp TIMESTAMP_PROTO = Timestamps.fromMillis(TIMESTAMP.getMillis()); private static final V1Options V_1_OPTIONS; static { @@ -128,9 +140,9 @@ public class DatastoreV1Test { V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null); } - @Mock private Datastore mockDatastore; - @Mock QuerySplitter mockQuerySplitter; - @Mock V1DatastoreFactory mockDatastoreFactory; + @Mock protected Datastore mockDatastore; + @Mock protected QuerySplitter mockQuerySplitter; + @Mock protected V1DatastoreFactory mockDatastoreFactory; @Rule public final ExpectedException thrown = ExpectedException.none(); @@ -146,782 +158,890 @@ public void setUp() { MetricsEnvironment.setProcessWideContainer(container); } - @Test - public void testBuildRead() throws Exception { - DatastoreV1.Read read = - DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId().get()); - assertEquals(NAMESPACE, read.getNamespace().get()); - } - - @Test - public void testBuildReadWithGqlQuery() throws Exception { - DatastoreV1.Read read = - DatastoreIO.v1() - .read() - .withProjectId(PROJECT_ID) - .withLiteralGqlQuery(GQL_QUERY) - .withNamespace(NAMESPACE); - assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get()); - assertEquals(PROJECT_ID, read.getProjectId().get()); - assertEquals(NAMESPACE, read.getNamespace().get()); - } - - /** {@link #testBuildRead} but constructed in a different order. */ - @Test - public void testBuildReadAlt() throws Exception { - DatastoreV1.Read read = - DatastoreIO.v1() - .read() - .withQuery(QUERY) - .withNamespace(NAMESPACE) - .withProjectId(PROJECT_ID) - .withLocalhost(LOCALHOST); - assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId().get()); - assertEquals(NAMESPACE, read.getNamespace().get()); - assertEquals(LOCALHOST, read.getLocalhost()); - } - - @Test - public void testReadValidationFailsQueryAndGqlQuery() throws Exception { - DatastoreV1.Read read = - DatastoreIO.v1() - .read() - .withProjectId(PROJECT_ID) - .withLiteralGqlQuery(GQL_QUERY) - .withQuery(QUERY); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive"); - read.expand(null); - } - - @Test - public void testReadValidationFailsQueryLimitZero() throws Exception { - Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Invalid query limit 0: must be positive"); - - DatastoreIO.v1().read().withQuery(invalidLimit); - } - - @Test - public void testReadValidationFailsQueryLimitNegative() throws Exception { - Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Invalid query limit -5: must be positive"); - - DatastoreIO.v1().read().withQuery(invalidLimit); - } - - @Test - public void testReadDisplayData() { - DatastoreV1.Read read = - DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); + @RunWith(JUnit4.class) + public static class SingletonTests extends DatastoreV1Test { + @Test + public void testBuildRead() throws Exception { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withQuery(QUERY) + .withNamespace(NAMESPACE); + assertEquals(QUERY, read.getQuery()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); + } - DisplayData displayData = DisplayData.from(read); + @Test + public void testBuildReadWithReadTime() throws Exception { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withQuery(QUERY) + .withReadTime(TIMESTAMP); + assertEquals(TIMESTAMP, read.getReadTime()); + assertEquals(QUERY, read.getQuery()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + } - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - assertThat(displayData, hasDisplayItem("query", QUERY.toString())); - assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); - } + @Test + public void testBuildReadWithGqlQuery() throws Exception { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withLiteralGqlQuery(GQL_QUERY) + .withNamespace(NAMESPACE); + assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); + } - @Test - public void testReadDisplayDataWithGqlQuery() { - DatastoreV1.Read read = - DatastoreIO.v1() - .read() - .withProjectId(PROJECT_ID) - .withLiteralGqlQuery(GQL_QUERY) - .withNamespace(NAMESPACE); + /** {@link #testBuildRead} but constructed in a different order. */ + @Test + public void testBuildReadAlt() throws Exception { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withReadTime(TIMESTAMP) + .withQuery(QUERY) + .withNamespace(NAMESPACE) + .withProjectId(PROJECT_ID) + .withLocalhost(LOCALHOST); + assertEquals(TIMESTAMP, read.getReadTime()); + assertEquals(QUERY, read.getQuery()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); + assertEquals(LOCALHOST, read.getLocalhost()); + } - DisplayData displayData = DisplayData.from(read); + @Test + public void testReadValidationFailsQueryAndGqlQuery() throws Exception { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withLiteralGqlQuery(GQL_QUERY) + .withQuery(QUERY); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive"); + read.expand(null); + } - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY)); - assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); - } + @Test + public void testReadValidationFailsQueryLimitZero() throws Exception { + Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid query limit 0: must be positive"); - @Test - public void testSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - int numSplits = 98; - PTransform> read = - DatastoreIO.v1() - .read() - .withProjectId(PROJECT_ID) - .withQuery(Query.newBuilder().build()) - .withNumQuerySplits(numSplits); - - String assertMessage = "DatastoreIO read should include the '%s' in its primitive display data"; - Set displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat( - String.format(assertMessage, "project id"), - displayData, - hasItem(hasDisplayItem("projectId", PROJECT_ID))); - assertThat( - String.format(assertMessage, "number of query splits"), - displayData, - hasItem(hasDisplayItem("numQuerySplits", numSplits))); - } + DatastoreIO.v1().read().withQuery(invalidLimit); + } - @Test - public void testWriteDisplayData() { - Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + @Test + public void testReadValidationFailsQueryLimitNegative() throws Exception { + Query invalidLimit = + Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid query limit -5: must be positive"); - DisplayData displayData = DisplayData.from(write); + DatastoreIO.v1().read().withQuery(invalidLimit); + } - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } + @Test + public void testReadDisplayData() { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withQuery(QUERY) + .withNamespace(NAMESPACE) + .withReadTime(TIMESTAMP); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + assertThat(displayData, hasDisplayItem("query", QUERY.toString())); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP)); + } - @Test - public void testDeleteEntityDisplayData() { - DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); + @Test + public void testReadDisplayDataWithGqlQuery() { + DatastoreV1.Read read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withLiteralGqlQuery(GQL_QUERY) + .withNamespace(NAMESPACE) + .withReadTime(TIMESTAMP); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY)); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP)); + } - DisplayData displayData = DisplayData.from(deleteEntity); + @Test + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + int numSplits = 98; + PTransform> read = + DatastoreIO.v1() + .read() + .withProjectId(PROJECT_ID) + .withQuery(Query.newBuilder().build()) + .withNumQuerySplits(numSplits); + + String assertMessage = + "DatastoreIO read should include the '%s' in its primitive display data"; + Set displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat( + String.format(assertMessage, "project id"), + displayData, + hasItem(hasDisplayItem("projectId", PROJECT_ID))); + assertThat( + String.format(assertMessage, "number of query splits"), + displayData, + hasItem(hasDisplayItem("numQuerySplits", numSplits))); + } - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } + @Test + public void testWriteDisplayData() { + Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); - @Test - public void testDeleteKeyDisplayData() { - DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); + DisplayData displayData = DisplayData.from(write); - DisplayData displayData = DisplayData.from(deleteKey); + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } + @Test + public void testDeleteEntityDisplayData() { + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); - @Test - public void testWritePrimitiveDisplayData() { - int hintNumWorkers = 10; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform, ?> write = - DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat( - "DatastoreIO write should include the project in its primitive display data", - displayData, - hasItem(hasDisplayItem("projectId"))); - assertThat( - "DatastoreIO write should include the upsertFn in its primitive display data", - displayData, - hasItem(hasDisplayItem("upsertFn"))); - assertThat( - "DatastoreIO write should include ramp-up throttling worker count hint if enabled", - displayData, - hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); - } + DisplayData displayData = DisplayData.from(deleteEntity); - @Test - public void testWritePrimitiveDisplayDataDisabledThrottler() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform, ?> write = - DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat( - "DatastoreIO write should include the project in its primitive display data", - displayData, - hasItem(hasDisplayItem("projectId"))); - assertThat( - "DatastoreIO write should include the upsertFn in its primitive display data", - displayData, - hasItem(hasDisplayItem("upsertFn"))); - assertThat( - "DatastoreIO write should include ramp-up throttling worker count hint if enabled", - displayData, - not(hasItem(hasDisplayItem("hintNumWorkers")))); - } + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } - @Test - public void testDeleteEntityPrimitiveDisplayData() { - int hintNumWorkers = 10; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform, ?> write = - DatastoreIO.v1() - .deleteEntity() - .withProjectId("myProject") - .withHintNumWorkers(hintNumWorkers); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat( - "DatastoreIO write should include the project in its primitive display data", - displayData, - hasItem(hasDisplayItem("projectId"))); - assertThat( - "DatastoreIO write should include the deleteEntityFn in its primitive display data", - displayData, - hasItem(hasDisplayItem("deleteEntityFn"))); - assertThat( - "DatastoreIO write should include ramp-up throttling worker count hint if enabled", - displayData, - hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); - } + @Test + public void testDeleteKeyDisplayData() { + DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); - @Test - public void testDeleteKeyPrimitiveDisplayData() { - int hintNumWorkers = 10; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform, ?> write = - DatastoreIO.v1().deleteKey().withProjectId("myProject").withHintNumWorkers(hintNumWorkers); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat( - "DatastoreIO write should include the project in its primitive display data", - displayData, - hasItem(hasDisplayItem("projectId"))); - assertThat( - "DatastoreIO write should include the deleteKeyFn in its primitive display data", - displayData, - hasItem(hasDisplayItem("deleteKeyFn"))); - assertThat( - "DatastoreIO write should include ramp-up throttling worker count hint if enabled", - displayData, - hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); - } + DisplayData displayData = DisplayData.from(deleteKey); - /** Test building a Write using builder methods. */ - @Test - public void testBuildWrite() throws Exception { - DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); - assertEquals(PROJECT_ID, write.getProjectId()); - } + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } - /** Test the detection of complete and incomplete keys. */ - @Test - public void testHasNameOrId() { - Key key; - // Complete with name, no ancestor - key = makeKey("bird", "finch").build(); - assertTrue(isValidKey(key)); - - // Complete with id, no ancestor - key = makeKey("bird", 123).build(); - assertTrue(isValidKey(key)); - - // Incomplete, no ancestor - key = makeKey("bird").build(); - assertFalse(isValidKey(key)); - - // Complete with name and ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird", "horned").build(); - assertTrue(isValidKey(key)); - - // Complete with id and ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird", 123).build(); - assertTrue(isValidKey(key)); - - // Incomplete with ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird").build(); - assertFalse(isValidKey(key)); - - key = makeKey().build(); - assertFalse(isValidKey(key)); - } + @Test + public void testWritePrimitiveDisplayData() { + int hintNumWorkers = 10; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform, ?> write = + DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat( + "DatastoreIO write should include the project in its primitive display data", + displayData, + hasItem(hasDisplayItem("projectId"))); + assertThat( + "DatastoreIO write should include the upsertFn in its primitive display data", + displayData, + hasItem(hasDisplayItem("upsertFn"))); + assertThat( + "DatastoreIO write should include ramp-up throttling worker count hint if enabled", + displayData, + hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); + } - /** Test that entities with incomplete keys cannot be updated. */ - @Test - public void testAddEntitiesWithIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - UpsertFn upsertFn = new UpsertFn(); + @Test + public void testWritePrimitiveDisplayDataDisabledThrottler() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform, ?> write = + DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat( + "DatastoreIO write should include the project in its primitive display data", + displayData, + hasItem(hasDisplayItem("projectId"))); + assertThat( + "DatastoreIO write should include the upsertFn in its primitive display data", + displayData, + hasItem(hasDisplayItem("upsertFn"))); + assertThat( + "DatastoreIO write should include ramp-up throttling worker count hint if enabled", + displayData, + not(hasItem(hasDisplayItem("hintNumWorkers")))); + } - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys"); + @Test + public void testDeleteEntityPrimitiveDisplayData() { + int hintNumWorkers = 10; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform, ?> write = + DatastoreIO.v1() + .deleteEntity() + .withProjectId("myProject") + .withHintNumWorkers(hintNumWorkers); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat( + "DatastoreIO write should include the project in its primitive display data", + displayData, + hasItem(hasDisplayItem("projectId"))); + assertThat( + "DatastoreIO write should include the deleteEntityFn in its primitive display data", + displayData, + hasItem(hasDisplayItem("deleteEntityFn"))); + assertThat( + "DatastoreIO write should include ramp-up throttling worker count hint if enabled", + displayData, + hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); + } - upsertFn.apply(entity); - } + @Test + public void testDeleteKeyPrimitiveDisplayData() { + int hintNumWorkers = 10; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform, ?> write = + DatastoreIO.v1() + .deleteKey() + .withProjectId("myProject") + .withHintNumWorkers(hintNumWorkers); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat( + "DatastoreIO write should include the project in its primitive display data", + displayData, + hasItem(hasDisplayItem("projectId"))); + assertThat( + "DatastoreIO write should include the deleteKeyFn in its primitive display data", + displayData, + hasItem(hasDisplayItem("deleteKeyFn"))); + assertThat( + "DatastoreIO write should include ramp-up throttling worker count hint if enabled", + displayData, + hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers))); + } - @Test - /** Test that entities with valid keys are transformed to upsert mutations. */ - public void testAddEntities() throws Exception { - Key key = makeKey("bird", "finch").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - UpsertFn upsertFn = new UpsertFn(); + /** Test building a Write using builder methods. */ + @Test + public void testBuildWrite() throws Exception { + DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + assertEquals(PROJECT_ID, write.getProjectId()); + } - Mutation expectedMutation = makeUpsert(entity).build(); - assertEquals(expectedMutation, upsertFn.apply(entity)); - } + /** Test the detection of complete and incomplete keys. */ + @Test + public void testHasNameOrId() { + Key key; + // Complete with name, no ancestor + key = makeKey("bird", "finch").build(); + assertTrue(isValidKey(key)); + + // Complete with id, no ancestor + key = makeKey("bird", 123).build(); + assertTrue(isValidKey(key)); + + // Incomplete, no ancestor + key = makeKey("bird").build(); + assertFalse(isValidKey(key)); + + // Complete with name and ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird", "horned").build(); + assertTrue(isValidKey(key)); + + // Complete with id and ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird", 123).build(); + assertTrue(isValidKey(key)); + + // Incomplete with ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird").build(); + assertFalse(isValidKey(key)); + + key = makeKey().build(); + assertFalse(isValidKey(key)); + } - /** Test that entities with incomplete keys cannot be deleted. */ - @Test - public void testDeleteEntitiesWithIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + /** Test that entities with incomplete keys cannot be updated. */ + @Test + public void testAddEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Entities to be deleted from the Cloud Datastore must have complete keys"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys"); - deleteEntityFn.apply(entity); - } + upsertFn.apply(entity); + } - /** Test that entities with valid keys are transformed to delete mutations. */ - @Test - public void testDeleteEntities() throws Exception { - Key key = makeKey("bird", "finch").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + @Test + /** Test that entities with valid keys are transformed to upsert mutations. */ + public void testAddEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); - Mutation expectedMutation = makeDelete(entity.getKey()).build(); - assertEquals(expectedMutation, deleteEntityFn.apply(entity)); - } + Mutation expectedMutation = makeUpsert(entity).build(); + assertEquals(expectedMutation, upsertFn.apply(entity)); + } - /** Test that incomplete keys cannot be deleted. */ - @Test - public void testDeleteIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + /** Test that entities with incomplete keys cannot be deleted. */ + @Test + public void testDeleteEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Entities to be deleted from the Cloud Datastore must have complete keys"); - deleteKeyFn.apply(key); - } + deleteEntityFn.apply(entity); + } - /** Test that valid keys are transformed to delete mutations. */ - @Test - public void testDeleteKeys() { - Key key = makeKey("bird", "finch").build(); - DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + /** Test that entities with valid keys are transformed to delete mutations. */ + @Test + public void testDeleteEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); - Mutation expectedMutation = makeDelete(key).build(); - assertEquals(expectedMutation, deleteKeyFn.apply(key)); - } + Mutation expectedMutation = makeDelete(entity.getKey()).build(); + assertEquals(expectedMutation, deleteEntityFn.apply(entity)); + } - @Test - public void testDatastoreWriteFnDisplayData() { - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null); - DisplayData displayData = DisplayData.from(datastoreWriter); - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } + /** Test that incomplete keys cannot be deleted. */ + @Test + public void testDeleteIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); - /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ - @Test - public void testDatatoreWriterFnWithOneBatch() throws Exception { - datastoreWriterFnTest(100); - verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); - } + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete"); - /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */ - @Test - public void testDatatoreWriterFnWithMultipleBatches() throws Exception { - datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100); - verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5); - } + deleteKeyFn.apply(key); + } - /** - * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of - * write batch size. - */ - @Test - public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { - datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2); - verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); - } + /** Test that valid keys are transformed to delete mutations. */ + @Test + public void testDeleteKeys() { + Key key = makeKey("bird", "finch").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); - // A helper method to test DatastoreWriterFn for various batch sizes. - private void datastoreWriterFnTest(int numMutations) throws Exception { - // Create the requested number of mutations. - List mutations = new ArrayList<>(numMutations); - for (int i = 0; i < numMutations; ++i) { - mutations.add( - makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + Mutation expectedMutation = makeDelete(key).build(); + assertEquals(expectedMutation, deleteKeyFn.apply(key)); } - DatastoreWriterFn datastoreWriter = - new DatastoreWriterFn( - StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(mutations); - - int start = 0; - while (start < numMutations) { - int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START); - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - commitRequest.addAllMutations(mutations.subList(start, end)); - // Verify all the batch requests were made with the expected mutations. - verify(mockDatastore, times(1)).commit(commitRequest.build()); - start = end; + @Test + public void testDatastoreWriteFnDisplayData() { + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null); + DisplayData displayData = DisplayData.from(datastoreWriter); + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } - } - /** - * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches. - */ - @Test - public void testDatatoreWriterFnWithLargeEntities() throws Exception { - List mutations = new ArrayList<>(); - int entitySize = 0; - for (int i = 0; i < 12; ++i) { - Entity entity = - Entity.newBuilder() - .setKey(makeKey("key" + i, i + 1)) - .putProperties( - "long", - makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build()) - .build(); - entitySize = entity.getSerializedSize(); // Take the size of any one entity. - mutations.add(makeUpsert(entity).build()); + /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ + @Test + public void testDatatoreWriterFnWithOneBatch() throws Exception { + datastoreWriterFnTest(100); + verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); } - DatastoreWriterFn datastoreWriter = - new DatastoreWriterFn( - StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(mutations); - - // This test is over-specific currently; it requires that we split the 12 entity writes into 3 - // requests, but we only need each CommitRequest to be less than 10MB in size. - int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; - int start = 0; - while (start < mutations.size()) { - int end = Math.min(mutations.size(), start + entitiesPerRpc); - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - commitRequest.addAllMutations(mutations.subList(start, end)); - // Verify all the batch requests were made with the expected mutations. - verify(mockDatastore).commit(commitRequest.build()); - start = end; + /** + * Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. + */ + @Test + public void testDatatoreWriterFnWithMultipleBatches() throws Exception { + datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100); + verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5); } - } - /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */ - @Test - public void testDatatoreWriterFnRetriesErrors() throws Exception { - List mutations = new ArrayList<>(); - int numRpcs = 2; - for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) { - mutations.add( - makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + /** + * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of + * write batch size. + */ + @Test + public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { + datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2); + verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); } - CommitResponse successfulCommit = CommitResponse.getDefaultInstance(); - when(mockDatastore.commit(any(CommitRequest.class))) - .thenReturn(successfulCommit) - .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null)) - .thenReturn(successfulCommit); - - DatastoreWriterFn datastoreWriter = - new DatastoreWriterFn( - StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(mutations); - verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); - verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1); - } - - /** - * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a - * query. - */ - @Test - public void testEstimatedSizeBytes() throws Exception { - long entityBytes = 100L; - // In seconds - long timestamp = 1234L; - - RunQueryRequest latestTimestampRequest = - makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE); - RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); - // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE); - RunQueryResponse statResponse = makeStatKindResponse(entityBytes); - - when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse); - when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse); - - assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE)); - verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); - verify(mockDatastore, times(1)).runQuery(statRequest); - } - - /** Tests {@link SplitQueryFn} when number of query splits is specified. */ - @Test - public void testSplitQueryFnWithNumSplits() throws Exception { - int numSplits = 100; - when(mockQuerySplitter.getSplits( - eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) - .thenReturn(splitQuery(QUERY, numSplits)); + // A helper method to test DatastoreWriterFn for various batch sizes. + private void datastoreWriterFnTest(int numMutations) throws Exception { + // Create the requested number of mutations. + List mutations = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + } + + DatastoreWriterFn datastoreWriter = + new DatastoreWriterFn( + StaticValueProvider.of(PROJECT_ID), + null, + mockDatastoreFactory, + new FakeWriteBatcher()); + DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + + int start = 0; + while (start < numMutations) { + int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START); + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + commitRequest.addAllMutations(mutations.subList(start, end)); + // Verify all the batch requests were made with the expected mutations. + verify(mockDatastore, times(1)).commit(commitRequest.build()); + start = end; + } + } - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); - DoFnTester doFnTester = DoFnTester.of(splitQueryFn); /** - * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through - * mock factory using a when clause for unit testing purposes, it is not serializable because it - * doesn't have a no-arg constructor. Thus disabling the cloning to prevent the doFn from being - * serialized. + * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches. */ - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List queries = doFnTester.processBundle(QUERY); - - assertEquals(queries.size(), numSplits); + @Test + public void testDatatoreWriterFnWithLargeEntities() throws Exception { + List mutations = new ArrayList<>(); + int entitySize = 0; + for (int i = 0; i < 12; ++i) { + Entity entity = + Entity.newBuilder() + .setKey(makeKey("key" + i, i + 1)) + .putProperties( + "long", + makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build()) + .build(); + entitySize = entity.getSerializedSize(); // Take the size of any one entity. + mutations.add(makeUpsert(entity).build()); + } + + DatastoreWriterFn datastoreWriter = + new DatastoreWriterFn( + StaticValueProvider.of(PROJECT_ID), + null, + mockDatastoreFactory, + new FakeWriteBatcher()); + DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + + // This test is over-specific currently; it requires that we split the 12 entity writes into 3 + // requests, but we only need each CommitRequest to be less than 10MB in size. + int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; + int start = 0; + while (start < mutations.size()) { + int end = Math.min(mutations.size(), start + entitiesPerRpc); + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + commitRequest.addAllMutations(mutations.subList(start, end)); + // Verify all the batch requests were made with the expected mutations. + verify(mockDatastore).commit(commitRequest.build()); + start = end; + } + } - // Confirms that sub-queries are not equal to original when there is more than one split. - for (Query subQuery : queries) { - assertNotEquals(subQuery, QUERY); + /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */ + @Test + public void testDatatoreWriterFnRetriesErrors() throws Exception { + List mutations = new ArrayList<>(); + int numRpcs = 2; + for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + } + + CommitResponse successfulCommit = CommitResponse.getDefaultInstance(); + when(mockDatastore.commit(any(CommitRequest.class))) + .thenReturn(successfulCommit) + .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null)) + .thenReturn(successfulCommit); + + DatastoreWriterFn datastoreWriter = + new DatastoreWriterFn( + StaticValueProvider.of(PROJECT_ID), + null, + mockDatastoreFactory, + new FakeWriteBatcher()); + DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); + verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1); } - verify(mockQuerySplitter, times(1)) - .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); - verifyZeroInteractions(mockDatastore); - } + /** Test options. * */ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider getDatastoreProject(); - /** Tests {@link SplitQueryFn} when no query splits is specified. */ - @Test - public void testSplitQueryFnWithoutNumSplits() throws Exception { - // Force SplitQueryFn to compute the number of query splits - int numSplits = 0; - int expectedNumSplits = 20; - long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; - // In seconds - long timestamp = 1234L; - - RunQueryRequest latestTimestampRequest = - makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE); - RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); - - // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE); - RunQueryResponse statResponse = makeStatKindResponse(entityBytes); - - when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse); - when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse); - when(mockQuerySplitter.getSplits( - eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) - .thenReturn(splitQuery(QUERY, expectedNumSplits)); - - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); - DoFnTester doFnTester = DoFnTester.of(splitQueryFn); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List queries = doFnTester.processBundle(QUERY); + void setDatastoreProject(ValueProvider value); - assertEquals(expectedNumSplits, queries.size()); - verify(mockQuerySplitter, times(1)) - .getSplits(eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); - verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); - verify(mockDatastore, times(1)).runQuery(statRequest); - } + ValueProvider getGqlQuery(); - /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */ - @Test - public void testSplitQueryFnWithQueryLimit() throws Exception { - Query queryWithLimit = QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build(); + void setGqlQuery(ValueProvider value); - SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory); - DoFnTester doFnTester = DoFnTester.of(splitQueryFn); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List queries = doFnTester.processBundle(queryWithLimit); + ValueProvider getNamespace(); - assertEquals(1, queries.size()); - verifyNoMoreInteractions(mockDatastore); - verifyNoMoreInteractions(mockQuerySplitter); - } + void setNamespace(ValueProvider value); + } - /** Tests {@link ReadFn} with a query limit less than one batch. */ - @Test - public void testReadFnWithOneBatch() throws Exception { - readFnTest(5); - verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1); - } + /** + * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction + * time when built with {@link DatastoreV1.Read#withQuery(Query)}. + */ + @Test + public void testRuntimeOptionsNotCalledInApplyQuery() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply( + DatastoreIO.v1() + .read() + .withProjectId(options.getDatastoreProject()) + .withQuery(QUERY) + .withNamespace(options.getNamespace())) + .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); + } - /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */ - @Test - public void testReadFnWithMultipleBatches() throws Exception { - readFnTest(QUERY_BATCH_LIMIT + 5); - verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2); - } + /** + * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction + * time when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}. + */ + @Test + public void testRuntimeOptionsNotCalledInApplyGqlQuery() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply( + DatastoreIO.v1() + .read() + .withProjectId(options.getDatastoreProject()) + .withLiteralGqlQuery(options.getGqlQuery())) + .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); + } - /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */ - @Test - public void testReadFnWithBatchesExactMultiple() throws Exception { - readFnTest(5 * QUERY_BATCH_LIMIT); - verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5); - } + @Test + public void testWriteBatcherWithoutData() { + DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); + writeBatcher.start(); + assertEquals( + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0)); + } - /** Tests that {@link ReadFn} retries after an error. */ - @Test - public void testReadFnRetriesErrors() throws Exception { - // An empty query to read entities. - Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build(); + @Test + public void testWriteBatcherFastQueries() { + DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); + writeBatcher.start(); + writeBatcher.addRequestLatency(0, 1000, 200); + writeBatcher.addRequestLatency(0, 1000, 200); + assertEquals( + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0)); + } - // Use mockResponseForQuery to generate results. - when(mockDatastore.runQuery(any(RunQueryRequest.class))) - .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null)) - .thenAnswer( - invocationOnMock -> { - Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); - return mockResponseForQuery(q); - }); + @Test + public void testWriteBatcherSlowQueries() { + DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); + writeBatcher.start(); + writeBatcher.addRequestLatency(0, 10000, 200); + writeBatcher.addRequestLatency(0, 10000, 200); + assertEquals(120, writeBatcher.nextBatchSize(0)); + } - ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory); - DoFnTester doFnTester = DoFnTester.of(readFn); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(query); - verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1); - verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1); - } + @Test + public void testWriteBatcherSizeNotBelowMinimum() { + DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); + writeBatcher.start(); + writeBatcher.addRequestLatency(0, 75000, 50); + writeBatcher.addRequestLatency(0, 75000, 50); + assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0)); + } - @Test - public void testTranslateGqlQueryWithLimit() throws Exception { - String gql = "SELECT * from DummyKind LIMIT 10"; - String gqlWithZeroLimit = gql + " LIMIT 0"; - GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build(); - GqlQuery gqlQueryWithZeroLimit = - GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); - RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace()); - RunQueryRequest gqlRequestWithZeroLimit = - makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace()); - when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) - .thenThrow( - new DatastoreException( - "runQuery", - Code.INVALID_ARGUMENT, - "invalid query", - // dummy - new RuntimeException())); - when(mockDatastore.runQuery(gqlRequest)) - .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); - assertEquals( - translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY); - verify(mockDatastore, times(1)).runQuery(gqlRequest); - verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); + @Test + public void testWriteBatcherSlidingWindow() { + DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); + writeBatcher.start(); + writeBatcher.addRequestLatency(0, 30000, 50); + writeBatcher.addRequestLatency(50000, 8000, 200); + writeBatcher.addRequestLatency(100000, 8000, 200); + assertEquals(150, writeBatcher.nextBatchSize(150000)); + } } - @Test - public void testTranslateGqlQueryWithNoLimit() throws Exception { - String gql = "SELECT * from DummyKind"; - String gqlWithZeroLimit = gql + " LIMIT 0"; - GqlQuery gqlQueryWithZeroLimit = - GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); - RunQueryRequest gqlRequestWithZeroLimit = - makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace()); - when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) - .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); - assertEquals( - translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY); - verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); - } + @RunWith(Parameterized.class) + public static class ParameterizedTests extends DatastoreV1Test { + @Parameter(0) + public Instant readTime; - @Test - public void testTranslateGqlQueryWithException() throws Exception { - String gql = "SELECT * from DummyKind"; - String gqlWithZeroLimit = gql + " LIMIT 0"; - GqlQuery gqlQueryWithZeroLimit = - GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); - RunQueryRequest gqlRequestWithZeroLimit = - makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace()); - when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) - .thenThrow(new RuntimeException("TestException")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("TestException"); - translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()); - } + @Parameter(1) + public Timestamp readTimeProto; - /** Test options. * */ - public interface RuntimeTestOptions extends PipelineOptions { - ValueProvider getDatastoreProject(); + @Parameters(name = "readTime = {0}, readTimeProto = {1}") + public static Collection data() { + return Arrays.asList(new Object[] {null, null}, new Object[] {TIMESTAMP, TIMESTAMP_PROTO}); + } - void setDatastoreProject(ValueProvider value); + /** + * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a + * query. + */ + @Test + public void testEstimatedSizeBytes() throws Exception { + long entityBytes = 100L; + // In seconds + long timestamp = 1234L; + + RunQueryRequest latestTimestampRequest = + makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime); + RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); + // Per Kind statistics request and response + RunQueryRequest statRequest = + makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse); + when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse); + + assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE, readTime)); + verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); + verify(mockDatastore, times(1)).runQuery(statRequest); + } - ValueProvider getGqlQuery(); + /** Tests {@link SplitQueryFn} when number of query splits is specified. */ + @Test + public void testSplitQueryFnWithNumSplits() throws Exception { + int numSplits = 100; + + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, numSplits)); + when(mockQuerySplitter.getSplits( + eq(QUERY), + any(PartitionId.class), + eq(numSplits), + any(Datastore.class), + eq(readTimeProto))) + .thenReturn(splitQuery(QUERY, numSplits)); + + SplitQueryFn splitQueryFn = + new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(splitQueryFn); + /** + * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected + * through mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * doFn from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List queries = doFnTester.processBundle(QUERY); + + assertEquals(queries.size(), numSplits); + + // Confirms that sub-queries are not equal to original when there is more than one split. + for (Query subQuery : queries) { + assertNotEquals(subQuery, QUERY); + } + + if (readTime == null) { + verify(mockQuerySplitter, times(1)) + .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); + } else { + verify(mockQuerySplitter, times(1)) + .getSplits( + eq(QUERY), + any(PartitionId.class), + eq(numSplits), + any(Datastore.class), + eq(readTimeProto)); + } + verifyZeroInteractions(mockDatastore); + } - void setGqlQuery(ValueProvider value); + /** Tests {@link SplitQueryFn} when no query splits is specified. */ + @Test + public void testSplitQueryFnWithoutNumSplits() throws Exception { + // Force SplitQueryFn to compute the number of query splits + int numSplits = 0; + int expectedNumSplits = 20; + long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; + // In seconds + long timestamp = 1234L; + + RunQueryRequest latestTimestampRequest = + makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime); + RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); + + // Per Kind statistics request and response + RunQueryRequest statRequest = + makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse); + when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse); + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, expectedNumSplits)); + when(mockQuerySplitter.getSplits( + eq(QUERY), + any(PartitionId.class), + eq(expectedNumSplits), + any(Datastore.class), + eq(readTimeProto))) + .thenReturn(splitQuery(QUERY, expectedNumSplits)); + + SplitQueryFn splitQueryFn = + new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List queries = doFnTester.processBundle(QUERY); + + assertEquals(expectedNumSplits, queries.size()); + if (readTime == null) { + verify(mockQuerySplitter, times(1)) + .getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); + } else { + verify(mockQuerySplitter, times(1)) + .getSplits( + eq(QUERY), + any(PartitionId.class), + eq(expectedNumSplits), + any(Datastore.class), + eq(readTimeProto)); + } + verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); + verify(mockDatastore, times(1)).runQuery(statRequest); + } - ValueProvider getNamespace(); + /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */ + @Test + public void testSplitQueryFnWithQueryLimit() throws Exception { + Query queryWithLimit = + QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build(); - void setNamespace(ValueProvider value); - } + SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, readTime, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List queries = doFnTester.processBundle(queryWithLimit); - /** - * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time - * when built with {@link DatastoreV1.Read#withQuery(Query)}. - */ - @Test - public void testRuntimeOptionsNotCalledInApplyQuery() { - RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); - Pipeline pipeline = TestPipeline.create(options); - pipeline - .apply( - DatastoreIO.v1() - .read() - .withProjectId(options.getDatastoreProject()) - .withQuery(QUERY) - .withNamespace(options.getNamespace())) - .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); - } + assertEquals(1, queries.size()); + verifyNoMoreInteractions(mockDatastore); + verifyNoMoreInteractions(mockQuerySplitter); + } - /** - * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time - * when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}. - */ - @Test - public void testRuntimeOptionsNotCalledInApplyGqlQuery() { - RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); - Pipeline pipeline = TestPipeline.create(options); - pipeline - .apply( - DatastoreIO.v1() - .read() - .withProjectId(options.getDatastoreProject()) - .withLiteralGqlQuery(options.getGqlQuery())) - .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); - } + /** Tests {@link ReadFn} with a query limit less than one batch. */ + @Test + public void testReadFnWithOneBatch() throws Exception { + readFnTest(5, readTime); + verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1); + } - @Test - public void testWriteBatcherWithoutData() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0)); - } + /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */ + @Test + public void testReadFnWithMultipleBatches() throws Exception { + readFnTest(QUERY_BATCH_LIMIT + 5, readTime); + verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2); + } - @Test - public void testWriteBatcherFastQueries() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 1000, 200); - writeBatcher.addRequestLatency(0, 1000, 200); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0)); - } + /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */ + @Test + public void testReadFnWithBatchesExactMultiple() throws Exception { + readFnTest(5 * QUERY_BATCH_LIMIT, readTime); + verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5); + } - @Test - public void testWriteBatcherSlowQueries() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 10000, 200); - writeBatcher.addRequestLatency(0, 10000, 200); - assertEquals(120, writeBatcher.nextBatchSize(0)); - } + /** Tests that {@link ReadFn} retries after an error. */ + @Test + public void testReadFnRetriesErrors() throws Exception { + // An empty query to read entities. + Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build(); + + // Use mockResponseForQuery to generate results. + when(mockDatastore.runQuery(any(RunQueryRequest.class))) + .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null)) + .thenAnswer( + invocationOnMock -> { + Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); + return mockResponseForQuery(q); + }); + + ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(readFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(query); + verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1); + verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1); + } - @Test - public void testWriteBatcherSizeNotBelowMinimum() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 75000, 50); - writeBatcher.addRequestLatency(0, 75000, 50); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0)); - } + @Test + public void testTranslateGqlQueryWithLimit() throws Exception { + String gql = "SELECT * from DummyKind LIMIT 10"; + String gqlWithZeroLimit = gql + " LIMIT 0"; + GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build(); + GqlQuery gqlQueryWithZeroLimit = + GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); + + RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace(), readTime); + RunQueryRequest gqlRequestWithZeroLimit = + makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime); + when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) + .thenThrow( + new DatastoreException( + "runQuery", + Code.INVALID_ARGUMENT, + "invalid query", + // dummy + new RuntimeException())); + when(mockDatastore.runQuery(gqlRequest)) + .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); + assertEquals( + translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime), + QUERY); + verify(mockDatastore, times(1)).runQuery(gqlRequest); + verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); + } + + @Test + public void testTranslateGqlQueryWithNoLimit() throws Exception { + String gql = "SELECT * from DummyKind"; + String gqlWithZeroLimit = gql + " LIMIT 0"; + GqlQuery gqlQueryWithZeroLimit = + GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); + + RunQueryRequest gqlRequestWithZeroLimit = + makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime); + when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) + .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); + assertEquals( + translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime), + QUERY); + verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); + } - @Test - public void testWriteBatcherSlidingWindow() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 30000, 50); - writeBatcher.addRequestLatency(50000, 8000, 200); - writeBatcher.addRequestLatency(100000, 8000, 200); - assertEquals(150, writeBatcher.nextBatchSize(150000)); + @Test + public void testTranslateGqlQueryWithException() throws Exception { + String gql = "SELECT * from DummyKind"; + String gqlWithZeroLimit = gql + " LIMIT 0"; + GqlQuery gqlQueryWithZeroLimit = + GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build(); + RunQueryRequest gqlRequestWithZeroLimit = + makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime); + when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) + .thenThrow(new RuntimeException("TestException")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("TestException"); + translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime); + } } /** Helper Methods */ @@ -963,7 +1083,7 @@ private static RunQueryResponse mockResponseForQuery(Query q) { } /** Helper function to run a test reading from a {@link ReadFn}. */ - private void readFnTest(int numEntities) throws Exception { + protected void readFnTest(int numEntities, Instant readTime) throws Exception { // An empty query to read entities. Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(numEntities)).build(); @@ -976,7 +1096,7 @@ private void readFnTest(int numEntities) throws Exception { return mockResponseForQuery(q); }); - ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory); + ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory); DoFnTester doFnTester = DoFnTester.of(readFn); /** * Although Datastore client is marked transient in {@link ReadFn}, when injected through mock @@ -987,10 +1107,20 @@ private void readFnTest(int numEntities) throws Exception { doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); List entities = doFnTester.processBundle(query); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(RunQueryRequest.class); + int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT); - verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class)); + verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(requestCaptor.capture()); // Validate the number of results. assertEquals(numEntities, entities.size()); + // Validate read Time. + RunQueryRequest request = requestCaptor.getValue(); + if (readTime != null) { + assertEquals( + readTime.getMillis(), Timestamps.toMillis(request.getReadOptions().getReadTime())); + } else { + assertFalse(request.hasReadOptions()); + } } /** Builds a per-kind statistics response with the given entity size. */ @@ -1050,7 +1180,7 @@ private static Query makeLatestTimestampQuery(String namespace) { } /** Generate dummy query splits. */ - private List splitQuery(Query query, int numSplits) { + private static List splitQuery(Query query, int numSplits) { List queries = new ArrayList<>(); int offsetOfOriginal = query.getOffset(); for (int i = 0; i < numSplits; i++) { @@ -1082,7 +1212,8 @@ public int nextBatchSize(long timeSinceEpochMillis) { } } - private void verifyMetricWasSet(String method, String status, String namespace, long count) { + private static void verifyMetricWasSet( + String method, String status, String namespace, long count) { // Verify the metric as reported. HashMap labels = new HashMap<>(); labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java index 6d0bd52f26dd..ea00821f3604 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options; import org.apache.beam.sdk.transforms.DoFnTester; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -50,6 +52,8 @@ */ @RunWith(JUnit4.class) public class SplitQueryFnIT { + private Instant readTime = Instant.now().minus(Duration.standardSeconds(10)); + /** Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset. */ @Test public void testSplitQueryFnWithLargeDataset() throws Exception { @@ -59,7 +63,8 @@ public void testSplitQueryFnWithLargeDataset() throws Exception { // Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by // Datastore stats. int expectedNumSplits = 32; - testSplitQueryFn(projectId, kind, namespace, expectedNumSplits); + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null); + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime); } /** Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset. */ @@ -69,17 +74,23 @@ public void testSplitQueryFnWithSmallDataset() throws Exception { String kind = "shakespeare"; String namespace = null; int expectedNumSplits = NUM_QUERY_SPLITS_MIN; - testSplitQueryFn(projectId, kind, namespace, expectedNumSplits); + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null); + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime); } /** A helper method to test {@link SplitQueryFn} to generate the expected number of splits. */ private void testSplitQueryFn( - String projectId, String kind, @Nullable String namespace, int expectedNumSplits) + String projectId, + String kind, + @Nullable String namespace, + int expectedNumSplits, + @Nullable Instant readTime) throws Exception { Query.Builder query = Query.newBuilder(); query.addKindBuilder().setName(kind); - SplitQueryFn splitQueryFn = new SplitQueryFn(V1Options.from(projectId, namespace, null), 0); + SplitQueryFn splitQueryFn = + new SplitQueryFn(V1Options.from(projectId, namespace, null), 0, readTime); DoFnTester doFnTester = DoFnTester.of(splitQueryFn); List queries = doFnTester.processBundle(query.build()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java index 55b53b3f0c91..7d6fc5770388 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -48,7 +49,9 @@ public class V1ReadIT { private V1TestOptions options; private String project; private String ancestor; - private final long numEntities = 1000; + private final long numEntitiesBeforeReadTime = 600; + private final long totalNumEntities = 1000; + private Instant readTime; @Before public void setup() throws Exception { @@ -57,7 +60,15 @@ public void setup() throws Exception { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); ancestor = UUID.randomUUID().toString(); // Create entities and write them to datastore - writeEntitiesToDatastore(options, project, ancestor, numEntities); + writeEntitiesToDatastore(options, project, ancestor, 0, numEntitiesBeforeReadTime); + + Thread.sleep(1000); + readTime = Instant.now(); + Thread.sleep(1000); + + long moreEntitiesToWrite = totalNumEntities - numEntitiesBeforeReadTime; + writeEntitiesToDatastore( + options, project, ancestor, numEntitiesBeforeReadTime, moreEntitiesToWrite); } @After @@ -77,6 +88,7 @@ public void testE2EV1Read() throws Exception { Query query = V1TestUtil.makeAncestorKindQuery(options.getKind(), options.getNamespace(), ancestor); + // Read entities without readTime. DatastoreV1.Read read = DatastoreIO.v1() .read() @@ -88,8 +100,23 @@ public void testE2EV1Read() throws Exception { Pipeline p = Pipeline.create(options); PCollection count = p.apply(read).apply(Count.globally()); - PAssert.thatSingleton(count).isEqualTo(numEntities); + PAssert.thatSingleton(count).isEqualTo(totalNumEntities); p.run(); + + // Read entities with readTime. + DatastoreV1.Read snapshotRead = + DatastoreIO.v1() + .read() + .withProjectId(project) + .withQuery(query) + .withNamespace(options.getNamespace()) + .withReadTime(readTime); + + Pipeline p2 = Pipeline.create(options); + PCollection count2 = p2.apply(snapshotRead).apply(Count.globally()); + + PAssert.thatSingleton(count2).isEqualTo(numEntitiesBeforeReadTime); + p2.run(); } @Test @@ -114,12 +141,13 @@ private void testE2EV1ReadWithGQLQuery(long limit) throws Exception { "SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')", options.getKind(), options.getKind(), ancestor); - long expectedNumEntities = numEntities; + long expectedNumEntities = totalNumEntities; if (limit > 0) { gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit); expectedNumEntities = limit; } + // Read entities without readTime. DatastoreV1.Read read = DatastoreIO.v1() .read() @@ -133,18 +161,36 @@ private void testE2EV1ReadWithGQLQuery(long limit) throws Exception { PAssert.thatSingleton(count).isEqualTo(expectedNumEntities); p.run(); + + // Read entities with readTime. + DatastoreV1.Read snapshotRead = + DatastoreIO.v1() + .read() + .withProjectId(project) + .withLiteralGqlQuery(gqlQuery) + .withNamespace(options.getNamespace()) + .withReadTime(readTime); + + Pipeline p2 = Pipeline.create(options); + PCollection count2 = p.apply(snapshotRead).apply(Count.globally()); + + long expectedNumEntities2 = limit > 0 ? limit : numEntitiesBeforeReadTime; + PAssert.thatSingleton(count2).isEqualTo(expectedNumEntities2); + p2.run(); } // Creates entities and write them to datastore private static void writeEntitiesToDatastore( - V1TestOptions options, String project, String ancestor, long numEntities) throws Exception { + V1TestOptions options, String project, String ancestor, long valueOffset, long numEntities) + throws Exception { Datastore datastore = getDatastore(options, project); // Write test entities to datastore V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder()); Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor); for (long i = 0; i < numEntities; i++) { - Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0); + Entity entity = + makeEntity(valueOffset + i, ancestorKey, options.getKind(), options.getNamespace(), 0); writer.write(entity); } writer.close();