From 5ce148ebc263ee3710c32cc1374b17fea5a8262e Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Thu, 19 Oct 2023 14:15:44 +0200 Subject: [PATCH 1/2] ns: Fix application uplink queue test flake --- .../redis/application_uplink_queue_test.go | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index d695cca9d2..fd0f1c848b 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -48,7 +48,7 @@ var ( devCountPerApp = 3 ) -func setupRedusApplicationUplinkQueue( +func setupRedisApplicationUplinkQueue( t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration, ) (*nsredis.ApplicationUplinkQueue, func()) { t.Helper() @@ -71,7 +71,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -95,7 +95,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...) t.Cleanup(redisCloseFn) - q, _ := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() @@ -196,7 +196,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -246,13 +246,18 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() } + expected := generateRandomUplinks(t, appCount, devCountPerApp) + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() + } + consumerCount := 3 uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) errCh := make(chan error, consumerCount) @@ -272,7 +277,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { }() } - expected := generateRandomUplinks(t, appCount, devCountPerApp) actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) var err error @@ -298,10 +302,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { } }() - if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { - t.FailNow() - } - wg.Wait() a.So(err, should.BeNil) @@ -316,7 +316,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -331,6 +331,11 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { return nil } + expected := generateRandomUplinks(t, appCount, devCountPerApp) + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() + } + consumerCount := 3 uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) errCh := make(chan error, consumerCount) @@ -350,7 +355,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { }() } - expected := generateRandomUplinks(t, appCount, devCountPerApp) actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) var err error @@ -376,10 +380,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { } }() - if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { - t.FailNow() - } - wg.Wait() expectedFailCount := devCountPerApp * 2 @@ -396,7 +396,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, 0, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { From 2e953e2128f9ed81e7b185345b57f407670c92b1 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Thu, 19 Oct 2023 15:09:54 +0200 Subject: [PATCH 2/2] ns: Append uplinks synchronously --- .../redis/application_uplink_queue_test.go | 98 +++++++------------ 1 file changed, 38 insertions(+), 60 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index fd0f1c848b..e9ec25c919 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -259,8 +259,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { } consumerCount := 3 - uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) - errCh := make(chan error, consumerCount) + uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount) wg := sync.WaitGroup{} for i := 0; i < consumerCount; i++ { @@ -269,42 +268,32 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { go func() { defer wg.Done() - errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { assertAllEqualAppIDs(t, ups) - uplinkCh <- ups + select { + case <-ctx.Done(): + return ctx.Err() + case uplinkCh <- ups: + } return nil - }) + }), should.BeNil) }() } + wg.Wait() actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) - var err error - - go func() { - for { - select { - case ups := <-uplinkCh: - actual = append(actual, ups...) - case <-ctx.Done(): - errCh <- ctx.Err() - } +outer: + for { + select { + case <-ctx.Done(): + break outer + case ups := <-uplinkCh: + actual = append(actual, ups...) + default: + break outer } - }() - - go func() { - for { - select { - case err = <-errCh: - return - case <-ctx.Done(): - errCh <- ctx.Err() - } - } - }() - - wg.Wait() + } - a.So(err, should.BeNil) a.So(actual, should.HaveLength, len(expected)) assertStreamUplinkCount(t, cl, 0) } @@ -337,8 +326,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { } consumerCount := 3 - uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) - errCh := make(chan error, consumerCount) + uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount) wg := sync.WaitGroup{} for i := 0; i < consumerCount; i++ { @@ -347,44 +335,34 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { go func() { defer wg.Done() - errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { assertAllEqualAppIDs(t, ups) - uplinkCh <- ups + select { + case <-ctx.Done(): + return ctx.Err() + case uplinkCh <- ups: + } return generateError(ups) - }) + }), should.BeNil) }() } + wg.Wait() actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) - var err error - - go func() { - for { - select { - case ups := <-uplinkCh: - actual = append(actual, ups...) - case <-ctx.Done(): - errCh <- ctx.Err() - } +outer: + for { + select { + case <-ctx.Done(): + break outer + case ups := <-uplinkCh: + actual = append(actual, ups...) + default: + break outer } - }() - - go func() { - for { - select { - case err = <-errCh: - return - case <-ctx.Done(): - errCh <- ctx.Err() - } - } - }() - - wg.Wait() + } expectedFailCount := devCountPerApp * 2 - a.So(err, should.BeNil) a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream }