Skip to content

Commit

Permalink
Add messages to messagetracker before getting produce response
Browse files Browse the repository at this point in the history
This avoids a race condition where the message would sometimes be consumed before we receive the produce response. If that's the case we would consider this message as lost, because we delete the message from the cache before it has been entered. Then after the preconfigured expiration time we would still have this message in the tracker, even though we have already read it.
  • Loading branch information
weeco committed Aug 13, 2021
1 parent 3d59e90 commit f6b7b3f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 33 deletions.
6 changes: 4 additions & 2 deletions e2e/endtoend_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ type EndToEndMessage struct {
MessageID string `json:"messageID"` // unique for each message
Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds

partition int // used in message tracker
hasArrived bool // used in tracker
// The following properties are only used within the message tracker
partition int
hasArrived bool
failedToProduce bool
}

func (m *EndToEndMessage) creationTime() time.Time {
Expand Down
65 changes: 35 additions & 30 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
package e2e

import (
"strconv"
"time"

goCache "github.com/patrickmn/go-cache"
"go.uber.org/zap"
)

// messageTracker keeps track of messages
// messageTracker keeps track of the messages' lifetime
//
// When we successfully send a mesasge, it will be added to this tracker.
// Later, when we receive the message back in the consumer, the message is marked as completed and removed from the tracker.
// If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost.
// A lost message is reported in the `roundtrip_latency_seconds` metric with infinite duration.
// We use a dedicated counter to track messages that couldn't be
// produced to Kafka.
// If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost. Messages that
// failed to be produced will not be
// considered as lost message.
//
// When we fail to send a message, it isn't tracked.
// We use a dedicated counter to track messages that couldn't be produced to Kafka.
type messageTracker struct {
svc *Service
logger *zap.Logger
cache *goCache.Cache
}

func newMessageTracker(svc *Service) *messageTracker {

defaultExpirationTime := svc.config.Consumer.RoundtripSla
cleanupInterval := 1 * time.Second

Expand All @@ -45,51 +44,57 @@ func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
t.cache.SetDefault(msg.MessageID, msg)
}

func (t *messageTracker) removeFromTracker(messageID string) {
t.cache.Delete(messageID)
}

func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
cachedMessageInterface, _, found := t.cache.GetWithExpiration(arrivedMessage.MessageID)
cm, found := t.cache.Get(arrivedMessage.MessageID)
if !found {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
}

actualExpireTime := arrivedMessage.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
if time.Now().Before(actualExpireTime) {
// message arrived early enough

// timeUntilExpire := time.Until(actualExpireTime)
// t.logger.Debug("message arrived",
// zap.Duration("timeLeft", timeUntilExpire),
// zap.Duration("age", ),
// zap.Int("partition", msg.partition),
// zap.String("messageId", msg.MessageID),
// )
} else {
// Message arrived late, but was still in cache.
// Maybe we could log something like "message arrived after the sla"...
//
// But for now we don't report it as "lost" in the log (because it actually *did* arrive just now, just too late).
// The metrics will report it as 'duration infinite' anyway.
msg := cm.(*EndToEndMessage)

expireTime := arrivedMessage.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
isOnTime := time.Now().Before(expireTime)
latency := time.Now().Sub(msg.creationTime())

if !isOnTime {
// Message arrived late, but was still in cache. We don't increment the lost counter here because eventually
// it will be evicted from the cache. This case should only pop up if the sla time is exceeded, but if the
// item has not been evicted from the cache yet (because we clean it only every second).
t.logger.Info("message arrived late, will be marked as a lost message",
zap.Int64("delay_ms", latency.Milliseconds()),
zap.String("id", msg.MessageID))
return
}

// Set it as arrived, so we don't log it as lost in 'onMessageExpired' and remove it from the tracker
msg := cachedMessageInterface.(*EndToEndMessage)
// message arrived early enough
t.svc.messagesReceived.Inc()
t.svc.endToEndRoundtripLatency.WithLabelValues(strconv.Itoa(msg.partition)).Observe(latency.Seconds())

// We mark the message as arrived so that we won't mark the message as lost and overwrite that modified message
// into the cache.
msg.hasArrived = true
t.cache.Set(msg.MessageID, msg, 0)
t.cache.Delete(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) {
// Because `t.cache.Delete` will invoke the onEvicted method we have to expect some calls to this function
// even though messages have arrived. Thus, we quit early if we receive such a method.
if msg.hasArrived {
// even though messages have arrived. Thus, we quit early if we receive such a message.
if msg.hasArrived || msg.failedToProduce {
return
}

created := msg.creationTime()
age := time.Since(created)
t.svc.lostMessages.Inc()

t.logger.Debug("message lost/expired",
t.logger.Info("message lost/expired",
zap.Int64("age_ms", age.Milliseconds()),
zap.Int("partition", msg.partition),
zap.String("message_id", msg.MessageID),
Expand Down
10 changes: 9 additions & 1 deletion e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {

pID := strconv.Itoa(partition)
s.messagesProducedInFlight.WithLabelValues(pID).Inc()
s.messageTracker.addToTracker(msg)
s.client.Produce(childCtx, record, func(r *kgo.Record, err error) {
defer cancel()
ackDuration := time.Since(startTime)
Expand All @@ -43,6 +44,14 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {

if err != nil {
s.messagesProducedFailed.WithLabelValues(pID).Inc()

// Mark message as failed and then remove from the tracker. Overwriting it into the cache is necessary,
// because we can't delete messages from the tracker without triggering the OnEvicted hook, which checks
// for lost messages.
msg.failedToProduce = true
s.messageTracker.addToTracker(msg)
s.messageTracker.removeFromTracker(msg.MessageID)

s.logger.Info("failed to produce message to end-to-end topic",
zap.String("topic_name", r.Topic),
zap.Int32("partition", r.Partition),
Expand All @@ -51,7 +60,6 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
}

s.endToEndAckLatency.WithLabelValues(pID).Observe(ackDuration.Seconds())
s.messageTracker.addToTracker(msg)
})
}

Expand Down

0 comments on commit f6b7b3f

Please sign in to comment.