> listTopicsAsync(ListOption... options);
- /**
- * Publishes a message to the provided topic. This method returns a service-generated id for the
- * published message. Service-generated ids are guaranteed to be unique within the topic.
- *
- * Example of publishing one message to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * Message message = Message.of("payload");
- * String messageId = pubsub.publish(topicName, message);
- * }
- *
- * @param topic the topic where the message is published
- * @param message the message to publish
- * @return a unique service-generated id for the message
- * @throws PubSubException upon failure, if the topic does not exist or if the message has empty
- * payload and no attributes
- */
- String publish(String topic, Message message);
-
- /**
- * Sends a request for publishing a message to the provided topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated
- * id for the published message. Service-generated ids are guaranteed to be unique within the
- * topic.
- *
- * Example of asynchronously publishing one message to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * Message message = Message.of("payload");
- * Future future = pubsub.publishAsync(topicName, message);
- * // ...
- * String messageId = future.get();
- * }
- *
- * @param topic the topic where the message is published
- * @param message the message to publish
- * @return a {@code Future} for the unique service-generated id for the message
- */
- Future publishAsync(String topic, Message message);
-
- /**
- * Publishes a number of messages to the provided topic. This method returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of publishing some messages to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * Message message1 = Message.of("payload1");
- * Message message2 = Message.of("payload2");
- * List messageIds = pubsub.publish(topicName, message1, message2);
- * }
- *
- * @param topic the topic where the message is published
- * @param message the first message to publish
- * @param messages other messages to publish
- * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
- * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
- * empty payload and no attributes
- */
- List publish(String topic, Message message, Message... messages);
-
- /**
- * Sends a request to publish a number of messages to the provided topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of asynchronously publishing some messages to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * Message message1 = Message.of("payload1");
- * Message message2 = Message.of("payload2");
- * Future> future = pubsub.publishAsync(topicName, message1, message2);
- * // ...
- * List messageIds = future.get();
- * }
- *
- * @param topic the topic where the message is published
- * @param message the first message to publish
- * @param messages other messages to publish
- * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
- * the messages.
- */
- Future> publishAsync(String topic, Message message, Message... messages);
-
- /**
- * Publishes a number of messages to the provided topic. This method returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of publishing a list of messages to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * List messages = new LinkedList<>();
- * messages.add(Message.of("payload1"));
- * messages.add(Message.of("payload2"));
- * List messageIds = pubsub.publish(topicName, messages);
- * }
- *
- * @param topic the topic where the message is published
- * @param messages the messages to publish
- * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
- * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
- * empty payload and no attributes
- */
- List publish(String topic, Iterable messages);
-
- /**
- * Sends a request to publish a number of messages to the provided topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of asynchronously publishing a list of messages to a topic.
- *
{@code
- * String topicName = "my_topic_name";
- * List messages = new LinkedList<>();
- * messages.add(Message.of("payload1"));
- * messages.add(Message.of("payload2"));
- * Future> future = pubsub.publishAsync(topicName, messages);
- * // ...
- * List messageIds = future.get();
- * }
- *
- * @param topic the topic where the message is published
- * @param messages the messages to publish
- * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
- * the messages
- */
- Future> publishAsync(String topic, Iterable messages);
+ Publisher getPublisher(TopicInfo topic) throws IOException;
/**
* Creates a new subscription.
@@ -672,7 +542,7 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) {
*/
Future> listSubscriptionsAsync(String topic, ListOption... options);
- Subscriber subscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver)
+ Subscriber getSubscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver)
throws IOException;
/**
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index 67a9e5f4b3d0..dc0d56b7cfdb 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -34,7 +34,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
@@ -55,10 +54,7 @@
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
import com.google.pubsub.v1.ModifyPushConfigRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -289,54 +285,13 @@ public Future> listTopicsAsync(ListOption... options) {
}
@Override
- public String publish(String topic, Message message) {
- return get(publishAsync(topic, message));
- }
-
- private static PublishRequest publishRequest(PubSubOptions serviceOptions, String topic,
- Iterable messages) {
- PublishRequest.Builder builder = PublishRequest.newBuilder();
- builder.setTopic(PublisherClient.formatTopicName(serviceOptions.getProjectId(), topic));
- builder.addAllMessages(Iterables.transform(messages, Message.TO_PB_FUNCTION));
- return builder.build();
- }
-
- @Override
- public Future publishAsync(String topic, Message message) {
- return transform(
- rpc.publish(publishRequest(getOptions(), topic, Collections.singletonList(message))),
- new Function() {
- @Override
- public String apply(PublishResponse publishResponse) {
- return publishResponse.getMessageIdsList().get(0);
- }
- });
- }
-
- @Override
- public List publish(String topic, Message message, Message... messages) {
- return publish(topic, Lists.asList(message, messages));
- }
-
- @Override
- public Future> publishAsync(String topic, Message message, Message... messages) {
- return publishAsync(topic, Lists.asList(message, messages));
- }
-
- @Override
- public List publish(String topic, Iterable messages) {
- return get(publishAsync(topic, messages));
- }
-
- @Override
- public Future> publishAsync(String topic, Iterable messages) {
- return transform(rpc.publish(publishRequest(getOptions(), topic, messages)),
- new Function>() {
- @Override
- public List apply(PublishResponse publishResponse) {
- return publishResponse.getMessageIdsList();
- }
- });
+ public Publisher getPublisher(TopicInfo topic) throws IOException {
+ // TODO(pongad): Provide a way to pass in the rest of the options.
+ String topicName =
+ PublisherClient.formatTopicName(getOptions().getProjectId(), topic.getName());
+ return Publisher.Builder.newBuilder(topicName)
+ .setCredentials(getOptions().getCredentials())
+ .build();
}
@Override
@@ -493,8 +448,8 @@ public Future> listSubscriptionsAsync(String topic,
}
@Override
- public Subscriber subscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver)
- throws IOException {
+ public Subscriber getSubscriber(
+ SubscriptionInfo subscription, Subscriber.MessageReceiver receiver) throws IOException {
// TODO(pongad): Provide a way to pass in the rest of the options.
String subName =
SubscriberClient.formatSubscriptionName(
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
index 3a5a8dd7c58c..c995294877ff 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
@@ -202,138 +202,6 @@ public Future reloadAsync() {
return pubsub.getTopicAsync(getName());
}
- /**
- * Publishes a message to this topic. This method returns a service-generated id for the published
- * message. Service-generated ids are guaranteed to be unique within the topic.
- *
- * Example of publishing one message to the topic.
- *
{@code
- * Message message = Message.of("payload");
- * String messageId = topic.publish(message);
- * }
- *
- * @param message the message to publish
- * @return a unique service-generated id for the message
- * @throws PubSubException upon failure, if the topic does not exist or if the message has empty
- * payload and no attributes
- */
- public String publish(Message message) {
- return pubsub.publish(getName(), message);
- }
-
- /**
- * Sends a request for publishing a message to the this topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated
- * id for the published message. Service-generated ids are guaranteed to be unique within the
- * topic.
- *
- * Example of asynchronously publishing one message to the topic.
- *
{@code
- * Message message = Message.of("payload");
- * Future future = topic.publishAsync(message);
- * // ...
- * String messageId = future.get();
- * }
- *
- * @param message the message to publish
- * @return a {@code Future} for the unique service-generated id for the message
- */
- public Future publishAsync(Message message) {
- return pubsub.publishAsync(getName(), message);
- }
-
- /**
- * Publishes a number of messages to this topic. This method returns a list of service-generated
- * ids for the published messages. Service-generated ids are guaranteed to be unique within the
- * topic.
- *
- * Example of publishing some messages to the topic.
- *
{@code
- * Message message1 = Message.of("payload1");
- * Message message2 = Message.of("payload2");
- * List messageIds = topic.publish(message1, message2);
- * }
- *
- * @param message the first message to publish
- * @param messages other messages to publish
- * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
- * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
- * empty payload and no attributes
- */
- public List publish(Message message, Message... messages) {
- return pubsub.publish(getName(), message, messages);
- }
-
- /**
- * Sends a request to publish a number of messages to this topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of asynchronously publishing some messages to the topic.
- *
{@code
- * Message message1 = Message.of("payload1");
- * Message message2 = Message.of("payload2");
- * Future> future = topic.publishAsync(message1, message2);
- * // ...
- * List messageIds = future.get();
- * }
- *
- * @param message the first message to publish
- * @param messages other messages to publish
- * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
- * the messages.
- */
- public Future> publishAsync(Message message, Message... messages) {
- return pubsub.publishAsync(getName(), message, messages);
- }
-
- /**
- * Publishes a number of messages to this topic. This method returns a list ofservice-generated
- * ids for the published messages. Service-generated ids are guaranteed to be unique within the
- * topic.
- *
- * Example of publishing a list of messages to the topic.
- *
{@code
- * List messages = new LinkedList<>();
- * messages.add(Message.of("payload1"));
- * messages.add(Message.of("payload2"));
- * List messageIds = topic.publish(messages);
- * }
- *
- * @param messages the messages to publish
- * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
- * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
- * empty payload and no attributes
- */
- public List publish(Iterable messages) {
- return pubsub.publish(getName(), messages);
- }
-
- /**
- * Sends a request to publish a number of messages to this topic. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a list of
- * service-generated ids for the published messages. Service-generated ids are guaranteed to be
- * unique within the topic.
- *
- * Example of asynchronously publishing a list of messages to the topic.
- *
{@code
- * List messages = new LinkedList<>();
- * messages.add(Message.of("payload1"));
- * messages.add(Message.of("payload2"));
- * Future> future = topic.publishAsync(messages);
- * // ...
- * List messageIds = future.get();
- * }
- *
- * @param messages the messages to publish
- * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
- * the messages.
- */
- public Future> publishAsync(Iterable messages) {
- return pubsub.publishAsync(getName(), messages);
- }
-
/**
* Lists the identities of the subscriptions for this topic. This method returns a {@link Page}
* object that can be used to consume paginated results. Use {@link ListOption} to specify the
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
index cdafc29b8556..eac041476f34 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
@@ -138,78 +138,6 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio
assertTrue(topic3.delete());
}
- @Test
- public void testPublishOneMessage() {
- String topic = formatForTest("test-publish-one-message-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message = Message.of("payload");
- assertNotNull(pubsub().publish(topic, message));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPublishNonExistingTopic() {
- String topic = formatForTest("test-publish-non-existing-topic");
- Message message = Message.of("payload");
- thrown.expect(PubSubException.class);
- pubsub().publish(topic, message);
- }
-
- @Test
- public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-publish-one-message-async-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message = Message.of("payload");
- Future publishFuture = pubsub().publishAsync(topic, message);
- assertNotNull(publishFuture.get());
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPublishMoreMessages() {
- String topic = formatForTest("test-publish-more-messages-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = pubsub().publish(topic, message1, message2);
- assertEquals(2, messageIds.size());
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-publish-more-messages-topic-async-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- Future> publishFuture = pubsub().publishAsync(topic, message1, message2);
- assertEquals(2, publishFuture.get().size());
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPublishMessageList() {
- String topic = formatForTest("test-publish-message-list-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPublishMessagesListAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-publish-message-list-async-topic");
- pubsub().create(TopicInfo.of(topic));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- Future> publishFuture =
- pubsub().publishAsync(topic, ImmutableList.of(message1, message2));
- assertEquals(2, publishFuture.get().size());
- assertTrue(pubsub().deleteTopic(topic));
- }
-
@Test
public void testCreateGetAndDeleteSubscription() {
String topic = formatForTest("test-create-get-delete-subscription-topic");
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
index 23beb76c8b06..3f02a7628805 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
@@ -482,104 +482,6 @@ public void testListTopicsAsyncWithOptions() throws ExecutionException, Interrup
assertArrayEquals(topicList.toArray(), Iterables.toArray(page.getValues(), Topic.class));
}
- @Test
- public void testPublishOneMessage() {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addAllMessages(ImmutableList.of(MESSAGE.toPb()))
- .build();
- String messageId = "messageId";
- PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageId, pubsub.publish(TOPIC, MESSAGE));
- }
-
- @Test
- public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addMessages(MESSAGE.toPb())
- .build();
- String messageId = "messageId";
- PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageId, pubsub.publishAsync(TOPIC, MESSAGE).get());
- }
-
- @Test
- public void testPublishMoreMessages() {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb()))
- .build();
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- PublishResponse response = PublishResponse.newBuilder()
- .addAllMessageIds(messageIds)
- .build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageIds, pubsub.publish(TOPIC, MESSAGE, MESSAGE));
- }
-
- @Test
- public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb()))
- .build();
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- PublishResponse response = PublishResponse.newBuilder()
- .addAllMessageIds(messageIds)
- .build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageIds, pubsub.publishAsync(TOPIC, MESSAGE, MESSAGE).get());
- }
-
- @Test
- public void testPublishMessageList() {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb()))
- .build();
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- PublishResponse response = PublishResponse.newBuilder()
- .addAllMessageIds(messageIds)
- .build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageIds, pubsub.publish(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)));
- }
-
- @Test
- public void testPublishMessageListAsync() throws ExecutionException, InterruptedException {
- PublishRequest request = PublishRequest.newBuilder()
- .setTopic(TOPIC_NAME_PB)
- .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb()))
- .build();
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- PublishResponse response = PublishResponse.newBuilder()
- .addAllMessageIds(messageIds)
- .build();
- Future responseFuture = Futures.immediateFuture(response);
- EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock);
- pubsub = new PubSubImpl(options);
- assertEquals(messageIds, pubsub.publishAsync(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)).get());
- }
-
@Test
public void testCreateSubscription() {
com.google.pubsub.v1.Subscription subscriptionPb = SUBSCRIPTION_INFO.toPb(PROJECT);
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java
index 438759b73184..9c909be203e7 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java
@@ -188,87 +188,6 @@ public void testDeleteAsyncFalse() throws ExecutionException, InterruptedExcepti
assertFalse(topic.deleteAsync().get());
}
- @Test
- public void testPublishOneMessage() {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message = Message.of("payload1");
- String messageId = "messageId";
- expect(pubsub.publish(NAME, message)).andReturn(messageId);
- replay(pubsub);
- initializeTopic();
- assertEquals(messageId, topic.publish(message));
- }
-
- @Test
- public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message = Message.of("payload1");
- String messageId = "messageId";
- expect(pubsub.publishAsync(NAME, message))
- .andReturn(Futures.immediateFuture(messageId));
- replay(pubsub);
- initializeTopic();
- assertEquals(messageId, topic.publishAsync(message).get());
- }
-
- @Test
- public void testPublishMoreMessages() {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- expect(pubsub.publish(NAME, message1, message2)).andReturn(messageIds);
- replay(pubsub);
- initializeTopic();
- assertEquals(messageIds, topic.publish(message1, message2));
- }
-
- @Test
- public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- expect(pubsub.publishAsync(NAME, message1, message2))
- .andReturn(Futures.immediateFuture(messageIds));
- replay(pubsub);
- initializeTopic();
- assertEquals(messageIds, topic.publishAsync(message1, message2).get());
- }
-
- @Test
- public void testPublishMessageList() {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messages = ImmutableList.of(message1, message2);
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- expect(pubsub.publish(NAME, messages)).andReturn(messageIds);
- replay(pubsub);
- initializeTopic();
- assertEquals(messageIds, topic.publish(messages));
- }
-
- @Test
- public void testPublishMessageListAsync() throws ExecutionException, InterruptedException {
- initializeExpectedTopic(1);
- expect(pubsub.getOptions()).andReturn(mockOptions);
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messages = ImmutableList.of(message1, message2);
- List messageIds = ImmutableList.of("messageId1", "messageId2");
- expect(pubsub.publishAsync(NAME, messages))
- .andReturn(Futures.immediateFuture(messageIds));
- replay(pubsub);
- initializeTopic();
- assertEquals(messageIds, topic.publishAsync(messages).get());
- }
-
@Test
public void testListSubscriptions() {
initializeExpectedTopic(1);