Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update presence design document #609

Merged
merged 3 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added design/media/presence-api.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/presence-event-presence-changed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/presence-event-unwatched.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/presence-event-watched.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/presence-structure.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/pubsub.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed design/media/pubsub.png
Binary file not shown.
Binary file added design/media/server-streaming.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added design/media/watch-document.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
348 changes: 290 additions & 58 deletions design/peer-awareness.md

Large diffs are not rendered by default.

110 changes: 68 additions & 42 deletions design/pub-sub.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: pub-sub
target-version: 0.3.0
target-version: 0.4.6
---

# PubSub
Expand All @@ -18,60 +18,88 @@ Documents.

## Proposal Details

### How does it work?
### WatchDocument API

In Yorkie, we use gRPC-Web and it currently supports 2 RPC modes: unary RPCs, server-side streaming RPCs. ([Client-side and Bi-directional streaming is not supported.](https://github.com/grpc/grpc-web#streaming-support)). Server-side streaming allows the server to send multiple messages in response to a single client request.

Yorkie implements WatchDocuments API
using [gRPC server-side streaming](https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc)
to deliver the events that have occurred to other clients.
![server-side streaming RPCs](media/server-streaming.jpg)

Yorkie implements WatchDocument API using [gRPC server-side streaming](https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc) to deliver the events to other clients.

```protobuf
// api/yorkie.proto

service Yorkie {
...
rpc WatchDocuments (WatchDocumentsRequest) returns (stream WatchDocumentsResponse) {}
rpc WatchDocument (WatchDocumentRequest) returns (stream WatchDocumentResponse) {}
}
```

And to manage the event delivery target, we are using the [PubSub pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern). You can learn more by looking at the [sync package](https://github.com/yorkie-team/yorkie/blob/main/server/backend/sync/pubsub.go) we are implementing.
In brief, when the client sends a WatchDocument request, it establishes a stream connection(1). On the server side, when changes occur in a document, it retrieves the clients watching that document in the subscriptionsMap(2), and then sends responses through the server stream to those clients(3). As a result, clients watching the document can receive response about its changes through the stream without the separate requests.

![WatchDocument API](media/watch-document.jpg)

### How does it work?

We are using the [PubSub pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) for handling event delivery targets. For more details, you can check out the [sync package](https://github.com/yorkie-team/yorkie/blob/main/server/backend/sync/pubsub.go) that we're working on.

![pub-sub pattern vs observer pattern](media/pubsub-pattern.png)

The process of the event pub-sub is as follows:
![pub-sub event flow](media/pubsub.png)

![pub-sub event flow](media/pubsub.jpg)

#### 1. Set SubscriptionsMap

The `watchDocuments` API creates a `Subscription` instance and adds it to the `subscriptionsMapByDocKey`. The `Subscription` instance internally manages the `DocEvent channel`, and a `select` statement is used to retrieve events that are passed to the `Subscription` instance.
The `WatchDocument` API creates a `Subscription` instance and adds it to the `subscriptionsMapByDocID`[(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/rpc/yorkie_server.go#L518-L523). The `Subscription` instance internally manages the `DocEvent channel`.

```go
// Subscription represents a subscription of a subscriber to documents.
type Subscription struct {
id string
subscriber types.Client
subscriber *time.ActorID
closed bool
events chan DocEvent
}
```

#### 2. Publish Event

The publisher can send a `DocEvent` to the `Subscription` instances that subscribe to the same document through the `Publish` method.
When changes occur in a document through WatchDocument or PushPull, the `Publish` method is called to send out a `DocEvent` which includes events such as `DocumentsWatchedEvent`, `DocumentsUnwatchedEvent`, and `DocumentsChangedEvent`.

```go
type DocEvent struct {
Type types.DocEventType
Publisher types.Client
DocumentKeys []key.Key
// server/rpc/yorkie_server.go
func (s *yorkieServer) watchDoc(...) (...) {
// Publish DocumentsWatchedEvent during watchDocument
s.backend.Coordinator.Publish(
ctx,
subscription.Subscriber(),
sync.DocEvent{
Type: types.DocumentsWatchedEvent,
Publisher: subscription.Subscriber(),
DocumentID: documentID,
},
)
}

Publish(ctx context.Context, publisherID *time.ActorID, event sync.DocEvent)
// server/packs/packs.go
func PushPull(...) (...) {
// Publish DocumentsChangedEvent during pushpull
be.Coordinator.Publish(
ctx,
publisherID,
sync.DocEvent{
Type: types.DocumentsChangedEvent,
Publisher: publisherID,
DocumentID: docInfo.ID,
},
)
}
```

When performing `WatchDocuments`, `UpdatePresence`, and `PushPull`, the `Publish` method is called to deliver the `DocEvent`.

#### 3. Fire Event

The event is sent to the `Subscription` channels that subscribe to the same document by iterating through the `documentKeys` of the `DocEvent`.
The `Publish` method sends a `DocEvent` to the event channel of subscriptions that are subscribing to the document of the `DocEvent`. Through `subscriptionsMapByDocID`, we can find the subscriptions (created in step 1) that are subscribing to a specific document. Subsequently, the `DocEvent` is sent to the event channels of these subscriptions. [(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/backend/sync/memory/pubsub.go#L150-L196).

```go
func (m *PubSub) Publish(
Expand All @@ -82,34 +110,33 @@ func (m *PubSub) Publish(
m.subscriptionsMapMu.RLock()
defer m.subscriptionsMapMu.RUnlock()

for _, docKey := range event.DocumentKeys {
k := docKey.String()

if subs, ok := m.subscriptionsMapByDocKey[k]; ok {
for _, sub := range subs.Map() {
// If the subscriber is itself, do not send
if sub.Subscriber().ID.Compare(publisherID) == 0 {
continue
}
documentID := event.DocumentID
if subs, ok := m.subscriptionsMapByDocID[documentID]; ok {
for _, sub := range subs.Map() {
// If the subscriber is itself, do not send
if sub.Subscriber().Compare(publisherID) == 0 {
continue
}

// Send the event to the peer's event channel
sub.Events() <- event
select {
// Send the event to the peer's event channel
case sub.Events() <- event:
}
}
}
}
```

#### 4. Send watchDocuments response to stream
#### 4. Send watchDocument response to stream

In the `select` statement from step 1, when the `Subscription` channel receives an event, the event is sent to the `watchDocumentsResponse` of the rpc stream.
When the event channel of `Subscription` receives an event, the event is sent to the `WatchDocumentResponse` of the rpc stream. [(code🔍)](https://github.com/yorkie-team/yorkie/blob/16fd182021231d75562a933cb32d924af16fc7f4/server/rpc/yorkie_server.go#L421-L443)

```go
func (s *yorkieServer) WatchDocuments(
req *api.WatchDocumentsRequest,
stream api.YorkieService_WatchDocumentsServer,
func (s *yorkieServer) WatchDocument(
req *api.WatchDocumentRequest,
stream api.YorkieService_WatchDocumentServer,
) error {
// ...
// ...
for {
select {
case <-s.serviceCtx.Done():
Expand All @@ -119,12 +146,11 @@ func (s *yorkieServer) WatchDocuments(
case event := <-subscription.Events():
eventType, err := converter.ToDocEventType(event.Type)

if err := stream.Send(&api.WatchDocumentsResponse{
Body: &api.WatchDocumentsResponse_Event{
if err := stream.Send(&api.WatchDocumentResponse{
Body: &api.WatchDocumentResponse_Event{
Event: &api.DocEvent{
Type: eventType,
Publisher: converter.ToClient(event.Publisher),
DocumentKeys: converter.ToDocumentKeys(event.DocumentKeys),
Type: eventType,
Publisher: event.Publisher.Bytes(),
},
},
}); err != nil {
Expand All @@ -137,4 +163,4 @@ func (s *yorkieServer) WatchDocuments(

### Risks and Mitigation

Currently, Subscription instances are managed in memory.
Currently, Subscription instances are managed in memory.