Skip to content

Commit

Permalink
timestamp from milis
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilherme Raduenz committed Oct 21, 2020
1 parent bdc4f9f commit 978c81a
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 44 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12
FROM golang:1.15

RUN DOCKERIZE_VERSION=v0.6.1 \
&& wget --no-check-certificate https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
Expand All @@ -8,6 +8,10 @@ RUN DOCKERIZE_VERSION=v0.6.1 \

WORKDIR /goevents

ADD . .

RUN go mod download

ENTRYPOINT ["dockerize"]

CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/amqp/consumer.go"]
28 changes: 21 additions & 7 deletions amqp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
Id: msg.MessageId,
Action: h.action,
Body: msg.Body,
Timestamp: msg.Timestamp,
Timestamp: getXEpochMilli(msg),
Ack: msg.Ack,
Nack: msg.Nack,
Reject: msg.Reject,
Expand Down Expand Up @@ -264,13 +264,14 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
delay = h.retryDelay
}

headers := msg.Headers
headers["x-retry-count"] = retryCount + 1
headers["x-retry-max"] = h.maxRetries
headers["x-retry-delay"] = delay.String()
headers["x-action-key"] = getAction(msg)

retryMsg := amqplib.Publishing{
Headers: amqplib.Table{
"x-retry-count": retryCount + 1,
"x-retry-max": h.maxRetries,
"x-retry-delay": delay.String(),
"x-action-key": getAction(msg),
},
Headers: headers,
Timestamp: msg.Timestamp,
DeliveryMode: msg.DeliveryMode,
Body: msg.Body,
Expand Down Expand Up @@ -591,3 +592,16 @@ func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {

return time.Duration(0), false
}

func getXEpochMilli(msg amqplib.Delivery) time.Time {
if epoch, ok := msg.Headers["x-epoch-milli"]; ok {
switch v := epoch.(type) {
case int64:
return time.Unix(0, v*int64(time.Millisecond))
case int:
return time.Unix(0, int64(v)*int64(time.Millisecond))
}
}

return msg.Timestamp
}
17 changes: 9 additions & 8 deletions amqp/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,10 @@ func TestRetryMessageToTheSameQueue(t *testing.T) {
defer func() { timesCalled1++ }()

if timesCalled1 == 0 {
return fmt.Errorf("Error.")
} else {
return nil
return fmt.Errorf("timescalled zero")
}

return nil
}, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestActionExitsMaxRetries(t *testing.T) {
// It runs once and get an error, it will try five times more until it stops.
c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }()
return fmt.Errorf("Error.")
return fmt.Errorf("error")
}, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) {
// It runs once and get an error, it will try three times more until it stops.
c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }()
return fmt.Errorf("Error.")
return fmt.Errorf("error")
}, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: true,
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) {
// It runs once and get an error, it will try three times more until it stops.
c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }()
return fmt.Errorf("Error.")
return fmt.Errorf("error")
}, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: true,
Expand Down Expand Up @@ -628,7 +628,7 @@ func TestActionRetryTimeout(t *testing.T) {
defer func() {
myActionTimesCalled++
}()
return fmt.Errorf("Error.")
return fmt.Errorf("error")
}, &messaging.SubscribeOptions{
RetryDelay: 300 * time.Millisecond,
DelayedRetry: true,
Expand Down Expand Up @@ -826,7 +826,8 @@ func TestCallEventNackMethod(t *testing.T) {
c.Subscribe("multi", func(e messaging.Event) error {
e.Manual()

count += 1
count++

if count == 3 {
e.Ack(false)
} else {
Expand Down
8 changes: 7 additions & 1 deletion amqp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/eventials/goevents/messaging"

log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
amqplib "github.com/streadway/amqp"
)

Expand Down Expand Up @@ -86,11 +87,16 @@ func (p *producer) Publish(action string, data []byte) {

messageID, _ := NewUUIDv4()

now := time.Now().UTC()

p.publishAmqMessage(action, amqplib.Publishing{
MessageId: messageID,
DeliveryMode: amqplib.Persistent,
Timestamp: time.Now().UTC(),
Timestamp: now,
Body: data,
Headers: amqp.Table{
"x-epoch-milli": now.Unix() / int64(time.Millisecond),
},
})
}

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ services:
- .:/goevents

broker:
image: rabbitmq:3.6-management
image: rabbitmq:3.8-management
ports:
- "15672:15672"
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
module github.com/eventials/goevents

go 1.12
go 1.15

require (
github.com/aws/aws-sdk-go v1.19.21
github.com/sirupsen/logrus v1.4.1
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 // indirect
github.com/aws/aws-sdk-go v1.35.12
github.com/sirupsen/logrus v1.7.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
)
40 changes: 20 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
github.com/aws/aws-sdk-go v1.19.21 h1:xLaPxl8gy0ZSXbc13jsCKIaHD6NiX+2tAQodPSEL5r8=
github.com/aws/aws-sdk-go v1.19.21/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.35.12 h1:qpxQ/DXfgsTNSYn8mUaCgQiJkCjBP8iHKw5ju+wkucU=
github.com/aws/aws-sdk-go v1.35.12/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 h1:KaQtG+aDELoNmXYas3TVkGNYRuq8JQ1aa7LJt8EXVyo=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit 978c81a

Please sign in to comment.