Skip to content

Commit

Permalink
feat(cmd): set kinesis metadata in tap CLI (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
shellcromancer authored Jan 17, 2025
1 parent e786320 commit 4156385
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion cmd/substation/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"os/signal"
Expand Down Expand Up @@ -103,6 +104,13 @@ for more information.
},
}

type kinesisStreamMetadata struct {
ApproximateArrivalTimestamp time.Time `json:"approximateArrivalTimestamp"`
Stream string `json:"stream"`
PartitionKey string `json:"partitionKey"`
SequenceNumber string `json:"sequenceNumber"`
}

//nolint:gocognit, cyclop, gocyclo // Ignore cognitive and cyclomatic complexity.
func tapKinesis(arg string, extVars map[string]string, offset, stream string) error {
cfg := customConfig{}
Expand Down Expand Up @@ -295,7 +303,19 @@ func tapKinesis(arg string, extVars map[string]string, offset, stream string) er
log.WithField("stream", stream).WithField("shard", shard.ShardId).WithField("count", len(deagg)).Debug("Retrieved records from Kinesis shard.")

for _, record := range deagg {
msg := message.New().SetData(record.Data).SkipMissingValues()
// Create Message metadata.
m := kinesisStreamMetadata{
*record.ApproximateArrivalTimestamp,
stream,
*record.PartitionKey,
*record.SequenceNumber,
}
metadata, err := json.Marshal(m)
if err != nil {
return err
}

msg := message.New().SetData(record.Data).SetMetadata(metadata).SkipMissingValues()
ch.Send(msg)
}

Expand Down

0 comments on commit 4156385

Please sign in to comment.