Skip to content

Commit

Permalink
Added a separate message queue to realtime presence
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Aug 29, 2023
1 parent 55bf6a4 commit c41ddeb
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 52 deletions.
10 changes: 5 additions & 5 deletions ably/realtime_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ func newRealtimeChannel(name string, client *Realtime, chOptions *channelOptions
func (c *RealtimeChannel) onConnStateChange(change ConnectionStateChange) {
switch change.Current {
case ConnectionStateConnected:
c.queue.Flush(false)
c.queue.Flush()
case ConnectionStateFailed:
c.setState(ChannelStateFailed, change.Reason, false)
c.queue.Fail(change.Reason, false)
c.queue.Fail(change.Reason)
}
}

Expand Down Expand Up @@ -813,7 +813,7 @@ func (c *RealtimeChannel) notify(msg *protocolMessage) {
if isNewAttach || !isAttachResumed { //RTL12
c.setState(ChannelStateAttached, newErrorFromProto(msg.Error), isAttachResumed)
}
c.queue.Flush(false)
c.queue.Flush()
case actionDetached:
c.mtx.Lock()
err := error(newErrorFromProto(msg.Error))
Expand Down Expand Up @@ -858,7 +858,7 @@ func (c *RealtimeChannel) notify(msg *protocolMessage) {
c.Presence.processIncomingMessage(msg, "")
case actionError:
c.setState(ChannelStateFailed, newErrorFromProto(msg.Error), false)
c.queue.Fail(newErrorFromProto(msg.Error), false)
c.queue.Fail(newErrorFromProto(msg.Error))
case actionMessage:
if c.State() == ChannelStateAttached {
for _, msg := range msg.Messages {
Expand Down Expand Up @@ -970,7 +970,7 @@ func (c *RealtimeChannel) setState(state ChannelState, err error, resumed bool)
}
// RTP5b
if state == ChannelStateAttached {
c.queue.Flush(true)
c.queue.Flush()
}
// RTP5f
if state == ChannelStateSuspended {
Expand Down
2 changes: 1 addition & 1 deletion ably/realtime_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *Realtime) onReconnected(isNewID bool) {
// No need to reattach: state is preserved. We just need to flush the
// queue of pending messages.
for _, ch := range c.Channels.Iterate() {
ch.queue.Flush(false)
ch.queue.Flush()
}
//RTN19a
c.Connection.resendPending()
Expand Down
62 changes: 36 additions & 26 deletions ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ably

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -31,6 +32,7 @@ type RealtimePresence struct {
state PresenceAction
syncMtx sync.Mutex
syncState syncState
queue *msgQueue
}

func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
Expand All @@ -41,6 +43,7 @@ func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
internalMembers: make(map[string]*PresenceMessage),
syncState: syncInitial,
}
pres.queue = newMsgQueue(pres.channel.client.Connection)
// Lock syncMtx to make all callers to Get(true) wait until the presence
// is in initial sync state. This is to not make them early return
// with an empty presence list before channel attaches.
Expand All @@ -66,51 +69,58 @@ func (pres *RealtimePresence) onChannelDetachedOrFailed(err error) {
for k := range pres.internalMembers {
delete(pres.internalMembers, k)
}
pres.channel.queue.Fail(err, true)
pres.queue.Fail(err)
}

// RTP5f
func (pres *RealtimePresence) onChannelSuspended(err error) {
pres.channel.queue.Fail(err, true)
pres.queue.Fail(err)
}

func (pres *RealtimePresence) maybeEnqueue(msg *protocolMessage, onAck func(err error)) bool {
if pres.channel.opts().NoQueueing {
if onAck != nil {
onAck(errors.New("unable enqueue message because Options.QueueMessages is set to false"))
}
return false
}
pres.queue.Enqueue(msg, onAck)
return true
}

func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) {
// RTP16c
if err := pres.verifyChanState(); err != nil {
return nil, err
}
presenceSendFunc := func(ctx context.Context) error {
protomsg := &protocolMessage{
Action: actionPresence,
Channel: pres.channel.Name,
Presence: []*PresenceMessage{msg},
}
listen := make(chan error, 1)
onAck := func(err error) {
listen <- err
}
if err := pres.channel.send(protomsg, onAck); err != nil {
return err
protomsg := &protocolMessage{
Action: actionPresence,
Channel: pres.channel.Name,
Presence: []*PresenceMessage{msg},
}
listen := make(chan error, 1)
onAck := func(err error) {
listen <- err
}
switch pres.channel.state {
case ChannelStateInitialized:
if pres.maybeEnqueue(protomsg, onAck) {
pres.channel.attach()
}
case ChannelStateAttaching:
pres.maybeEnqueue(protomsg, onAck)
case ChannelStateAttached:
pres.channel.client.Connection.send(protomsg, onAck)
}

return resultFunc(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-listen:
return err
}
}
if pres.channel.state == ChannelStateInitialized {
_, err := pres.channel.attach()
if err != nil {
return nil, err
}
}
return resultFunc(presenceSendFunc), nil
}

func (pres *RealtimePresence) enqueuePresenceMsg() {

}), nil
}

func (pres *RealtimePresence) syncWait() {
Expand Down
30 changes: 10 additions & 20 deletions ably/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,34 +214,24 @@ func (q *msgQueue) Enqueue(msg *protocolMessage, onAck func(err error)) {
q.mtx.Unlock()
}

func (q *msgQueue) Flush(onlyPresenceMessages bool) {
func (q *msgQueue) Flush() {
q.mtx.Lock()
var pendingQueuedMessages []msgWithAck
for _, msgWithAck := range q.queue {
if onlyPresenceMessages && msgWithAck.msg.Action != actionPresence {
pendingQueuedMessages = append(pendingQueuedMessages, msgWithAck)
continue
}
q.conn.send(msgWithAck.msg, msgWithAck.onAck)
for _, msgch := range q.queue {
q.conn.send(msgch.msg, msgch.onAck)
}
q.queue = pendingQueuedMessages
q.queue = nil
q.mtx.Unlock()
}

func (q *msgQueue) Fail(err error, onlyPresenceMessages bool) {
func (q *msgQueue) Fail(err error) {
q.mtx.Lock()
var nonFailedQueueMessages []msgWithAck
for _, msgWithAck := range q.queue {
if onlyPresenceMessages && msgWithAck.msg.Action != actionPresence {
nonFailedQueueMessages = append(nonFailedQueueMessages, msgWithAck)
continue
}
q.log().Errorf("failure sending message (serial=%d): %v", msgWithAck.msg.MsgSerial, err)
if msgWithAck.onAck != nil {
msgWithAck.onAck(newError(90000, err))
for _, msgch := range q.queue {
q.log().Errorf("failure sending message (serial=%d): %v", msgch.msg.MsgSerial, err)
if msgch.onAck != nil {
msgch.onAck(newError(90000, err))
}
}
q.queue = nonFailedQueueMessages
q.queue = nil
q.mtx.Unlock()
}

Expand Down

0 comments on commit c41ddeb

Please sign in to comment.