Skip to content

Commit

Permalink
Refactored code to handle synthesized and old message checks
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Aug 15, 2023
1 parent a2e2d38 commit 95b5ef8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
31 changes: 28 additions & 3 deletions ably/proto_presence_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ably

import (
"fmt"
"strconv"
"strings"
)

Expand Down Expand Up @@ -65,10 +66,34 @@ func (msg *PresenceMessage) isServerSynthesizedPresenceMessage() bool {
return strings.HasPrefix(msg.ID, msg.ConnectionID)
}

func (oldMessage *PresenceMessage) IsNewerThan(incomingMessage *PresenceMessage) bool {
func (incomingMessage *PresenceMessage) IsOlderThan(oldMessage *PresenceMessage) (bool, error) {
if oldMessage.isServerSynthesizedPresenceMessage() ||
incomingMessage.isServerSynthesizedPresenceMessage() {
return oldMessage.Timestamp > incomingMessage.Timestamp
return oldMessage.Timestamp > incomingMessage.Timestamp, nil
}
return false

presenceIdErr := func(presenceMsgId string) error {
return fmt.Errorf("parsing error, the presence message has invalid id %v", presenceMsgId)
}

oldMessageIds := strings.Split(oldMessage.ID, ":")
incomingMessageIds := strings.Split(incomingMessage.ID, ":")

oldMessageSerial, err := strconv.ParseInt(oldMessageIds[1], 10, 64)
oldMessageIndex, err := strconv.ParseInt(oldMessageIds[2], 10, 64)
if len(oldMessageIds) != 3 || err != nil {
return false, presenceIdErr(oldMessage.ID)
}

incomingMessageSerial, err := strconv.ParseInt(incomingMessageIds[1], 10, 64)
incomingMessageIndex, err := strconv.ParseInt(incomingMessageIds[2], 10, 64)
if len(incomingMessageIds) != 3 || err != nil {
return true, presenceIdErr(incomingMessage.ID)
}

if oldMessageSerial == incomingMessageSerial {
return oldMessageIndex > incomingMessageIndex, nil
}

return oldMessageSerial > incomingMessageSerial, nil
}
5 changes: 4 additions & 1 deletion ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func (pres *RealtimePresence) processIncomingMessage(msg *protocolMessage, syncS
memberKey := presenceMember.ConnectionID + presenceMember.ClientID

if oldPresenceMember, ok := pres.members[memberKey]; ok { // RTP2a
if oldPresenceMember.IsNewerThan(presenceMember) { // RTP2b1
isIncomingMessageOld, err := presenceMember.IsOlderThan(oldPresenceMember)
pres.log().Error(err)
// TODO - publish channel error event here without state change
if isIncomingMessageOld { // RTP2b1
continue // do not process message with older timestamp // RTP2b1a
}
}
Expand Down

0 comments on commit 95b5ef8

Please sign in to comment.