From 978c81a1c1d9edcbd4b63b5cd4bb9ffbe1a579d0 Mon Sep 17 00:00:00 2001 From: Guilherme Raduenz Date: Wed, 21 Oct 2020 19:41:11 -0300 Subject: [PATCH] timestamp from milis --- Dockerfile | 6 +++++- amqp/consumer.go | 28 +++++++++++++++++++++------- amqp/consumer_test.go | 17 +++++++++-------- amqp/producer.go | 8 +++++++- docker-compose.yml | 2 +- go.mod | 11 +++++------ go.sum | 40 ++++++++++++++++++++-------------------- 7 files changed, 68 insertions(+), 44 deletions(-) diff --git a/Dockerfile b/Dockerfile index a0599cf..704314a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.12 +FROM golang:1.15 RUN DOCKERIZE_VERSION=v0.6.1 \ && wget --no-check-certificate https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ @@ -8,6 +8,10 @@ RUN DOCKERIZE_VERSION=v0.6.1 \ WORKDIR /goevents +ADD . . + +RUN go mod download + ENTRYPOINT ["dockerize"] CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/amqp/consumer.go"] diff --git a/amqp/consumer.go b/amqp/consumer.go index a915552..c8b0396 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -174,7 +174,7 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err Id: msg.MessageId, Action: h.action, Body: msg.Body, - Timestamp: msg.Timestamp, + Timestamp: getXEpochMilli(msg), Ack: msg.Ack, Nack: msg.Nack, Reject: msg.Reject, @@ -264,13 +264,14 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int delay = h.retryDelay } + headers := msg.Headers + headers["x-retry-count"] = retryCount + 1 + headers["x-retry-max"] = h.maxRetries + headers["x-retry-delay"] = delay.String() + headers["x-action-key"] = getAction(msg) + retryMsg := amqplib.Publishing{ - Headers: amqplib.Table{ - "x-retry-count": retryCount + 1, - "x-retry-max": h.maxRetries, - "x-retry-delay": delay.String(), - "x-action-key": getAction(msg), - }, + Headers: headers, Timestamp: msg.Timestamp, DeliveryMode: msg.DeliveryMode, Body: msg.Body, @@ -591,3 +592,16 @@ func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) { return time.Duration(0), false } + +func getXEpochMilli(msg amqplib.Delivery) time.Time { + if epoch, ok := msg.Headers["x-epoch-milli"]; ok { + switch v := epoch.(type) { + case int64: + return time.Unix(0, v*int64(time.Millisecond)) + case int: + return time.Unix(0, int64(v)*int64(time.Millisecond)) + } + } + + return msg.Timestamp +} diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 17444a4..cdac5ab 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -458,10 +458,10 @@ func TestRetryMessageToTheSameQueue(t *testing.T) { defer func() { timesCalled1++ }() if timesCalled1 == 0 { - return fmt.Errorf("Error.") - } else { - return nil + return fmt.Errorf("timescalled zero") } + + return nil }, &messaging.SubscribeOptions{ RetryDelay: 100 * time.Millisecond, DelayedRetry: false, @@ -506,7 +506,7 @@ func TestActionExitsMaxRetries(t *testing.T) { // It runs once and get an error, it will try five times more until it stops. c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() - return fmt.Errorf("Error.") + return fmt.Errorf("error") }, &messaging.SubscribeOptions{ RetryDelay: 100 * time.Millisecond, DelayedRetry: false, @@ -544,7 +544,7 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) { // It runs once and get an error, it will try three times more until it stops. c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() - return fmt.Errorf("Error.") + return fmt.Errorf("error") }, &messaging.SubscribeOptions{ RetryDelay: 100 * time.Millisecond, DelayedRetry: true, @@ -583,7 +583,7 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) { // It runs once and get an error, it will try three times more until it stops. c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() - return fmt.Errorf("Error.") + return fmt.Errorf("error") }, &messaging.SubscribeOptions{ RetryDelay: 100 * time.Millisecond, DelayedRetry: true, @@ -628,7 +628,7 @@ func TestActionRetryTimeout(t *testing.T) { defer func() { myActionTimesCalled++ }() - return fmt.Errorf("Error.") + return fmt.Errorf("error") }, &messaging.SubscribeOptions{ RetryDelay: 300 * time.Millisecond, DelayedRetry: true, @@ -826,7 +826,8 @@ func TestCallEventNackMethod(t *testing.T) { c.Subscribe("multi", func(e messaging.Event) error { e.Manual() - count += 1 + count++ + if count == 3 { e.Ack(false) } else { diff --git a/amqp/producer.go b/amqp/producer.go index f706383..25ba8fd 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -10,6 +10,7 @@ import ( "github.com/eventials/goevents/messaging" log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" amqplib "github.com/streadway/amqp" ) @@ -86,11 +87,16 @@ func (p *producer) Publish(action string, data []byte) { messageID, _ := NewUUIDv4() + now := time.Now().UTC() + p.publishAmqMessage(action, amqplib.Publishing{ MessageId: messageID, DeliveryMode: amqplib.Persistent, - Timestamp: time.Now().UTC(), + Timestamp: now, Body: data, + Headers: amqp.Table{ + "x-epoch-milli": now.Unix() / int64(time.Millisecond), + }, }) } diff --git a/docker-compose.yml b/docker-compose.yml index d68328c..10b88fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,6 @@ services: - .:/goevents broker: - image: rabbitmq:3.6-management + image: rabbitmq:3.8-management ports: - "15672:15672" diff --git a/go.mod b/go.mod index b46fb8c..aad0ab1 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,10 @@ module github.com/eventials/goevents -go 1.12 +go 1.15 require ( - github.com/aws/aws-sdk-go v1.19.21 - github.com/sirupsen/logrus v1.4.1 - github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 - github.com/stretchr/testify v1.3.0 - golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 // indirect + github.com/aws/aws-sdk-go v1.35.12 + github.com/sirupsen/logrus v1.7.0 + github.com/streadway/amqp v1.0.0 + github.com/stretchr/testify v1.6.1 ) diff --git a/go.sum b/go.sum index c652294..0610c54 100644 --- a/go.sum +++ b/go.sum @@ -1,30 +1,30 @@ -github.com/aws/aws-sdk-go v1.19.21 h1:xLaPxl8gy0ZSXbc13jsCKIaHD6NiX+2tAQodPSEL5r8= -github.com/aws/aws-sdk-go v1.19.21/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.35.12 h1:qpxQ/DXfgsTNSYn8mUaCgQiJkCjBP8iHKw5ju+wkucU= +github.com/aws/aws-sdk-go v1.35.12/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= -github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= -github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 h1:KaQtG+aDELoNmXYas3TVkGNYRuq8JQ1aa7LJt8EXVyo= -golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=