Skip to content

Commit

Permalink
Add subscribe test
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Sep 16, 2024
1 parent cac71b4 commit 985e774
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/api/subscribeWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func startSubscribeWorker(
log *zap.Logger,
store *sql.DB,
) (*subscribeWorker, error) {
log = log.Named("subscribeWorker")
q := queries.New(store)
pollableQuery := func(ctx context.Context, lastSeen db.VectorClock, numRows int32) ([]queries.GatewayEnvelope, db.VectorClock, error) {
envs, err := q.
Expand Down Expand Up @@ -80,7 +81,7 @@ func startSubscribeWorker(
}
worker := &subscribeWorker{
ctx: ctx,
log: log.Named("subscribeWorker"),
log: log,
dbSubscription: dbChan,
globalListeners: make([]subscriber, 0),
originatorListeners: make(map[uint32][]subscriber),
Expand Down
127 changes: 127 additions & 0 deletions pkg/api/subscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package api_test

import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
"github.com/xmtp/xmtpd/pkg/testutils"
)

var allRows = []queries.InsertGatewayEnvelopeParams{
// Initial rows
{
OriginatorNodeID: 1,
OriginatorSequenceID: 1,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope1"),
},
{
OriginatorNodeID: 2,
OriginatorSequenceID: 1,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope2"),
},
// Later rows
{
OriginatorNodeID: 1,
OriginatorSequenceID: 2,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope3"),
},
{
OriginatorNodeID: 2,
OriginatorSequenceID: 2,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope4"),
},
{
OriginatorNodeID: 1,
OriginatorSequenceID: 3,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope5"),
},
}

func insertInitialRows(t *testing.T, store *sql.DB) {
testutils.InsertGatewayEnvelopes(t, store, []queries.InsertGatewayEnvelopeParams{
allRows[0], allRows[1],
})
}

func insertAdditionalRows(t *testing.T, store *sql.DB, notifyChan ...chan bool) {
testutils.InsertGatewayEnvelopes(t, store, []queries.InsertGatewayEnvelopeParams{
allRows[2], allRows[3], allRows[4],
}, notifyChan...)
}

func validateUpdates(
t *testing.T,
stream message_api.ReplicationApi_BatchSubscribeEnvelopesClient,
expectedIndices []int,
) {
for i := 0; i < len(expectedIndices); {
fmt.Println("waiting for update")
envs, err := stream.Recv()
fmt.Printf("got update of length %d\n", len(envs.Envelopes))
require.NoError(t, err)
for _, env := range envs.Envelopes {
expected := allRows[expectedIndices[i]].OriginatorEnvelope
require.Equal(t, expected, testutils.Marshal(t, env))
i++
}
}
}

func TestQAllEnvelopes(t *testing.T) {
t.Skip("skipping test")
client, db, cleanup := testutils.NewTestAPIClient(t)
defer cleanup()
insertInitialRows(t, db)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
res, err := client.QueryEnvelopes(
ctx,
&message_api.QueryEnvelopesRequest{
Query: &message_api.EnvelopesQuery{
Filter: nil,
LastSeen: &message_api.VectorClock{},
},
},
)
require.NoError(t, err)

insertAdditionalRows(t, db)
require.Equal(t, 5, len(res.Envelopes))
}

func TestSubscribeAllEnvelopes(t *testing.T) {
client, db, cleanup := testutils.NewTestAPIClient(t)
defer cleanup()
insertInitialRows(t, db)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.BatchSubscribeEnvelopes(
ctx,
&message_api.BatchSubscribeEnvelopesRequest{
Requests: []*message_api.BatchSubscribeEnvelopesRequest_SubscribeEnvelopesRequest{
{
Query: &message_api.EnvelopesQuery{
Filter: nil,
LastSeen: &message_api.VectorClock{},
},
},
},
},
)
require.NoError(t, err)

insertAdditionalRows(t, db)
validateUpdates(t, stream, []int{2, 3, 4})
}

0 comments on commit 985e774

Please sign in to comment.