Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Cloud Pub/Sub ordering key #2863 #2864

Merged
merged 6 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ private[pubsub] trait PubSubApi {
fields.get("data").map(_.convertTo[String]),
fields.get("attributes").map(_.convertTo[Map[String, String]]),
fields("messageId").convertTo[String],
fields("publishTime").convertTo[Instant]
fields("publishTime").convertTo[Instant],
fields.get("orderingKey").map(_.convertTo[String])
)
}
override def write(m: PubSubMessage): JsValue =
Expand All @@ -85,6 +86,7 @@ private[pubsub] trait PubSubApi {
"publishTime" -> m.publishTime.toJson
)
++ m.data.map(data => "data" -> data.toJson)
++ m.orderingKey.map(orderingKey => "orderingKey" -> orderingKey.toJson)
++ m.attributes.map(attributes => "attributes" -> attributes.toJson): _*
)
}
Expand All @@ -93,10 +95,15 @@ private[pubsub] trait PubSubApi {
def read(json: JsValue): PublishMessage = {
val data = json.asJsObject.fields("data").convertTo[String]
val attributes = json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]]
PublishMessage(data, attributes)
val orderingKey = json.asJsObject.fields.get("orderingKey").map(_.convertTo[String])
PublishMessage(data, Some(attributes), orderingKey)
}
def write(m: PublishMessage): JsValue =
JsObject(Seq("data" -> JsString(m.data)) ++ m.attributes.map(a => "attributes" -> a.toJson): _*)
JsObject(
Seq("data" -> JsString(m.data)) ++
m.orderingKey.map(orderingKey => "orderingKey" -> orderingKey.toJson) ++
m.attributes.map(a => "attributes" -> a.toJson): _*
)
}

private implicit val pubSubRequestFormat = new RootJsonFormat[PublishRequest] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,44 @@ object PubSubConfig {
apply(projectId, clientEmail, privateKey, pullReturnImmediately, pullMaxMessagesPerInternalBatch)(actorSystem)
}

