Skip to content

Commit

Permalink
ns: Append uplinks synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Oct 19, 2023
1 parent 5ce148e commit 64e4991
Showing 1 changed file with 37 additions and 59 deletions.
96 changes: 37 additions & 59 deletions pkg/networkserver/redis/application_uplink_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
errCh := make(chan error, consumerCount)
uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount)
wg := sync.WaitGroup{}

for i := 0; i < consumerCount; i++ {
Expand All @@ -269,42 +268,31 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) {
go func() {
defer wg.Done()

errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
assertAllEqualAppIDs(t, ups)
uplinkCh <- ups
select {
case <-ctx.Done():
return ctx.Err()
case uplinkCh <- ups:
}
return nil
})
}), should.BeNil)
}()
}
wg.Wait()

actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

go func() {
for {
select {
case ups := <-uplinkCh:
actual = append(actual, ups...)
case <-ctx.Done():
errCh <- ctx.Err()
}
}
}()

go func() {
for {
select {
case err = <-errCh:
return
case <-ctx.Done():
errCh <- ctx.Err()
}
outer:
for {
select {
case <-ctx.Done():
case ups := <-uplinkCh:
actual = append(actual, ups...)
default:
break outer
}
}()

wg.Wait()
}

a.So(err, should.BeNil)
a.So(actual, should.HaveLength, len(expected))
assertStreamUplinkCount(t, cl, 0)
}
Expand Down Expand Up @@ -337,7 +325,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
}

consumerCount := 3
uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount)
uplinkCh := make(chan []*ttnpb.ApplicationUp, consumerCount*appCount)
errCh := make(chan error, consumerCount)
wg := sync.WaitGroup{}

Expand All @@ -347,44 +335,34 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) {
go func() {
defer wg.Done()

errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
a.So(q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
assertAllEqualAppIDs(t, ups)
uplinkCh <- ups
select {
case <-ctx.Done():
return ctx.Err()
case uplinkCh <- ups:
}
return generateError(ups)
})
}), should.BeNil)
}()
}
wg.Wait()

actual := make([]*ttnpb.ApplicationUp, 0, len(expected))
var err error

go func() {
for {
select {
case ups := <-uplinkCh:
actual = append(actual, ups...)
case <-ctx.Done():
errCh <- ctx.Err()
}
outer:
for {
select {
case ups := <-uplinkCh:
actual = append(actual, ups...)
case <-ctx.Done():
errCh <- ctx.Err()
default:
break outer
}
}()

go func() {
for {
select {
case err = <-errCh:
return
case <-ctx.Done():
errCh <- ctx.Err()
}
}
}()

wg.Wait()
}

expectedFailCount := devCountPerApp * 2

a.So(err, should.BeNil)
a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed
assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream
}
Expand Down

0 comments on commit 64e4991

Please sign in to comment.