Skip to content

Commit

Permalink
ns: Fix application uplink queue test flake
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Oct 19, 2023
1 parent c69cb8c commit 5ce148e
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions pkg/networkserver/redis/application_uplink_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
devCountPerApp = 3
)

func setupRedusApplicationUplinkQueue(
func setupRedisApplicationUplinkQueue(
t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration,
) (*nsredis.ApplicationUplinkQueue, func()) {
t.Helper()
Expand All @@ -71,7 +71,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand All @@ -95,7 +95,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...)
t.Cleanup(redisCloseFn)

q, _ := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)

if !a.So(q.Init(ctx), should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -246,13 +246,18 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
t.FailNow()
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
errCh := make(chan error, consumerCount)
Expand All @@ -272,7 +277,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
}()
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

Expand All @@ -298,10 +302,6 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
}
}()

if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

wg.Wait()

a.So(err, should.BeNil)
Expand All @@ -316,7 +316,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand All @@ -331,6 +331,11 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
return nil
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
errCh := make(chan error, consumerCount)
Expand All @@ -350,7 +355,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
}()
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

Expand All @@ -376,10 +380,6 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
}
}()

if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

wg.Wait()

expectedFailCount := devCountPerApp * 2
Expand All @@ -396,7 +396,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, 0, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down

0 comments on commit 5ce148e

Please sign in to comment.