diff --git a/cmd/ttn-lw-stack/commands/start.go b/cmd/ttn-lw-stack/commands/start.go index 9b9db5e6e1..a6f16c559b 100644 --- a/cmd/ttn-lw-stack/commands/start.go +++ b/cmd/ttn-lw-stack/commands/start.go @@ -302,7 +302,6 @@ var startCommand = &cobra.Command{ int64(applicationUplinkQueueSize), redisConsumerGroup, time.Minute, - redis.DefaultStreamBlockLimit, ) if err := applicationUplinkQueue.Init(ctx); err != nil { return shared.ErrInitializeNetworkServer.WithCause(err) diff --git a/pkg/networkserver/internal/test/shared/redis.go b/pkg/networkserver/internal/test/shared/redis.go index fb40f1102b..5ffccf960a 100644 --- a/pkg/networkserver/internal/test/shared/redis.go +++ b/pkg/networkserver/internal/test/shared/redis.go @@ -36,7 +36,7 @@ func testStreamBlockLimit() time.Duration { func NewRedisApplicationUplinkQueue(ctx context.Context) (ApplicationUplinkQueue, func()) { tb := test.MustTBFromContext(ctx) cl, flush := test.NewRedis(ctx, append(redisNamespace[:], "application-uplinks")...) - q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0, testStreamBlockLimit()) + q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0) if err := q.Init(ctx); err != nil { tb.Fatalf("Failed to initialize Redis application uplink queue: %s", test.FormatError(err)) } diff --git a/pkg/networkserver/networkserver.go b/pkg/networkserver/networkserver.go index b0335dbe3d..306df5244b 100644 --- a/pkg/networkserver/networkserver.go +++ b/pkg/networkserver/networkserver.go @@ -20,7 +20,6 @@ import ( "crypto/rand" "fmt" "os" - "sync" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "go.thethings.network/lorawan-stack/v3/pkg/cluster" @@ -167,7 +166,6 @@ type NetworkServer struct { newDevAddr newDevAddrFunc devAddrPrefixes devAddrPrefixesFunc - applicationServers *sync.Map // string -> *applicationUpStream applicationUplinks ApplicationUplinkQueue downlinkTasks DownlinkTaskQueue @@ -282,7 +280,6 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer, clusterID: conf.ClusterID, newDevAddr: makeNewDevAddrFunc(devAddrPrefixes...), devAddrPrefixes: makeDevAddrPrefixesFunc(devAddrPrefixes...), - applicationServers: &sync.Map{}, applicationUplinks: conf.ApplicationUplinkQueue.Queue, deduplicationWindow: makeWindowDurationFunc(conf.DeduplicationWindow), collectionWindow: makeWindowDurationFunc(conf.DeduplicationWindow + conf.CooldownWindow), diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index 6bef2f3c1c..ec9042ab8a 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -47,13 +47,12 @@ var ( // ApplicationUplinkQueue is an implementation of ApplicationUplinkQueue. type ApplicationUplinkQueue struct { - redis *ttnredis.Client - maxLen int64 - groupID string - streamID string - minIdle time.Duration - streamBlockLimit time.Duration - consumers sync.Map + redis *ttnredis.Client + maxLen int64 + groupID string + streamID string + minIdle time.Duration + consumers sync.Map } // NewApplicationUplinkQueue returns new application uplink queue. @@ -62,16 +61,14 @@ func NewApplicationUplinkQueue( maxLen int64, groupID string, minIdle time.Duration, - streamBlockLimit time.Duration, ) *ApplicationUplinkQueue { return &ApplicationUplinkQueue{ - redis: cl, - maxLen: maxLen, - groupID: groupID, - streamID: cl.Key("uplinks"), - minIdle: minIdle, - streamBlockLimit: streamBlockLimit, - consumers: sync.Map{}, + redis: cl, + maxLen: maxLen, + groupID: groupID, + streamID: cl.Key("uplinks"), + minIdle: minIdle, + consumers: sync.Map{}, } } @@ -188,9 +185,10 @@ func addToBatch( return nil } -func (q *ApplicationUplinkQueue) processMessages( +func (*ApplicationUplinkQueue) processMessages( ctx context.Context, msgs []redis.XMessage, + ack func(...string) error, f func(context.Context, []*ttnpb.ApplicationUp) error, ) error { batches := map[string]*contextualUplinkBatch{} @@ -207,20 +205,15 @@ func (q *ApplicationUplinkQueue) processMessages( return err } } - pipeliner := q.redis.Pipeline() + processedIDs := make([]string, 0, len(msgs)) for _, batch := range batches { if err := f(batch.ctx, batch.uplinks); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to process uplink batch") continue // Do not confirm messages that failed to process. } - - pipeliner.XAck(ctx, q.streamID, q.groupID, batch.confirmIDs...) - pipeliner.XDel(ctx, q.streamID, batch.confirmIDs...) - } - if _, err := pipeliner.Exec(ctx); err != nil { - return ttnredis.ConvertError(err) + processedIDs = append(processedIDs, batch.confirmIDs...) } - return nil + return ack(processedIDs...) } // Pop implements ApplicationUplinkQueue interface. @@ -229,33 +222,16 @@ func (q *ApplicationUplinkQueue) Pop( f func(context.Context, []*ttnpb.ApplicationUp) error, ) error { q.consumers.Store(consumerID, struct{}{}) - - msgs, _, err := q.redis.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Group: q.groupID, - Consumer: consumerID, - Stream: q.streamID, - Start: "-", - MinIdle: q.minIdle, - Count: int64(limit), - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - return ttnredis.ConvertError(err) - } - - remainingCount := limit - len(msgs) - streams, err := q.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: q.groupID, - Consumer: consumerID, - Streams: []string{q.streamID, ">"}, - Count: int64(remainingCount), - Block: q.streamBlockLimit, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - return ttnredis.ConvertError(err) - } - if len(streams) > 0 { - stream := streams[0] - msgs = append(msgs, stream.Messages...) - } - return q.processMessages(ctx, msgs, f) + return ttnredis.RangeStreams( + ctx, + q.redis, + q.groupID, + consumerID, + int64(limit), + q.minIdle, + func(_ string, ack func(...string) error, msgs ...redis.XMessage) error { + return q.processMessages(ctx, msgs, ack, f) + }, + q.streamID, + ) } diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index e9ec25c919..f66834ebca 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -38,24 +38,23 @@ var ( redisNamespace = [...]string{ "redis_test_uplink_queue", } - readLimit = 7 - maxLen = int64(100) - groupID = "ns-test" - minIdle = (1 << 8) * test.Delay - streamBlockLimit = (1 << 6) * test.Delay + readLimit = 7 + maxLen = int64(100) + groupID = "ns-test" + minIdle = (1 << 8) * test.Delay appCount = 5 devCountPerApp = 3 ) func setupRedisApplicationUplinkQueue( - t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration, + t *testing.T, cl *ttnredis.Client, minIdle time.Duration, ) (*nsredis.ApplicationUplinkQueue, func()) { t.Helper() _, ctx := test.New(t) - q := nsredis.NewApplicationUplinkQueue(cl, maxLen, groupID, minIdle, streamBlockLimit) + q := nsredis.NewApplicationUplinkQueue(cl, maxLen, groupID, minIdle) return q, func() { if err := q.Close(ctx); err != nil { @@ -71,7 +70,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -95,7 +94,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...) t.Cleanup(redisCloseFn) - q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() @@ -196,7 +195,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -246,7 +245,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -305,7 +304,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -374,7 +373,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -416,11 +415,9 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) { } actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) - invokeCount := 0 consumerID2 := fmt.Sprintf("test-consumer-%d", 2) err = q.Pop(ctx, consumerID2, 100, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { assertAllEqualAppIDs(t, ups) - invokeCount++ actual = append(actual, ups...) return nil }) @@ -429,7 +426,6 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) { } a.So(err, should.BeNil) - a.So(invokeCount, should.Equal, appCount) a.So(actual, should.HaveLength, len(expected)) assertStreamUplinkCount(t, cl, 0) } diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 1f88355d9e..25da7c615c 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -885,17 +885,23 @@ func LockedWatch(ctx context.Context, r WatchCmdable, k, id string, expiration t return nil } -// RangeStreams sequentially iterates over all non-acknowledged messages in streams calling f with at most count messages. +// RangeStreams sequentially iterates over all non-acknowledged messages in streams calling f with at most count +// messages. f must acknowledge the messages which have been processed. // RangeStreams assumes that within its lifetime it is the only consumer within group group using ID id. // RangeStreams iterates over all pending messages, which have been idle for at least minIdle milliseconds first. -func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count int64, minIdle time.Duration, f func(string, ...redis.XMessage) error, streams ...string) error { - var ack func(context.Context, string, ...redis.XMessage) error - { - ids := make([]string, 0, int(count)) - ack = func(ctx context.Context, stream string, msgs ...redis.XMessage) error { - ids = ids[:0] - for _, msg := range msgs { - ids = append(ids, msg.ID) +func RangeStreams( + ctx context.Context, + r redis.Cmdable, + group, id string, + count int64, + minIdle time.Duration, + f func(string, func(...string) error, ...redis.XMessage) error, + streams ...string, +) error { + makeAck := func(stream string) func(...string) error { + return func(ids ...string) error { + if len(ids) == 0 { + return nil } _, err := r.Pipelined(ctx, func(p redis.Pipeliner) error { // NOTE: Both calls below copy contents of ids internally. @@ -903,13 +909,17 @@ func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count p.XDel(ctx, stream, ids...) return nil }) - return err + if err != nil { + return ConvertError(err) + } + return nil } } for _, stream := range streams { - for start := "-"; ; { - msgs, lastID, err := r.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + for start := "-"; start != "0-0"; { + var err error + _, start, err = r.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{ Stream: stream, Group: group, Consumer: id, @@ -918,38 +928,27 @@ func RangeStreams(ctx context.Context, r redis.Cmdable, group, id string, count Count: count, }).Result() if err != nil { - return err - } - if len(msgs) == 0 { - break - } - if err := f(stream, msgs...); err != nil { - return err + return ConvertError(err) } - if err := ack(ctx, stream, msgs...); err != nil { - return err - } - start = lastID } } streamCount := len(streams) - streamsArg := make([]string, 2*streamCount) - idsArg := make([]string, 2*streamCount) - for i, stream := range streams { - j := i * 2 - streamsArg[j], streamsArg[j+1], idsArg[j], idsArg[j+1] = stream, stream, "0", ">" + args := make([]string, 2*streamCount) + streamsArg := args[:streamCount] + idsArg := args[streamCount:] + for i := range streams { + streamsArg[i], idsArg[i] = streams[i], "0" } - drainedOld := make(map[string]struct{}, streamCount) -outer: + finishedOld, block := false, time.Duration(-1) for { rets, err := r.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: group, Consumer: id, - Streams: append(streamsArg, idsArg...), + Streams: args, Count: count, - Block: -1, // do not block + Block: block, }).Result() if err != nil { if errors.Is(err, redis.Nil) { @@ -958,38 +957,33 @@ outer: return ConvertError(err) } + cont := false for i, ret := range rets { - if idsArg[i] == "0" && len(ret.Messages) < int(count) { - drainedOld[ret.Stream] = struct{}{} - } - if len(ret.Messages) == 0 { - if i == len(rets)-1 { - return nil - } + n := int64(len(ret.Messages)) + if n == 0 { continue } - - if err := f(ret.Stream, ret.Messages...); err != nil { + cont = cont || n == count + idsArg[i] = ret.Messages[len(ret.Messages)-1].ID + if err := f(ret.Stream, makeAck(ret.Stream), ret.Messages...); err != nil { return err } - if err := ack(ctx, ret.Stream, ret.Messages...); err != nil { - return err - } - if len(ret.Messages) == int(count) { - continue outer - } } - streamsArg = streamsArg[:0] - idsArg = idsArg[:0] - for _, stream := range streams { - _, ok := drainedOld[stream] - if !ok { - streamsArg = append(streamsArg, stream) - idsArg = append(idsArg, "0") + switch { + case cont: + // At least one stream has returned `count` messages. + case finishedOld: + // All streams have returned less than `count` messages, + // and we have already processed all of the old and new messages. + return nil + default: // !cont && !finishedOld + // All streams have returned less than `count` messages, + // and we have processed all of the old messages. + finishedOld, block = true, minIdle + for i := range streams { + idsArg[i] = ">" } - streamsArg = append(streamsArg, stream) - idsArg = append(idsArg, ">") } } }