Skip to content

Commit

Permalink
Merge pull request #621 from ably/fix/presencemap
Browse files Browse the repository at this point in the history
[SDK-222] [no connection serial ] + implement missing internal presence map + presencequeue
  • Loading branch information
sacOO7 authored Dec 13, 2023
2 parents 981eceb + 14cd5a2 commit adb6354
Show file tree
Hide file tree
Showing 21 changed files with 1,974 additions and 732 deletions.
1 change: 1 addition & 0 deletions ably/error_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
ErrTimeoutError ErrorCode = 50003
ErrConnectionFailed ErrorCode = 80000
ErrConnectionSuspended ErrorCode = 80002
ErrConnectionClosed ErrorCode = 80017
ErrDisconnected ErrorCode = 80003
ErrProtocolError ErrorCode = 80013
ErrChannelOperationFailed ErrorCode = 90000
Expand Down
12 changes: 12 additions & 0 deletions ably/errors.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 61 additions & 0 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func (c *REST) GetCachedFallbackHost() string {
return c.successFallbackHost.get()
}

func (c *RealtimeChannel) GetChannelSerial() string {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.properties.ChannelSerial
}

func (c *RealtimeChannel) GetAttachResume() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand All @@ -105,6 +111,12 @@ func (c *RealtimeChannel) SetAttachResume(value bool) {
c.attachResume = value
}

func (c *RealtimeChannel) SetState(chanState ChannelState) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.state = chanState
}

func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration {
return opts.fallbackRetryTimeout()
}
Expand Down Expand Up @@ -193,6 +205,55 @@ func (c *Connection) PendingItems() int {
return len(c.pending.queue)
}

// AckAll empties queue and acks all pending callbacks
func (c *Connection) AckAll() {
c.mtx.Lock()
cx := c.pending.Dismiss()
c.mtx.Unlock()
c.log().Infof("Ack all %d messages waiting for ACK/NACK", len(cx))
for _, v := range cx {
v.onAck(nil)
}
}

func (c *Connection) SetKey(key string) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.key = key
}

func (c *RealtimePresence) Members() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
presenceMembers := make(map[string]*PresenceMessage, len(c.members))
for k, pm := range c.members {
presenceMembers[k] = pm
}
return presenceMembers
}

func (c *RealtimePresence) InternalMembers() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
internalMembers := make(map[string]*PresenceMessage, len(c.internalMembers))
for k, pm := range c.internalMembers {
internalMembers[k] = pm
}
return internalMembers
}

func (c *RealtimePresence) SyncInitial() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInitial
}

func (c *RealtimePresence) SyncInProgress() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInProgress
}

func (c *Connection) ConnectionStateTTL() time.Duration {
return c.connectionStateTTL()
}
Expand Down
32 changes: 0 additions & 32 deletions ably/mock_test.go

This file was deleted.

1 change: 0 additions & 1 deletion ably/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ func (opts *authOptions) KeySecret() string {
// clientOptions passes additional client-specific properties to the [ably.NewREST] or to the [ably.NewRealtime].
// Properties set using [ably.clientOptions] are used instead of the [ably.defaultOptions] values.
type clientOptions struct {

// authOptions Embedded an [ably.authOptions] object (TO3j).
authOptions

Expand Down
44 changes: 44 additions & 0 deletions ably/proto_presence_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ably

import (
"fmt"
"strconv"
"strings"
)

// PresenceAction describes the possible actions members in the presence set can emit (TP2).
Expand Down Expand Up @@ -58,3 +60,45 @@ func (m PresenceMessage) String() string {
"update",
}[m.Action], m.ClientID, m.Data)
}

func (msg *PresenceMessage) isServerSynthesized() bool {
return !strings.HasPrefix(msg.ID, msg.ConnectionID)
}

func (msg *PresenceMessage) getMsgSerialAndIndex() (int64, int64, error) {
msgIds := strings.Split(msg.ID, ":")
if len(msgIds) != 3 {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid id %v", msg.ID)
}
msgSerial, err := strconv.ParseInt(msgIds[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgSerial, for msgId %v", msg.ID)
}
msgIndex, err := strconv.ParseInt(msgIds[2], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgIndex, for msgId %v", msg.ID)
}
return msgSerial, msgIndex, nil
}

// RTP2b, RTP2c
func (msg1 *PresenceMessage) IsNewerThan(msg2 *PresenceMessage) (bool, error) {
// RTP2b1
if msg1.isServerSynthesized() || msg2.isServerSynthesized() {
return msg1.Timestamp > msg2.Timestamp, nil
}

// RTP2b2
msg1Serial, msg1Index, err := msg1.getMsgSerialAndIndex()
if err != nil {
return false, err
}
msg2Serial, msg2Index, err := msg2.getMsgSerialAndIndex()
if err != nil {
return true, err
}
if msg1Serial == msg2Serial {
return msg1Index > msg2Index, nil
}
return msg1Serial > msg2Serial, nil
}
Loading

0 comments on commit adb6354

Please sign in to comment.