Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix application uplink queue test flake #6643

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 53 additions & 75 deletions pkg/networkserver/redis/application_uplink_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
devCountPerApp = 3
)

func setupRedusApplicationUplinkQueue(
func setupRedisApplicationUplinkQueue(
t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration,
) (*nsredis.ApplicationUplinkQueue, func()) {
t.Helper()
Expand All @@ -71,7 +71,7 @@ func TestApplicationUplinkQueueInit(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand All @@ -95,7 +95,7 @@ func TestApplicationUplinkQueueClose(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...)
t.Cleanup(redisCloseFn)

q, _ := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, _ := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)

if !a.So(q.Init(ctx), should.BeNil) {
t.FailNow()
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down Expand Up @@ -246,16 +246,20 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
t.FailNow()
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
errCh := make(chan error, consumerCount)
uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount)
wg := sync.WaitGroup{}

for i := 0; i < consumerCount; i++ {
Expand All @@ -264,47 +268,32 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
go func() {
defer wg.Done()

errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
assertAllEqualAppIDs(t, ups)
uplinkCh <- ups
select {
case <-ctx.Done():
return ctx.Err()
case uplinkCh <- ups:
}
return nil
})
}), should.BeNil)
}()
}
wg.Wait()

expected := generateRandomUplinks(t, appCount, devCountPerApp)
actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

go func() {
for {
select {
case ups := <-uplinkCh:
actual = append(actual, ups...)
case <-ctx.Done():
errCh <- ctx.Err()
}
outer:
for {
select {
case <-ctx.Done():
break outer
case ups := <-uplinkCh:
actual = append(actual, ups...)
default:
break outer
}
}()

go func() {
for {
select {
case err = <-errCh:
return
case <-ctx.Done():
errCh <- ctx.Err()
}
}
}()

if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

wg.Wait()

a.So(err, should.BeNil)
a.So(actual, should.HaveLength, len(expected))
assertStreamUplinkCount(t, cl, 0)
}
Expand All @@ -316,7 +305,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand All @@ -331,9 +320,13 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
return nil
}

expected := generateRandomUplinks(t, appCount, devCountPerApp)
if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
errCh := make(chan error, consumerCount)
uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount)
wg := sync.WaitGroup{}

for i := 0; i < consumerCount; i++ {
Expand All @@ -342,49 +335,34 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
go func() {
defer wg.Done()

errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
assertAllEqualAppIDs(t, ups)
uplinkCh <- ups
select {
case <-ctx.Done():
return ctx.Err()
case uplinkCh <- ups:
}
return generateError(ups)
})
}), should.BeNil)
}()
}
wg.Wait()

expected := generateRandomUplinks(t, appCount, devCountPerApp)
actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

go func() {
for {
select {
case ups := <-uplinkCh:
actual = append(actual, ups...)
case <-ctx.Done():
errCh <- ctx.Err()
}
outer:
for {
select {
case <-ctx.Done():
break outer
case ups := <-uplinkCh:
actual = append(actual, ups...)
default:
break outer
}
}()

go func() {
for {
select {
case err = <-errCh:
return
case <-ctx.Done():
errCh <- ctx.Err()
}
}
}()

if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) {
t.FailNow()
}

wg.Wait()

expectedFailCount := devCountPerApp * 2

a.So(err, should.BeNil)
a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed
assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream
}
Expand All @@ -396,7 +374,7 @@ func TestApplicationUplinkQueueClaiming(t *testing.T) {
cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...)
t.Cleanup(redisCloseFn)

q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, 0, streamBlockLimit)
q, qCloseFn := setupRedisApplicationUplinkQueue(t, cl, 0, streamBlockLimit)
t.Cleanup(qCloseFn)

if !a.So(q.Init(ctx), should.BeNil) {
Expand Down