From 53a4844f09eace777142b8cdcd06bc07cef0b432 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 1 Jul 2024 11:25:55 -0400 Subject: [PATCH] docs(samples): Optimistic subscribe sample (#2063) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs(samples): Add code sample for optimistic subscribe * docs(samples): Fix formatting on test * docs(samples): Use an error listener instead of catching an exception for the OptimisticSubscribeExample * test: Add exception handler to OptimisticSubscribeExample * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 7 +- .../pubsub/OptimisticSubscribeExample.java | 103 ++++++++++++++++++ .../src/test/java/pubsub/SubscriberIT.java | 19 ++++ 3 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java diff --git a/README.md b/README.md index 55d765048..195200dba 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-pubsub' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-pubsub:1.130.1' +implementation 'com.google.cloud:google-cloud-pubsub:1.131.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.130.1" +libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0" ``` @@ -275,6 +275,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | List Subscriptions In Project Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) | | List Subscriptions In Topic Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) | | List Topics Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListTopicsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListTopicsExample.java) | +| Optimistic Subscribe Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java) | | Publish Avro Records Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishAvroRecordsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishAvroRecordsExample.java) | | Publish Protobuf Messages Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishProtobufMessagesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishProtobufMessagesExample.java) | | Publish With Batch Settings Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishWithBatchSettingsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishWithBatchSettingsExample.java) | @@ -411,7 +412,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsub/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-pubsub.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.130.1 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.131.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java b/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java new file mode 100644 index 000000000..fbc9a183b --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_optimistic_subscribe] + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class OptimisticSubscribeExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String subscriptionId = "your-subscription-id"; + String topicId = "your-topic-id"; + + optimisticSubscribeExample(projectId, subscriptionId, topicId); + } + + public static void optimisticSubscribeExample( + String projectId, String subscriptionId, String topicId) throws IOException { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + + // Listen for resource NOT_FOUND errors and rebuild the subscriber and restart subscribing + // when the current subscriber encounters these errors. + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + System.out.println(failure.getStackTrace()); + if (failure instanceof NotFoundException) { + try (SubscriptionAdminClient subscriptionAdminClient = + SubscriptionAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + // Create a pull subscription with default acknowledgement deadline of 10 seconds. + // The client library will automatically extend acknowledgement deadlines. + Subscription subscription = + subscriptionAdminClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 10); + System.out.println("Created pull subscription: " + subscription.getName()); + optimisticSubscribeExample(projectId, subscriptionId, topicId); + } catch (IOException err) { + System.out.println("Failed to create pull subscription: " + err.getMessage()); + } + } + } + }, + MoreExecutors.directExecutor()); + + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (IllegalStateException e) { + // Prevent an exception from being thrown if it is the expected NotFoundException + if (!(subscriber.failureCause() instanceof NotFoundException)) { + throw e; + } + } catch (TimeoutException e) { + subscriber.stopAsync(); + } + } +} + +// [END pubsub_optimistic_subscribe] diff --git a/samples/snippets/src/test/java/pubsub/SubscriberIT.java b/samples/snippets/src/test/java/pubsub/SubscriberIT.java index ee4e03068..3f91a2a31 100644 --- a/samples/snippets/src/test/java/pubsub/SubscriberIT.java +++ b/samples/snippets/src/test/java/pubsub/SubscriberIT.java @@ -56,12 +56,16 @@ public class SubscriberIT { private static final String subscriptionId = "subscriber-test-subscription-" + _suffix; // For a subscription with exactly once delivery enabled. private static final String subscriptionEodId = "subscriber-test-subscription-eod" + _suffix; + private static final String subscriptionOptimisticId = + "subscriber-test-subscription-optimistic" + _suffix; private static final TopicName topicName = TopicName.of(projectId, topicId); private static final TopicName topicNameEod = TopicName.of(projectId, topicIdEod); private static final ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); private static final ProjectSubscriptionName subscriptionEodName = ProjectSubscriptionName.of(projectId, subscriptionEodId); + private static final ProjectSubscriptionName subscriptionOptimisticName = + ProjectSubscriptionName.of(projectId, subscriptionOptimisticId); private static void requireEnvVar(String varName) { assertNotNull( @@ -163,6 +167,11 @@ public void tearDown() throws Exception { try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { subscriptionAdminClient.deleteSubscription(subscriptionName.toString()); subscriptionAdminClient.deleteSubscription(subscriptionEodName.toString()); + try { + subscriptionAdminClient.deleteSubscription(subscriptionOptimisticName.toString()); + } catch (Exception e) { + // Ignore exception. + } } try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { @@ -240,4 +249,14 @@ public void testSubscriberExactlyOnceDelivery() throws Exception { assertThat(bout.toString()).contains("Message successfully acked: " + messageId); } } + + @Test + public void testOptimisticSubscriber() throws Exception { + bout.reset(); + OptimisticSubscribeExample.optimisticSubscribeExample( + projectId, subscriptionOptimisticId, topicId); + assertThat( + bout.toString() + .contains("Created pull subscription: " + subscriptionOptimisticName.toString())); + } }