Skip to content

Commit

Permalink
Introduce a state property that tracks whether a msg has been produce…
Browse files Browse the repository at this point in the history
…d successfully or not
  • Loading branch information
weeco committed Aug 25, 2021
1 parent 7b6bd7e commit 1e3a1d5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
11 changes: 8 additions & 3 deletions e2e/endtoend_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package e2e

import "time"

const (
_ = iota
EndToEndeMessageStateCreated
EndToEndeMessageStateProducedSuccessfully
)

type EndToEndMessage struct {
MinionID string `json:"minionID"` // unique for each running kminion instance
MessageID string `json:"messageID"` // unique for each message
Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds

// The following properties are only used within the message tracker
partition int
hasArrived bool
failedToProduce bool
partition int
state int
}

func (m *EndToEndMessage) creationTime() time.Time {
Expand Down
17 changes: 10 additions & 7 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {

// This childCtx will ensure that we will abort our efforts to produce (including retries) when we exceed
// the SLA for producers.
childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla)
childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla+2*time.Second)

pID := strconv.Itoa(partition)
s.messagesProducedInFlight.WithLabelValues(pID).Inc()
Expand All @@ -44,19 +44,21 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {

if err != nil {
s.messagesProducedFailed.WithLabelValues(pID).Inc()

// Mark message as failed and then remove from the tracker. Overwriting it into the cache is necessary,
// because we can't delete messages from the tracker without triggering the OnEvicted hook, which checks
// for lost messages.
msg.failedToProduce = true
s.messageTracker.addToTracker(msg)
s.messageTracker.removeFromTracker(msg.MessageID)

s.logger.Info("failed to produce message to end-to-end topic",
zap.String("topic_name", r.Topic),
zap.Int32("partition", r.Partition),
zap.Error(err))
return
} else {
// Update the message's state. If this message expires and is marked as successfully produced we will
// report this as a lost message, which would indicate that the producer was told that the message got
// produced successfully, but it got lost somewhere.
// We need to use updateItemIfExists() because it's possible that the message has already been consumed
// before we have received the message here (because we were awaiting the produce ack).
msg.state = EndToEndeMessageStateProducedSuccessfully
s.messageTracker.updateItemIfExists(msg)
}

s.produceLatency.WithLabelValues(pID).Observe(ackDuration.Seconds())
Expand All @@ -70,6 +72,7 @@ func createEndToEndRecord(minionID string, topicName string, partition int) (*kg
Timestamp: time.Now().UnixNano(),

partition: partition,
state: EndToEndeMessageStateCreated,
}

mjson, err := json.Marshal(message)
Expand Down

0 comments on commit 1e3a1d5

Please sign in to comment.