Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
jgongd committed Jun 3, 2024
1 parent 2112afe commit 2f26cec
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 55 deletions.
2 changes: 0 additions & 2 deletions master/internal/stream/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ func ModelMakeFilter(spec *ModelSubscriptionSpec) (func(*ModelMsg) bool, error)
// return a closure around our copied maps
return func(msg *ModelMsg) bool {
// subscribed to model by this model_id?
fmt.Printf("model msg: %#v\n", msg)
fmt.Printf("modelIDs: %#v, workspaceIDs: %#v, userIDs: %#v\n", modelIDs, workspaceIDs, userIDs)
if _, ok := modelIDs[msg.ID]; ok {
return true
}
Expand Down
4 changes: 0 additions & 4 deletions master/internal/stream/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,19 +403,16 @@ func publishLoop[T stream.Msg](

// Did we get a notification?
case notification := <-listener.Notify:
fmt.Println("recemove notification")
if notification == nil {
// Some notification may be lost during connection loss. Restart the publisher
// system to recover.
return fmt.Errorf("(%s) lost connection", channelName)
}
var event stream.Event[T]
err = json.Unmarshal([]byte(notification.Extra), &event)
fmt.Printf("notification: %#v, err: %#v\n", notification.Extra, err)
if err != nil {
return err
}
fmt.Printf("%+v are of type %T\n", event, event)
events = append(events, event)
// Collect all available notifications before proceeding.
keepGoing := true
Expand All @@ -437,7 +434,6 @@ func publishLoop[T stream.Msg](
}

idToRecordCache := map[int]stream.RecordCache{}
// hydratedEvents :=
for _, ev := range events {
idToRecordCache = publisher.HydrateMsg(ev.After, idToRecordCache)
}
Expand Down
1 change: 0 additions & 1 deletion master/internal/stream/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (
// testPrepareFunc returns a string representation of known messages;
// otherwise, returns the MarshallableMsg that the streamer sends.
func testPrepareFunc(i stream.MarshallableMsg) interface{} {
fmt.Printf("msg: %#v\n", i)
switch msg := i.(type) {
case *stream.UpsertMsg:
switch typedMsg := msg.Msg.(type) {
Expand Down
23 changes: 3 additions & 20 deletions master/pkg/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/list"
"database/sql"
"encoding/json"
"fmt"
"reflect"
"sync"

Expand Down Expand Up @@ -195,13 +194,8 @@ func (p *Publisher[T]) CloseAllStreamers() {
p.Subscriptions = nil
}

// hydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event
// and grabs the fields(hydrated message) that we care about.
// Here are the different scenarios of an event in this function:
// 1. The record with id x has been deleted.
// 2. The record with id x still has the same Seq as the rawMsg.
// 3. The record with id x has a Seq greater than the rawMsg.
// The function returns an upsert message scenarios 2.
// HydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event
// and get the full record.
func (p *Publisher[T]) HydrateMsg(rawMsg T, idToRecordCache map[int]RecordCache) map[int]RecordCache {
if reflect.ValueOf(rawMsg).IsNil() {
return idToRecordCache
Expand Down Expand Up @@ -236,57 +230,46 @@ func (p *Publisher[T]) Broadcast(events []Event[T], idToRecordCache map[int]Reco
// check each event against each subscription
for e := p.Subscriptions.Front(); e != nil; e = e.Next() {
if sub, ok := e.Value.(*Subscription[T]); ok {
fmt.Printf("\nsub: %+v\n", sub)
userNotKnownIDs := set.New[int]()
func() {
for i, ev := range events {
fmt.Printf("ev: %#v\n", ev)
for _, ev := range events {
var msg interface{}
switch {
case !reflect.ValueOf(ev.After).IsNil() && sub.filter(ev.After) && sub.permissionFilter(ev.After):
// update, insert, or fallin: send the record to the client.
fmt.Println("in update, insert, fallin")
fmt.Printf("index: %+v, afterMsg: %+v\n", i, ev.After)
afterMsg := ev.After
isInsert := reflect.ValueOf(ev.Before).IsNil()
isFallin := !reflect.ValueOf(ev.Before).IsNil() && (!sub.filter(ev.Before) || !sub.permissionFilter(ev.Before))
fmt.Printf("isInsert: %v, isFallin: %+v\n", isInsert, isFallin)

if recordCache, ok := idToRecordCache[afterMsg.GetID()]; ok && recordCache.UpsertMsg != nil {
cachedSeq := recordCache.UpsertMsg.SeqNum()
if cachedSeq == afterMsg.SeqNum() {
msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg)
fmt.Printf("send msg: %+v\n", msg)
} else {
if isInsert || isFallin {
userNotKnownIDs.Insert(afterMsg.GetID())
fmt.Printf("userNotKnownIDS: %#v\n", userNotKnownIDs)
}
continue
}
} else {
if isInsert || isFallin {
userNotKnownIDs.Insert(afterMsg.GetID())
fmt.Printf("userNotKnownIDS: %#v\n", userNotKnownIDs)
}
continue
}

case !reflect.ValueOf(ev.Before).IsNil() && sub.filter(ev.Before) && sub.permissionFilter(ev.Before):
// deletion or fallout: tell the client the record is deleted.
fmt.Println("delete or fallout")
beforeMsg := ev.Before
if !userNotKnownIDs.Contains(beforeMsg.GetID()) {
msg = sub.Streamer.PrepareFn(beforeMsg.DeleteMsg())
userNotKnownIDs.Insert(beforeMsg.GetID())
fmt.Printf("send msg: %+v\n", msg)
} else {
continue
}

default:
// ignore this message
fmt.Println("ignore")
continue
}
// is this the first match for this Subscription during this Broadcast?
Expand Down
48 changes: 20 additions & 28 deletions master/pkg/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stream

import (
"database/sql"
"fmt"
"strconv"
"testing"

Expand Down Expand Up @@ -136,7 +135,7 @@ func TestConfigureSubscription(t *testing.T) {
dummyFilter := func(msg *TestMsgTypeA) bool {
return true
}
dummyHydrator := func(ID int) (*TestMsgTypeA, error) {
dummyHydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{}, nil
}
streamer := NewStreamer(prepareNothing)
Expand Down Expand Up @@ -175,10 +174,10 @@ func TestConfigureSubscription(t *testing.T) {
}

func TestBroadcast(t *testing.T) {
hydrator := func(ID int) (*TestMsgTypeA, error) {
hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{
Seq: int64(ID),
ID: ID,
Seq: int64(msg.GetID()),
ID: msg.GetID(),
}, nil
}
streamer := NewStreamer(prepareNothing)
Expand Down Expand Up @@ -217,10 +216,10 @@ func TestBroadcast(t *testing.T) {

func TestBroadcastWithFilters(t *testing.T) {
streamer := NewStreamer(prepareNothing)
hydrator := func(ID int) (*TestMsgTypeA, error) {
hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{
Seq: int64(ID),
ID: ID,
Seq: int64(msg.GetID()),
ID: msg.GetID(),
}, nil
}
publisher := NewPublisher[*TestMsgTypeA](hydrator)
Expand Down Expand Up @@ -338,10 +337,10 @@ func TestBroadcastWithFilters(t *testing.T) {

func TestBroadcastWithPermissionFilters(t *testing.T) {
streamer := NewStreamer(prepareNothing)
hydrator := func(ID int) (*TestMsgTypeA, error) {
hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{
Seq: int64(ID),
ID: ID,
Seq: int64(msg.GetID()),
ID: msg.GetID(),
}, nil
}
publisher := NewPublisher[*TestMsgTypeA](hydrator)
Expand Down Expand Up @@ -428,16 +427,16 @@ func TestBroadcastWithPermissionFilters(t *testing.T) {
func TestBroadcastSeparateEvents(t *testing.T) {
streamer := NewStreamer(prepareNothing)
streamerTwo := NewStreamer(prepareNothing)
hydratorA := func(ID int) (*TestMsgTypeA, error) {
hydratorA := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{
Seq: int64(ID),
ID: ID,
Seq: int64(msg.GetID()),
ID: msg.GetID(),
}, nil
}
hydratorB := func(ID int) (*TestMsgTypeB, error) {
hydratorB := func(msg *TestMsgTypeB) (*TestMsgTypeB, error) {
return &TestMsgTypeB{
Seq: int64(ID),
ID: ID,
Seq: int64(msg.GetID()),
ID: msg.GetID(),
}, nil
}
publisher := NewPublisher[*TestMsgTypeA](hydratorA)
Expand Down Expand Up @@ -534,7 +533,6 @@ func TestBroadcastSeparateEvents(t *testing.T) {
}

func setup(t *testing.T, testEvents []TestEvent, testSubscribers []TestSubscriber) {

var events []Event[*TestMsgTypeA]
userToFalloutSeq := make(map[int]int64)
userToFallinSeq := make(map[int]int64)
Expand Down Expand Up @@ -581,29 +579,25 @@ func setup(t *testing.T, testEvents []TestEvent, testSubscribers []TestSubscribe
userToFalloutSeq[ts.ID] = int64(len(testEvents) + 1)
}
}
fmt.Printf("userToFalloutSeq: %+v\n", userToFalloutSeq)
// Setting fallin seq for users do not have a fallin event.
for _, ts := range testSubscribers {
if _, ok := userToFallinSeq[ts.ID]; !ok {
userToFallinSeq[ts.ID] = int64(-1)
}
}
fmt.Printf("userToFallinSeq: %+v\n", userToFallinSeq)

// Getting seq for the mocked hydrator

var hydrator func(int) (*TestMsgTypeA, error)
var hydrator func(*TestMsgTypeA) (*TestMsgTypeA, error)
if testEvents[len(testEvents)-1].Type != "delete" {
lastSeq := testEvents[len(testEvents)-1].AfterSeq

hydrator = func(ID int) (*TestMsgTypeA, error) {
hydrator = func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return &TestMsgTypeA{
Seq: lastSeq,
ID: ID,
ID: msg.GetID(),
}, nil
}
} else {
hydrator = func(ID int) (*TestMsgTypeA, error) {
hydrator = func(msg *TestMsgTypeA) (*TestMsgTypeA, error) {
return nil, sql.ErrNoRows
}
}
Expand Down Expand Up @@ -986,11 +980,9 @@ func TestTwoSubscribers(t *testing.T) {
for _, ts := range testSubscribers {
streamerMsgs = append(streamerMsgs, ts.Streamer.Msgs...)
}
fmt.Printf("all msg: %#v\n", streamerMsgs)
require.Equal(t, len(tc.outGoingMsgs), len(streamerMsgs), "streamer.Msgs length incorrect")

for i, o := range tc.outGoingMsgs {
fmt.Printf("msg: %#v\n", streamerMsgs[i])
switch o.(type) {
case UpsertMsg:
upsertMsg, ok := streamerMsgs[i].(*UpsertMsg)
Expand Down

0 comments on commit 2f26cec

Please sign in to comment.