From 059bb0e503635c86c9b9ea570af9f80fc75763ca Mon Sep 17 00:00:00 2001 From: Jisha Abubaker Date: Wed, 22 Nov 2017 09:08:49 -0800 Subject: [PATCH] updating pub/sub samples (#928) * updating pub/sub samples * another update --- .../flexible/pubsub/PubSubPublish.java | 4 +- .../pubsub/CreatePullSubscriptionExample.java | 9 ++- .../example/pubsub/CreateTopicExample.java | 7 +- .../com/example/pubsub/PublisherExample.java | 65 ++++++++++++------- .../com/example/pubsub/SubscriberExample.java | 4 +- .../java/com/example/pubsub/QuickStartIT.java | 9 +-- 6 files changed, 62 insertions(+), 36 deletions(-) diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java index 9974392f567..58df271205e 100644 --- a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java @@ -39,8 +39,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) String topicId = System.getenv("PUBSUB_TOPIC"); // create a publisher on the topic if (publisher == null) { - publisher = Publisher.defaultBuilder( - TopicName.create(ServiceOptions.getDefaultProjectId(), topicId)) + publisher = Publisher.newBuilder( + TopicName.of(ServiceOptions.getDefaultProjectId(), topicId)) .build(); } // construct a pubsub message from the payload diff --git a/pubsub/cloud-client/src/main/java/com/example/pubsub/CreatePullSubscriptionExample.java b/pubsub/cloud-client/src/main/java/com/example/pubsub/CreatePullSubscriptionExample.java index e114c42328f..f9e4531e057 100644 --- a/pubsub/cloud-client/src/main/java/com/example/pubsub/CreatePullSubscriptionExample.java +++ b/pubsub/cloud-client/src/main/java/com/example/pubsub/CreatePullSubscriptionExample.java @@ -17,6 +17,7 @@ package com.example.pubsub; // [START pubsub_quickstart_create_subscription] +import com.google.api.gax.rpc.ApiException; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.pubsub.v1.PushConfig; @@ -43,15 +44,19 @@ public static void main(String... args) throws Exception { // Your subscription ID eg. "my-sub" String subscriptionId = args[1]; - TopicName topicName = TopicName.create(projectId, topicId); + TopicName topicName = TopicName.of(projectId, topicId); // Create a new subscription - SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId); + SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId); try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { // create a pull subscription with default acknowledgement deadline (= 10 seconds) Subscription subscription = subscriptionAdminClient.createSubscription( subscriptionName, topicName, PushConfig.getDefaultInstance(), 0); + } catch (ApiException e) { + // example : code = ALREADY_EXISTS(409) implies subscription already exists + System.out.print(e.getStatusCode().getCode()); + System.out.print(e.isRetryable()); } System.out.printf( diff --git a/pubsub/cloud-client/src/main/java/com/example/pubsub/CreateTopicExample.java b/pubsub/cloud-client/src/main/java/com/example/pubsub/CreateTopicExample.java index 8aae406c8b7..86a60e6885d 100644 --- a/pubsub/cloud-client/src/main/java/com/example/pubsub/CreateTopicExample.java +++ b/pubsub/cloud-client/src/main/java/com/example/pubsub/CreateTopicExample.java @@ -18,6 +18,7 @@ // [START pubsub_quickstart_create_topic] // Imports the Google Cloud client library +import com.google.api.gax.rpc.ApiException; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.pubsub.v1.TopicName; @@ -39,9 +40,13 @@ public static void main(String... args) throws Exception { String topicId = args[0]; // Create a new topic - TopicName topic = TopicName.create(projectId, topicId); + TopicName topic = TopicName.of(projectId, topicId); try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { topicAdminClient.createTopic(topic); + } catch (ApiException e) { + // example : code = ALREADY_EXISTS(409) implies topic already exists + System.out.print(e.getStatusCode().getCode()); + System.out.print(e.isRetryable()); } System.out.printf("Topic %s:%s created.\n", topic.getProject(), topic.getTopic()); diff --git a/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java b/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java index debfdb29d25..87cb5c0b7c9 100644 --- a/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java +++ b/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java @@ -18,52 +18,67 @@ // [START pubsub_quickstart_publisher] import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiException; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; -import java.util.ArrayList; -import java.util.List; public class PublisherExample { - static final int MESSAGE_COUNT = 5; - // use the default project id private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId(); - //schedule a message to be published, messages are automatically batched - private static ApiFuture publishMessage(Publisher publisher, String message) - throws Exception { - // convert message to bytes - ByteString data = ByteString.copyFromUtf8(message); - PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); - return publisher.publish(pubsubMessage); - } - - /** Publish messages to a topic. */ + /** Publish messages to a topic. + * @param args topic name, number of messages + */ public static void main(String... args) throws Exception { // topic id, eg. "my-topic" String topicId = args[0]; - TopicName topicName = TopicName.create(PROJECT_ID, topicId); + int messageCount = Integer.parseInt(args[1]); + TopicName topicName = TopicName.of(PROJECT_ID, topicId); Publisher publisher = null; - List> apiFutures = new ArrayList<>(); try { // Create a publisher instance with default settings bound to the topic - publisher = Publisher.defaultBuilder(topicName).build(); - for (int i = 0; i < MESSAGE_COUNT; i++) { + publisher = Publisher.newBuilder(topicName).build(); + + for (int i = 0; i < messageCount; i++) { String message = "message-" + i; - ApiFuture messageId = publishMessage(publisher, message); - apiFutures.add(messageId); + + // convert message to bytes + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder() + .setData(data) + .build(); + + //schedule a message to be published, messages are automatically batched + ApiFuture future = publisher.publish(pubsubMessage); + + // add an asynchronous callback to handle success / failure + ApiFutures.addCallback(future, new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // details on the API exception + System.out.println(apiException.getStatusCode().getCode()); + System.out.println(apiException.isRetryable()); + } + System.out.println("Error publishing message : " + message); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + System.out.println(messageId); + } + }); } } finally { - // Once published, returns server-assigned message ids (unique within the topic) - List messageIds = ApiFutures.allAsList(apiFutures).get(); - for (String messageId : messageIds) { - System.out.println(messageId); - } if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); diff --git a/pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java b/pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java index 6610c8a828e..54f19c856e2 100644 --- a/pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java +++ b/pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java @@ -47,12 +47,12 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { public static void main(String... args) throws Exception { // set subscriber id, eg. my-sub String subscriptionId = args[0]; - SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId); + SubscriptionName subscriptionName = SubscriptionName.of(PROJECT_ID, subscriptionId); Subscriber subscriber = null; try { // create a subscriber bound to the asynchronous message receiver subscriber = - Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build(); + Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build(); subscriber.startAsync().awaitRunning(); // Continue to listen to messages while (true) { diff --git a/pubsub/cloud-client/src/test/java/com/example/pubsub/QuickStartIT.java b/pubsub/cloud-client/src/test/java/com/example/pubsub/QuickStartIT.java index d396240f8c0..eb7a7766a7c 100644 --- a/pubsub/cloud-client/src/test/java/com/example/pubsub/QuickStartIT.java +++ b/pubsub/cloud-client/src/test/java/com/example/pubsub/QuickStartIT.java @@ -49,6 +49,7 @@ public class QuickStartIT { private String projectId = ServiceOptions.getDefaultProjectId(); private String topicId = formatForTest("my-topic"); private String subscriptionId = formatForTest("my-sub"); + private int messageCount = 5; class SubscriberRunnable implements Runnable { @@ -104,9 +105,9 @@ public void testQuickstart() throws Exception { bout.reset(); // publish messages - PublisherExample.main(topicId); + PublisherExample.main(topicId, String.valueOf(messageCount)); String[] messageIds = bout.toString().split("\n"); - assertThat(messageIds).hasLength(PublisherExample.MESSAGE_COUNT); + assertThat(messageIds).hasLength(messageCount); bout.reset(); // receive messages @@ -132,7 +133,7 @@ private String formatForTest(String name) { private void deleteTestTopic() throws Exception { try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { - topicAdminClient.deleteTopic(TopicName.create(projectId, topicId)); + topicAdminClient.deleteTopic(TopicName.of(projectId, topicId)); } catch (IOException e) { System.err.println("Error deleting topic " + e.getMessage()); } @@ -141,7 +142,7 @@ private void deleteTestTopic() throws Exception { private void deleteTestSubscription() throws Exception { try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { subscriptionAdminClient.deleteSubscription( - SubscriptionName.create(projectId, subscriptionId)); + SubscriptionName.of(projectId, subscriptionId)); } catch (IOException e) { System.err.println("Error deleting subscription " + e.getMessage()); }