Skip to content

Commit

Permalink
Add DB subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 23, 2024
1 parent 3879604 commit a100969
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions pkg/db/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package db

import (
"context"
"fmt"
"time"
)

type PollableDBQuery[ValueType any] func(lastSeenID int64, numRows int32) (results []ValueType, lastID int64, err error)

// Poll whenever notified, or at an interval if not notified
type PollingOptions struct {
Interval time.Duration
Notifier <-chan bool
NumRows int32
}

type DBSubscription[ValueType any] struct {
ctx context.Context
lastSeenID int64
options PollingOptions
query PollableDBQuery[ValueType]
updates chan<- []ValueType
}

func NewDBSubscription[ValueType any](
ctx context.Context,
query PollableDBQuery[ValueType],
lastSeenID int64,
options PollingOptions,
) *DBSubscription[ValueType] {
return &DBSubscription[ValueType]{
ctx: ctx,
lastSeenID: lastSeenID,
options: options,
query: query,
updates: nil,
}
}

func (s *DBSubscription[ValueType]) Start() (<-chan []ValueType, error) {
var err error
if s.updates != nil {
return nil, fmt.Errorf("Already started")
}
updates := make(chan []ValueType)
s.updates = updates

go func() {
if err := s.poll(); err != nil {
panic("TODO(rich)")
}

for {
select {
case <-s.ctx.Done():
return
case <-s.options.Notifier:
if err = s.poll(); err != nil {
panic("TODO(rich)")
}
case <-time.After(s.options.Interval):
if err = s.poll(); err != nil {
panic("TODO(rich)")
}
}
}

}()

return updates, nil
}

func (s *DBSubscription[ValueType]) poll() error {
for {
results, lastID, err := s.query(s.lastSeenID, s.options.NumRows)
if err != nil {
return err
}
s.lastSeenID = lastID
s.updates <- results
if int32(len(results)) < s.options.NumRows {
break
}
}

return nil
}

0 comments on commit a100969

Please sign in to comment.