From fe1ceda2f71b7d698aecf6ef773819b9f7afdac1 Mon Sep 17 00:00:00 2001 From: Maitri Mangal Date: Tue, 31 Oct 2023 17:50:45 +0000 Subject: [PATCH 1/5] fix: concurrent modification of processing receievd messages --- .../main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 9556849bb..37e49716c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -376,7 +376,7 @@ private void notifyReceiptComplete() { } } - void processReceivedMessages(List messages) { + synchronized void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { From f579d865360647a9a6a2baf2b8837d818b50809e Mon Sep 17 00:00:00 2001 From: Maitri Mangal Date: Tue, 31 Oct 2023 19:57:50 +0000 Subject: [PATCH 2/5] Removing synchronized keyword, and making outstandingReceipts into a concurrentMap --- .../java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 37e49716c..0117bb184 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -92,8 +92,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - private final LinkedHashMap outstandingReceipts = - new LinkedHashMap(); + private final ConcurrentMap outstandingReceipts = + new ConcurrentHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -376,7 +376,7 @@ private void notifyReceiptComplete() { } } - synchronized void processReceivedMessages(List messages) { + void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { From e9b4f3fe9eb575ff12fcf2e0797b46bcaf71ee7b Mon Sep 17 00:00:00 2001 From: Maitri Mangal Date: Tue, 31 Oct 2023 20:01:46 +0000 Subject: [PATCH 3/5] Removing synchronized keyword for notifyAckSuccess and failure as well --- .../java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 0117bb184..714969589 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -411,7 +411,7 @@ void processReceivedMessages(List messages) { processBatch(outstandingBatch); } - synchronized void notifyAckSuccess(AckRequestData ackRequestData) { + void notifyAckSuccess(AckRequestData ackRequestData) { if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); @@ -437,7 +437,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) { } } - synchronized void notifyAckFailed(AckRequestData ackRequestData) { + void notifyAckFailed(AckRequestData ackRequestData) { outstandingReceipts.remove(ackRequestData.getAckId()); } From ca97554d268b0755d07666532477e48216528d0f Mon Sep 17 00:00:00 2001 From: Maitri Mangal Date: Tue, 31 Oct 2023 20:04:04 +0000 Subject: [PATCH 4/5] fixing lint --- .../main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 714969589..b257594ea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; From 267edac882a1614815b61cc6bcd382e9a356cb66 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 31 Oct 2023 20:57:43 +0000 Subject: [PATCH 5/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5d2060635..44b50a02a 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-pubsub' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-pubsub:1.125.9' +implementation 'com.google.cloud:google-cloud-pubsub:1.125.10' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.125.9" +libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.125.10" ``` @@ -409,7 +409,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsub/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-pubsub.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.125.9 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.125.10 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles