Skip to content

Commit

Permalink
RTL6d (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardopereira authored and tcard committed May 2, 2016
1 parent 4720ac4 commit 213271d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
8 changes: 5 additions & 3 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,11 @@ - (void)send:(ARTProtocolMessage *)msg callback:(void (^)(ARTStatus *))cb {
[self sendImpl:msg callback:cb];
} else if ([self shouldQueueEvents]) {
BOOL merged = NO;
if ([self.queuedMessages count]) {
ARTQueuedMessage *lastQueued = [self.queuedMessages objectAtIndex:(self.queuedMessages.count) - 1];
merged = [lastQueued mergeFrom:msg callback:cb];
for (ARTQueuedMessage *queuedMsg in self.queuedMessages) {
merged = [queuedMsg mergeFrom:msg callback:cb];
if (merged) {
break;
}
}
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg callback:cb];
Expand Down
20 changes: 16 additions & 4 deletions Source/ARTRealtimeChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ - (void)publishProtocolMessage:(ARTProtocolMessage *)pm callback:(void (^)(ARTSt
// intentional fall-through
case ARTRealtimeChannelAttaching:
{
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:pm callback:cb];
[self.queuedMessages addObject:qm];
[self addToQueue:pm callback:cb];
break;
}
case ARTRealtimeChannelDetaching:
Expand All @@ -161,8 +160,7 @@ - (void)publishProtocolMessage:(ARTProtocolMessage *)pm callback:(void (^)(ARTSt
if (_realtime.connection.state == ARTRealtimeConnected) {
[self sendMessage:pm callback:cb];
} else {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:pm callback:cb];
[self.queuedMessages addObject:qm];
[self addToQueue:pm callback:cb];

[_realtime.connection once:ARTRealtimeConnected callback:^(ARTConnectionStateChange *__art_nullable change) {
[self sendQueuedMessages];
Expand All @@ -175,6 +173,20 @@ - (void)publishProtocolMessage:(ARTProtocolMessage *)pm callback:(void (^)(ARTSt
}
}

- (void)addToQueue:(ARTProtocolMessage *)msg callback:(void (^)(ARTStatus *))cb {
BOOL merged = NO;
for (ARTQueuedMessage *queuedMsg in self.queuedMessages) {
merged = [queuedMsg mergeFrom:msg callback:cb];
if (merged) {
break;
}
}
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg callback:cb];
[self.queuedMessages addObject:qm];
}
}

- (void)sendMessage:(ARTProtocolMessage *)pm callback:(void (^)(ARTStatus *))cb {
__block BOOL gotFailure = false;
NSString *oldConnectionId = self.realtime.connection.id;
Expand Down
54 changes: 54 additions & 0 deletions Spec/RealtimeClientChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,60 @@ class RealtimeClientChannel: QuickSpec {

}

// RTL6d
it("Messages are delivered using a single ProtocolMessage where possible by bundling in all messages for that channel") {
let options = AblyTests.commonAppSetup()
options.autoConnect = false
let client = AblyTests.newRealtime(options)
defer { client.close() }
let channel = client.channels.get("test")

// Test that the initially queued messages are sent together.

let messagesSent = 3
waitUntil(timeout: testTimeout) { done in
let partialDone = AblyTests.splitDone(messagesSent, done: done)
for i in 1...messagesSent {
channel.publish("initial", data: "message\(i)") { error in
expect(error).to(beNil())
partialDone()
}
}
client.connect()
}

let transport = client.transport as! TestProxyTransport
let protocolMessages = transport.protocolMessagesSent.filter{ $0.action == .Message }
expect(protocolMessages).to(haveCount(1))
if protocolMessages.count != 1 {
return
}
expect(protocolMessages[0].messages).to(haveCount(messagesSent))

// Test that publishing an array of messages sends them together.

// TODO: limit the total number of messages bundled per ProtocolMessage
let maxMessages = 50

var messages = [ARTMessage]()
for i in 1...maxMessages {
messages.append(ARTMessage(name: "total number of messages", data: "message\(i)"))
}
waitUntil(timeout: testTimeout) { done in
channel.publish(messages) { error in
expect(error).to(beNil())
let transport = client.transport as! TestProxyTransport
let protocolMessages = transport.protocolMessagesSent.filter{ $0.action == .Message }
expect(protocolMessages).to(haveCount(2))
if protocolMessages.count != 2 {
done(); return
}
expect(protocolMessages[1].messages).to(haveCount(maxMessages))
done()
}
}
}

// RTL6e
context("Unidentified clients using Basic Auth") {

Expand Down

0 comments on commit 213271d

Please sign in to comment.