Skip to content

Commit

Permalink
ns: Use RangeStreams in application uplink queue
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Oct 20, 2023
1 parent c3f5b94 commit 5878679
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 74 deletions.
1 change: 0 additions & 1 deletion cmd/ttn-lw-stack/commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ var startCommand = &cobra.Command{
int64(applicationUplinkQueueSize),
redisConsumerGroup,
time.Minute,
redis.DefaultStreamBlockLimit,
)
if err := applicationUplinkQueue.Init(ctx); err != nil {
return shared.ErrInitializeNetworkServer.WithCause(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkserver/internal/test/shared/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func testStreamBlockLimit() time.Duration {
func NewRedisApplicationUplinkQueue(ctx context.Context) (ApplicationUplinkQueue, func()) {
tb := test.MustTBFromContext(ctx)
cl, flush := test.NewRedis(ctx, append(redisNamespace[:], "application-uplinks")...)
q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0, testStreamBlockLimit())
q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0)
if err := q.Init(ctx); err != nil {
tb.Fatalf("Failed to initialize Redis application uplink queue: %s", test.FormatError(err))
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/networkserver/networkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"crypto/rand"
"fmt"
"os"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"go.thethings.network/lorawan-stack/v3/pkg/cluster"
Expand Down Expand Up @@ -167,7 +166,6 @@ type NetworkServer struct {
newDevAddr newDevAddrFunc
devAddrPrefixes devAddrPrefixesFunc

applicationServers *sync.Map // string -> *applicationUpStream
applicationUplinks ApplicationUplinkQueue

downlinkTasks DownlinkTaskQueue
Expand Down Expand Up @@ -282,7 +280,6 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer,
clusterID: conf.ClusterID,
newDevAddr: makeNewDevAddrFunc(devAddrPrefixes...),
devAddrPrefixes: makeDevAddrPrefixesFunc(devAddrPrefixes...),
applicationServers: &sync.Map{},
applicationUplinks: conf.ApplicationUplinkQueue.Queue,
deduplicationWindow: makeWindowDurationFunc(conf.DeduplicationWindow),
collectionWindow: makeWindowDurationFunc(conf.DeduplicationWindow + conf.CooldownWindow),
Expand Down
82 changes: 29 additions & 53 deletions pkg/networkserver/redis/application_uplink_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ var (

// ApplicationUplinkQueue is an implementation of ApplicationUplinkQueue.
type ApplicationUplinkQueue struct {
redis *ttnredis.Client
maxLen int64
groupID string
streamID string
minIdle time.Duration
streamBlockLimit time.Duration
consumers sync.Map
redis *ttnredis.Client
maxLen int64
groupID string
streamID string
minIdle time.Duration
consumers sync.Map
}

// NewApplicationUplinkQueue returns new application uplink queue.
Expand All @@ -62,16 +61,14 @@ func NewApplicationUplinkQueue(
maxLen int64,
groupID string,
minIdle time.Duration,
streamBlockLimit time.Duration,
) *ApplicationUplinkQueue {
return &ApplicationUplinkQueue{
redis: cl,
maxLen: maxLen,
groupID: groupID,
streamID: cl.Key("uplinks"),
minIdle: minIdle,
streamBlockLimit: streamBlockLimit,
consumers: sync.Map{},
redis: cl,
maxLen: maxLen,
groupID: groupID,
streamID: cl.Key("uplinks"),
minIdle: minIdle,
consumers: sync.Map{},
}
}

Expand Down Expand Up @@ -188,9 +185,10 @@ func addToBatch(
return nil
}

func (q *ApplicationUplinkQueue) processMessages(
func (*ApplicationUplinkQueue) processMessages(
ctx context.Context,
msgs []redis.XMessage,
ack func(...string) error,
f func(context.Context, []*ttnpb.ApplicationUp) error,
) error {
batches := map[string]*contextualUplinkBatch{}
Expand All @@ -207,20 +205,15 @@ func (q *ApplicationUplinkQueue) processMessages(
return err
}
}
pipeliner := q.redis.Pipeline()
processedIDs := make([]string, 0, len(msgs))
for _, batch := range batches {
if err := f(batch.ctx, batch.uplinks); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to process uplink batch")
continue // Do not confirm messages that failed to process.
}

pipeliner.XAck(ctx, q.streamID, q.groupID, batch.confirmIDs...)
pipeliner.XDel(ctx, q.streamID, batch.confirmIDs...)
}
if _, err := pipeliner.Exec(ctx); err != nil {
return ttnredis.ConvertError(err)
processedIDs = append(processedIDs, batch.confirmIDs...)
}
return nil
return ack(processedIDs...)
}

// Pop implements ApplicationUplinkQueue interface.
Expand All @@ -229,33 +222,16 @@ func (q *ApplicationUplinkQueue) Pop(
f func(context.Context, []*ttnpb.ApplicationUp) error,
) error {
q.consumers.Store(consumerID, struct{}{})

msgs, _, err := q.redis.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: q.groupID,
Consumer: consumerID,
Stream: q.streamID,
Start: "-",
MinIdle: q.minIdle,
Count: int64(limit),
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return ttnredis.ConvertError(err)
}

remainingCount := limit - len(msgs)
streams, err := q.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: q.groupID,
Consumer: consumerID,
Streams: []string{q.streamID, ">"},
Count: int64(remainingCount),
Block: q.streamBlockLimit,
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return ttnredis.ConvertError(err)
}
if len(streams) > 0 {
stream := streams[0]
msgs = append(msgs, stream.Messages...)
}
return q.processMessages(ctx, msgs, f)
return ttnredis.RangeStreams(
ctx,
q.redis,
q.groupID,
consumerID,
int64(limit),
q.minIdle,
func(_ string, ack func(...string) error, msgs ...redis.XMessage) error {
return q.processMessages(ctx, msgs, ack, f)
},
q.streamID,
)
}
28 changes: 12 additions & 16 deletions pkg/networkserver/redis/application_uplink_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,23 @@ var (
redisNamespace = [...]string{
"redis_test_uplink_queue",
}
readLimit = 7
maxLen = int64(100)
groupID = "ns-test"
minIdle = (1 << 8) * test.Delay
streamBlockLimit = (1 << 6) * test.Delay
readLimit = 7
maxLen = int64(100)
groupID = "ns-test"
minIdle = (1 << 8) * test.Delay

appCount = 5
devCountPerApp = 3
)

func setupRedisApplicationUplinkQueue(
t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration,
t *testing.T, cl *ttnredis.Client, minIdle time.Duration,
) (*nsredis.ApplicationUplinkQueue, func()) {
t.Helper()

_, ctx := test.New(t)

q := nsredis.NewApplicationUplinkQueue(cl, maxLen, groupID, minIdle, streamBlockLimit)
q := nsredis.NewApplicationUplinkQueue(cl, maxLen, groupID, minIdle)

return q, func() {
if err := q.Close(ctx); err != nil {
Expand All @@ -71,7 +70,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand All @@ -95,7 +94,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...)
t.Cleanup(redisCloseFn)

q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle)

if !a.So(q.Init(ctx), should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -196,7 +195,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -246,7 +245,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -305,7 +304,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -374,7 +373,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -416,11 +415,9 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) {
}

actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
invokeCount := 0
consumerID2 := fmt.Sprintf("test-consumer-%d", 2)
err = q.Pop(ctx, consumerID2, 100, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
assertAllEqualAppIDs(t, ups)
invokeCount++
actual = append(actual, ups...)
return nil
})
Expand All @@ -429,7 +426,6 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) {
}

a.So(err, should.BeNil)
a.So(invokeCount, should.Equal, appCount)
a.So(actual, should.HaveLength, len(expected))
assertStreamUplinkCount(t, cl, 0)
}

0 comments on commit 5878679

Please sign in to comment.