From 3a1e973f02b5894274450ee22c932a3394dd3988 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 26 Sep 2024 15:27:54 -0400 Subject: [PATCH] Revert "fix: add topic existing validation (#32465)" (#32572) This reverts commit 380ed7b7f746a379a6b9bd902327fcb0ba1b2139. --- CHANGES.md | 4 - .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 3 - .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 16 -- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 94 +--------- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 14 -- .../sdk/io/gcp/pubsub/PubsubTestClient.java | 6 - .../io/gcp/pubsub/PubsubGrpcClientTest.java | 40 ----- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 170 ------------------ .../io/gcp/pubsub/PubsubJsonClientTest.java | 24 --- 9 files changed, 1 insertion(+), 370 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a0133bd531ca..c123a8e1a4dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,10 +59,6 @@ * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) -## I/Os - -* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465)) - ## New Features / Improvements * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index bd01604643e1..2964a29dbb6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -507,9 +507,6 @@ public abstract void modifyAckDeadline( /** Return a list of topics for {@code project}. */ public abstract List listTopics(ProjectPath project) throws IOException; - /** Return {@literal true} if {@code topic} exists. */ - public abstract boolean isTopicExists(TopicPath topic) throws IOException; - /** Create {@code subscription} to {@code topic}. */ public abstract void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 0cfb06688108..93fdd5524007 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -54,7 +54,6 @@ import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; -import io.grpc.StatusRuntimeException; import io.grpc.auth.ClientAuthInterceptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -373,21 +372,6 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } - @Override - public boolean isTopicExists(TopicPath topic) throws IOException { - GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build(); - try { - publisherStub().getTopic(request); - return true; - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { - return false; - } - - throw e; - } - } - @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 4d0586aa85af..f59a68c40551 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -861,8 +860,6 @@ public abstract static class Read extends PTransform> abstract ErrorHandler getBadRecordErrorHandler(); - abstract boolean getValidate(); - abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -874,7 +871,6 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setNeedsOrderingKey(false); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); - builder.setValidate(true); return builder; } @@ -922,8 +918,6 @@ abstract static class Builder { abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); - abstract Builder setValidate(boolean validation); - abstract Read build(); } @@ -1103,11 +1097,6 @@ public Read withErrorHandler(ErrorHandler badRecordErrorHandler .build(); } - /** Disable validation of the existence of the topic. */ - public Read withoutValidation() { - return toBuilder().setValidate(false).build(); - } - @VisibleForTesting /** * Set's the internal Clock. @@ -1273,35 +1262,6 @@ public T apply(PubsubMessage input) { return read.setCoder(getCoder()); } - @Override - public void validate(PipelineOptions options) { - if (!getValidate()) { - return; - } - - PubsubOptions psOptions = options.as(PubsubOptions.class); - - // Validate the existence of the topic. - if (getTopicProvider() != null) { - PubsubTopic topic = getTopicProvider().get(); - boolean topicExists = true; - try (PubsubClient pubsubClient = - getPubsubClientFactory() - .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { - topicExists = - pubsubClient.isTopicExists( - PubsubClient.topicPathFromName(topic.project, topic.topic)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - if (!topicExists) { - throw new IllegalArgumentException( - String.format("Pubsub topic '%s' does not exist.", topic)); - } - } - } - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); @@ -1381,8 +1341,6 @@ public abstract static class Write extends PTransform, PDone> abstract ErrorHandler getBadRecordErrorHandler(); - abstract boolean getValidate(); - abstract Builder toBuilder(); static Builder newBuilder( @@ -1392,7 +1350,6 @@ static Builder newBuilder( builder.setFormatFn(formatFn); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); - builder.setValidate(true); return builder; } @@ -1429,8 +1386,6 @@ abstract Builder setFormatFn( abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); - abstract Builder setValidate(boolean validation); - abstract Write build(); } @@ -1441,14 +1396,11 @@ abstract Builder setBadRecordErrorHandler( * {@code topic} string. */ public Write to(String topic) { - ValueProvider topicProvider = StaticValueProvider.of(topic); - validateTopic(topicProvider); - return to(topicProvider); + return to(StaticValueProvider.of(topic)); } /** Like {@code topic()} but with a {@link ValueProvider}. */ public Write to(ValueProvider topic) { - validateTopic(topic); return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .setTopicFunction(null) @@ -1469,13 +1421,6 @@ public Write to(SerializableFunction, String> topicFun .build(); } - /** Handles validation of {@code topic}. */ - private static void validateTopic(ValueProvider topic) { - if (topic.isAccessible()) { - PubsubTopic.fromPath(topic.get()); - } - } - /** * The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client @@ -1552,14 +1497,6 @@ public Write withErrorHandler(ErrorHandler badRecordErrorHandle .build(); } - /** - * Disable validation of the existence of the topic. Validation of the topic works only if the - * topic is set statically and not dynamically. - */ - public Write withoutValidation() { - return toBuilder().setValidate(false).build(); - } - @Override public PDone expand(PCollection input) { if (getTopicProvider() == null && !getDynamicDestinations()) { @@ -1636,35 +1573,6 @@ public void populateDisplayData(DisplayData.Builder builder) { builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); } - @Override - public void validate(PipelineOptions options) { - if (!getValidate()) { - return; - } - - PubsubOptions psOptions = options.as(PubsubOptions.class); - - // Validate the existence of the topic. - if (getTopicProvider() != null) { - PubsubTopic topic = getTopicProvider().get(); - boolean topicExists = true; - try (PubsubClient pubsubClient = - getPubsubClientFactory() - .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { - topicExists = - pubsubClient.isTopicExists( - PubsubClient.topicPathFromName(topic.project, topic.topic)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - if (!topicExists) { - throw new IllegalArgumentException( - String.format("Pubsub topic '%s' does not exist.", topic)); - } - } - } - /** * Writer to Pubsub which batches messages from bounded collections. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 0a838da66f69..386febcf005b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; @@ -311,19 +310,6 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } - @Override - public boolean isTopicExists(TopicPath topic) throws IOException { - try { - pubsub.projects().topics().get(topic.getPath()).execute(); - return true; - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == 404) { - return false; - } - throw e; - } - } - @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index 3d5a879fce15..a8109d05ec38 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -605,12 +605,6 @@ public List listTopics(ProjectPath project) throws IOException { throw new UnsupportedOperationException(); } - @Override - public boolean isTopicExists(TopicPath topic) throws IOException { - // Always return true for testing purposes. - return true; - } - @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 6c4625f2e077..3724e169c612 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -40,7 +40,6 @@ import com.google.pubsub.v1.Topic; import io.grpc.ManagedChannel; import io.grpc.Server; -import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -433,43 +432,4 @@ public void getSchema(GetSchemaRequest request, StreamObserver responseO server.shutdownNow(); } } - - @Test - public void isTopicExists() throws IOException { - initializeClient(null, null); - TopicPath topicDoesNotExist = - PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist"); - TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist"); - - PublisherImplBase publisherImplBase = - new PublisherImplBase() { - @Override - public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { - String topicPath = request.getTopic(); - if (topicPath.equals(topicDoesNotExist.getPath())) { - responseObserver.onError( - new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND))); - } - if (topicPath.equals(topicExists.getPath())) { - responseObserver.onNext( - Topic.newBuilder() - .setName(topicPath) - .setSchemaSettings( - SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build()) - .build()); - responseObserver.onCompleted(); - } - } - }; - Server server = - InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start(); - try { - assertEquals(false, client.isTopicExists(topicDoesNotExist)); - - assertEquals(true, client.isTopicExists(topicExists)); - - } finally { - server.shutdownNow(); - } - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 0f4c929619a5..d4effbae40a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -83,7 +83,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.commons.lang3.RandomStringUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -98,7 +97,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.model.Statement; -import org.mockito.Mockito; /** Tests for PubsubIO Read and Write transforms. */ @RunWith(JUnit4.class) @@ -930,172 +928,4 @@ public void testBigMessageBounded() throws IOException { pipeline.run(); } } - - @Test - public void testReadValidate() throws IOException { - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - Read read = - Read.newBuilder() - .setTopicProvider( - StaticValueProvider.of( - PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) - .setTimestampAttribute("myTimestamp") - .setIdAttribute("myId") - .setPubsubClientFactory(mockFactory) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) - .build(); - - read.validate(options); - } - - @Test - public void testReadValidateTopicIsNotExists() throws Exception { - thrown.expect(IllegalArgumentException.class); - - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - Read read = - Read.newBuilder() - .setTopicProvider( - StaticValueProvider.of( - PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) - .setTimestampAttribute("myTimestamp") - .setIdAttribute("myId") - .setPubsubClientFactory(mockFactory) - .setCoder(PubsubMessagePayloadOnlyCoder.of()) - .build(); - - read.validate(options); - } - - @Test - public void testReadWithoutValidation() throws IOException { - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - Read read = - PubsubIO.readMessages() - .fromTopic("projects/test-project/topics/nonExistingTopic") - .withoutValidation(); - - read.validate(options); - } - - @Test - public void testWriteTopicValidationSuccess() throws Exception { - PubsubIO.writeStrings().to("projects/my-project/topics/abc"); - PubsubIO.writeStrings().to("projects/my-project/topics/ABC"); - PubsubIO.writeStrings().to("projects/my-project/topics/AbC-DeF"); - PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234"); - PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); - PubsubIO.writeStrings() - .to( - new StringBuilder() - .append("projects/my-project/topics/A-really-long-one-") - .append(RandomStringUtils.randomAlphanumeric(100)) - .toString()); - } - - @Test - public void testWriteTopicValidationBadCharacter() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.writeStrings().to("projects/my-project/topics/abc-*-abc"); - } - - @Test - public void testWriteValidationTooLong() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.writeStrings() - .to( - new StringBuilder() - .append("projects/my-project/topics/A-really-long-one-") - .append(RandomStringUtils.randomAlphanumeric(1000)) - .toString()); - } - - @Test - public void testWriteValidate() throws IOException { - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - PubsubIO.Write write = - PubsubIO.Write.newBuilder() - .setTopicProvider( - StaticValueProvider.of( - PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) - .setTimestampAttribute("myTimestamp") - .setIdAttribute("myId") - .setDynamicDestinations(false) - .setPubsubClientFactory(mockFactory) - .build(); - - write.validate(options); - } - - @Test - public void testWriteValidateTopicIsNotExists() throws Exception { - thrown.expect(IllegalArgumentException.class); - - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - PubsubIO.Write write = - PubsubIO.Write.newBuilder() - .setTopicProvider( - StaticValueProvider.of( - PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) - .setTimestampAttribute("myTimestamp") - .setIdAttribute("myId") - .setDynamicDestinations(false) - .setPubsubClientFactory(mockFactory) - .build(); - - write.validate(options); - } - - @Test - public void testWithoutValidation() throws IOException { - PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); - PubsubClient mockClient = Mockito.mock(PubsubClient.class); - Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); - PubsubClient.PubsubClientFactory mockFactory = - Mockito.mock(PubsubClient.PubsubClientFactory.class); - Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); - - PubsubIO.Write write = - PubsubIO.writeMessages() - .to("projects/test-project/topics/nonExistingTopic") - .withoutValidation(); - - write.validate(options); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 5ee32825db1f..634ad42c937a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -23,10 +23,6 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpResponseException; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; import com.google.api.services.pubsub.Pubsub.Projects.Topics; @@ -312,26 +308,6 @@ private static Topic buildTopic(int i) { return topic; } - @Test - public void isTopicExists() throws Exception { - TopicPath topicExists = - PubsubClient.topicPathFromPath("projects/testProject/topics/topicExists"); - TopicPath topicDoesNotExist = - PubsubClient.topicPathFromPath("projects/testProject/topics/topicDoesNotExist"); - HttpResponseException.Builder builder = - new HttpResponseException.Builder(404, "topic is not found", new HttpHeaders()); - GoogleJsonError error = new GoogleJsonError(); - when(mockPubsub.projects().topics().get(topicExists.getPath()).execute()) - .thenReturn(new Topic().setName(topicExists.getName())); - when(mockPubsub.projects().topics().get(topicDoesNotExist.getPath()).execute()) - .thenThrow(new GoogleJsonResponseException(builder, error)); - - client = new PubsubJsonClient(null, null, mockPubsub); - - assertEquals(true, client.isTopicExists(topicExists)); - assertEquals(false, client.isTopicExists(topicDoesNotExist)); - } - @Test public void listSubscriptions() throws Exception { ListSubscriptionsResponse expectedResponse1 = new ListSubscriptionsResponse();