From 7c133c4eb9642e7344d1dc6cd0fb93f7d52dea54 Mon Sep 17 00:00:00 2001 From: Yourim Cha Date: Wed, 15 Feb 2023 12:09:58 +0900 Subject: [PATCH] Add watch multiple documents test --- test/integration/peer_awareness_test.go | 79 +++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/test/integration/peer_awareness_test.go b/test/integration/peer_awareness_test.go index 28ddd3289..3a6c5fe68 100644 --- a/test/integration/peer_awareness_test.go +++ b/test/integration/peer_awareness_test.go @@ -122,4 +122,83 @@ func TestPeerAwareness(t *testing.T) { assert.Equal(t, expected, responsePairs) }) + + t.Run("Watch multiple documents test", func(t *testing.T) { + ctx := context.Background() + + d1 := document.New(key.Key(t.Name())) + d2 := document.New(key.Key(t.Name())) + d3 := document.New(key.Key(t.Name() + "2")) + assert.NoError(t, c1.Attach(ctx, d1)) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + + var expected []watchResponsePair + var responsePairs []watchResponsePair + wgEvents := sync.WaitGroup{} + wgEvents.Add(1) + + // starting to watch a document + watch1Ctx, cancel1 := context.WithCancel(ctx) + defer cancel1() + wrch, err := c1.Watch(watch1Ctx, d1) + assert.NoError(t, err) + go func() { + defer func() { + wgEvents.Done() + }() + for { + select { + case <-time.After(time.Second): + assert.Fail(t, "timeout") + return + case wr := <-wrch: + if wr.Err != nil { + assert.Fail(t, "unexpected stream closing", wr.Err) + return + } + + if wr.Type == client.PeersChanged { + peers := wr.PeersMapByDoc[d1.Key().String()] + responsePairs = append(responsePairs, watchResponsePair{ + Type: wr.Type, + Peers: peers, + }) + + if len(peers) == 1 { + return + } + } + } + } + }() + + // 01. PeersChanged is triggered when another client watches the document + expected = append(expected, watchResponsePair{ + Type: client.PeersChanged, + Peers: map[string]types.Presence{ + c1.ID().String(): c1.Presence(), + c2.ID().String(): c2.Presence(), + }, + }) + assert.NoError(t, c2.Attach(ctx, d2)) + defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + assert.NoError(t, c2.Attach(ctx, d3)) + defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }() + watch2Ctx, cancel2 := context.WithCancel(ctx) + _, err = c2.Watch(watch2Ctx, d2, d3) + assert.NoError(t, err) + + // 02. PeersChanged is triggered when another client closes the watch + expected = append(expected, watchResponsePair{ + Type: client.PeersChanged, + Peers: map[string]types.Presence{ + c1.ID().String(): c1.Presence(), + }, + }) + cancel2() + + wgEvents.Wait() + + assert.Equal(t, expected, responsePairs) + }) }