Skip to content

Commit

Permalink
Add DB subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 21, 2024
1 parent b342aa5 commit 92f5ac8
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions pkg/db/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package db

import (
"fmt"
"time"
)

type PollableDBQuery[ValueType any] func(lastSeenID int64, numRows int32) (results []ValueType, lastID int64, err error)

// Poll whenever notified, or at an interval if not notified
type PollingOptions struct {
Interval time.Duration
Notifier <-chan bool
NumRows int32
}

type DBSubscription[ValueType any] struct {
lastSeenID int64
options PollingOptions
query PollableDBQuery[ValueType]
updates chan<- []ValueType
}

func NewDBSubscription[ValueType any](
query PollableDBQuery[ValueType],
lastSeenID int64,
options PollingOptions,
) *DBSubscription[ValueType] {
return &DBSubscription[ValueType]{
lastSeenID: lastSeenID,
options: options,
query: query,
updates: nil,
}
}

func (s *DBSubscription[ValueType]) Start() (<-chan []ValueType, error) {
var err error
if s.updates != nil {
return nil, fmt.Errorf("Already started")
}
updates := make(chan []ValueType)
s.updates = updates

go func() {
if err := s.poll(); err != nil {
panic("TODO(rich)")
}

for {
select {
case <-s.options.Notifier:
if err = s.poll(); err != nil {
panic("TODO(rich)")
}
case <-time.After(s.options.Interval):
if err = s.poll(); err != nil {
panic("TODO(rich)")
}
}
}

}()

return updates, nil
}

func (s *DBSubscription[ValueType]) poll() error {
for {
results, lastID, err := s.query(s.lastSeenID, s.options.NumRows)
if err != nil {
return err
}
s.lastSeenID = lastID
s.updates <- results
if int32(len(results)) < s.options.NumRows {
break
}
}

return nil
}

// One channel or multiple?

// Subscriptions:
// - One for the publish worker on the staged table
// - One giant one for the entire envelope table
// - For each envelope, figure out which subscriptions to push it on?
// - Look at Tsachi's PR
// Do we just select * by ID on the table, or do we need to be able to filter?
// - Could look at this later

0 comments on commit 92f5ac8

Please sign in to comment.