Skip to content

Commit

Permalink
Use GRPC client, fix service cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Sep 16, 2024
1 parent 942e86b commit ee1edb4
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/api/publishWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func StartPublishWorker(
reg *registrant.Registrant,
store *sql.DB,
) (*PublishWorker, error) {
log = log.Named("publishWorker")
q := queries.New(store)
query := func(ctx context.Context, lastSeenID int64, numRows int32) ([]queries.StagedOriginatorEnvelope, int64, error) {
results, err := q.SelectStagedOriginatorEnvelopes(
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestPublishEnvelope(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()

resp, err := svc.PublishEnvelope(
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestPublishEnvelope(t *testing.T) {
}

func TestUnmarshalErrorOnPublish(t *testing.T) {
svc, _, cleanup := testutils.NewTestService(t)
svc, _, cleanup := testutils.NewTestAPIService(t)
defer cleanup()

envelope := testutils.CreatePayerEnvelope(t)
Expand All @@ -69,7 +69,7 @@ func TestUnmarshalErrorOnPublish(t *testing.T) {
}

func TestMismatchingOriginatorOnPublish(t *testing.T) {
svc, _, cleanup := testutils.NewTestService(t)
svc, _, cleanup := testutils.NewTestAPIService(t)
defer cleanup()

clientEnv := testutils.CreateClientEnvelope()
Expand All @@ -84,7 +84,7 @@ func TestMismatchingOriginatorOnPublish(t *testing.T) {
}

func TestMissingTopicOnPublish(t *testing.T) {
svc, _, cleanup := testutils.NewTestService(t)
svc, _, cleanup := testutils.NewTestAPIService(t)
defer cleanup()

clientEnv := testutils.CreateClientEnvelope()
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func setupQueryTest(t *testing.T, db *sql.DB) []queries.InsertGatewayEnvelopePar
}

func TestQueryAllEnvelopes(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand All @@ -80,7 +80,7 @@ func TestQueryAllEnvelopes(t *testing.T) {
}

func TestQueryPagedEnvelopes(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand All @@ -96,7 +96,7 @@ func TestQueryPagedEnvelopes(t *testing.T) {
}

func TestQueryEnvelopesByOriginator(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand All @@ -117,7 +117,7 @@ func TestQueryEnvelopesByOriginator(t *testing.T) {
}

func TestQueryEnvelopesByTopic(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand All @@ -136,7 +136,7 @@ func TestQueryEnvelopesByTopic(t *testing.T) {
}

func TestQueryEnvelopesFromLastSeen(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand All @@ -155,7 +155,7 @@ func TestQueryEnvelopesFromLastSeen(t *testing.T) {
}

func TestQueryEnvelopesWithEmptyResult(t *testing.T) {
svc, db, cleanup := testutils.NewTestService(t)
svc, db, cleanup := testutils.NewTestAPIService(t)
defer cleanup()
db_rows := setupQueryTest(t, db)

Expand Down
11 changes: 11 additions & 0 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ func (s *ApiServer) Addr() net.Addr {
return s.grpcListener.Addr()
}

func (s *ApiServer) DialGRPC(ctx context.Context) (*grpc.ClientConn, error) {
// https://github.com/grpc/grpc/blob/master/doc/naming.md
dialAddr := fmt.Sprintf("passthrough://localhost/%s", s.grpcListener.Addr().String())
return grpc.DialContext(

Check failure on line 112 in pkg/api/server.go

View workflow job for this annotation

GitHub Actions / Lint

SA1019: grpc.DialContext is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
ctx,
dialAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(),
)
}

func (s *ApiServer) gracefulShutdown(timeout time.Duration) {
ctx, cancel := context.WithCancel(context.Background())
// Attempt to use GracefulStop up until the timeout
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *DBSubscription[ValueType, CursorType]) poll() {
break
} else if err != nil {
s.log.Error(
"Error querying for DB subscription",
fmt.Sprintf("Error querying for DB subscription: %v", err),
zap.Any("lastSeen", s.lastSeen),
zap.Int32("numRows", s.options.NumRows),
)
Expand Down
81 changes: 81 additions & 0 deletions pkg/testutils/api_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package testutils

import (
"context"
"database/sql"
"testing"

"github.com/ethereum/go-ethereum/crypto"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/api"
"github.com/xmtp/xmtpd/pkg/db/queries"
mocks "github.com/xmtp/xmtpd/pkg/mocks/registry"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
"github.com/xmtp/xmtpd/pkg/registrant"
"github.com/xmtp/xmtpd/pkg/registry"
)

func NewTestAPIServer(t *testing.T) (*api.ApiServer, *sql.DB, func()) {
ctx, cancel := context.WithCancel(context.Background())
log := NewLog(t)
db, _, dbCleanup := NewDB(t, ctx)
privKey, err := crypto.GenerateKey()
require.NoError(t, err)
privKeyStr := "0x" + HexEncode(crypto.FromECDSA(privKey))
mockRegistry := mocks.NewMockNodeRegistry(t)
mockRegistry.EXPECT().GetNodes().Return([]registry.Node{
{NodeID: 1, SigningKey: &privKey.PublicKey},
}, nil)
registrant, err := registrant.NewRegistrant(ctx, queries.New(db), mockRegistry, privKeyStr)
require.NoError(t, err)

svr, err := api.NewAPIServer(ctx, db, log, 0 /*port*/, registrant, true /*enableReflection*/)
require.NoError(t, err)

return svr, db, func() {
log.Info("-------- Cleaning up server and DB ----------")
cancel()
svr.Close()
dbCleanup()
}
}

func NewTestAPIService(t *testing.T) (*api.Service, *sql.DB, func()) {
ctx, cancel := context.WithCancel(context.Background())
log := NewLog(t)
db, _, dbCleanup := NewDB(t, ctx)
privKey, err := crypto.GenerateKey()
require.NoError(t, err)
privKeyStr := "0x" + HexEncode(crypto.FromECDSA(privKey))
mockRegistry := mocks.NewMockNodeRegistry(t)
mockRegistry.EXPECT().GetNodes().Return([]registry.Node{
{NodeID: 1, SigningKey: &privKey.PublicKey},
}, nil)
registrant, err := registrant.NewRegistrant(ctx, queries.New(db), mockRegistry, privKeyStr)
require.NoError(t, err)

svc, err := api.NewReplicationApiService(ctx, log, registrant, db)
require.NoError(t, err)

return svc, db, func() {
log.Info("-------- Cleaning up service and DB ----------")
cancel()
svc.Close()
dbCleanup()
}
}

func NewTestAPIClient(t *testing.T) (message_api.ReplicationApiClient, *sql.DB, func()) {
svc, db, cleanup := NewTestAPIServer(t)
conn, err := svc.DialGRPC(context.Background())
require.NoError(t, err)
client := message_api.NewReplicationApiClient(conn)

return client, db, func() {
log.Info("-------- Closing client connection ----------")
conn.Close()
require.NoError(t, err)
cleanup()
}
}
38 changes: 0 additions & 38 deletions pkg/testutils/service.go

This file was deleted.

0 comments on commit ee1edb4

Please sign in to comment.