diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index d695cca9d2..fd0f1c848b 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -48,7 +48,7 @@ var ( devCountPerApp = 3 ) -func setupRedusApplicationUplinkQueue( +func setupRedisApplicationUplinkQueue( t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration, ) (*nsredis.ApplicationUplinkQueue, func()) { t.Helper() @@ -71,7 +71,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -95,7 +95,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...) t.Cleanup(redisCloseFn) - q, _ := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() @@ -196,7 +196,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -246,13 +246,18 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() } + expected := generateRandomUplinks(t, appCount, devCountPerApp) + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() + } + consumerCount := 3 uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) errCh := make(chan error, consumerCount) @@ -272,7 +277,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { }() } - expected := generateRandomUplinks(t, appCount, devCountPerApp) actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) var err error @@ -298,10 +302,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { } }() - if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { - t.FailNow() - } - wg.Wait() a.So(err, should.BeNil) @@ -316,7 +316,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -331,6 +331,11 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { return nil } + expected := generateRandomUplinks(t, appCount, devCountPerApp) + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() + } + consumerCount := 3 uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) errCh := make(chan error, consumerCount) @@ -350,7 +355,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { }() } - expected := generateRandomUplinks(t, appCount, devCountPerApp) actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) var err error @@ -376,10 +380,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { } }() - if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { - t.FailNow() - } - wg.Wait() expectedFailCount := devCountPerApp * 2 @@ -396,7 +396,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, 0, streamBlockLimit) + q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) {