Skip to content

Commit

Permalink
Queue messages when CONNECTED but renewing token.
Browse files Browse the repository at this point in the history
  • Loading branch information
tcard committed May 3, 2016
1 parent 547903f commit 6cda49e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Source/ARTRealtime+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ART_ASSUME_NONNULL_BEGIN
@interface ARTRealtime () <ARTRealtimeTransportDelegate>

@property (readonly, strong, nonatomic) __GENERIC(ARTEventEmitter, NSNumber *, ARTConnectionStateChange *) *eventEmitter;
@property (readonly, strong, nonatomic) __GENERIC(ARTEventEmitter, NSNull *, NSNull *) *reconnectedEventEmitter;
@property (readonly, strong, nonatomic) __GENERIC(ARTEventEmitter, NSNull *, NSNull *) *connectedEventEmitter;

+ (NSString *)protocolStr:(ARTProtocolMessageAction)action;

Expand Down
94 changes: 53 additions & 41 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
#import "ARTRealtimeTransport.h"
#import "ARTFallback.h"

@interface ARTConnectionStateChange ()

- (void)setRetryIn:(NSTimeInterval)retryIn;

@end

#pragma mark - ARTRealtime implementation

@implementation ARTRealtime {
Expand Down Expand Up @@ -59,7 +65,7 @@ - (instancetype)initWithOptions:(ARTClientOptions *)options {

_rest = [[ARTRest alloc] initWithOptions:options];
_eventEmitter = [[ARTEventEmitter alloc] init];
_reconnectedEventEmitter = [[ARTEventEmitter alloc] init];
_connectedEventEmitter = [[ARTEventEmitter alloc] init];
_pingEventEmitter = [[ARTEventEmitter alloc] init];
_channels = [[ARTRealtimeChannels alloc] initWithRealtime:self];
_transport = nil;
Expand Down Expand Up @@ -163,13 +169,14 @@ - (void)ping:(void (^)(ARTErrorInfo *)) cb {
cb([ARTErrorInfo createWithCode:0 status:ARTStateConnectionFailed message:[NSString stringWithFormat:@"Can't ping a %@ connection", ARTRealtimeStateToStr(self.connection.state)]]);
return;
case ARTRealtimeConnecting:
case ARTRealtimeDisconnected: {
[_connection once:^(ARTConnectionStateChange *change) {
[self ping:cb];
}];
return;
}
case ARTRealtimeDisconnected:
case ARTRealtimeConnected:
if (![self shouldSendEvents]) {
[_connectedEventEmitter once:^(NSNull *n) {
[self ping:cb];
}];
return;
}
[_pingEventEmitter timed:[_pingEventEmitter once:cb] deadline:[ARTDefault realtimeRequestTimeout] onTimeout:^{
cb([ARTErrorInfo createWithCode:0 status:ARTStateConnectionFailed message:@"timed out"]);
}];
Expand All @@ -196,13 +203,21 @@ - (void)transition:(ARTRealtimeConnectionState)state {
- (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo *)errorInfo {
[self.logger debug:__FILE__ line:__LINE__ message:@"%p transition to %@ requested", self, ARTRealtimeStateToStr(state)];

ARTRealtimeConnectionState previousState = self.connection.state;
ARTConnectionStateChange *stateChange = [[ARTConnectionStateChange alloc] initWithCurrent:state previous:self.connection.state reason:errorInfo retryIn:0];
[self.connection setState:state];

[self transitionSideEffects:stateChange];

if (errorInfo != nil) {
[self.connection setErrorReason:errorInfo];
}
[self.connection emit:state with:stateChange];
}

- (void)transitionSideEffects:(ARTConnectionStateChange *)stateChange {
ARTStatus *status = nil;
NSTimeInterval retryIn = 0;

switch (self.connection.state) {
switch (stateChange.current) {
case ARTRealtimeConnecting: {
[self unlessStateChangesBefore:[ARTDefault realtimeRequestTimeout] do:^{
[self transition:ARTRealtimeDisconnected withErrorInfo:[ARTErrorInfo createWithCode:0 status:ARTStateConnectionFailed message:@"timed out"]];
Expand All @@ -211,7 +226,7 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
if (!_transport) {
NSString *resumeKey = nil;
NSNumber *connectionSerial = nil;
if (previousState == ARTRealtimeFailed || previousState == ARTRealtimeDisconnected) {
if (stateChange.previous == ARTRealtimeFailed || stateChange.previous == ARTRealtimeDisconnected) {
resumeKey = self.connection.key;
connectionSerial = [NSNumber numberWithLongLong:self.connection.serial];
_resuming = true;
Expand All @@ -221,20 +236,6 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
[_transport connect];
}

if (previousState == ARTRealtimeDisconnected) {
__GENERIC(NSArray, ARTQueuedMessage *) *pending = self.pendingMessages;
_pendingMessages = [[NSMutableArray alloc] init];
for (ARTQueuedMessage *queued in pending) {
[self send:queued.msg callback:^(ARTStatus *__art_nonnull status) {
for (id cb in queued.cbs) {
((void(^)(ARTStatus *__art_nonnull))cb)(status);
}
}];
}

[_reconnectedEventEmitter emit:[NSNull null] with:nil];
}

break;
}
case ARTRealtimeClosing: {
Expand All @@ -252,7 +253,7 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
_transport = nil;
break;
case ARTRealtimeFailed:
status = [ARTStatus state:ARTStateConnectionFailed info:errorInfo];
status = [ARTStatus state:ARTStateConnectionFailed info:stateChange.reason];
[self.transport abort:status];
self.transport.delegate = nil;
_transport = nil;
Expand All @@ -267,16 +268,16 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
}];
}
if ([[NSDate date] timeIntervalSinceDate:_startedReconnection] >= _connectionStateTtl) {
[self transition:ARTRealtimeSuspended withErrorInfo:errorInfo];
[self transition:ARTRealtimeSuspended withErrorInfo:stateChange.reason];
return;
}

[self.transport close];
self.transport.delegate = nil;
_transport = nil;
retryIn = self.options.disconnectedRetryTimeout;
[stateChange setRetryIn:self.options.disconnectedRetryTimeout];

[self unlessStateChangesBefore:retryIn do:^{
[self unlessStateChangesBefore:stateChange.retryIn do:^{
[self transition:ARTRealtimeConnecting];
}];

Expand All @@ -286,14 +287,25 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
[self.transport close];
self.transport.delegate = nil;
_transport = nil;
retryIn = self.options.suspendedRetryTimeout;
[self unlessStateChangesBefore:retryIn do:^{
[stateChange setRetryIn:self.options.suspendedRetryTimeout];
[self unlessStateChangesBefore:stateChange.retryIn do:^{
[self transition:ARTRealtimeConnecting];
}];
break;
}
case ARTRealtimeConnected:
case ARTRealtimeConnected: {
__GENERIC(NSArray, ARTQueuedMessage *) *pending = self.pendingMessages;
_pendingMessages = [[NSMutableArray alloc] init];
for (ARTQueuedMessage *queued in pending) {
[self send:queued.msg callback:^(ARTStatus *__art_nonnull status) {
for (id cb in queued.cbs) {
((void(^)(ARTStatus *__art_nonnull))cb)(status);
}
}];
}
[_connectedEventEmitter emit:[NSNull null] with:nil];
break;
}
case ARTRealtimeInitialized:
break;
}
Expand All @@ -309,13 +321,13 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
// For every Channel
for (ARTRealtimeChannel* channel in self.channels) {
if (channel.state == ARTRealtimeChannelInitialized || channel.state == ARTRealtimeChannelAttaching || channel.state == ARTRealtimeChannelAttached || channel.state == ARTRealtimeChannelFailed) {
if(state == ARTRealtimeClosing) {
if(stateChange.current == ARTRealtimeClosing) {
//do nothing. Closed state is coming.
}
else if(state == ARTRealtimeClosed) {
else if(stateChange.current == ARTRealtimeClosed) {
[channel detachChannel:[ARTStatus state:ARTStateOk]];
}
else if(state == ARTRealtimeSuspended) {
else if(stateChange.current == ARTRealtimeSuspended) {
[channel detachChannel:channelStatus];
}
else {
Expand All @@ -327,11 +339,6 @@ - (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo
}
}
}

if (errorInfo != nil) {
[self.connection setErrorReason:errorInfo];
}
[self.connection emit:state with:[[ARTConnectionStateChange alloc] initWithCurrent:state previous:previousState reason:errorInfo retryIn:retryIn]];
}

- (void)unlessStateChangesBefore:(NSTimeInterval)deadline do:(void(^)())callback {
Expand Down Expand Up @@ -392,6 +399,9 @@ - (void)onConnected:(ARTProtocolMessage *)message {
}
[self transition:ARTRealtimeConnected withErrorInfo:message.error];
break;
case ARTRealtimeConnected:
// Renewing token.
[self transitionSideEffects:[[ARTConnectionStateChange alloc] initWithCurrent:ARTRealtimeConnected previous:ARTRealtimeConnected reason:nil]];
default:
break;
}
Expand Down Expand Up @@ -488,7 +498,7 @@ - (void)onSuspended {
- (BOOL)shouldSendEvents {
switch (self.connection.state) {
case ARTRealtimeConnected:
return true;
return !_renewingToken;
default:
return false;
}
Expand All @@ -503,6 +513,8 @@ - (BOOL)shouldQueueEvents {
case ARTRealtimeConnecting:
case ARTRealtimeDisconnected:
return true;
case ARTRealtimeConnected:
return _renewingToken;
default:
return false;
}
Expand Down
32 changes: 18 additions & 14 deletions Source/ARTRealtimeChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,15 @@ - (void)attachAfterChecks:(void (^)(ARTErrorInfo * _Nullable))callback {
[_attachedEventEmitter emit:[NSNull null] with:errorInfo];
}];

ARTEventListener *reconnectedListener = [self.realtime.reconnectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while attaching, re-attach.
[self attachAfterChecks:callback];
}];
[_attachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.reconnectedEventEmitter off:reconnectedListener];
}];
if (![self.realtime shouldQueueEvents]) {
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while attaching, re-attach.
[self attachAfterChecks:callback];
}];
[_attachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.connectedEventEmitter off:reconnectedListener];
}];
}
}

- (void)detach:(void (^)(ARTErrorInfo * _Nullable))callback {
Expand Down Expand Up @@ -624,13 +626,15 @@ - (void)detachAfterChecks:(void (^)(ARTErrorInfo * _Nullable))callback {
[_detachedEventEmitter emit:[NSNull null] with:errorInfo];
}];

ARTEventListener *reconnectedListener = [self.realtime.reconnectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while attaching, re-detach.
[self detachAfterChecks:callback];
}];
[_detachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.reconnectedEventEmitter off:reconnectedListener];
}];
if (![self.realtime shouldQueueEvents]) {
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while attaching, re-detach.
[self detachAfterChecks:callback];
}];
[_detachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.connectedEventEmitter off:reconnectedListener];
}];
}
}

- (void)detach {
Expand Down
4 changes: 4 additions & 0 deletions Source/ARTTypes.m
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ - (NSString *)description {
}
}

- (void)setRetryIn:(NSTimeInterval)retryIn {
_retryIn = retryIn;
}

@end
2 changes: 1 addition & 1 deletion Spec/RealtimeClientChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RealtimeClientChannel: QuickSpec {
expect(emitCounter).to(equal(5))

if states.count != 5 {
fail("Missing some states")
fail("Expecting 5 states; got \(states)")
return
}

Expand Down

0 comments on commit 6cda49e

Please sign in to comment.