final class PublishMessage private (val data: String, val attributes: Option[immutable.Map[String, String]]) {
override def toString: String = "PublishMessage(data=" + data + ",attributes=" + attributes.toString + ")"
final class PublishMessage private (val data: String,
val attributes: Option[immutable.Map[String, String]],
val orderingKey: Option[String]) {
def this(data: String, attributes: Option[immutable.Map[String, String]]) = this(data, attributes, None)

override def toString: String =
"PublishMessage(data=" + data + ",attributes=" + attributes.toString + ",orderingKey=" + orderingKey + ")"

override def equals(other: Any): Boolean = other match {
case that: PublishMessage => data == that.data && attributes == that.attributes
case that: PublishMessage => data == that.data && attributes == that.attributes && orderingKey == that.orderingKey
case _ => false
}

override def hashCode: Int = java.util.Objects.hash(data, attributes)
override def hashCode: Int = java.util.Objects.hash(data, attributes, orderingKey)
}

object PublishMessage {
def apply(data: String, attributes: immutable.Map[String, String]) = new PublishMessage(data, Some(attributes))
def apply(data: String, attributes: Option[immutable.Map[String, String]]) = new PublishMessage(data, attributes)
def apply(data: String) = new PublishMessage(data, None)
def create(data: String) = new PublishMessage(data, None)
def apply(data: String, attributes: immutable.Map[String, String]) = new PublishMessage(data, Some(attributes), None)
def apply(data: String, attributes: Option[immutable.Map[String, String]], orderingKey: Option[String]) =
new PublishMessage(data, attributes, orderingKey)
def apply(data: String, attributes: Option[immutable.Map[String, String]]) =
new PublishMessage(data, attributes, None)
def apply(data: String) = new PublishMessage(data, None, None)
def create(data: String) = new PublishMessage(data, None, None)

/**
* Java API
*/
def create(data: String, attributes: java.util.Map[String, String]) =
new PublishMessage(data, Some(attributes.asScala.toMap))
def create(data: String, attributes: java.util.Map[String, String]): PublishMessage =
create(data, attributes, java.util.Optional.empty())

/**
* Java API with ordering key
*/
def create(data: String,
attributes: java.util.Map[String, String],
orderingKey: java.util.Optional[String]): PublishMessage =
new PublishMessage(data, Some(attributes.asScala.toMap), Option(orderingKey.orElse(null)))
}

/**
Expand All @@ -147,48 +163,77 @@ object PublishMessage {
* @param attributes attributes for this message, if not present, data can't be empty
* @param messageId the message id given by server.
* @param publishTime the time the message was published.
* @param orderingKey if non-empty, identifies related messages for which publish order should be respected
*/
final class PubSubMessage private (val data: Option[String],
val attributes: Option[immutable.Map[String, String]],
val messageId: String,
val publishTime: Instant) {
val publishTime: Instant,
val orderingKey: Option[String]) {

def this(data: Option[String],
attributes: Option[immutable.Map[String, String]],
messageId: String,
publishTime: Instant) = this(data, attributes, messageId, publishTime, None)

def withAttributes(attributes: java.util.Map[String, String]): PubSubMessage =
new PubSubMessage(data, Some(attributes.asScala.toMap), messageId, publishTime)
new PubSubMessage(data, Some(attributes.asScala.toMap), messageId, publishTime, orderingKey)

def withData(data: String): PubSubMessage =
new PubSubMessage(Some(data), attributes, messageId, publishTime)
new PubSubMessage(Some(data), attributes, messageId, publishTime, orderingKey)

def withOrderingKey(orderingKey: String): PubSubMessage =
new PubSubMessage(data, attributes, messageId, publishTime, Some(orderingKey))

override def equals(other: Any): Boolean = other match {
case that: PubSubMessage =>
data == that.data && attributes == that.attributes && messageId == that.messageId && publishTime == that.publishTime
data == that.data && attributes == that.attributes && messageId == that.messageId && publishTime == that.publishTime && orderingKey == that.orderingKey
case _ => false
}

override def hashCode: Int = java.util.Objects.hash(data, attributes, messageId, publishTime)
override def hashCode: Int = java.util.Objects.hash(data, attributes, messageId, publishTime, orderingKey)

override def toString: String =
"PubSubMessage(data=" + data + ",attributes=" + attributes + ",messageId=" + messageId + ",publishTime=" + publishTime + ")"
"PubSubMessage(data=" + data + ",attributes=" + attributes + ",messageId=" + messageId + ",publishTime=" + publishTime + ",orderingKey=" + orderingKey + ")"
}

object PubSubMessage {

def apply(data: Option[String],
attributes: Option[immutable.Map[String, String]],
messageId: String,
publishTime: Instant) =
new PubSubMessage(data, attributes, messageId, publishTime, None)

def apply(data: Option[String] = None,
attributes: Option[immutable.Map[String, String]] = None,
messageId: String,
publishTime: Instant) = new PubSubMessage(data, attributes, messageId, publishTime)
publishTime: Instant,
orderingKey: Option[String] = None) =
new PubSubMessage(data, attributes, messageId, publishTime, orderingKey)

/**
* Java API
*/
def create(data: java.util.Optional[String],
attributes: java.util.Optional[java.util.Map[String, String]],
messageId: String,
publishTime: Instant) =
publishTime: Instant): PubSubMessage =
create(data, attributes, messageId, publishTime, java.util.Optional.empty())

