Skip to content

Commit

Permalink
Merge pull request #21 from rassulrakhimzhan/master
Browse files Browse the repository at this point in the history
custom subscription headers support added
  • Loading branch information
NaikSoftware authored Nov 27, 2016
2 parents d9b70a1 + 8780629 commit 4664650
Showing 1 changed file with 42 additions and 40 deletions.
82 changes: 42 additions & 40 deletions lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,46 +147,48 @@ public void disconnect() {
mConnected = false;
}

public Observable<StompMessage> topic(String destinationPath) {
return Observable.<StompMessage>create(subscriber -> {

Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
if (subscribersSet == null) {
subscribersSet = new HashSet<>();
mSubscribers.put(destinationPath, subscribersSet);
subscribePath(destinationPath);
}
subscribersSet.add(subscriber);

}).doOnUnsubscribe(() -> {
for (String dest : mSubscribers.keySet()) {
Set<Subscriber<? super StompMessage>> set = mSubscribers.get(dest);
for (Subscriber<? super StompMessage> subscriber : set) {
if (subscriber.isUnsubscribed()) {
set.remove(subscriber);
if (set.size() < 1) {
mSubscribers.remove(dest);
unsubscribePath(dest);
}
}
}
}
});
}

private void subscribePath(String destinationPath) {
if (destinationPath == null) return;
String topicId = UUID.randomUUID().toString();
Log.d(TAG, "Subscribe path: " + destinationPath + " id: " + topicId);

if (mTopics == null) mTopics = new HashMap<>();
mTopics.put(destinationPath, topicId);
send(new StompMessage(StompCommand.SUBSCRIBE,
Arrays.asList(
new StompHeader(StompHeader.ID, topicId),
new StompHeader(StompHeader.DESTINATION, destinationPath),
new StompHeader(StompHeader.ACK, DEFAULT_ACK)), null));
}
public Observable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
return Observable.<StompMessage>create(subscriber -> {
Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
if (subscribersSet == null) {
subscribersSet = new HashSet<>();
mSubscribers.put(destinationPath, subscribersSet);
subscribePath(destinationPath, headerList);
}
subscribersSet.add(subscriber);

}).doOnUnsubscribe(() -> {
for (String dest : mSubscribers.keySet()) {
Set<Subscriber<? super StompMessage>> set = mSubscribers.get(dest);
for (Subscriber<? super StompMessage> subscriber : set) {
if (subscriber.isUnsubscribed()) {
set.remove(subscriber);
if (set.size() < 1) {
mSubscribers.remove(dest);
unsubscribePath(dest);
}
}
}
}
});
}

private void subscribePath(String destinationPath, List<StompHeader> headerList) {
if (destinationPath == null) return;
String topicId = UUID.randomUUID().toString();

if (mTopics == null) mTopics = new HashMap<>();
mTopics.put(destinationPath, topicId);
List<StompHeader> headers = new ArrayList<>();
headers.add(new StompHeader(StompHeader.ID, topicId));
headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath));
headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
for(StompHeader header : headerList){
headers.add(header);
}
send(new StompMessage(StompCommand.SUBSCRIBE,
headers, null));
}


private void unsubscribePath(String dest) {
Expand Down

0 comments on commit 4664650

Please sign in to comment.