Skip to content

Commit

Permalink
Resend pending messages without using object wide flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
maratal committed Nov 26, 2023
1 parent 081fe13 commit c544507
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 23 deletions.
44 changes: 24 additions & 20 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ @interface ARTRealtimeInternal ()

@implementation ARTRealtimeInternal {
BOOL _resuming;
BOOL _reuseMsgSerial;
BOOL _renewingToken;
BOOL _shouldImmediatelyReconnect;
ARTEventEmitter<ARTEvent *, ARTErrorInfo *> *_pingEventEmitter;
Expand Down Expand Up @@ -225,7 +224,6 @@ - (instancetype)initWithOptions:(ARTClientOptions *)options {
_transport = nil;
_reachabilityClass = [ARTOSReachability class];
_msgSerial = 0;
_reuseMsgSerial = false;
_queuedMessages = [NSMutableArray array];
_pendingMessages = [NSMutableArray array];
_pendingMessageStartSerial = 0;
Expand Down Expand Up @@ -517,8 +515,13 @@ - (BOOL)stats:(ARTStatsQuery *)query callback:(ARTPaginatedStatsCallback)callbac
- (void)transition:(ARTRealtimeConnectionState)state withMetadata:(ARTConnectionStateChangeMetadata *)metadata {
ARTLogVerbose(self.logger, @"R:%p realtime state transitions to %tu - %@%@", self, state, ARTRealtimeConnectionStateToStr(state), metadata.retryAttempt ? [NSString stringWithFormat: @" (result of %@)", metadata.retryAttempt.id] : @"");

ARTConnectionStateChange *stateChange = [[ARTConnectionStateChange alloc] initWithCurrent:state previous:self.connection.state_nosync event:(ARTRealtimeConnectionEvent)state reason:metadata.errorInfo retryIn:0 retryAttempt:metadata.retryAttempt];

ARTConnectionStateChange *stateChange = [[ARTConnectionStateChange alloc] initWithCurrent:state
previous:self.connection.state_nosync
event:(ARTRealtimeConnectionEvent)state
reason:metadata.errorInfo
retryIn:0
retryAttempt:metadata.retryAttempt
resumed:metadata.resumed];
[self.connection setState:state];
[self.connection setErrorReason:metadata.errorInfo];

Expand Down Expand Up @@ -728,7 +731,7 @@ - (ARTEventListener *)performTransitionWithStateChange:(ARTConnectionStateChange
_fallbacks = nil;
_connectionLostAt = nil;
self.options.recover = nil; // RTN16k
[self resendPendingMessagesWithResumed:self.msgSerial > 0]; // RTN19a
[self resendPendingMessagesWithResumed:stateChange.resumed];
[_connectedEventEmitter emit:nil with:nil];
break;
}
Expand Down Expand Up @@ -880,7 +883,8 @@ - (void)onConnected:(ARTProtocolMessage *)message {
NSString *prevConnId = self.connection.id_nosync;
BOOL connIdChanged = prevConnId && ![message.connectionId isEqualToString:prevConnId];
BOOL recoverFailure = !prevConnId && message.error;
if (connIdChanged || recoverFailure) {
BOOL resumed = !(connIdChanged || recoverFailure);
if (!resumed) {
ARTLogDebug(self.logger, @"RT:%p msgSerial of connection \"%@\" has been reset", self, self.connection.id_nosync);
self.msgSerial = 0;
self.pendingMessageStartSerial = 0;
Expand All @@ -899,7 +903,9 @@ - (void)onConnected:(ARTProtocolMessage *)message {
[self setIdleTimer];
}
ARTConnectionStateChangeMetadata *const metadata = [[ARTConnectionStateChangeMetadata alloc] initWithErrorInfo:message.error];
metadata.resumed = resumed; // RTN19a
[self transition:ARTRealtimeConnected withMetadata:metadata];

break;
}
case ARTRealtimeConnected: {
Expand Down Expand Up @@ -1243,9 +1249,9 @@ - (BOOL)isActive {
return [self shouldQueueEvents] || [self shouldSendEvents];
}

- (void)sendImpl:(ARTProtocolMessage *)pm sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
- (void)sendImpl:(ARTProtocolMessage *)pm reuseMsgSerial:(BOOL)reuseMsgSerial sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if (pm.ackRequired) {
if (!_reuseMsgSerial) {
if (!reuseMsgSerial) {
pm.msgSerial = [NSNumber numberWithLongLong:self.msgSerial];
}
}
Expand All @@ -1271,7 +1277,7 @@ - (void)sendImpl:(ARTProtocolMessage *)pm sentCallback:(ARTCallback)sentCallback
}

if (pm.ackRequired) {
if (!_reuseMsgSerial) {
if (!reuseMsgSerial) {
self.msgSerial++;
}
ARTPendingMessage *pendingMessage = [[ARTPendingMessage alloc] initWithProtocolMessage:pm ackCallback:ackCallback];
Expand All @@ -1285,9 +1291,9 @@ - (void)sendImpl:(ARTProtocolMessage *)pm sentCallback:(ARTCallback)sentCallback
}
}

- (void)send:(ARTProtocolMessage *)msg sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
- (void)send:(ARTProtocolMessage *)msg reuseMsgSerial:(BOOL)reuseMsgSerial sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if ([self shouldSendEvents]) {
[self sendImpl:msg sentCallback:sentCallback ackCallback:ackCallback];
[self sendImpl:msg reuseMsgSerial:reuseMsgSerial sentCallback:sentCallback ackCallback:ackCallback];
}
else if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
Expand All @@ -1308,24 +1314,22 @@ - (void)send:(ARTProtocolMessage *)msg sentCallback:(ARTCallback)sentCallback ac
}
}

- (void)resendPendingMessagesWithResumed:(BOOL)resumed {
- (void)send:(ARTProtocolMessage *)msg sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
[self send:msg reuseMsgSerial:NO sentCallback:sentCallback ackCallback:ackCallback];
}

- (void)resendPendingMessagesWithResumed:(BOOL)reuseMsgSerial {
NSArray<ARTPendingMessage *> *pendingMessages = self.pendingMessages;
if (pendingMessages.count > 0) {
ARTLogDebug(self.logger, @"RT:%p resending messages waiting for acknowledgment", self);
// reuse msgSerial for pending messages if resume was successful
_reuseMsgSerial = resumed;
}
self.pendingMessages = [NSMutableArray array];
for (ARTPendingMessage *pendingMessage in pendingMessages) {
ARTProtocolMessage* pm = pendingMessage.msg;
if (!resumed) {
pm.msgSerial = @0; // RTN19a2 (self.msgSerial = 0 when resume fails)
}
[self send:pm sentCallback:nil ackCallback:^(ARTStatus *status) {
[self send:pm reuseMsgSerial:reuseMsgSerial sentCallback:nil ackCallback:^(ARTStatus *status) {
pendingMessage.ackCallback(status);
}];
}
_reuseMsgSerial = false;
}

- (void)failPendingMessages:(ARTStatus *)status {
Expand All @@ -1341,7 +1345,7 @@ - (void)sendQueuedMessages {
self.queuedMessages = [NSMutableArray array];

for (ARTQueuedMessage *message in qms) {
[self sendImpl:message.msg sentCallback:message.sentCallback ackCallback:message.ackCallback];
[self sendImpl:message.msg reuseMsgSerial:NO sentCallback:message.sentCallback ackCallback:message.ackCallback];
}
}

Expand Down
5 changes: 3 additions & 2 deletions Source/ARTTypes.m
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ - (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current previous:(AR
}

- (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current previous:(ARTRealtimeConnectionState)previous event:(ARTRealtimeConnectionEvent)event reason:(ARTErrorInfo *)reason retryIn:(NSTimeInterval)retryIn {
return [self initWithCurrent:current previous:previous event:event reason:reason retryIn:retryIn retryAttempt:nil];
return [self initWithCurrent:current previous:previous event:event reason:reason retryIn:retryIn retryAttempt:nil resumed:NO];
}

- (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current previous:(ARTRealtimeConnectionState)previous event:(ARTRealtimeConnectionEvent)event reason:(ARTErrorInfo *)reason retryIn:(NSTimeInterval)retryIn retryAttempt:(ARTRetryAttempt *)retryAttempt {
- (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current previous:(ARTRealtimeConnectionState)previous event:(ARTRealtimeConnectionEvent)event reason:(ARTErrorInfo *)reason retryIn:(NSTimeInterval)retryIn retryAttempt:(ARTRetryAttempt *)retryAttempt resumed:(BOOL)resumed {
self = [self init];
if (self) {
_current = current;
Expand All @@ -57,6 +57,7 @@ - (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current previous:(AR
_reason = reason;
_retryIn = retryIn;
_retryAttempt = retryAttempt;
_resumed = resumed;
}
return self;
}
Expand Down
2 changes: 2 additions & 0 deletions Source/PrivateHeaders/Ably/ARTConnectionStateChangeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ NS_SWIFT_NAME(ConnectionStateChangeMetadata)

@property (nullable, nonatomic, readonly) ARTRetryAttempt *retryAttempt;

@property (assign, nonatomic) BOOL resumed;

/**
Creates an `ARTConnectionStateChangeMetadata` instance whose `errorInfo` is `nil`.
*/
Expand Down
2 changes: 2 additions & 0 deletions Source/PrivateHeaders/Ably/ARTRealtime+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ NS_ASSUME_NONNULL_BEGIN
// Message sending
- (void)send:(ARTProtocolMessage *)msg sentCallback:(nullable ARTCallback)sentCallback ackCallback:(nullable ARTStatusCallback)ackCallback;

- (void)send:(ARTProtocolMessage *)msg reuseMsgSerial:(BOOL)reuseMsgSerial sentCallback:(nullable ARTCallback)sentCallback ackCallback:(nullable ARTStatusCallback)ackCallback;

@end

NS_ASSUME_NONNULL_END
8 changes: 7 additions & 1 deletion Source/PrivateHeaders/Ably/ARTTypes+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ NS_ASSUME_NONNULL_BEGIN
*/
@property (nonatomic, readonly, nullable) ARTRetryAttempt *retryAttempt;

/**
* Indicates whether the connection was resumed.
*/
@property (assign, nonatomic) BOOL resumed;

- (instancetype)initWithCurrent:(ARTRealtimeConnectionState)current
previous:(ARTRealtimeConnectionState)previous
event:(ARTRealtimeConnectionEvent)event
reason:(nullable ARTErrorInfo *)reason
retryIn:(NSTimeInterval)retryIn
retryAttempt:(nullable ARTRetryAttempt *)retryAttempt;
retryAttempt:(nullable ARTRetryAttempt *)retryAttempt
resumed:(BOOL)resumed;

@end

Expand Down

0 comments on commit c544507

Please sign in to comment.