diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java index 21ae0f027cbe..103856373e62 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java @@ -32,6 +32,7 @@ private PubsubMessage(com.google.protobuf.GeneratedMessageV3.Builder builder) private PubsubMessage() { data_ = com.google.protobuf.ByteString.EMPTY; messageId_ = ""; + orderingKey_ = ""; } @java.lang.Override @@ -98,6 +99,13 @@ private PubsubMessage( publishTime_ = subBuilder.buildPartial(); } + break; + } + case 42: + { + java.lang.String s = input.readStringRequireUtf8(); + + orderingKey_ = s; break; } default: @@ -350,6 +358,59 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { return getPublishTime(); } + public static final int ORDERING_KEY_FIELD_NUMBER = 5; + private volatile java.lang.Object orderingKey_; + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ + public java.lang.String getOrderingKey() { + java.lang.Object ref = orderingKey_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + orderingKey_ = s; + return s; + } + } + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ + public com.google.protobuf.ByteString getOrderingKeyBytes() { + java.lang.Object ref = orderingKey_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + orderingKey_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private byte memoizedIsInitialized = -1; @java.lang.Override @@ -375,6 +436,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io if (publishTime_ != null) { output.writeMessage(4, getPublishTime()); } + if (!getOrderingKeyBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 5, orderingKey_); + } unknownFields.writeTo(output); } @@ -403,6 +467,9 @@ public int getSerializedSize() { if (publishTime_ != null) { size += com.google.protobuf.CodedOutputStream.computeMessageSize(4, getPublishTime()); } + if (!getOrderingKeyBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(5, orderingKey_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -426,6 +493,7 @@ public boolean equals(final java.lang.Object obj) { if (hasPublishTime()) { result = result && getPublishTime().equals(other.getPublishTime()); } + result = result && getOrderingKey().equals(other.getOrderingKey()); result = result && unknownFields.equals(other.unknownFields); return result; } @@ -449,6 +517,8 @@ public int hashCode() { hash = (37 * hash) + PUBLISH_TIME_FIELD_NUMBER; hash = (53 * hash) + getPublishTime().hashCode(); } + hash = (37 * hash) + ORDERING_KEY_FIELD_NUMBER; + hash = (53 * hash) + getOrderingKey().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -631,6 +701,8 @@ public Builder clear() { publishTime_ = null; publishTimeBuilder_ = null; } + orderingKey_ = ""; + return this; } @@ -668,6 +740,7 @@ public com.google.pubsub.v1.PubsubMessage buildPartial() { } else { result.publishTime_ = publishTimeBuilder_.build(); } + result.orderingKey_ = orderingKey_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -729,6 +802,10 @@ public Builder mergeFrom(com.google.pubsub.v1.PubsubMessage other) { if (other.hasPublishTime()) { mergePublishTime(other.getPublishTime()); } + if (!other.getOrderingKey().isEmpty()) { + orderingKey_ = other.orderingKey_; + onChanged(); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -1273,6 +1350,125 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { return publishTimeBuilder_; } + private java.lang.Object orderingKey_ = ""; + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ + public java.lang.String getOrderingKey() { + java.lang.Object ref = orderingKey_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + orderingKey_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ + public com.google.protobuf.ByteString getOrderingKeyBytes() { + java.lang.Object ref = orderingKey_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + orderingKey_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ + public Builder setOrderingKey(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + orderingKey_ = value; + onChanged(); + return this; + } + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ + public Builder clearOrderingKey() { + + orderingKey_ = getDefaultInstance().getOrderingKey(); + onChanged(); + return this; + } + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ + public Builder setOrderingKeyBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + orderingKey_ = value; + onChanged(); + return this; + } + @java.lang.Override public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFieldsProto3(unknownFields); diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java index e1db91aa9512..93fb2ce06b9c 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java @@ -137,4 +137,35 @@ public interface PubsubMessageOrBuilder * .google.protobuf.Timestamp publish_time = 4; */ com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder(); + + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ + java.lang.String getOrderingKey(); + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ + com.google.protobuf.ByteString getOrderingKeyBytes(); } diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java index cdc7b4ce4539..b53e18f24357 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java @@ -212,189 +212,190 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { + "bsub.v1.Topic.LabelsEntry\022F\n\026message_sto" + "rage_policy\030\003 \001(\0132&.google.pubsub.v1.Mes" + "sageStoragePolicy\032-\n\013LabelsEntry\022\013\n\003key\030" - + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"\333\001\n\rPubsubMessa" + + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"\361\001\n\rPubsubMessa" + "ge\022\014\n\004data\030\001 \001(\014\022C\n\nattributes\030\002 \003(\0132/.g" + "oogle.pubsub.v1.PubsubMessage.Attributes" + "Entry\022\022\n\nmessage_id\030\003 \001(\t\0220\n\014publish_tim" - + "e\030\004 \001(\0132\032.google.protobuf.Timestamp\0321\n\017A" - + "ttributesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001" - + "(\t:\0028\001\" \n\017GetTopicRequest\022\r\n\005topic\030\001 \001(\t" - + "\"m\n\022UpdateTopicRequest\022&\n\005topic\030\001 \001(\0132\027." - + "google.pubsub.v1.Topic\022/\n\013update_mask\030\002 " - + "\001(\0132\032.google.protobuf.FieldMask\"R\n\016Publi" - + "shRequest\022\r\n\005topic\030\001 \001(\t\0221\n\010messages\030\002 \003" - + "(\0132\037.google.pubsub.v1.PubsubMessage\"&\n\017P" - + "ublishResponse\022\023\n\013message_ids\030\001 \003(\t\"K\n\021L" - + "istTopicsRequest\022\017\n\007project\030\001 \001(\t\022\021\n\tpag" - + "e_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"V\n\022List" - + "TopicsResponse\022\'\n\006topics\030\001 \003(\0132\027.google." - + "pubsub.v1.Topic\022\027\n\017next_page_token\030\002 \001(\t" - + "\"U\n\035ListTopicSubscriptionsRequest\022\r\n\005top" - + "ic\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_toke" - + "n\030\003 \001(\t\"P\n\036ListTopicSubscriptionsRespons" - + "e\022\025\n\rsubscriptions\030\001 \003(\t\022\027\n\017next_page_to" - + "ken\030\002 \001(\t\"Q\n\031ListTopicSnapshotsRequest\022\r" - + "\n\005topic\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage" - + "_token\030\003 \001(\t\"H\n\032ListTopicSnapshotsRespon" - + "se\022\021\n\tsnapshots\030\001 \003(\t\022\027\n\017next_page_token" - + "\030\002 \001(\t\"#\n\022DeleteTopicRequest\022\r\n\005topic\030\001 " - + "\001(\t\"\204\003\n\014Subscription\022\014\n\004name\030\001 \001(\t\022\r\n\005to" - + "pic\030\002 \001(\t\0221\n\013push_config\030\004 \001(\0132\034.google." - + "pubsub.v1.PushConfig\022\034\n\024ack_deadline_sec" - + "onds\030\005 \001(\005\022\035\n\025retain_acked_messages\030\007 \001(" - + "\010\022=\n\032message_retention_duration\030\010 \001(\0132\031." - + "google.protobuf.Duration\022:\n\006labels\030\t \003(\013" - + "2*.google.pubsub.v1.Subscription.LabelsE" - + "ntry\022=\n\021expiration_policy\030\013 \001(\0132\".google" - + ".pubsub.v1.ExpirationPolicy\032-\n\013LabelsEnt" - + "ry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\":\n\020Ex" - + "pirationPolicy\022&\n\003ttl\030\001 \001(\0132\031.google.pro" - + "tobuf.Duration\"\230\001\n\nPushConfig\022\025\n\rpush_en" - + "dpoint\030\001 \001(\t\022@\n\nattributes\030\002 \003(\0132,.googl" - + "e.pubsub.v1.PushConfig.AttributesEntry\0321" - + "\n\017AttributesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030" - + "\002 \001(\t:\0028\001\"S\n\017ReceivedMessage\022\016\n\006ack_id\030\001" - + " \001(\t\0220\n\007message\030\002 \001(\0132\037.google.pubsub.v1" - + ".PubsubMessage\".\n\026GetSubscriptionRequest" - + "\022\024\n\014subscription\030\001 \001(\t\"\202\001\n\031UpdateSubscri" - + "ptionRequest\0224\n\014subscription\030\001 \001(\0132\036.goo" - + "gle.pubsub.v1.Subscription\022/\n\013update_mas" - + "k\030\002 \001(\0132\032.google.protobuf.FieldMask\"R\n\030L" - + "istSubscriptionsRequest\022\017\n\007project\030\001 \001(\t" - + "\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"" - + "k\n\031ListSubscriptionsResponse\0225\n\rsubscrip" - + "tions\030\001 \003(\0132\036.google.pubsub.v1.Subscript" - + "ion\022\027\n\017next_page_token\030\002 \001(\t\"1\n\031DeleteSu" - + "bscriptionRequest\022\024\n\014subscription\030\001 \001(\t\"" - + "b\n\027ModifyPushConfigRequest\022\024\n\014subscripti" - + "on\030\001 \001(\t\0221\n\013push_config\030\002 \001(\0132\034.google.p" - + "ubsub.v1.PushConfig\"U\n\013PullRequest\022\024\n\014su" - + "bscription\030\001 \001(\t\022\032\n\022return_immediately\030\002" - + " \001(\010\022\024\n\014max_messages\030\003 \001(\005\"L\n\014PullRespon" - + "se\022<\n\021received_messages\030\001 \003(\0132!.google.p" - + "ubsub.v1.ReceivedMessage\"_\n\030ModifyAckDea" - + "dlineRequest\022\024\n\014subscription\030\001 \001(\t\022\017\n\007ac" - + "k_ids\030\004 \003(\t\022\034\n\024ack_deadline_seconds\030\003 \001(" - + "\005\";\n\022AcknowledgeRequest\022\024\n\014subscription\030" - + "\001 \001(\t\022\017\n\007ack_ids\030\002 \003(\t\"\244\001\n\024StreamingPull" - + "Request\022\024\n\014subscription\030\001 \001(\t\022\017\n\007ack_ids" - + "\030\002 \003(\t\022\037\n\027modify_deadline_seconds\030\003 \003(\005\022" - + "\037\n\027modify_deadline_ack_ids\030\004 \003(\t\022#\n\033stre" - + "am_ack_deadline_seconds\030\005 \001(\005\"U\n\025Streami" - + "ngPullResponse\022<\n\021received_messages\030\001 \003(" - + "\0132!.google.pubsub.v1.ReceivedMessage\"\257\001\n" - + "\025CreateSnapshotRequest\022\014\n\004name\030\001 \001(\t\022\024\n\014" - + "subscription\030\002 \001(\t\022C\n\006labels\030\003 \003(\01323.goo" - + "gle.pubsub.v1.CreateSnapshotRequest.Labe" - + "lsEntry\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005v" - + "alue\030\002 \001(\t:\0028\001\"v\n\025UpdateSnapshotRequest\022" - + ",\n\010snapshot\030\001 \001(\0132\032.google.pubsub.v1.Sna" - + "pshot\022/\n\013update_mask\030\002 \001(\0132\032.google.prot" - + "obuf.FieldMask\"\277\001\n\010Snapshot\022\014\n\004name\030\001 \001(" - + "\t\022\r\n\005topic\030\002 \001(\t\022/\n\013expire_time\030\003 \001(\0132\032." - + "google.protobuf.Timestamp\0226\n\006labels\030\004 \003(" - + "\0132&.google.pubsub.v1.Snapshot.LabelsEntr" - + "y\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002" - + " \001(\t:\0028\001\"&\n\022GetSnapshotRequest\022\020\n\010snapsh" - + "ot\030\001 \001(\t\"N\n\024ListSnapshotsRequest\022\017\n\007proj" - + "ect\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_tok" - + "en\030\003 \001(\t\"_\n\025ListSnapshotsResponse\022-\n\tsna" - + "pshots\030\001 \003(\0132\032.google.pubsub.v1.Snapshot" - + "\022\027\n\017next_page_token\030\002 \001(\t\")\n\025DeleteSnaps" - + "hotRequest\022\020\n\010snapshot\030\001 \001(\t\"m\n\013SeekRequ" - + "est\022\024\n\014subscription\030\001 \001(\t\022*\n\004time\030\002 \001(\0132" - + "\032.google.protobuf.TimestampH\000\022\022\n\010snapsho" - + "t\030\003 \001(\tH\000B\010\n\006target\"\016\n\014SeekResponse2\277\010\n\t" - + "Publisher\022j\n\013CreateTopic\022\027.google.pubsub" - + ".v1.Topic\032\027.google.pubsub.v1.Topic\")\202\323\344\223" - + "\002#\032\036/v1/{name=projects/*/topics/*}:\001*\022}\n" - + "\013UpdateTopic\022$.google.pubsub.v1.UpdateTo" - + "picRequest\032\027.google.pubsub.v1.Topic\"/\202\323\344" - + "\223\002)2$/v1/{topic.name=projects/*/topics/*" - + "}:\001*\022\202\001\n\007Publish\022 .google.pubsub.v1.Publ" - + "ishRequest\032!.google.pubsub.v1.PublishRes" - + "ponse\"2\202\323\344\223\002,\"\'/v1/{topic=projects/*/top" - + "ics/*}:publish:\001*\022o\n\010GetTopic\022!.google.p" - + "ubsub.v1.GetTopicRequest\032\027.google.pubsub" - + ".v1.Topic\"\'\202\323\344\223\002!\022\037/v1/{topic=projects/*" - + "/topics/*}\022\200\001\n\nListTopics\022#.google.pubsu" - + "b.v1.ListTopicsRequest\032$.google.pubsub.v" - + "1.ListTopicsResponse\"\'\202\323\344\223\002!\022\037/v1/{proje" - + "ct=projects/*}/topics\022\262\001\n\026ListTopicSubsc" - + "riptions\022/.google.pubsub.v1.ListTopicSub" - + "scriptionsRequest\0320.google.pubsub.v1.Lis" - + "tTopicSubscriptionsResponse\"5\202\323\344\223\002/\022-/v1" - + "/{topic=projects/*/topics/*}/subscriptio" - + "ns\022\242\001\n\022ListTopicSnapshots\022+.google.pubsu" - + "b.v1.ListTopicSnapshotsRequest\032,.google." - + "pubsub.v1.ListTopicSnapshotsResponse\"1\202\323" - + "\344\223\002+\022)/v1/{topic=projects/*/topics/*}/sn" - + "apshots\022t\n\013DeleteTopic\022$.google.pubsub.v" - + "1.DeleteTopicRequest\032\026.google.protobuf.E" - + "mpty\"\'\202\323\344\223\002!*\037/v1/{topic=projects/*/topi" - + "cs/*}2\371\021\n\nSubscriber\022\206\001\n\022CreateSubscript" - + "ion\022\036.google.pubsub.v1.Subscription\032\036.go" - + "ogle.pubsub.v1.Subscription\"0\202\323\344\223\002*\032%/v1" - + "/{name=projects/*/subscriptions/*}:\001*\022\222\001" - + "\n\017GetSubscription\022(.google.pubsub.v1.Get" - + "SubscriptionRequest\032\036.google.pubsub.v1.S" - + "ubscription\"5\202\323\344\223\002/\022-/v1/{subscription=p" - + "rojects/*/subscriptions/*}\022\240\001\n\022UpdateSub" - + "scription\022+.google.pubsub.v1.UpdateSubsc" - + "riptionRequest\032\036.google.pubsub.v1.Subscr" - + "iption\"=\202\323\344\223\002722/v1/{subscription.name=p" - + "rojects/*/subscriptions/*}:\001*\022\234\001\n\021ListSu" - + "bscriptions\022*.google.pubsub.v1.ListSubsc" - + "riptionsRequest\032+.google.pubsub.v1.ListS" - + "ubscriptionsResponse\".\202\323\344\223\002(\022&/v1/{proje" - + "ct=projects/*}/subscriptions\022\220\001\n\022DeleteS" - + "ubscription\022+.google.pubsub.v1.DeleteSub" - + "scriptionRequest\032\026.google.protobuf.Empty" - + "\"5\202\323\344\223\002/*-/v1/{subscription=projects/*/s" - + "ubscriptions/*}\022\243\001\n\021ModifyAckDeadline\022*." - + "google.pubsub.v1.ModifyAckDeadlineReques" - + "t\032\026.google.protobuf.Empty\"J\202\323\344\223\002D\"?/v1/{" - + "subscription=projects/*/subscriptions/*}" - + ":modifyAckDeadline:\001*\022\221\001\n\013Acknowledge\022$." - + "google.pubsub.v1.AcknowledgeRequest\032\026.go" - + "ogle.protobuf.Empty\"D\202\323\344\223\002>\"9/v1/{subscr" - + "iption=projects/*/subscriptions/*}:ackno" - + "wledge:\001*\022\204\001\n\004Pull\022\035.google.pubsub.v1.Pu" - + "llRequest\032\036.google.pubsub.v1.PullRespons" - + "e\"=\202\323\344\223\0027\"2/v1/{subscription=projects/*/" - + "subscriptions/*}:pull:\001*\022f\n\rStreamingPul" - + "l\022&.google.pubsub.v1.StreamingPullReques" - + "t\032\'.google.pubsub.v1.StreamingPullRespon" - + "se\"\000(\0010\001\022\240\001\n\020ModifyPushConfig\022).google.p" - + "ubsub.v1.ModifyPushConfigRequest\032\026.googl" - + "e.protobuf.Empty\"I\202\323\344\223\002C\">/v1/{subscript" - + "ion=projects/*/subscriptions/*}:modifyPu" - + "shConfig:\001*\022~\n\013GetSnapshot\022$.google.pubs" - + "ub.v1.GetSnapshotRequest\032\032.google.pubsub" - + ".v1.Snapshot\"-\202\323\344\223\002\'\022%/v1/{snapshot=proj" - + "ects/*/snapshots/*}\022\214\001\n\rListSnapshots\022&." - + "google.pubsub.v1.ListSnapshotsRequest\032\'." - + "google.pubsub.v1.ListSnapshotsResponse\"*" - + "\202\323\344\223\002$\022\"/v1/{project=projects/*}/snapsho" - + "ts\022\203\001\n\016CreateSnapshot\022\'.google.pubsub.v1" - + ".CreateSnapshotRequest\032\032.google.pubsub.v" - + "1.Snapshot\",\202\323\344\223\002&\032!/v1/{name=projects/*" - + "/snapshots/*}:\001*\022\214\001\n\016UpdateSnapshot\022\'.go" - + "ogle.pubsub.v1.UpdateSnapshotRequest\032\032.g" - + "oogle.pubsub.v1.Snapshot\"5\202\323\344\223\002/2*/v1/{s" - + "napshot.name=projects/*/snapshots/*}:\001*\022" - + "\200\001\n\016DeleteSnapshot\022\'.google.pubsub.v1.De" - + "leteSnapshotRequest\032\026.google.protobuf.Em" - + "pty\"-\202\323\344\223\002\'*%/v1/{snapshot=projects/*/sn" - + "apshots/*}\022\204\001\n\004Seek\022\035.google.pubsub.v1.S" - + "eekRequest\032\036.google.pubsub.v1.SeekRespon" - + "se\"=\202\323\344\223\0027\"2/v1/{subscription=projects/*" - + "/subscriptions/*}:seek:\001*B\256\001\n\024com.google" - + ".pubsub.v1B\013PubsubProtoP\001Z6google.golang" - + ".org/genproto/googleapis/pubsub/v1;pubsu" - + "b\370\001\001\252\002\026Google.Cloud.PubSub.V1\312\002\026Google\\C" - + "loud\\PubSub\\V1\352\002\031Google::Cloud::PubSub::" - + "V1b\006proto3" + + "e\030\004 \001(\0132\032.google.protobuf.Timestamp\022\024\n\014o" + + "rdering_key\030\005 \001(\t\0321\n\017AttributesEntry\022\013\n\003" + + "key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\" \n\017GetTopic" + + "Request\022\r\n\005topic\030\001 \001(\t\"m\n\022UpdateTopicReq" + + "uest\022&\n\005topic\030\001 \001(\0132\027.google.pubsub.v1.T" + + "opic\022/\n\013update_mask\030\002 \001(\0132\032.google.proto" + + "buf.FieldMask\"R\n\016PublishRequest\022\r\n\005topic" + + "\030\001 \001(\t\0221\n\010messages\030\002 \003(\0132\037.google.pubsub" + + ".v1.PubsubMessage\"&\n\017PublishResponse\022\023\n\013" + + "message_ids\030\001 \003(\t\"K\n\021ListTopicsRequest\022\017" + + "\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npa" + + "ge_token\030\003 \001(\t\"V\n\022ListTopicsResponse\022\'\n\006" + + "topics\030\001 \003(\0132\027.google.pubsub.v1.Topic\022\027\n" + + "\017next_page_token\030\002 \001(\t\"U\n\035ListTopicSubsc" + + "riptionsRequest\022\r\n\005topic\030\001 \001(\t\022\021\n\tpage_s" + + "ize\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"P\n\036ListTop" + + "icSubscriptionsResponse\022\025\n\rsubscriptions" + + "\030\001 \003(\t\022\027\n\017next_page_token\030\002 \001(\t\"Q\n\031ListT" + + "opicSnapshotsRequest\022\r\n\005topic\030\001 \001(\t\022\021\n\tp" + + "age_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"H\n\032Li" + + "stTopicSnapshotsResponse\022\021\n\tsnapshots\030\001 " + + "\003(\t\022\027\n\017next_page_token\030\002 \001(\t\"#\n\022DeleteTo" + + "picRequest\022\r\n\005topic\030\001 \001(\t\"\245\003\n\014Subscripti" + + "on\022\014\n\004name\030\001 \001(\t\022\r\n\005topic\030\002 \001(\t\0221\n\013push_" + + "config\030\004 \001(\0132\034.google.pubsub.v1.PushConf" + + "ig\022\034\n\024ack_deadline_seconds\030\005 \001(\005\022\035\n\025reta" + + "in_acked_messages\030\007 \001(\010\022=\n\032message_reten" + + "tion_duration\030\010 \001(\0132\031.google.protobuf.Du" + + "ration\022:\n\006labels\030\t \003(\0132*.google.pubsub.v" + + "1.Subscription.LabelsEntry\022\037\n\027enable_mes" + + "sage_ordering\030\n \001(\010\022=\n\021expiration_policy" + + "\030\013 \001(\0132\".google.pubsub.v1.ExpirationPoli" + + "cy\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030" + + "\002 \001(\t:\0028\001\":\n\020ExpirationPolicy\022&\n\003ttl\030\001 \001" + + "(\0132\031.google.protobuf.Duration\"\230\001\n\nPushCo" + + "nfig\022\025\n\rpush_endpoint\030\001 \001(\t\022@\n\nattribute" + + "s\030\002 \003(\0132,.google.pubsub.v1.PushConfig.At" + + "tributesEntry\0321\n\017AttributesEntry\022\013\n\003key\030" + + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"S\n\017ReceivedMess" + + "age\022\016\n\006ack_id\030\001 \001(\t\0220\n\007message\030\002 \001(\0132\037.g" + + "oogle.pubsub.v1.PubsubMessage\".\n\026GetSubs" + + "criptionRequest\022\024\n\014subscription\030\001 \001(\t\"\202\001" + + "\n\031UpdateSubscriptionRequest\0224\n\014subscript" + + "ion\030\001 \001(\0132\036.google.pubsub.v1.Subscriptio" + + "n\022/\n\013update_mask\030\002 \001(\0132\032.google.protobuf" + + ".FieldMask\"R\n\030ListSubscriptionsRequest\022\017" + + "\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npa" + + "ge_token\030\003 \001(\t\"k\n\031ListSubscriptionsRespo" + + "nse\0225\n\rsubscriptions\030\001 \003(\0132\036.google.pubs" + + "ub.v1.Subscription\022\027\n\017next_page_token\030\002 " + + "\001(\t\"1\n\031DeleteSubscriptionRequest\022\024\n\014subs" + + "cription\030\001 \001(\t\"b\n\027ModifyPushConfigReques" + + "t\022\024\n\014subscription\030\001 \001(\t\0221\n\013push_config\030\002" + + " \001(\0132\034.google.pubsub.v1.PushConfig\"U\n\013Pu" + + "llRequest\022\024\n\014subscription\030\001 \001(\t\022\032\n\022retur" + + "n_immediately\030\002 \001(\010\022\024\n\014max_messages\030\003 \001(" + + "\005\"L\n\014PullResponse\022<\n\021received_messages\030\001" + + " \003(\0132!.google.pubsub.v1.ReceivedMessage\"" + + "_\n\030ModifyAckDeadlineRequest\022\024\n\014subscript" + + "ion\030\001 \001(\t\022\017\n\007ack_ids\030\004 \003(\t\022\034\n\024ack_deadli" + + "ne_seconds\030\003 \001(\005\";\n\022AcknowledgeRequest\022\024" + + "\n\014subscription\030\001 \001(\t\022\017\n\007ack_ids\030\002 \003(\t\"\244\001" + + "\n\024StreamingPullRequest\022\024\n\014subscription\030\001" + + " \001(\t\022\017\n\007ack_ids\030\002 \003(\t\022\037\n\027modify_deadline" + + "_seconds\030\003 \003(\005\022\037\n\027modify_deadline_ack_id" + + "s\030\004 \003(\t\022#\n\033stream_ack_deadline_seconds\030\005" + + " \001(\005\"U\n\025StreamingPullResponse\022<\n\021receive" + + "d_messages\030\001 \003(\0132!.google.pubsub.v1.Rece" + + "ivedMessage\"\257\001\n\025CreateSnapshotRequest\022\014\n" + + "\004name\030\001 \001(\t\022\024\n\014subscription\030\002 \001(\t\022C\n\006lab" + + "els\030\003 \003(\01323.google.pubsub.v1.CreateSnaps" + + "hotRequest.LabelsEntry\032-\n\013LabelsEntry\022\013\n" + + "\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"v\n\025UpdateS" + + "napshotRequest\022,\n\010snapshot\030\001 \001(\0132\032.googl" + + "e.pubsub.v1.Snapshot\022/\n\013update_mask\030\002 \001(" + + "\0132\032.google.protobuf.FieldMask\"\277\001\n\010Snapsh" + + "ot\022\014\n\004name\030\001 \001(\t\022\r\n\005topic\030\002 \001(\t\022/\n\013expir" + + "e_time\030\003 \001(\0132\032.google.protobuf.Timestamp" + + "\0226\n\006labels\030\004 \003(\0132&.google.pubsub.v1.Snap" + + "shot.LabelsEntry\032-\n\013LabelsEntry\022\013\n\003key\030\001" + + " \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"&\n\022GetSnapshotRe" + + "quest\022\020\n\010snapshot\030\001 \001(\t\"N\n\024ListSnapshots" + + "Request\022\017\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 " + + "\001(\005\022\022\n\npage_token\030\003 \001(\t\"_\n\025ListSnapshots" + + "Response\022-\n\tsnapshots\030\001 \003(\0132\032.google.pub" + + "sub.v1.Snapshot\022\027\n\017next_page_token\030\002 \001(\t" + + "\")\n\025DeleteSnapshotRequest\022\020\n\010snapshot\030\001 " + + "\001(\t\"m\n\013SeekRequest\022\024\n\014subscription\030\001 \001(\t" + + "\022*\n\004time\030\002 \001(\0132\032.google.protobuf.Timesta" + + "mpH\000\022\022\n\010snapshot\030\003 \001(\tH\000B\010\n\006target\"\016\n\014Se" + + "ekResponse2\277\010\n\tPublisher\022j\n\013CreateTopic\022" + + "\027.google.pubsub.v1.Topic\032\027.google.pubsub" + + ".v1.Topic\")\202\323\344\223\002#\032\036/v1/{name=projects/*/" + + "topics/*}:\001*\022}\n\013UpdateTopic\022$.google.pub" + + "sub.v1.UpdateTopicRequest\032\027.google.pubsu" + + "b.v1.Topic\"/\202\323\344\223\002)2$/v1/{topic.name=proj" + + "ects/*/topics/*}:\001*\022\202\001\n\007Publish\022 .google" + + ".pubsub.v1.PublishRequest\032!.google.pubsu" + + "b.v1.PublishResponse\"2\202\323\344\223\002,\"\'/v1/{topic" + + "=projects/*/topics/*}:publish:\001*\022o\n\010GetT" + + "opic\022!.google.pubsub.v1.GetTopicRequest\032" + + "\027.google.pubsub.v1.Topic\"\'\202\323\344\223\002!\022\037/v1/{t" + + "opic=projects/*/topics/*}\022\200\001\n\nListTopics" + + "\022#.google.pubsub.v1.ListTopicsRequest\032$." + + "google.pubsub.v1.ListTopicsResponse\"\'\202\323\344" + + "\223\002!\022\037/v1/{project=projects/*}/topics\022\262\001\n" + + "\026ListTopicSubscriptions\022/.google.pubsub." + + "v1.ListTopicSubscriptionsRequest\0320.googl" + + "e.pubsub.v1.ListTopicSubscriptionsRespon" + + "se\"5\202\323\344\223\002/\022-/v1/{topic=projects/*/topics" + + "/*}/subscriptions\022\242\001\n\022ListTopicSnapshots" + + "\022+.google.pubsub.v1.ListTopicSnapshotsRe" + + "quest\032,.google.pubsub.v1.ListTopicSnapsh" + + "otsResponse\"1\202\323\344\223\002+\022)/v1/{topic=projects" + + "/*/topics/*}/snapshots\022t\n\013DeleteTopic\022$." + + "google.pubsub.v1.DeleteTopicRequest\032\026.go" + + "ogle.protobuf.Empty\"\'\202\323\344\223\002!*\037/v1/{topic=" + + "projects/*/topics/*}2\371\021\n\nSubscriber\022\206\001\n\022" + + "CreateSubscription\022\036.google.pubsub.v1.Su" + + "bscription\032\036.google.pubsub.v1.Subscripti" + + "on\"0\202\323\344\223\002*\032%/v1/{name=projects/*/subscri" + + "ptions/*}:\001*\022\222\001\n\017GetSubscription\022(.googl" + + "e.pubsub.v1.GetSubscriptionRequest\032\036.goo" + + "gle.pubsub.v1.Subscription\"5\202\323\344\223\002/\022-/v1/" + + "{subscription=projects/*/subscriptions/*" + + "}\022\240\001\n\022UpdateSubscription\022+.google.pubsub" + + ".v1.UpdateSubscriptionRequest\032\036.google.p" + + "ubsub.v1.Subscription\"=\202\323\344\223\002722/v1/{subs" + + "cription.name=projects/*/subscriptions/*" + + "}:\001*\022\234\001\n\021ListSubscriptions\022*.google.pubs" + + "ub.v1.ListSubscriptionsRequest\032+.google." + + "pubsub.v1.ListSubscriptionsResponse\".\202\323\344" + + "\223\002(\022&/v1/{project=projects/*}/subscripti" + + "ons\022\220\001\n\022DeleteSubscription\022+.google.pubs" + + "ub.v1.DeleteSubscriptionRequest\032\026.google" + + ".protobuf.Empty\"5\202\323\344\223\002/*-/v1/{subscripti" + + "on=projects/*/subscriptions/*}\022\243\001\n\021Modif" + + "yAckDeadline\022*.google.pubsub.v1.ModifyAc" + + "kDeadlineRequest\032\026.google.protobuf.Empty" + + "\"J\202\323\344\223\002D\"?/v1/{subscription=projects/*/s" + + "ubscriptions/*}:modifyAckDeadline:\001*\022\221\001\n" + + "\013Acknowledge\022$.google.pubsub.v1.Acknowle" + + "dgeRequest\032\026.google.protobuf.Empty\"D\202\323\344\223" + + "\002>\"9/v1/{subscription=projects/*/subscri" + + "ptions/*}:acknowledge:\001*\022\204\001\n\004Pull\022\035.goog" + + "le.pubsub.v1.PullRequest\032\036.google.pubsub" + + ".v1.PullResponse\"=\202\323\344\223\0027\"2/v1/{subscript" + + "ion=projects/*/subscriptions/*}:pull:\001*\022" + + "f\n\rStreamingPull\022&.google.pubsub.v1.Stre" + + "amingPullRequest\032\'.google.pubsub.v1.Stre" + + "amingPullResponse\"\000(\0010\001\022\240\001\n\020ModifyPushCo" + + "nfig\022).google.pubsub.v1.ModifyPushConfig" + + "Request\032\026.google.protobuf.Empty\"I\202\323\344\223\002C\"" + + ">/v1/{subscription=projects/*/subscripti" + + "ons/*}:modifyPushConfig:\001*\022~\n\013GetSnapsho" + + "t\022$.google.pubsub.v1.GetSnapshotRequest\032" + + "\032.google.pubsub.v1.Snapshot\"-\202\323\344\223\002\'\022%/v1" + + "/{snapshot=projects/*/snapshots/*}\022\214\001\n\rL" + + "istSnapshots\022&.google.pubsub.v1.ListSnap" + + "shotsRequest\032\'.google.pubsub.v1.ListSnap" + + "shotsResponse\"*\202\323\344\223\002$\022\"/v1/{project=proj" + + "ects/*}/snapshots\022\203\001\n\016CreateSnapshot\022\'.g" + + "oogle.pubsub.v1.CreateSnapshotRequest\032\032." + + "google.pubsub.v1.Snapshot\",\202\323\344\223\002&\032!/v1/{" + + "name=projects/*/snapshots/*}:\001*\022\214\001\n\016Upda" + + "teSnapshot\022\'.google.pubsub.v1.UpdateSnap" + + "shotRequest\032\032.google.pubsub.v1.Snapshot\"" + + "5\202\323\344\223\002/2*/v1/{snapshot.name=projects/*/s" + + "napshots/*}:\001*\022\200\001\n\016DeleteSnapshot\022\'.goog" + + "le.pubsub.v1.DeleteSnapshotRequest\032\026.goo" + + "gle.protobuf.Empty\"-\202\323\344\223\002\'*%/v1/{snapsho" + + "t=projects/*/snapshots/*}\022\204\001\n\004Seek\022\035.goo" + + "gle.pubsub.v1.SeekRequest\032\036.google.pubsu" + + "b.v1.SeekResponse\"=\202\323\344\223\0027\"2/v1/{subscrip" + + "tion=projects/*/subscriptions/*}:seek:\001*" + + "B\256\001\n\024com.google.pubsub.v1B\013PubsubProtoP\001" + + "Z6google.golang.org/genproto/googleapis/" + + "pubsub/v1;pubsub\370\001\001\252\002\026Google.Cloud.PubSu" + + "b.V1\312\002\026Google\\Cloud\\PubSub\\V1\352\002\031Google::" + + "Cloud::PubSub::V1b\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -443,7 +444,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_google_pubsub_v1_PubsubMessage_descriptor, new java.lang.String[] { - "Data", "Attributes", "MessageId", "PublishTime", + "Data", "Attributes", "MessageId", "PublishTime", "OrderingKey", }); internal_static_google_pubsub_v1_PubsubMessage_AttributesEntry_descriptor = internal_static_google_pubsub_v1_PubsubMessage_descriptor.getNestedTypes().get(0); @@ -554,6 +555,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( "RetainAckedMessages", "MessageRetentionDuration", "Labels", + "EnableMessageOrdering", "ExpirationPolicy", }); internal_static_google_pubsub_v1_Subscription_LabelsEntry_descriptor = diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java index efb2168d1aa7..9b73353c4e84 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java @@ -27,6 +27,7 @@ private Subscription() { topic_ = ""; ackDeadlineSeconds_ = 0; retainAckedMessages_ = false; + enableMessageOrdering_ = false; } @java.lang.Override @@ -120,6 +121,11 @@ private Subscription( labels_.getMutableMap().put(labels__.getKey(), labels__.getValue()); break; } + case 80: + { + enableMessageOrdering_ = input.readBool(); + break; + } case 90: { com.google.pubsub.v1.ExpirationPolicy.Builder subBuilder = null; @@ -546,6 +552,26 @@ public java.lang.String getLabelsOrThrow(java.lang.String key) { return map.get(key); } + public static final int ENABLE_MESSAGE_ORDERING_FIELD_NUMBER = 10; + private boolean enableMessageOrdering_; + /** + * + * + *
+   * WARNING: `enable_message_ordering` is an experimental field not yet
+   * supported by the service.
+   * If true, messages published with the same `ordering_key` in `PubsubMessage`
+   * will be delivered to the subscribers in the order in which they
+   * are received by the Pub/Sub system. Otherwise, they may be delivered in
+   * any order.
+   * 
+ * + * bool enable_message_ordering = 10; + */ + public boolean getEnableMessageOrdering() { + return enableMessageOrdering_; + } + public static final int EXPIRATION_POLICY_FIELD_NUMBER = 11; private com.google.pubsub.v1.ExpirationPolicy expirationPolicy_; /** @@ -645,6 +671,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io } com.google.protobuf.GeneratedMessageV3.serializeStringMapTo( output, internalGetLabels(), LabelsDefaultEntryHolder.defaultEntry, 9); + if (enableMessageOrdering_ != false) { + output.writeBool(10, enableMessageOrdering_); + } if (expirationPolicy_ != null) { output.writeMessage(11, getExpirationPolicy()); } @@ -687,6 +716,9 @@ public int getSerializedSize() { .build(); size += com.google.protobuf.CodedOutputStream.computeMessageSize(9, labels__); } + if (enableMessageOrdering_ != false) { + size += com.google.protobuf.CodedOutputStream.computeBoolSize(10, enableMessageOrdering_); + } if (expirationPolicy_ != null) { size += com.google.protobuf.CodedOutputStream.computeMessageSize(11, getExpirationPolicy()); } @@ -719,6 +751,7 @@ public boolean equals(final java.lang.Object obj) { result = result && getMessageRetentionDuration().equals(other.getMessageRetentionDuration()); } result = result && internalGetLabels().equals(other.internalGetLabels()); + result = result && (getEnableMessageOrdering() == other.getEnableMessageOrdering()); result = result && (hasExpirationPolicy() == other.hasExpirationPolicy()); if (hasExpirationPolicy()) { result = result && getExpirationPolicy().equals(other.getExpirationPolicy()); @@ -754,6 +787,8 @@ public int hashCode() { hash = (37 * hash) + LABELS_FIELD_NUMBER; hash = (53 * hash) + internalGetLabels().hashCode(); } + hash = (37 * hash) + ENABLE_MESSAGE_ORDERING_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean(getEnableMessageOrdering()); if (hasExpirationPolicy()) { hash = (37 * hash) + EXPIRATION_POLICY_FIELD_NUMBER; hash = (53 * hash) + getExpirationPolicy().hashCode(); @@ -943,6 +978,8 @@ public Builder clear() { messageRetentionDurationBuilder_ = null; } internalGetMutableLabels().clear(); + enableMessageOrdering_ = false; + if (expirationPolicyBuilder_ == null) { expirationPolicy_ = null; } else { @@ -993,6 +1030,7 @@ public com.google.pubsub.v1.Subscription buildPartial() { } result.labels_ = internalGetLabels(); result.labels_.makeImmutable(); + result.enableMessageOrdering_ = enableMessageOrdering_; if (expirationPolicyBuilder_ == null) { result.expirationPolicy_ = expirationPolicy_; } else { @@ -1069,6 +1107,9 @@ public Builder mergeFrom(com.google.pubsub.v1.Subscription other) { mergeMessageRetentionDuration(other.getMessageRetentionDuration()); } internalGetMutableLabels().mergeFrom(other.internalGetLabels()); + if (other.getEnableMessageOrdering() != false) { + setEnableMessageOrdering(other.getEnableMessageOrdering()); + } if (other.hasExpirationPolicy()) { mergeExpirationPolicy(other.getExpirationPolicy()); } @@ -2106,6 +2147,65 @@ public Builder putAllLabels(java.util.Map va return this; } + private boolean enableMessageOrdering_; + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ + public boolean getEnableMessageOrdering() { + return enableMessageOrdering_; + } + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ + public Builder setEnableMessageOrdering(boolean value) { + + enableMessageOrdering_ = value; + onChanged(); + return this; + } + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ + public Builder clearEnableMessageOrdering() { + + enableMessageOrdering_ = false; + onChanged(); + return this; + } + private com.google.pubsub.v1.ExpirationPolicy expirationPolicy_ = null; private com.google.protobuf.SingleFieldBuilderV3< com.google.pubsub.v1.ExpirationPolicy, diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java index dc06e7fb432b..8a97ea0bd8a7 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java @@ -264,6 +264,22 @@ public interface SubscriptionOrBuilder */ java.lang.String getLabelsOrThrow(java.lang.String key); + /** + * + * + *
+   * WARNING: `enable_message_ordering` is an experimental field not yet
+   * supported by the service.
+   * If true, messages published with the same `ordering_key` in `PubsubMessage`
+   * will be delivered to the subscribers in the order in which they
+   * are received by the Pub/Sub system. Otherwise, they may be delivered in
+   * any order.
+   * 
+ * + * bool enable_message_ordering = 10; + */ + boolean getEnableMessageOrdering(); + /** * * diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto index 02d0bf34b3ee..1dc8cd301508 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto @@ -399,6 +399,15 @@ message PubsubMessage { // it receives the `Publish` call. It must not be populated by the // publisher in a `Publish` call. google.protobuf.Timestamp publish_time = 4; + + // WARNING: `ordering_key` is an experimental field not yet + // supported by the service. + // + // Identifies related messages for which publish order should be respected. + // If a `Subscription` has `enable_message_ordering` set to `true`, messages + // published with the same `ordering_key` value will be delivered to + // subscribers in the order in which they are received by the Pub/Sub system. + string ordering_key = 5; } // Request for the GetTopic method. @@ -599,6 +608,15 @@ message Subscription { // managing labels. map labels = 9; + // WARNING: `enable_message_ordering` is an experimental field not yet + // supported by the service. + // + // If true, messages published with the same `ordering_key` in `PubsubMessage` + // will be delivered to the subscribers in the order in which they + // are received by the Pub/Sub system. Otherwise, they may be delivered in + // any order. + bool enable_message_ordering = 10; + // A policy that specifies the conditions for this subscription's expiration. // A subscription is considered active as long as any connected subscriber is // successfully consuming messages from the subscription or is issuing diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 557a483073de..3673efd6a593 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -45,9 +45,14 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -81,16 +86,16 @@ public class Publisher { private final String topicName; private final BatchingSettings batchingSettings; + private final boolean enableMessageOrdering; private final Lock messagesBatchLock; - private List messagesBatch; - private int batchedBytes; - + private final Map messagesBatches; private final AtomicBoolean activeAlarm; private final PublisherStub publisherStub; private final ScheduledExecutorService executor; + private final SequentialExecutorService sequentialExecutor; private final AtomicBoolean shutdown; private final List closeables; private final MessageWaiter messagesWaiter; @@ -110,11 +115,13 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.enableMessageOrdering = builder.enableMessageOrdering; - messagesBatch = new LinkedList<>(); + messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); + sequentialExecutor = new SequentialExecutorService<>(executor); if (builder.executorProvider.shouldAutoClose()) { closeables = Collections.singletonList(new ExecutorAsBackgroundResource(executor)); @@ -124,9 +131,31 @@ private Publisher(Builder builder) throws IOException { // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. // We post-process this here to keep backward-compatibility. - RetrySettings retrySettings = builder.retrySettings; - if (retrySettings.getMaxAttempts() == 0) { - retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); + // Also, if "message ordering" is enabled, the publisher should retry sending the failed + // message infinitely rather than sending the next one. + RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder(); + if (retrySettingsBuilder.getMaxAttempts() == 0) { + retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE); + } + if (enableMessageOrdering) { + retrySettingsBuilder + .setMaxAttempts(Integer.MAX_VALUE) + .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); + } + + Set retryCodes; + if (enableMessageOrdering) { + retryCodes = EnumSet.allOf(StatusCode.Code.class); + } else { + retryCodes = + EnumSet.of( + StatusCode.Code.ABORTED, + StatusCode.Code.CANCELLED, + StatusCode.Code.DEADLINE_EXCEEDED, + StatusCode.Code.INTERNAL, + StatusCode.Code.RESOURCE_EXHAUSTED, + StatusCode.Code.UNKNOWN, + StatusCode.Code.UNAVAILABLE); } PublisherStubSettings.Builder stubSettings = @@ -136,15 +165,8 @@ private Publisher(Builder builder) throws IOException { .setTransportChannelProvider(builder.channelProvider); stubSettings .publishSettings() - .setRetryableCodes( - StatusCode.Code.ABORTED, - StatusCode.Code.CANCELLED, - StatusCode.Code.DEADLINE_EXCEEDED, - StatusCode.Code.INTERNAL, - StatusCode.Code.RESOURCE_EXHAUSTED, - StatusCode.Code.UNKNOWN, - StatusCode.Code.UNAVAILABLE) - .setRetrySettings(retrySettings) + .setRetryableCodes(retryCodes) + .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); @@ -192,6 +214,12 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } + final String orderingKey = message.getOrderingKey(); + if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) { + throw new IllegalStateException( + "Cannot publish a message with an ordering key when message ordeirng is not enabled."); + } + final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; SettableApiFuture publishResult = SettableApiFuture.create(); @@ -199,35 +227,38 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.lock(); try { // Check if the next message makes the current batch exceed the max batch byte size. - if (!messagesBatch.isEmpty() + MessagesBatch batch = messagesBatches.get(orderingKey); + if (batch == null) { + batch = new MessagesBatch(orderingKey); + messagesBatches.put(orderingKey, batch); + } + if (!batch.isEmpty() && hasBatchingBytes() - && batchedBytes + messageSize >= getMaxBatchBytes()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + && batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { + batchToSend = batch.popOutstandingBatch(); } // Border case if the message to send is greater or equals to the max batch size then can't // be included in the current batch and instead sent immediately. if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) { - batchedBytes += messageSize; - messagesBatch.add(outstandingPublish); - + batch.addMessage(outstandingPublish, messageSize); // If after adding the message we have reached the batch max messages then we have a batch // to send. - if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { + batchToSend = batch.popOutstandingBatch(); } } + // Setup the next duration based delivery alarm if there are messages batched. - if (!messagesBatch.isEmpty()) { + if (!batch.isEmpty()) { setupDurationBasedPublishAlarm(); - } else if (currentAlarmFuture != null) { - logger.log(Level.FINER, "Cancelling alarm, no more messages"); - if (activeAlarm.getAndSet(false)) { - currentAlarmFuture.cancel(false); + } else { + messagesBatches.remove(orderingKey); + if (currentAlarmFuture != null) { + logger.log(Level.FINER, "Cancelling alarm, no more messages"); + if (activeAlarm.getAndSet(false)) { + currentAlarmFuture.cancel(false); + } } } } finally { @@ -238,14 +269,8 @@ && hasBatchingBytes() if (batchToSend != null) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - final OutstandingBatch finalBatchToSend = batchToSend; - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(finalBatchToSend); - } - }); + publishOutstandingBatch(batchToSend); + publishAllOutstanding(); } // If the message is over the size limit, it was not added to the pending messages and it will @@ -253,14 +278,9 @@ public void run() { if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) { logger.log( Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send."); - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch( - new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize)); - } - }); + publishOutstandingBatch( + new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey)); + publishAllOutstanding(); } return publishResult; @@ -292,29 +312,33 @@ public void run() { */ public void publishAllOutstanding() { messagesBatchLock.lock(); - OutstandingBatch batchToSend; try { - if (messagesBatch.isEmpty()) { - return; + for (MessagesBatch batch : messagesBatches.values()) { + if (!batch.isEmpty()) { + // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If + // it's released, the order of publishing cannot be guaranteed if `publish()` is called + // while this function is running. This locking mechanism needs to be improved if it + // causes any performance degradation. + publishOutstandingBatch(batch.popOutstandingBatch()); + } } - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + messagesBatches.clear(); } finally { messagesBatchLock.unlock(); } - publishOutstandingBatch(batchToSend); } - private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { + private ApiFuture publishCall(final OutstandingBatch outstandingBatch) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); publishRequest.setTopic(topicName); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } + return publisherStub.publishCallable().futureCall(publishRequest.build()); + } - ApiFutures.addCallback( - publisherStub.publishCallable().futureCall(publishRequest.build()), + private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -353,7 +377,28 @@ public void onFailure(Throwable t) { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } } - }); + }; + + if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) { + // If ordering key is empty, publish the batch using the normal executor. + Runnable task = + new Runnable() { + public void run() { + ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback); + } + }; + executor.execute(task); + } else { + // If ordering key is specified, publish the batch using the sequential executor. + Callable func = + new Callable() { + public ApiFuture call() { + return publishCall(outstandingBatch); + } + }; + ApiFutures.addCallback( + sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback); + } } private static final class OutstandingBatch { @@ -361,12 +406,15 @@ private static final class OutstandingBatch { final long creationTime; int attempt; int batchSizeBytes; + final String orderingKey; - OutstandingBatch(List outstandingPublishes, int batchSizeBytes) { + OutstandingBatch( + List outstandingPublishes, int batchSizeBytes, String orderingKey) { this.outstandingPublishes = outstandingPublishes; attempt = 1; creationTime = System.currentTimeMillis(); this.batchSizeBytes = batchSizeBytes; + this.orderingKey = orderingKey; } public int getAttempt() { @@ -504,7 +552,7 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - + static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -518,6 +566,8 @@ public static final class Builder { RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; + TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -604,6 +654,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } + /** Sets the message ordering option. */ + public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + return this; + } + /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -614,4 +670,42 @@ public Publisher build() throws IOException { return new Publisher(this); } } + + private static class MessagesBatch { + private List messages = new LinkedList<>(); + private int batchedBytes; + private String orderingKey; + + public MessagesBatch(String orderingKey) { + this.orderingKey = orderingKey; + } + + public OutstandingBatch popOutstandingBatch() { + OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); + reset(); + return batch; + } + + private void reset() { + messages = new LinkedList<>(); + batchedBytes = 0; + } + + public boolean isEmpty() { + return messages.isEmpty(); + } + + public int getBatchedBytes() { + return batchedBytes; + } + + public void addMessage(OutstandingPublish message, int messageSize) { + messages.add(message); + batchedBytes += messageSize; + } + + public int getMessagesCount() { + return messages.size(); + } + }; } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 396b5d05bd5f..620a09ac98bc 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud @@ -33,6 +34,8 @@ class FakePublisherServiceImpl extends PublisherImplBase { private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + private boolean autoPublishResponse; /** Class used to save the state of a possible response. */ private static class Response { @@ -75,7 +78,15 @@ public void publish(PublishRequest request, StreamObserver resp requests.add(request); Response response; try { - response = publishResponses.take(); + if (autoPublishResponse) { + PublishResponse.Builder builder = PublishResponse.newBuilder(); + for (int i = 0; i < request.getMessagesCount(); i++) { + builder.addMessageIds(Integer.toString(nextMessageId.getAndIncrement())); + } + response = new Response(builder.build()); + } else { + response = publishResponses.take(); + } } catch (InterruptedException e) { throw new IllegalArgumentException(e); } @@ -87,6 +98,15 @@ public void publish(PublishRequest request, StreamObserver resp } } + /** + * If enabled, PublishResponse is generated with a unique message id automatically when publish() + * is called. + */ + public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishResponse) { + this.autoPublishResponse = autoPublishResponse; + return this; + } + public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { publishResponses.add(new Response(publishResponse)); return this; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 2901ae4b0a9d..3d0557e97719 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -35,12 +35,14 @@ import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -241,6 +243,152 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { + // Limit the number of maximum elements in a single batch to 3. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 3. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // One of the batches reaches the limit. + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { + // Limit the batching timeout to 100 seconds. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 10 and timeout has not + // been expired. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // The timeout expires. + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testOrderingKeyWhenDisabled_throwsException() throws Exception { + // Message ordering is disabled by default. + Publisher publisher = getTestPublisherBuilder().build(); + try { + ApiFuture publishFuture = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + fail("Should have thrown an IllegalStateException"); + } catch (IllegalStateException expected) { + // expected + } + publisher.shutdown(); + } + + @Test + public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { + // Set maxAttempts to 1 and enableMessageOrdering to true at the same time. + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(1) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Although maxAttempts is 1, the publisher will retry until it succeeds since + // enableMessageOrdering is true. + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + assertEquals("1", publishFuture1.get()); + + assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); + publisher.shutdown(); + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher =