-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pub/Sub API prototype #768
Changes from 5 commits
c712c8a
99a0a5a
6bb23be
88a2ec2
1ffb2a9
eb2cf43
0dedcd9
9ca5f9f
0f30906
360f6e5
b829590
d2e1ca4
a9a0caa
685f185
05e8df0
2d5f9df
3152de3
7202b27
1edd1f0
cefc65d
965f7a6
6639c66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
/* | ||
* Copyright 2016 Google Inc. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.gcloud.pubsub; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
|
||
import com.google.api.client.util.Strings; | ||
import com.google.common.base.MoreObjects; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.Timestamp; | ||
|
||
import java.io.Serializable; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Pub/Sub message. | ||
*/ | ||
public final class Message implements Serializable { | ||
|
||
private static final long serialVersionUID = -1436515787233340634L; | ||
private static final long NANOS_PER_MILLISECOND = 1000000; | ||
private static final long MILLIS_PER_SECOND = 1000; | ||
|
||
private final String id; | ||
private final ByteString payload; | ||
private final ImmutableMap<String, String> attributes; | ||
private final Long publishTime; | ||
|
||
/** | ||
* Builder for Message. | ||
*/ | ||
public static final class Builder { | ||
|
||
private String id; | ||
private byte[] payload; | ||
private Map<String, String> attributes = new HashMap<>(); | ||
private Long publishTime; | ||
|
||
private Builder() {} | ||
|
||
private Builder(Message message) { | ||
id = message.id; | ||
payload = message.payload.toByteArray(); | ||
attributes = new HashMap<>(message.attributes); | ||
publishTime = message.publishTime; | ||
} | ||
|
||
Builder id(String id) { | ||
this.id = id; | ||
return this; | ||
} | ||
|
||
public Builder payload(byte[] payload) { | ||
this.payload = checkNotNull(payload); | ||
return this; | ||
} | ||
|
||
public Builder addAttribute(String name, String value) { | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
attributes.put(name, value); | ||
return this; | ||
} | ||
|
||
public Builder removeAttribute(String name) { | ||
attributes.remove(name); | ||
return this; | ||
} | ||
|
||
public Builder clearAttributes() { | ||
attributes.clear(); | ||
return this; | ||
} | ||
|
||
Builder publishTime(Long publishTime) { | ||
this.publishTime = publishTime; | ||
return this; | ||
} | ||
|
||
public Message build() { | ||
return new Message(this); | ||
} | ||
} | ||
|
||
private Message(Builder builder) { | ||
id = builder.id; | ||
payload = ByteString.copyFrom(checkNotNull(builder.payload)); | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
attributes = ImmutableMap.copyOf(builder.attributes); | ||
publishTime = builder.publishTime; | ||
} | ||
|
||
public Long publishTime() { | ||
return publishTime; | ||
} | ||
|
||
public Map<String, String> attributes() { | ||
return attributes; | ||
} | ||
|
||
public String id() { | ||
return id; | ||
} | ||
|
||
public byte[] payload() { | ||
return payload.toByteArray(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
return Objects.equals(toPb(), ((Message)o).toPb()); | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(serialVersionUID, id, payload, attributes, publishTime); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this) | ||
.add("id", id) | ||
.add("payload", payload) | ||
.add("attributes", attributes) | ||
.add("publishTime", publishTime) | ||
.toString(); | ||
} | ||
|
||
com.google.pubsub.v1.PubsubMessage toPb() { | ||
com.google.pubsub.v1.PubsubMessage.Builder builder = | ||
com.google.pubsub.v1.PubsubMessage.newBuilder(); | ||
if (id != null) { | ||
builder.setMessageId(id); | ||
} | ||
builder.setData(payload); | ||
builder.getAttributes().putAll(attributes); | ||
if (publishTime != null) { | ||
Timestamp.Builder tsBuilder = Timestamp.newBuilder(); | ||
tsBuilder.setSeconds(publishTime / MILLIS_PER_SECOND); | ||
tsBuilder.setNanos((int) (publishTime % MILLIS_PER_SECOND * NANOS_PER_MILLISECOND)); | ||
builder.setPublishTime(tsBuilder); | ||
} | ||
return builder.build(); | ||
} | ||
|
||
static Message fromPb(com.google.pubsub.v1.PubsubMessage messagePb) { | ||
Builder builder = builder(); | ||
if (messagePb.hasPublishTime()) { | ||
Timestamp ts = messagePb.getPublishTime(); | ||
builder.publishTime( | ||
ts.getSeconds() * MILLIS_PER_SECOND + ts.getNanos() / NANOS_PER_MILLISECOND); | ||
} | ||
if (Strings.isNullOrEmpty(messagePb.getMessageId())) { | ||
builder.id(messagePb.getMessageId()); | ||
} | ||
for (Map.Entry<String, String> entry : messagePb.getAttributes().entrySet()) { | ||
builder.addAttribute(entry.getKey(), entry.getValue()); | ||
} | ||
builder.payload(messagePb.getData().toByteArray()); | ||
return builder.build(); | ||
} | ||
|
||
public Builder toBuilder() { | ||
return new Builder(this); | ||
} | ||
|
||
public static Message of(byte[] payload) { | ||
return builder().payload(payload).build(); | ||
} | ||
|
||
public static Builder builder() { | ||
return new Builder(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Copyright 2016 Google Inc. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.gcloud.pubsub; | ||
|
||
import com.google.gcloud.Page; | ||
import com.google.gcloud.Service; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.Future; | ||
|
||
/** | ||
* An interface for Google Cloud Pub/Sub. | ||
* | ||
* @see <a href="https://cloud.google.com/pubsub/">Google Cloud Pub/Sub</a> | ||
*/ | ||
public interface PubSub extends Service<PubSubOptions> { | ||
|
||
class ListOption { | ||
// page size | ||
// page token | ||
} | ||
|
||
class PullOption { | ||
// bool return_immediately = 2; | ||
// int32 max_messages = 3; | ||
} | ||
|
||
// topics | ||
////////////////////// | ||
Topic createTopic(TopicInfo topic); | ||
|
||
Future<Topic> createTopicAsync(String topic); | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
// null if not found | ||
Topic getTopic(String topic); | ||
|
||
Future<Topic> getTopicAsync(String topic); | ||
|
||
// false if not found | ||
boolean deleteTopic(String topic); | ||
|
||
Future<Boolean> deleteTopicAsync(String topic); | ||
|
||
Page<Topic> listTopics(ListOption... options); | ||
|
||
// todo: consider AsyncPage that has nextPageAsync | ||
Future<Page<Topic>> listTopicsAsync(ListOption... options); | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
String publish(String topic, Message message); | ||
|
||
Future<String> publishAsync(String topic, Message message); | ||
|
||
List<String> publish(String topic, Message first, Message... other); | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
Future<List<String>> publishAsync(String topic, Message first, Message... other); | ||
|
||
// subscriptions | ||
//////////////////////////// | ||
Subscription createSubscription(SubscriptionInfo subscription); | ||
|
||
Future<Subscription> createSubscriptionAsync(SubscriptionInfo subscription); | ||
|
||
// null if not found | ||
Subscription getSubscription(String subscription); | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
Future<Subscription> getSubscriptionAsync(String topic); | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
// false if not found | ||
boolean deleteSubscription(String subscription); | ||
|
||
Future<Boolean> deleteSubscriptionAsync(String subscription); | ||
|
||
Page<Subscription> listSubscription(ListOption... options); | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
// todo: consider AsyncPage that has nextPageAsync | ||
Future<Page<Subscription>> listSubscriptionAsync(ListOption... options); | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
Page<Subscription> listSubscription(String topic, ListOption... options); | ||
|
||
// todo: consider AsyncPage that has nextPageAsync | ||
Future<Page<Subscription>> listSubscriptionAsync(String topic, ListOption... options); | ||
|
||
// Ack options: | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
// 1) replace return value to ReceivedMessage (Message + functional ack) | ||
// 2) like (1) but with no auto-renew, so provide a function for renew | ||
// 3) rename Message to MessageInfo and make Message functional with ack | ||
// 4) return a "special" list (extends List) but has a way to "ackSoFar" | ||
// 5) instead of List use callback and auto-acknowledge per callback (and auto-renew) | ||
// ** Auto renew means, using a separate thread. | ||
List<Message> pull(String subscription, PullOption... options); | ||
|
||
Future<List<Message>> pullAsync(String subscription, PullOption... options); | ||
|
||
|
||
// Note regarding Date types: | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
// 1) No field selection | ||
// --- This is why there are no options for getters | ||
// 2) Never null for primitive values or collections | ||
// --- should we replace default "" with null (e.g. id when not populated)? | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
|
||
// IAM Policy operations: getPolicy, replacePolicy, testPermissions | ||
// Not sure if ready (docs is not up-to-date), is it one per topic and | ||
// one per subscription? | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright 2016 Google Inc. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.gcloud.pubsub; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import com.google.gcloud.BaseServiceException; | ||
import com.google.gcloud.RetryHelper.RetryHelperException; | ||
import com.google.gcloud.RetryHelper.RetryInterruptedException; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* Pub/Sub service exception. | ||
* | ||
* @see <a href="https://cloud.google.com/pubsub/error-codes">Google Cloud Pub/Sub error codes</a> | ||
*/ | ||
public class PubSubException extends BaseServiceException { | ||
|
||
private static final long serialVersionUID = 6434989638600001226L; | ||
private static final Set<Error> RETRYABLE_ERRORS = ImmutableSet.of( | ||
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong. |
||
new Error(499, null), | ||
new Error(503, null), | ||
new Error(429, null), | ||
new Error(500, null), | ||
new Error(504, null)); | ||
|
||
public PubSubException(int code, String message) { | ||
super(code, message, null, true); | ||
} | ||
|
||
@Override | ||
protected Set<Error> retryableErrors() { | ||
return RETRYABLE_ERRORS; | ||
} | ||
|
||
/** | ||
* Translate RetryHelperException to the ResourceManagerException that caused the error. This | ||
* method will always throw an exception. | ||
* | ||
* @throws PubSubException when {@code ex} was caused by a {@code | ||
* ResourceManagerException} | ||
* @throws RetryInterruptedException when {@code ex} is a {@code RetryInterruptedException} | ||
*/ | ||
static PubSubException translateAndThrow(RetryHelperException ex) { | ||
BaseServiceException.translateAndPropagateIfPossible(ex); | ||
throw new PubSubException(UNKNOWN_CODE, ex.getMessage()); | ||
} | ||
} |
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.