/**
* Java API with ordering key
*/
def create(data: java.util.Optional[String],
attributes: java.util.Optional[java.util.Map[String, String]],
messageId: String,
publishTime: Instant,
orderingKey: java.util.Optional[String]): PubSubMessage =
new PubSubMessage(Option(data.orElse(null)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To stay source compatible for Java users, you should add an overload for create with the new orderingKey and keep the existing create with its signature. (Just as you did with PublishMessage.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Option(attributes.orElse(null)).map(_.asScala.toMap),
messageId,
publishTime)
publishTime,
Option(orderingKey.orElse(null)))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.security.spec.InvalidKeySpecException;
import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

public class ExampleUsageJava {
Expand Down Expand Up @@ -73,6 +75,19 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc
.runWith(Sink.ignore(), system);
// #publish-fast

// #publish with ordering key
PublishMessage publishMessageWithOrderingKey =
PublishMessage.create(
new String(Base64.getEncoder().encode("Hello Google!".getBytes())),
new HashMap<>(),
Optional.of("my-ordering-key"));
PublishRequest publishRequestWithOrderingKey =
PublishRequest.create(Lists.newArrayList(publishMessage));

CompletionStage<List<List<String>>> publishedMessageWithOrderingKeyIds =
source.via(publishFlow).runWith(Sink.seq(), system);
// #publish with ordering key

// #subscribe
Source<ReceivedMessage, Cancellable> subscriptionSource =
GooglePubSub.subscribe(subscription, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing {
val publishMessage3 = PublishMessage("abcde")
val publishMessage4 = PublishMessage("abcd", Map("k1" -> "v1", "k2" -> "v2"))
val publishMessage5 = PublishMessage("abcde", Map("k1" -> "v1", "k2" -> "v2"))
val publishMessage6 = PublishMessage("abcde", Some(Map("k1" -> "v1", "k2" -> "v2")), Some("qwe"))
val publishMessage7 = PublishMessage("abcde", Some(Map("k1" -> "v1", "k2" -> "v2")), None)

test("PublishMessage equals, hashCode") {

publishMessage1 shouldNot be(publishMessage2)
publishMessage1 shouldNot be(publishMessage3)
publishMessage1 shouldNot be(publishMessage4)
publishMessage1 shouldNot be(publishMessage6)
publishMessage1 shouldBe publishMessage5
publishMessage1 shouldBe publishMessage7

publishMessage1.hashCode shouldNot be(publishMessage2.hashCode)
publishMessage1.hashCode shouldNot be(publishMessage3.hashCode)
publishMessage1.hashCode shouldNot be(publishMessage4.hashCode)
publishMessage1.hashCode shouldNot be(publishMessage6.hashCode)
publishMessage1.hashCode shouldBe publishMessage5.hashCode
publishMessage1.hashCode shouldBe publishMessage7.hashCode
}

test("PublishMessage toString") {
publishMessage1.toString shouldBe
"PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)))"
"PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=None)"

publishMessage6.toString shouldBe
"PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=Some(qwe))"
}

val pubSubMessage1 = PubSubMessage(Some("data"), Some(Map("k1" -> "v1")), "Id-1", Instant.ofEpochMilli(0L))
Expand All @@ -43,43 +52,54 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing {
val pubSubMessage4 = PubSubMessage(Some("data"), Some(Map("k1" -> "v1")), "Id-2", Instant.ofEpochMilli(0L))
val pubSubMessage5 = PubSubMessage(Some("data"), Some(Map("k1" -> "v1")), "Id-1", Instant.ofEpochMilli(1L))
val pubSubMessage6 = PubSubMessage(Some("data"), Some(Map("k1" -> "v1")), "Id-1", Instant.ofEpochMilli(0L))
val pubSubMessage7 =
PubSubMessage(Some("data"), Some(Map("k1" -> "v1")), "Id-1", Instant.ofEpochMilli(0L), Some("qwe"))

test("PubSubMessage equals, hashCode") {
pubSubMessage1 shouldNot be(pubSubMessage2)
pubSubMessage1 shouldNot be(pubSubMessage3)
pubSubMessage1 shouldNot be(pubSubMessage4)
pubSubMessage1 shouldNot be(pubSubMessage5)
pubSubMessage1 shouldNot be(pubSubMessage7)
pubSubMessage1 shouldBe pubSubMessage6

pubSubMessage1.hashCode shouldNot be(pubSubMessage2.hashCode)
pubSubMessage1.hashCode shouldNot be(pubSubMessage3.hashCode)
pubSubMessage1.hashCode shouldNot be(pubSubMessage4.hashCode)
pubSubMessage1.hashCode shouldNot be(pubSubMessage5.hashCode)
pubSubMessage1.hashCode shouldNot be(pubSubMessage7.hashCode)
pubSubMessage1.hashCode shouldBe pubSubMessage6.hashCode
}

test("PubSubMessage toString") {
pubSubMessage1.toString shouldBe ("PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z)")
pubSubMessage1.toString shouldBe "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None)"
pubSubMessage7.toString shouldBe "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe))"
}

val receivedMessage1 = ReceivedMessage("1", pubSubMessage1)
val receivedMessage2 = ReceivedMessage("2", pubSubMessage1)
val receivedMessage3 = ReceivedMessage("1", pubSubMessage2)
val receivedMessage4 = ReceivedMessage("1", pubSubMessage1)
val receivedMessage5 = ReceivedMessage("2", pubSubMessage7)

test("ReceivedMessage equals, hashCode") {
receivedMessage1 shouldNot be(receivedMessage2)
receivedMessage1 shouldNot be(receivedMessage3)
receivedMessage1 shouldNot be(receivedMessage5)
receivedMessage1 shouldBe receivedMessage4

receivedMessage1.hashCode shouldNot be(receivedMessage2.hashCode)
receivedMessage1.hashCode shouldNot be(receivedMessage3.hashCode)
receivedMessage1.hashCode shouldNot be(receivedMessage5.hashCode)
receivedMessage1.hashCode shouldBe receivedMessage4.hashCode
}

test("ReceivedMessage toString") {
receivedMessage1.toString shouldBe
"ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z))"
"ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None))"

receivedMessage5.toString shouldBe
"ReceivedMessage(ackId=2,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe)))"
}

val acknowledgeRequest1 = AcknowledgeRequest()
Expand Down Expand Up @@ -107,17 +127,21 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing {
val publishRequest1 = PublishRequest(Seq(publishMessage1))
val publishRequest2 = PublishRequest(Seq(publishMessage2))
val publishRequest3 = PublishRequest(Seq(publishMessage1))
val publishRequest4 = PublishRequest(Seq(publishMessage6))

test("PublishRequest equals, hashCode") {
publishRequest1 shouldNot be(publishRequest2)
publishRequest1 shouldNot be(publishRequest4)
publishRequest1 shouldBe publishRequest3

publishRequest1.hashCode shouldNot be(publishRequest2.hashCode)
publishRequest1.hashCode shouldNot be(publishRequest4.hashCode)
publishRequest1.hashCode shouldBe publishRequest3.hashCode
}

test("PublishRequest toString") {
publishRequest1.toString shouldBe "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)))])"
publishRequest1.toString shouldBe "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=None)])"
publishRequest4.toString shouldBe "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=Some(qwe))])"
}

private val publishResponse1 = PublishResponse(Seq.empty[String])
Expand Down Expand Up @@ -145,6 +169,7 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing {
private val pullResponse1 = PullResponse(Some(Seq(receivedMessage1)))
private val pullResponse2 = PullResponse(None)
private val pullResponse3 = PullResponse(Some(Seq(receivedMessage1)))
private val pullResponse4 = PullResponse(Some(Seq(receivedMessage5)))

test("PullResponse equals, hashCode") {
pullResponse1 shouldNot be(pullResponse2)
Expand All @@ -155,6 +180,7 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing {
}

test("PullResponse toString") {
pullResponse1.toString shouldBe "PullResponse(Some([ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z))]))"
pullResponse1.toString shouldBe "PullResponse(Some([ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None))]))"
pullResponse4.toString shouldBe "PullResponse(Some([ReceivedMessage(ackId=2,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe)))]))"
}
}
Loading