Skip to content

Commit

Permalink
Fix subscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
NaikSoftware committed Apr 3, 2018
1 parent 18990db commit d67eaeb
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@
import android.support.annotation.Nullable;
import android.util.Log;

import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;

/**
Expand All @@ -28,12 +22,10 @@ abstract class AbstractConnectionProvider implements ConnectionProvider {
private final PublishSubject<LifecycleEvent> mLifecycleStream;
@NonNull
private final PublishSubject<String> mMessagesStream;
final BehaviorSubject<Boolean> mConnectionStream;

AbstractConnectionProvider() {
mLifecycleStream = PublishSubject.create();
mMessagesStream = PublishSubject.create();
mConnectionStream = BehaviorSubject.createDefault(false);
}

@NonNull
Expand All @@ -54,27 +46,13 @@ public Observable<String> messages() {

@Override
public Completable disconnect() {
CompletableSource ex = Completable.error(new IllegalStateException("Attempted to disconnect when already disconnected"));

Completable block = mConnectionStream
.filter(connected -> connected).firstOrError().toCompletable()
.timeout(1, TimeUnit.SECONDS, ex);

return Completable
.fromAction(this::rawDisconnect)
.startWith(block);
.fromAction(this::rawDisconnect);
}

private Completable initSocket() {
CompletableSource ex = Completable.error(new IllegalStateException("Attempted to connect when already connected"));

Completable block = mConnectionStream
.filter(connected -> !connected).firstOrError().toCompletable()
.timeout(1, TimeUnit.SECONDS, ex);

return Completable
.fromAction(this::createWebSocketConnection)
.startWith(block);
.fromAction(this::createWebSocketConnection);
}

// Doesn't do anything at all, only here as a stub
Expand Down Expand Up @@ -130,8 +108,6 @@ public Completable send(String stompMessage) {
void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
mLifecycleStream.onNext(lifecycleEvent);
if (lifecycleEvent.getType().equals(LifecycleEvent.Type.CLOSED))
mConnectionStream.onNext(false);
}

void emitMessage(String stompMessage) {
Expand All @@ -144,9 +120,4 @@ void emitMessage(String stompMessage) {
public Observable<LifecycleEvent> lifecycle() {
return mLifecycleStream;
}

@Override
public Flowable<Boolean> connected() {
return mConnectionStream.toFlowable(BackpressureStrategy.LATEST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,4 @@ public interface ConnectionProvider {
Completable disconnect();

Completable setHeartbeat(int ms);

Flowable<Boolean> connected();
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public void onClosing(final WebSocket webSocket, final int code, final String re
}

);
mConnectionStream.onNext(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public void onError(Exception ex) {

mWebSocketClient.connect();
haveConnection = true;
mConnectionStream.onNext(true);
}

@Override
Expand Down
29 changes: 22 additions & 7 deletions lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import ua.naiksoftware.stomp.ConnectionProvider;
import ua.naiksoftware.stomp.LifecycleEvent;
Expand All @@ -39,6 +40,7 @@ public class StompClient {

private PublishSubject<StompMessage> mMessageStream;
private ConcurrentHashMap<String, Flowable<StompMessage>> mStreamMap;
private final BehaviorSubject<Boolean> mConnectionStream;
private Parser parser;
private Disposable mLifecycleDisposable;
private Disposable mMessagesDisposable;
Expand All @@ -49,6 +51,7 @@ public StompClient(ConnectionProvider connectionProvider) {
mConnectionProvider = connectionProvider;
mMessageStream = PublishSubject.create();
mStreamMap = new ConcurrentHashMap<>();
mConnectionStream = BehaviorSubject.createDefault(false);
parser = Parser.NONE;
}

Expand Down Expand Up @@ -113,12 +116,12 @@ public void connect(@Nullable List<StompHeader> _headers) {
break;

case CLOSED:
mConnected = false;
setConnected(false);
isConnecting = false;
break;

case ERROR:
mConnected = false;
setConnected(false);
isConnecting = false;
break;
}
Expand All @@ -130,17 +133,24 @@ public void connect(@Nullable List<StompHeader> _headers) {
.doOnNext(this::callSubscribers)
.filter(msg -> msg.getStompCommand().equals(StompCommand.CONNECTED))
.subscribe(stompMessage -> {
mConnected = true;
setConnected(true);
isConnecting = false;

});
}

private void setConnected(boolean connected) {
mConnected = connected;
mConnectionStream.onNext(mConnected);
}

/**
* Disconnect from server, and then reconnect with the last-used headers
*/
public void reconnect() {
disconnect();
connect(mHeaders);
disconnectCompletable()
.subscribe(() -> connect(mHeaders),
e -> Log.e(tag, "Disconnect error", e));
}

public Completable send(String destination) {
Expand All @@ -156,7 +166,7 @@ public Completable send(String destination, String data) {

public Completable send(@NonNull StompMessage stompMessage) {
Completable completable = mConnectionProvider.send(stompMessage.compile(legacyWhitespace));
CompletableSource connectionComplete = mConnectionProvider.connected()
CompletableSource connectionComplete = mConnectionStream
.filter(isConnected -> isConnected)
.firstOrError().toCompletable();
return completable
Expand All @@ -172,9 +182,14 @@ public Flowable<LifecycleEvent> lifecycle() {
}

public void disconnect() {
disconnectCompletable().subscribe(() -> {}, e -> Log.e(tag, "Disconnect error", e));
}

public Completable disconnectCompletable() {
mLifecycleDisposable.dispose();
mMessagesDisposable.dispose();
mConnectionProvider.disconnect().subscribe(() -> mConnected = false, e -> Log.e(tag, "Disconnect error", e));
return mConnectionProvider.disconnect()
.doOnComplete(() -> setConnected(false));
}

public Flowable<StompMessage> topic(String destinationPath) {
Expand Down

0 comments on commit d67eaeb

Please sign in to comment.