diff --git a/master/internal/stream/models.go b/master/internal/stream/models.go index ae187de3d627..516969f2d8be 100644 --- a/master/internal/stream/models.go +++ b/master/internal/stream/models.go @@ -218,8 +218,6 @@ func ModelMakeFilter(spec *ModelSubscriptionSpec) (func(*ModelMsg) bool, error) // return a closure around our copied maps return func(msg *ModelMsg) bool { // subscribed to model by this model_id? - fmt.Printf("model msg: %#v\n", msg) - fmt.Printf("modelIDs: %#v, workspaceIDs: %#v, userIDs: %#v\n", modelIDs, workspaceIDs, userIDs) if _, ok := modelIDs[msg.ID]; ok { return true } diff --git a/master/internal/stream/publisher.go b/master/internal/stream/publisher.go index 3ca02632ffcf..cd749658ade2 100644 --- a/master/internal/stream/publisher.go +++ b/master/internal/stream/publisher.go @@ -403,7 +403,6 @@ func publishLoop[T stream.Msg]( // Did we get a notification? case notification := <-listener.Notify: - fmt.Println("recemove notification") if notification == nil { // Some notification may be lost during connection loss. Restart the publisher // system to recover. @@ -411,11 +410,9 @@ func publishLoop[T stream.Msg]( } var event stream.Event[T] err = json.Unmarshal([]byte(notification.Extra), &event) - fmt.Printf("notification: %#v, err: %#v\n", notification.Extra, err) if err != nil { return err } - fmt.Printf("%+v are of type %T\n", event, event) events = append(events, event) // Collect all available notifications before proceeding. keepGoing := true @@ -437,7 +434,6 @@ func publishLoop[T stream.Msg]( } idToRecordCache := map[int]stream.RecordCache{} - // hydratedEvents := for _, ev := range events { idToRecordCache = publisher.HydrateMsg(ev.After, idToRecordCache) } diff --git a/master/internal/stream/test_util.go b/master/internal/stream/test_util.go index aea34a8531ba..fe14978ada69 100644 --- a/master/internal/stream/test_util.go +++ b/master/internal/stream/test_util.go @@ -23,7 +23,6 @@ const ( // testPrepareFunc returns a string representation of known messages; // otherwise, returns the MarshallableMsg that the streamer sends. func testPrepareFunc(i stream.MarshallableMsg) interface{} { - fmt.Printf("msg: %#v\n", i) switch msg := i.(type) { case *stream.UpsertMsg: switch typedMsg := msg.Msg.(type) { diff --git a/master/pkg/stream/stream.go b/master/pkg/stream/stream.go index 7da662b354c5..11de4f007fd8 100644 --- a/master/pkg/stream/stream.go +++ b/master/pkg/stream/stream.go @@ -4,7 +4,6 @@ import ( "container/list" "database/sql" "encoding/json" - "fmt" "reflect" "sync" @@ -195,13 +194,8 @@ func (p *Publisher[T]) CloseAllStreamers() { p.Subscriptions = nil } -// hydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event -// and grabs the fields(hydrated message) that we care about. -// Here are the different scenarios of an event in this function: -// 1. The record with id x has been deleted. -// 2. The record with id x still has the same Seq as the rawMsg. -// 3. The record with id x has a Seq greater than the rawMsg. -// The function returns an upsert message scenarios 2. +// HydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event +// and get the full record. func (p *Publisher[T]) HydrateMsg(rawMsg T, idToRecordCache map[int]RecordCache) map[int]RecordCache { if reflect.ValueOf(rawMsg).IsNil() { return idToRecordCache @@ -236,57 +230,46 @@ func (p *Publisher[T]) Broadcast(events []Event[T], idToRecordCache map[int]Reco // check each event against each subscription for e := p.Subscriptions.Front(); e != nil; e = e.Next() { if sub, ok := e.Value.(*Subscription[T]); ok { - fmt.Printf("\nsub: %+v\n", sub) userNotKnownIDs := set.New[int]() func() { - for i, ev := range events { - fmt.Printf("ev: %#v\n", ev) + for _, ev := range events { var msg interface{} switch { case !reflect.ValueOf(ev.After).IsNil() && sub.filter(ev.After) && sub.permissionFilter(ev.After): // update, insert, or fallin: send the record to the client. - fmt.Println("in update, insert, fallin") - fmt.Printf("index: %+v, afterMsg: %+v\n", i, ev.After) afterMsg := ev.After isInsert := reflect.ValueOf(ev.Before).IsNil() isFallin := !reflect.ValueOf(ev.Before).IsNil() && (!sub.filter(ev.Before) || !sub.permissionFilter(ev.Before)) - fmt.Printf("isInsert: %v, isFallin: %+v\n", isInsert, isFallin) if recordCache, ok := idToRecordCache[afterMsg.GetID()]; ok && recordCache.UpsertMsg != nil { cachedSeq := recordCache.UpsertMsg.SeqNum() if cachedSeq == afterMsg.SeqNum() { msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg) - fmt.Printf("send msg: %+v\n", msg) } else { if isInsert || isFallin { userNotKnownIDs.Insert(afterMsg.GetID()) - fmt.Printf("userNotKnownIDS: %#v\n", userNotKnownIDs) } continue } } else { if isInsert || isFallin { userNotKnownIDs.Insert(afterMsg.GetID()) - fmt.Printf("userNotKnownIDS: %#v\n", userNotKnownIDs) } continue } case !reflect.ValueOf(ev.Before).IsNil() && sub.filter(ev.Before) && sub.permissionFilter(ev.Before): // deletion or fallout: tell the client the record is deleted. - fmt.Println("delete or fallout") beforeMsg := ev.Before if !userNotKnownIDs.Contains(beforeMsg.GetID()) { msg = sub.Streamer.PrepareFn(beforeMsg.DeleteMsg()) userNotKnownIDs.Insert(beforeMsg.GetID()) - fmt.Printf("send msg: %+v\n", msg) } else { continue } default: // ignore this message - fmt.Println("ignore") continue } // is this the first match for this Subscription during this Broadcast? diff --git a/master/pkg/stream/stream_test.go b/master/pkg/stream/stream_test.go index da9987be1a40..45aea2b63fde 100644 --- a/master/pkg/stream/stream_test.go +++ b/master/pkg/stream/stream_test.go @@ -2,7 +2,6 @@ package stream import ( "database/sql" - "fmt" "strconv" "testing" @@ -136,7 +135,7 @@ func TestConfigureSubscription(t *testing.T) { dummyFilter := func(msg *TestMsgTypeA) bool { return true } - dummyHydrator := func(ID int) (*TestMsgTypeA, error) { + dummyHydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{}, nil } streamer := NewStreamer(prepareNothing) @@ -175,10 +174,10 @@ func TestConfigureSubscription(t *testing.T) { } func TestBroadcast(t *testing.T) { - hydrator := func(ID int) (*TestMsgTypeA, error) { + hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{ - Seq: int64(ID), - ID: ID, + Seq: int64(msg.GetID()), + ID: msg.GetID(), }, nil } streamer := NewStreamer(prepareNothing) @@ -217,10 +216,10 @@ func TestBroadcast(t *testing.T) { func TestBroadcastWithFilters(t *testing.T) { streamer := NewStreamer(prepareNothing) - hydrator := func(ID int) (*TestMsgTypeA, error) { + hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{ - Seq: int64(ID), - ID: ID, + Seq: int64(msg.GetID()), + ID: msg.GetID(), }, nil } publisher := NewPublisher[*TestMsgTypeA](hydrator) @@ -338,10 +337,10 @@ func TestBroadcastWithFilters(t *testing.T) { func TestBroadcastWithPermissionFilters(t *testing.T) { streamer := NewStreamer(prepareNothing) - hydrator := func(ID int) (*TestMsgTypeA, error) { + hydrator := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{ - Seq: int64(ID), - ID: ID, + Seq: int64(msg.GetID()), + ID: msg.GetID(), }, nil } publisher := NewPublisher[*TestMsgTypeA](hydrator) @@ -428,16 +427,16 @@ func TestBroadcastWithPermissionFilters(t *testing.T) { func TestBroadcastSeparateEvents(t *testing.T) { streamer := NewStreamer(prepareNothing) streamerTwo := NewStreamer(prepareNothing) - hydratorA := func(ID int) (*TestMsgTypeA, error) { + hydratorA := func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{ - Seq: int64(ID), - ID: ID, + Seq: int64(msg.GetID()), + ID: msg.GetID(), }, nil } - hydratorB := func(ID int) (*TestMsgTypeB, error) { + hydratorB := func(msg *TestMsgTypeB) (*TestMsgTypeB, error) { return &TestMsgTypeB{ - Seq: int64(ID), - ID: ID, + Seq: int64(msg.GetID()), + ID: msg.GetID(), }, nil } publisher := NewPublisher[*TestMsgTypeA](hydratorA) @@ -534,7 +533,6 @@ func TestBroadcastSeparateEvents(t *testing.T) { } func setup(t *testing.T, testEvents []TestEvent, testSubscribers []TestSubscriber) { - var events []Event[*TestMsgTypeA] userToFalloutSeq := make(map[int]int64) userToFallinSeq := make(map[int]int64) @@ -581,29 +579,25 @@ func setup(t *testing.T, testEvents []TestEvent, testSubscribers []TestSubscribe userToFalloutSeq[ts.ID] = int64(len(testEvents) + 1) } } - fmt.Printf("userToFalloutSeq: %+v\n", userToFalloutSeq) // Setting fallin seq for users do not have a fallin event. for _, ts := range testSubscribers { if _, ok := userToFallinSeq[ts.ID]; !ok { userToFallinSeq[ts.ID] = int64(-1) } } - fmt.Printf("userToFallinSeq: %+v\n", userToFallinSeq) - - // Getting seq for the mocked hydrator - var hydrator func(int) (*TestMsgTypeA, error) + var hydrator func(*TestMsgTypeA) (*TestMsgTypeA, error) if testEvents[len(testEvents)-1].Type != "delete" { lastSeq := testEvents[len(testEvents)-1].AfterSeq - hydrator = func(ID int) (*TestMsgTypeA, error) { + hydrator = func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return &TestMsgTypeA{ Seq: lastSeq, - ID: ID, + ID: msg.GetID(), }, nil } } else { - hydrator = func(ID int) (*TestMsgTypeA, error) { + hydrator = func(msg *TestMsgTypeA) (*TestMsgTypeA, error) { return nil, sql.ErrNoRows } } @@ -986,11 +980,9 @@ func TestTwoSubscribers(t *testing.T) { for _, ts := range testSubscribers { streamerMsgs = append(streamerMsgs, ts.Streamer.Msgs...) } - fmt.Printf("all msg: %#v\n", streamerMsgs) require.Equal(t, len(tc.outGoingMsgs), len(streamerMsgs), "streamer.Msgs length incorrect") for i, o := range tc.outGoingMsgs { - fmt.Printf("msg: %#v\n", streamerMsgs[i]) switch o.(type) { case UpsertMsg: upsertMsg, ok := streamerMsgs[i].(*UpsertMsg)