Skip to content

Commit

Permalink
Add query to fetch initial vector clock
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Sep 12, 2024
1 parent 08f1870 commit f2c6b04
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (s *Service) BatchSubscribeEnvelopes(
}

ch, err := s.subscribeWorker.subscribe(requests)
if err != nil {
// TODO(rich) Tidy error interface, validate before sending header
return err
}
defer func() {
// TODO(rich) Handle unsubscribe
// if sub != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/api/subscribeWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (

type subscriber = chan<- []*message_api.OriginatorEnvelope

// A worker that listens for new envelopes in the DB and sends them to subscribers
// Assumes that there are many listeners - non-blocking updates are sent on buffered channels
// and may be dropped if full
type subscribeWorker struct {
ctx context.Context
log *zap.Logger
Expand All @@ -39,8 +42,7 @@ func startSubscribeWorker(
store *sql.DB,
) (*subscribeWorker, error) {
q := queries.New(store)
// Get vector clock from DB
query := func(ctx context.Context, lastSeen db.VectorClock, numRows int32) ([]queries.GatewayEnvelope, db.VectorClock, error) {
pollableQuery := func(ctx context.Context, lastSeen db.VectorClock, numRows int32) ([]queries.GatewayEnvelope, db.VectorClock, error) {
envs, err := q.
SelectGatewayEnvelopes(
ctx,
Expand All @@ -51,19 +53,26 @@ func startSubscribeWorker(
return nil, lastSeen, err
}
for _, env := range envs {
// TODO(rich) Handle out-of-order envelopes
lastSeen[uint32(env.OriginatorNodeID)] = uint64(env.OriginatorSequenceID)
}
return envs, lastSeen, nil
}

vc, err := q.SelectVectorClock(ctx)
if err != nil {
return nil, err
}

subscription := db.NewDBSubscription(
ctx,
log,
query,
db.VectorClock{}, // TODO(rich) fetch from DB
pollableQuery,
db.ToVectorClock(vc),
db.PollingOptions{
Interval: 100 * time.Millisecond,
NumRows: 100,
}, // TODO(rich) Make numRows nullable
NumRows: 10000,
},
)
dbChan, err := subscription.Start()
if err != nil {
Expand All @@ -89,6 +98,7 @@ func (s *subscribeWorker) start() {
case <-s.ctx.Done():
return
case new_batch := <-s.dbSubscription:
// Log batch size, performance
for _, row := range new_batch {
s.dispatch(&row)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,12 @@ LIMIT @num_rows;
DELETE FROM staged_originator_envelopes
WHERE id = @id;

-- name: SelectVectorClock :many
SELECT
originator_node_id,
max(originator_sequence_id)::BIGINT AS originator_sequence_id
FROM
gateway_envelopes
GROUP BY
originator_node_id;

38 changes: 38 additions & 0 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/db/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type PollingOptions struct {
NumRows int32
}

// A subscription that polls a DB for updates
// Assumes there is only one listener (updates block on a single unbuffered channel)
type DBSubscription[ValueType any, CursorType any] struct {
ctx context.Context
log *zap.Logger
Expand Down
8 changes: 8 additions & 0 deletions pkg/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ func SetVectorClock(
}
return q
}

func ToVectorClock(rows []queries.SelectVectorClockRow) VectorClock {
vc := make(VectorClock)
for _, row := range rows {
vc[uint32(row.OriginatorNodeID)] = uint64(row.OriginatorSequenceID)
}
return vc
}

0 comments on commit f2c6b04

Please sign in to comment.