diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index fd0f1c848b0..d391e13b801 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -259,8 +259,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { } consumerCount := 3 - uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) - errCh := make(chan error, consumerCount) + uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount) wg := sync.WaitGroup{} for i := 0; i < consumerCount; i++ { @@ -269,42 +268,31 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { go func() { defer wg.Done() - errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { assertAllEqualAppIDs(t, ups) - uplinkCh <- ups + select { + case <-ctx.Done(): + return ctx.Err() + case uplinkCh <- ups: + } return nil - }) + }), should.BeNil) }() } + wg.Wait() actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) - var err error - - go func() { - for { - select { - case ups := <-uplinkCh: - actual = append(actual, ups...) - case <-ctx.Done(): - errCh <- ctx.Err() - } - } - }() - - go func() { - for { - select { - case err = <-errCh: - return - case <-ctx.Done(): - errCh <- ctx.Err() - } +outer: + for { + select { + case <-ctx.Done(): + case ups := <-uplinkCh: + actual = append(actual, ups...) + default: + break outer } - }() - - wg.Wait() + } - a.So(err, should.BeNil) a.So(actual, should.HaveLength, len(expected)) assertStreamUplinkCount(t, cl, 0) } @@ -337,7 +325,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { } consumerCount := 3 - uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) + uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount) errCh := make(chan error, consumerCount) wg := sync.WaitGroup{} @@ -347,44 +335,34 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { go func() { defer wg.Done() - errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { assertAllEqualAppIDs(t, ups) - uplinkCh <- ups + select { + case <-ctx.Done(): + return ctx.Err() + case uplinkCh <- ups: + } return generateError(ups) - }) + }), should.BeNil) }() } + wg.Wait() actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) - var err error - - go func() { - for { - select { - case ups := <-uplinkCh: - actual = append(actual, ups...) - case <-ctx.Done(): - errCh <- ctx.Err() - } +outer: + for { + select { + case ups := <-uplinkCh: + actual = append(actual, ups...) + case <-ctx.Done(): + errCh <- ctx.Err() + default: + break outer } - }() - - go func() { - for { - select { - case err = <-errCh: - return - case <-ctx.Done(): - errCh <- ctx.Err() - } - } - }() - - wg.Wait() + } expectedFailCount := devCountPerApp * 2 - a.So(err, should.BeNil) a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream }