Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter out unsubscribed documents key in DocEvent #463

Merged
merged 5 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *PubSub) Subscribe(
m.subscriptionsMapMu.Lock()
defer m.subscriptionsMapMu.Unlock()

sub := sync.NewSubscription(subscriber)
sub := sync.NewSubscription(subscriber, keys)
m.subscriptionMapBySubscriber[sub.SubscriberID()] = sub

for _, docKey := range keys {
Expand Down Expand Up @@ -207,10 +207,15 @@ func (m *PubSub) Publish(
)
}

watchDocEvent := sync.DocEvent{
Type: event.Type,
Publisher: event.Publisher,
DocumentKeys: []key.Key{docKey},
}
// NOTE: When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case sub.Events() <- event:
case sub.Events() <- watchDocEvent:
case <-gotime.After(100 * gotime.Millisecond):
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout`,
Expand Down
10 changes: 9 additions & 1 deletion server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ import (
type Subscription struct {
id string
subscriber types.Client
docKeys []key.Key
closed bool
events chan DocEvent
}

// NewSubscription creates a new instance of Subscription.
func NewSubscription(subscriber types.Client) *Subscription {
func NewSubscription(subscriber types.Client, keys []key.Key) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
docKeys: keys,
events: make(chan DocEvent, 1),
}
}
Expand All @@ -57,6 +59,12 @@ func (s *Subscription) Events() chan DocEvent {
return s.events
}

// DocKeys returns an array of the keys of the documents
// that the subscription is subscribed to.
func (s *Subscription) DocKeys() []key.Key {
return s.docKeys
}

// Subscriber returns the subscriber of this subscription.
func (s *Subscription) Subscriber() types.Client {
return s.subscriber
Expand Down
79 changes: 79 additions & 0 deletions test/integration/peer_awareness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,83 @@ func TestPeerAwareness(t *testing.T) {

assert.Equal(t, expected, responsePairs)
})

t.Run("Watch multiple documents test", func(t *testing.T) {
ctx := context.Background()

d1 := document.New(key.Key(t.Name()))
d2 := document.New(key.Key(t.Name()))
d3 := document.New(key.Key(t.Name() + "2"))
assert.NoError(t, c1.Attach(ctx, d1))
defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }()

var expected []watchResponsePair
var responsePairs []watchResponsePair
wgEvents := sync.WaitGroup{}
wgEvents.Add(1)

// starting to watch a document
watch1Ctx, cancel1 := context.WithCancel(ctx)
defer cancel1()
wrch, err := c1.Watch(watch1Ctx, d1)
assert.NoError(t, err)
go func() {
defer func() {
wgEvents.Done()
}()
for {
select {
case <-time.After(time.Second):
assert.Fail(t, "timeout")
return
case wr := <-wrch:
if wr.Err != nil {
assert.Fail(t, "unexpected stream closing", wr.Err)
return
}

if wr.Type == client.PeersChanged {
peers := wr.PeersMapByDoc[d1.Key().String()]
responsePairs = append(responsePairs, watchResponsePair{
Type: wr.Type,
Peers: peers,
})

if len(peers) == 1 {
return
}
}
}
}
}()

// 01. PeersChanged is triggered when another client watches the document
expected = append(expected, watchResponsePair{
Type: client.PeersChanged,
Peers: map[string]types.Presence{
c1.ID().String(): c1.Presence(),
c2.ID().String(): c2.Presence(),
},
})
assert.NoError(t, c2.Attach(ctx, d2))
defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }()
assert.NoError(t, c2.Attach(ctx, d3))
defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }()
watch2Ctx, cancel2 := context.WithCancel(ctx)
_, err = c2.Watch(watch2Ctx, d2, d3)
assert.NoError(t, err)

// 02. PeersChanged is triggered when another client closes the watch
expected = append(expected, watchResponsePair{
Type: client.PeersChanged,
Peers: map[string]types.Presence{
c1.ID().String(): c1.Presence(),
},
})
cancel2()
hackerwins marked this conversation as resolved.
Show resolved Hide resolved

wgEvents.Wait()

assert.Equal(t, expected, responsePairs)
})
}