Skip to content

Commit

Permalink
Implement RTP18a (start of a new sync sequence and any previous in-fl…
Browse files Browse the repository at this point in the history
…ight sync is discarded).
  • Loading branch information
maratal committed Apr 13, 2024
1 parent 81ef883 commit a5e84f4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 26 deletions.
20 changes: 20 additions & 0 deletions Source/ARTProtocolMessage.m
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ - (NSString *)getConnectionKey {
return _connectionKey;
}

- (NSString *)getSyncIdentifierAtIndex:(NSInteger)index {
if (!_channelSerial || [_channelSerial isEqualToString:@""] || index > 1) {
return @"";
}
NSArray *a = [_channelSerial componentsSeparatedByString:@":"];
return a.count == 2 ? a[index] : @"";
}

- (NSString *)getSyncSequenceId {
return [self getSyncIdentifierAtIndex:0];
}

- (NSString *)getSyncCursor {
return [self getSyncIdentifierAtIndex:1];
}

- (BOOL)isEndOfSync {
return [[self getSyncCursor] isEqualToString:@""];
}

- (NSString *)description {
NSMutableString *description = [NSMutableString stringWithFormat:@"<%@: %p> {\n", self.class, self];
[description appendFormat:@" count: %d,\n", self.count];
Expand Down
46 changes: 21 additions & 25 deletions Source/ARTRealtimePresence.m
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ @implementation ARTRealtimePresenceInternal {
NSMutableDictionary<NSString *, ARTPresenceMessage *> *_internalMembers; // RTP17h

NSMutableDictionary<NSString *, ARTPresenceMessage *> *_beforeSyncMembers; // RTP19

// RTP18a
NSString *_syncSequenceId;
NSMutableDictionary<NSString *, ARTPresenceMessage *> *_membersBackup;
}

- (instancetype)initWithChannel:(ARTRealtimeChannelInternal *)channel logger:(ARTInternalLog *)logger {
Expand Down Expand Up @@ -690,23 +694,8 @@ - (void)broadcast:(ARTPresenceMessage *)pm {
[_eventEmitter emit:[ARTEvent newWithPresenceAction:pm.action] with:pm];
}

/*
* Checks that a channelSerial is the final serial in a sequence of sync messages,
* by checking that there is nothing after the colon - RTP18b, RTP18c
*/
- (bool)isLastChannelSerial:(NSString *)channelSerial {
if (!channelSerial || [channelSerial isEqualToString:@""]) {
return true;
}
NSArray *a = [channelSerial componentsSeparatedByString:@":"];
if (a.count > 1 && ![[a objectAtIndex:1] isEqualToString:@""]) {
return false;
}
return true;
}

- (void)onAttached:(ARTProtocolMessage *)message {
[self startSync];
[self startSyncWithSequenceId:[message getSyncSequenceId]];
if (!message.hasPresence) {
// RTP1 - when an ATTACHED message is received without a HAS_PRESENCE flag, reset PresenceMap (also RTP19a)
[self endSync];
Expand Down Expand Up @@ -748,17 +737,17 @@ - (void)onMessage:(ARTProtocolMessage *)message {
}

- (void)onSync:(ARTProtocolMessage *)message {
if (!self.syncInProgress) {
[self startSync];
NSString *sequenceId = [message getSyncSequenceId];
if (!self.syncInProgress || ![_syncSequenceId isEqualToString:sequenceId]) { // RTP18a
[self startSyncWithSequenceId:sequenceId];
}
else {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) PresenceMap sync is in progress", _realtime, _channel, _channel.name);
}

[self onMessage:message];

// TODO: RTP18a (previous in-flight sync should be discarded)
if ([self isLastChannelSerial:message.channelSerial]) { // RTP18b, RTP18c
if ([message isEndOfSync]) { // RTP18b, RTP18c
[self endSync];
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) PresenceMap sync ended", _realtime, _channel, _channel.name);
}
Expand Down Expand Up @@ -940,23 +929,30 @@ - (void)reset {
_internalMembers = [NSMutableDictionary new];
}

- (void)startSync {
ARTLogDebug(_logger, @"%p PresenceMap sync started", self);
- (void)startSyncWithSequenceId:(NSString *)sequenceId {
ARTLogDebug(_logger, @"%p PresenceMap sync started with sequence id = %@", self, sequenceId);
if (_syncSequenceId && sequenceId && ![_syncSequenceId isEqualToString:sequenceId]) {
_members = [_membersBackup mutableCopy]; // discard sync - RTP18a
ARTLogDebug(_logger, @"%p PresenceMap sync with sequence id = %@ was discarded", self, _syncSequenceId);
}
_beforeSyncMembers = [_members mutableCopy];
_syncSequenceId = sequenceId;
_membersBackup = [_members mutableCopy];
_syncState = ARTPresenceSyncStarted;
[_syncEventEmitter emit:[ARTEvent newWithPresenceSyncState:_syncState] with:nil];
}

- (void)endSync {
ARTLogVerbose(_logger, @"%p PresenceMap sync ending", self);
ARTLogVerbose(_logger, @"%p PresenceMap sync ending with sequence id = %@", self, _syncSequenceId);
[self cleanUpAbsentMembers];
[self leaveMembersNotPresentInSync];
_syncState = ARTPresenceSyncEnded;
_beforeSyncMembers = nil;

_membersBackup = nil;
[_syncEventEmitter emit:[ARTEvent newWithPresenceSyncState:ARTPresenceSyncEnded] with:[_members allValues]];
[_syncEventEmitter off];
ARTLogDebug(_logger, @"%p PresenceMap sync ended", self);
ARTLogDebug(_logger, @"%p PresenceMap sync with sequence id = %@ is ended", self, _syncSequenceId);
_syncSequenceId = nil;
}

- (void)failsSync:(ARTErrorInfo *)error {
Expand Down
4 changes: 4 additions & 0 deletions Source/PrivateHeaders/Ably/ARTProtocolMessage+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ NS_ASSUME_NONNULL_BEGIN

- (BOOL)mergeFrom:(ARTProtocolMessage *)msg;

- (NSString *)getSyncSequenceId;
- (NSString *)getSyncCursor;
- (BOOL)isEndOfSync;

@end

NS_ASSUME_NONNULL_END
2 changes: 1 addition & 1 deletion Source/PrivateHeaders/Ably/ARTRealtimePresence+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ NS_ASSUME_NONNULL_BEGIN
- (void)processMember:(ARTPresenceMessage *)message;
- (void)reset;

- (void)startSync;
- (void)startSyncWithSequenceId:(NSString *)sequenceId;
- (void)endSync;
- (void)failsSync:(ARTErrorInfo *)error;

Expand Down

0 comments on commit a5e84f4

Please sign in to comment.