From 784914ab9b86396a187e28dbdc0e28ec4116959b Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Sun, 22 May 2016 23:20:01 +0200 Subject: [PATCH] Add javadoc and tests for functional Topic class --- .../java/com/google/cloud/pubsub/Topic.java | 126 ++++++- .../com/google/cloud/pubsub/TopicInfo.java | 15 +- .../com/google/cloud/pubsub/TopicTest.java | 320 ++++++++++++++++++ 3 files changed, 447 insertions(+), 14 deletions(-) create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java index 65ed737fc0cd..2b9857495f63 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java @@ -30,7 +30,13 @@ import java.util.concurrent.Future; /** - * PubSub Topic. + * A Google Cloud Pub/Sub topic. A topic is a named resource to which messages are sent by + * publishers. Subscribers can receive messages sent to a topic by creating subscriptions. + * {@code Topic} adds a layer of service-related functionality over {@link TopicInfo}. Objects of + * this class are immutable. To get a {@code Topic} object with the most recent information use + * {@link #reload} or {@link #reloadAsync}. + * + * @see Pub/Sub Data Model */ public class Topic extends TopicInfo { @@ -39,6 +45,9 @@ public class Topic extends TopicInfo { private final PubSubOptions options; private transient PubSub pubsub; + /** + * A builder for {@code Topic} objects. + */ public static final class Builder extends TopicInfo.Builder { private final PubSub pubsub; @@ -73,70 +82,173 @@ public Builder toBuilder() { } @Override - public int hashCode() { + public final int hashCode() { return Objects.hash(options, super.hashCode()); } @Override - public boolean equals(Object obj) { + public final boolean equals(Object obj) { if (this == obj) { return true; } - if (obj == null || getClass() != obj.getClass()) { + if (obj == null || !obj.getClass().equals(Topic.class)) { return false; } Topic other = (Topic) obj; - return Objects.equals(name(), other.name()) && Objects.equals(options, other.options); + return baseEquals(other) && Objects.equals(options, other.options); } + /** + * Returns the topic's {@code PubSub} object used to issue requests. + */ public PubSub pubSub() { return pubsub; } + /** + * Deletes this topic. + * + * @return {@code true} if the topic was deleted, {@code false} if it was not found + * @throws PubSubException upon failure + */ public boolean delete() { return pubsub.deleteTopic(name()); } + /** + * Sends a request for deleting this topic. This method returns a {@code Future} object to consume + * the result. {@link Future#get()} returns {@code true} if the topic was deleted, {@code false} + * if it was not found. + * + * @throws PubSubException upon failure + */ public Future deleteAsync() { return pubsub.deleteTopicAsync(name()); } + /** + * Fetches current topic's latest information. Returns {@code null} if the topic does not exist. + * + * @return a {@code Topic} object with latest information or {@code null} if not found + * @throws PubSubException upon failure + */ public Topic reload() { return pubsub.getTopic(name()); } + /** + * Sends a request to fetch current topic's latest information. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a {@code Topic} + * object with latest information or {@code null} if not found. + * + * @throws PubSubException upon failure + */ public Future reloadAsync() { return pubsub.getTopicAsync(name()); } + /** + * Publishes a message to this topic. This method returns a service-generated id for the published + * message. Service-generated ids are guaranteed to be unique within the topic. + * + * @param message the message to publish + * @return a unique service-generated id for the message + * @throws PubSubException upon failure, if the topic does not exist or if the message has empty + * payload and no attributes + */ public String publish(Message message) { return pubsub.publish(name(), message); } + /** + * Sends a request for publishing a message to the this topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated + * id for the published message. Service-generated ids are guaranteed to be unique within the + * topic. + * + * @param message the message to publish + * @return a {@code Future} for the unique service-generated id for the message + */ public Future publishAsync(Message message) { return pubsub.publishAsync(name(), message); } + /** + * Publishes a number of messages to this topic. This method returns a list of service-generated + * ids for the published messages. Service-generated ids are guaranteed to be unique within the + * topic. + * + * @param message the first message to publish + * @param messages other messages to publish + * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. + * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has + * empty payload and no attributes + */ public List publish(Message message, Message... messages) { return pubsub.publish(name(), message, messages); } + /** + * Sends a request to publish a number of messages to this topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param message the first message to publish + * @param messages other messages to publish + * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as + * the messages. + */ public Future> publishAsync(Message message, Message... messages) { return pubsub.publishAsync(name(), message, messages); } + /** + * Publishes a number of messages to this topic. This method returns a list ofservice-generated + * ids for the published messages. Service-generated ids are guaranteed to be unique within the + * topic. + * + * @param messages the messages to publish + * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. + * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has + * empty payload and no attributes + */ public List publish(Iterable messages) { return pubsub.publish(name(), messages); } + /** + * Sends a request to publish a number of messages to this topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a list of + * service-generated ids for the published messages. Service-generated ids are guaranteed to be + * unique within the topic. + * + * @param messages the messages to publish + * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as + * the messages + */ public Future> publishAsync(Iterable messages) { return pubsub.publishAsync(name(), messages); } + /** + * Lists the identities of the subscriptions for this topic. This method returns a {@link Page} + * object that can be used to consume paginated results. Use {@link ListOption} to specify the + * page size or the page token from which to start listing subscriptions. + * + * @throws PubSubException upon failure + */ public Page listSubscriptions(ListOption... options) { return pubsub.listSubscriptions(name(), options); } + /** + * Sends a request for listing the identities of subscriptions for this topic. This method returns + * a {@code Future} object to consume the result. {@link Future#get()} returns an + * {@link AsyncPage} object that can be used to asynchronously handle paginated results. Use + * {@link ListOption} to specify the page size or the page token from which to start listing + * subscriptions. + */ public Future> listSubscriptionsAsync(ListOption... options) { return pubsub.listSubscriptionsAsync(name(), options); } @@ -146,9 +258,9 @@ private void readObject(ObjectInputStream input) throws IOException, ClassNotFou this.pubsub = options.service(); } - static Topic fromPb(PubSub storage, com.google.pubsub.v1.Topic topicPb) { + static Topic fromPb(PubSub pubsub, com.google.pubsub.v1.Topic topicPb) { TopicInfo topicInfo = TopicInfo.fromPb(topicPb); - return new Topic(storage, new BuilderImpl(topicInfo)); + return new Topic(pubsub, new BuilderImpl(topicInfo)); } static Function fromPbFunction(final PubSub pubsub) { diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java index 8c90a0c2705d..e1b2dc6275a0 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java @@ -98,15 +98,16 @@ public int hashCode() { return Objects.hash(name); } + final boolean baseEquals(TopicInfo topicInfo) { + return Objects.equals(name, topicInfo.name); + } + @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || !obj.getClass().equals(this.getClass())) { - return false; - } - return Objects.equals(name, ((TopicInfo) obj).name); + return obj == this + || obj != null + && obj.getClass().equals(TopicInfo.class) + && baseEquals((TopicInfo) obj); } @Override diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java new file mode 100644 index 000000000000..01ea682776b8 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java @@ -0,0 +1,320 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createStrictMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.AsyncPage; +import com.google.cloud.AsyncPageImpl; +import com.google.cloud.Page; +import com.google.cloud.PageImpl; +import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; + +import org.junit.After; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class TopicTest { + + private static final String NAME = "topic"; + private static final TopicInfo TOPIC_INFO = TopicInfo.of(NAME); + + private PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class); + private PubSubOptions mockOptions = createMock(PubSubOptions.class); + private PubSub pubsub; + private Topic expectedTopic; + private Topic topic; + + private void initializeExpectedTopic(int optionsCalls) { + expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls); + replay(serviceMockReturnsOptions); + pubsub = createStrictMock(PubSub.class); + expectedTopic = new Topic(serviceMockReturnsOptions, new Topic.BuilderImpl(TOPIC_INFO)); + } + + private void initializeTopic() { + topic = new Topic(pubsub, new Topic.BuilderImpl(TOPIC_INFO)); + } + + @After + public void tearDown() throws Exception { + verify(pubsub, serviceMockReturnsOptions); + } + + @Test + public void testBuilder() { + initializeExpectedTopic(2); + replay(pubsub); + Topic builtTopic = expectedTopic.toBuilder().name("newTopic").build(); + assertEquals("newTopic", builtTopic.name()); + } + + @Test + public void testToBuilder() { + initializeExpectedTopic(2); + replay(pubsub); + compareTopic(expectedTopic, expectedTopic.toBuilder().build()); + } + + @Test + public void testReload() { + initializeExpectedTopic(2); + TopicInfo updatedInfo = TOPIC_INFO.toBuilder().name("newTopic").build(); + Topic expectedTopic = + new Topic(serviceMockReturnsOptions, new TopicInfo.BuilderImpl(updatedInfo)); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopic(NAME)).andReturn(expectedTopic); + replay(pubsub); + initializeTopic(); + Topic updatedTopic = topic.reload(); + compareTopic(expectedTopic, updatedTopic); + } + + @Test + public void testReloadNull() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopic(NAME)).andReturn(null); + replay(pubsub); + initializeTopic(); + assertNull(topic.reload()); + } + + @Test + public void testReloadAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(2); + TopicInfo updatedInfo = TOPIC_INFO.toBuilder().name("newTopic").build(); + Topic expectedTopic = + new Topic(serviceMockReturnsOptions, new TopicInfo.BuilderImpl(updatedInfo)); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopicAsync(NAME)) + .andReturn(Futures.immediateFuture(expectedTopic)); + replay(pubsub); + initializeTopic(); + Topic updatedTopic = topic.reloadAsync().get(); + compareTopic(expectedTopic, updatedTopic); + } + + @Test + public void testReloadAsyncNull() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopicAsync(NAME)).andReturn(Futures.immediateFuture(null)); + replay(pubsub); + initializeTopic(); + assertNull(topic.reloadAsync().get()); + } + + @Test + public void testDeleteTrue() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteTopic(NAME)).andReturn(true); + replay(pubsub); + initializeTopic(); + assertTrue(topic.delete()); + } + + @Test + public void testDeleteFalse() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteTopic(NAME)).andReturn(false); + replay(pubsub); + initializeTopic(); + assertFalse(topic.delete()); + } + + @Test + public void testDeleteAsyncTrue() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteTopicAsync(NAME)).andReturn(Futures.immediateFuture(true)); + replay(pubsub); + initializeTopic(); + assertTrue(topic.deleteAsync().get()); + } + + @Test + public void testDeleteAsyncFalse() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteTopicAsync(NAME)).andReturn(Futures.immediateFuture(false)); + replay(pubsub); + initializeTopic(); + assertFalse(topic.deleteAsync().get()); + } + + @Test + public void testPublishOneMessage() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message = Message.of("payload1"); + String messageId = "messageId"; + expect(pubsub.publish(NAME, message)).andReturn(messageId); + replay(pubsub); + initializeTopic(); + assertEquals(messageId, topic.publish(message)); + } + + @Test + public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message = Message.of("payload1"); + String messageId = "messageId"; + expect(pubsub.publishAsync(NAME, message)) + .andReturn(Futures.immediateFuture(messageId)); + replay(pubsub); + initializeTopic(); + assertEquals(messageId, topic.publishAsync(message).get()); + } + + @Test + public void testPublishMoreMessages() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + expect(pubsub.publish(NAME, message1, message2)).andReturn(messageIds); + replay(pubsub); + initializeTopic(); + assertEquals(messageIds, topic.publish(message1, message2)); + } + + @Test + public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + expect(pubsub.publishAsync(NAME, message1, message2)) + .andReturn(Futures.immediateFuture(messageIds)); + replay(pubsub); + initializeTopic(); + assertEquals(messageIds, topic.publishAsync(message1, message2).get()); + } + + @Test + public void testPublishMessageList() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messages = ImmutableList.of(message1, message2); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + expect(pubsub.publish(NAME, messages)).andReturn(messageIds); + replay(pubsub); + initializeTopic(); + assertEquals(messageIds, topic.publish(messages)); + } + + @Test + public void testPublishMessageListAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messages = ImmutableList.of(message1, message2); + List messageIds = ImmutableList.of("messageId1", "messageId2"); + expect(pubsub.publishAsync(NAME, messages)) + .andReturn(Futures.immediateFuture(messageIds)); + replay(pubsub); + initializeTopic(); + assertEquals(messageIds, topic.publishAsync(messages).get()); + } + + @Test + public void testListSubscriptions() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + final List subscriptions = ImmutableList.of( + new SubscriptionId("project", "subscription1"), + new SubscriptionId("project", "subscription2")); + Page result = new PageImpl<>(null, null, subscriptions); + expect(pubsub.listSubscriptions(NAME)).andReturn(result); + replay(pubsub); + initializeTopic(); + assertEquals(subscriptions, topic.listSubscriptions().values()); + } + + @Test + public void testListSubscriptionsWithOptions() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + final List subscriptions = ImmutableList.of( + new SubscriptionId("project", "subscription1"), + new SubscriptionId("project", "subscription2")); + Page result = new PageImpl<>(null, null, subscriptions); + expect(pubsub.listSubscriptions(NAME, ListOption.pageSize(42))).andReturn(result); + replay(pubsub); + initializeTopic(); + assertEquals(subscriptions, topic.listSubscriptions(ListOption.pageSize(42)).values()); + } + + @Test + public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + final List subscriptions = ImmutableList.of( + new SubscriptionId("project", "subscription1"), + new SubscriptionId("project", "subscription2")); + AsyncPage result = new AsyncPageImpl<>(null, null, subscriptions); + expect(pubsub.listSubscriptionsAsync(NAME)) + .andReturn(Futures.immediateFuture(result)); + replay(pubsub); + initializeTopic(); + assertEquals(subscriptions, topic.listSubscriptionsAsync().get().values()); + } + + @Test + public void testListSubscriptionsAsyncWithOptions() + throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + final List subscriptions = ImmutableList.of( + new SubscriptionId("project", "subscription1"), + new SubscriptionId("project", "subscription2")); + AsyncPage result = new AsyncPageImpl<>(null, null, subscriptions); + expect(pubsub.listSubscriptionsAsync(NAME, ListOption.pageSize(42))) + .andReturn(Futures.immediateFuture(result)); + replay(pubsub); + initializeTopic(); + assertEquals(subscriptions, + topic.listSubscriptionsAsync(ListOption.pageSize(42)).get().values()); + } + + private void compareTopic(Topic expected, Topic value) { + assertEquals(expected, value); + assertEquals(expected.name(), value.name()); + assertEquals(expected.hashCode(), value.hashCode()); + } +}