Skip to content

Commit

Permalink
Merge pull request #54 from hardyeats/rxjava2
Browse files Browse the repository at this point in the history
Issue #48 : Convert from RxJava 1 to RxJava 2.
NaikSoftware authored Aug 7, 2017
2 parents d2cac6d + d945182 commit 8780db3
Showing 9 changed files with 146 additions and 153 deletions.
8 changes: 4 additions & 4 deletions example-client/build.gradle
Original file line number Diff line number Diff line change
@@ -31,9 +31,9 @@ dependencies {
compile 'com.android.support:appcompat-v7:25.2.0'
compile 'org.java-websocket:java-websocket:1.3.2'
compile 'com.android.support:recyclerview-v7:25.2.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile project(':lib')
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package ua.naiksoftware.stompclientexample;

import io.reactivex.Flowable;
import retrofit2.http.POST;
import retrofit2.http.Query;
import rx.Observable;

/**
* Created by Naik on 24.02.17.
*/
public interface ExampleRepository {

@POST("hello-convert-and-send")
Observable<Void> sendRestEcho(@Query("msg") String message);
Flowable<Void> sendRestEcho(@Query("msg") String message);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ua.naiksoftware.stompclientexample;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.util.Log;
@@ -19,10 +19,10 @@
import java.util.List;
import java.util.Locale;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import io.reactivex.FlowableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.client.StompClient;

@@ -35,7 +35,7 @@ public class MainActivity extends AppCompatActivity {
private SimpleAdapter mAdapter;
private List<String> mDataSet = new ArrayList<>();
private StompClient mStompClient;
private Subscription mRestPingSubscription;
private Disposable mRestPingDisposable;
private final SimpleDateFormat mTimeFormat = new SimpleDateFormat("HH:mm:ss", Locale.getDefault());
private RecyclerView mRecyclerView;
private Gson mGson = new GsonBuilder().create();
@@ -100,7 +100,7 @@ public void sendEchoViaStomp(View v) {
}

public void sendEchoViaRest(View v) {
mRestPingSubscription = RestClient.getInstance().getExampleRepository()
mRestPingDisposable = RestClient.getInstance().getExampleRepository()
.sendRestEcho("Echo REST " + mTimeFormat.format(new Date()))
.compose(applySchedulers())
.subscribe(aVoid -> {
@@ -122,8 +122,8 @@ private void toast(String text) {
Toast.makeText(this, text, Toast.LENGTH_SHORT).show();
}

protected <T> Observable.Transformer<T, T> applySchedulers() {
return rObservable -> rObservable
protected <T> FlowableTransformer<T, T> applySchedulers() {
return tFlowable -> tFlowable
.unsubscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@@ -132,7 +132,7 @@ protected <T> Observable.Transformer<T, T> applySchedulers() {
@Override
protected void onDestroy() {
mStompClient.disconnect();
if (mRestPingSubscription != null) mRestPingSubscription.unsubscribe();
if (mRestPingDisposable != null) mRestPingDisposable.dispose();
super.onDestroy();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ua.naiksoftware.stompclientexample;

import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

/**
@@ -33,7 +33,7 @@ public static RestClient getInstance() {
private RestClient() {
Retrofit retrofit = new Retrofit.Builder().baseUrl("http://" + ANDROID_EMULATOR_LOCALHOST + ":" + SERVER_PORT + "/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
mExampleRepository = retrofit.create(ExampleRepository.class);
}
2 changes: 1 addition & 1 deletion lib/build.gradle
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ android {
dependencies {
compile fileTree(include: ['*.jar'], dir: 'libs')
testCompile 'junit:junit:4.12'
compile 'io.reactivex:rxjava:1.2.0'
compile "io.reactivex.rxjava2:rxjava:2.1.2"
// Supported transports
provided "org.java-websocket:java-websocket:1.3.2"
provided 'com.squareup.okhttp3:okhttp:3.8.0'
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ua.naiksoftware.stomp;

import rx.Observable;
import io.reactivex.Flowable;

/**
* Created by naik on 05.05.16.
@@ -10,17 +10,17 @@ public interface ConnectionProvider {
/**
* Subscribe this for receive stomp messages
*/
Observable<String> messages();
Flowable<String> messages();

/**
* Sending stomp messages via you ConnectionProvider.
* onError if not connected or error detected will be called, or onCompleted id sending started
* TODO: send messages with ACK
*/
Observable<Void> send(String stompMessage);
Flowable<Void> send(String stompMessage);

/**
* Subscribe this for receive #LifecycleEvent events
*/
Observable<LifecycleEvent> getLifecycleReceiver();
Flowable<LifecycleEvent> getLifecycleReceiver();
}
Original file line number Diff line number Diff line change
@@ -9,15 +9,16 @@
import java.util.Map;
import java.util.TreeMap;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import rx.Observable;
import rx.Subscriber;

/* package */ class OkHttpConnectionProvider implements ConnectionProvider {

@@ -27,40 +28,37 @@
private final Map<String, String> mConnectHttpHeaders;
private final OkHttpClient mOkHttpClient;

private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private final List<Subscriber<? super String>> mMessagesSubscribers;
private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters;
private final List<FlowableEmitter<? super String>> mMessagesEmitters;

private WebSocket openedSocked;


/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleSubscribers = new ArrayList<>();
mMessagesSubscribers = new ArrayList<>();
mLifecycleEmitters = new ArrayList<>();
mMessagesEmitters = new ArrayList<>();
mOkHttpClient = okHttpClient;
}

@Override
public Observable<String> messages() {
Observable<String> observable = Observable.<String>create(subscriber -> {
mMessagesSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}

if (mMessagesSubscribers.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});
public Flowable<String> messages() {
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}

if (mMessagesEmitters.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});
createWebSocketConnection();
return observable;
return flowable;
}

private void createWebSocketConnection() {
@@ -112,29 +110,27 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
}

@Override
public Observable<Void> send(String stompMessage) {
return Observable.create(subscriber -> {
public Flowable<Void> send(String stompMessage) {
return Flowable.create(subscriber -> {
if (openedSocked == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
openedSocked.send(stompMessage);
subscriber.onCompleted();
subscriber.onComplete();
}
});
}, BackpressureStrategy.BUFFER);
}

@Override
public Observable<LifecycleEvent> getLifecycleReceiver() {
return Observable.<LifecycleEvent>create(subscriber -> {
mLifecycleSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}
});
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
});
}

private TreeMap<String, String> headersAsMap(Response response) {
@@ -154,14 +150,14 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
subscriber.onNext(lifecycleEvent);
}
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
for (FlowableEmitter<? super String> subscriber : mMessagesEmitters) {
subscriber.onNext(stompMessage);
}
}
Original file line number Diff line number Diff line change
@@ -20,8 +20,9 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;

import rx.Observable;
import rx.Subscriber;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;

/**
* Created by naik on 05.05.16.
@@ -33,8 +34,8 @@
private final String mUri;
private final Map<String, String> mConnectHttpHeaders;

private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private final List<Subscriber<? super String>> mMessagesSubscribers;
private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters;
private final List<FlowableEmitter<? super String>> mMessagesEmitters;

private WebSocketClient mWebSocketClient;
private boolean haveConnection;
@@ -47,29 +48,26 @@
/* package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleSubscribers = new ArrayList<>();
mMessagesSubscribers = new ArrayList<>();
mLifecycleEmitters = new ArrayList<>();
mMessagesEmitters = new ArrayList<>();
}

@Override
public Observable<String> messages() {
Observable<String> observable = Observable.<String>create(subscriber -> {
mMessagesSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}

if (mMessagesSubscribers.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
mWebSocketClient.close();
}
});
public Flowable<String> messages() {
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}

if (mMessagesEmitters.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
mWebSocketClient.close();
}
});
createWebSocketConnection();
return observable;
return flowable;
}

private void createWebSocketConnection() {
@@ -133,42 +131,40 @@ public void onError(Exception ex) {
}

@Override
public Observable<Void> send(String stompMessage) {
return Observable.create(subscriber -> {
public Flowable<Void> send(String stompMessage) {
return Flowable.create(emitter -> {
if (mWebSocketClient == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
emitter.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
mWebSocketClient.send(stompMessage);
subscriber.onCompleted();
emitter.onComplete();
}
});
}, BackpressureStrategy.BUFFER);
}

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
subscriber.onNext(lifecycleEvent);
for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) {
emitter.onNext(lifecycleEvent);
}
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
subscriber.onNext(stompMessage);
for (FlowableEmitter<? super String> emitter : mMessagesEmitters) {
emitter.onNext(stompMessage);
}
}

@Override
public Observable<LifecycleEvent> getLifecycleReceiver() {
return Observable.<LifecycleEvent>create(subscriber -> {
mLifecycleSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}
});
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
});
}
}
101 changes: 51 additions & 50 deletions lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java
Original file line number Diff line number Diff line change
@@ -13,10 +13,11 @@
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import ua.naiksoftware.stomp.ConnectionProvider;
import ua.naiksoftware.stomp.LifecycleEvent;
import ua.naiksoftware.stomp.StompHeader;
@@ -31,17 +32,17 @@ public class StompClient {
public static final String SUPPORTED_VERSIONS = "1.1,1.0";
public static final String DEFAULT_ACK = "auto";

private Subscription mMessagesSubscription;
private Map<String, Set<Subscriber<? super StompMessage>>> mSubscribers = new HashMap<>();
private List<ConnectableObservable<Void>> mWaitConnectionObservables;
private Disposable mMessagesDisposable;
private Map<String, Set<FlowableEmitter<? super StompMessage>>> mEmitters = new HashMap<>();
private List<ConnectableFlowable<Void>> mWaitConnectionFlowables;
private final ConnectionProvider mConnectionProvider;
private HashMap<String, String> mTopics;
private boolean mConnected;
private boolean isConnecting;

public StompClient(ConnectionProvider connectionProvider) {
mConnectionProvider = connectionProvider;
mWaitConnectionObservables = new CopyOnWriteArrayList<>();
mWaitConnectionFlowables = new CopyOnWriteArrayList<>();
}

/**
@@ -96,103 +97,103 @@ public void connect(List<StompHeader> _headers, boolean reconnect) {
});

isConnecting = true;
mMessagesSubscription = mConnectionProvider.messages()
mMessagesDisposable = mConnectionProvider.messages()
.map(StompMessage::from)
.subscribe(stompMessage -> {
if (stompMessage.getStompCommand().equals(StompCommand.CONNECTED)) {
mConnected = true;
isConnecting = false;
for (ConnectableObservable<Void> observable : mWaitConnectionObservables) {
observable.connect();
for (ConnectableFlowable<Void> flowable : mWaitConnectionFlowables) {
flowable.connect();
}
mWaitConnectionObservables.clear();
mWaitConnectionFlowables.clear();
}
callSubscribers(stompMessage);
});
}

public Observable<Void> send(String destination) {
public Flowable<Void> send(String destination) {
return send(new StompMessage(
StompCommand.SEND,
Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)),
null));
}

public Observable<Void> send(String destination, String data) {
public Flowable<Void> send(String destination, String data) {
return send(new StompMessage(
StompCommand.SEND,
Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)),
data));
}

public Observable<Void> send(StompMessage stompMessage) {
Observable<Void> observable = mConnectionProvider.send(stompMessage.compile());
public Flowable<Void> send(StompMessage stompMessage) {
Flowable<Void> flowable = mConnectionProvider.send(stompMessage.compile());
if (!mConnected) {
ConnectableObservable<Void> deffered = observable.publish();
mWaitConnectionObservables.add(deffered);
ConnectableFlowable<Void> deffered = flowable.publish();
mWaitConnectionFlowables.add(deffered);
return deffered;
} else {
return observable;
return flowable;
}
}

private void callSubscribers(StompMessage stompMessage) {
String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION);
for (String dest : mSubscribers.keySet()) {
for (String dest : mEmitters.keySet()) {
if (dest.equals(messageDestination)) {
for (Subscriber<? super StompMessage> subscriber : mSubscribers.get(dest)) {
for (FlowableEmitter<? super StompMessage> subscriber : mEmitters.get(dest)) {
subscriber.onNext(stompMessage);
}
return;
}
}
}

public Observable<LifecycleEvent> lifecycle() {
public Flowable<LifecycleEvent> lifecycle() {
return mConnectionProvider.getLifecycleReceiver();
}

public void disconnect() {
if (mMessagesSubscription != null) mMessagesSubscription.unsubscribe();
if (mMessagesDisposable != null) mMessagesDisposable.dispose();
mConnected = false;
}

public Observable<StompMessage> topic(String destinationPath) {
public Flowable<StompMessage> topic(String destinationPath) {
return topic(destinationPath, null);
}

public Observable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
return Observable.<StompMessage>create(subscriber -> {
Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
if (subscribersSet == null) {
subscribersSet = new HashSet<>();
mSubscribers.put(destinationPath, subscribersSet);
public Flowable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
return Flowable.<StompMessage>create(emitter -> {
Set<FlowableEmitter<? super StompMessage>> emittersSet = mEmitters.get(destinationPath);
if (emittersSet == null) {
emittersSet = new HashSet<>();
mEmitters.put(destinationPath, emittersSet);
subscribePath(destinationPath, headerList).subscribe();
}
subscribersSet.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<String> mapIterator = mSubscribers.keySet().iterator();
while (mapIterator.hasNext()) {
String destinationUrl = mapIterator.next();
Set<Subscriber<? super StompMessage>> set = mSubscribers.get(destinationUrl);
Iterator<Subscriber<? super StompMessage>> setIterator = set.iterator();
while (setIterator.hasNext()) {
Subscriber<? super StompMessage> subscriber = setIterator.next();
if (subscriber.isUnsubscribed()) {
setIterator.remove();
if (set.size() < 1) {
mapIterator.remove();
unsubscribePath(destinationUrl).subscribe();
emittersSet.add(emitter);
}, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<String> mapIterator = mEmitters.keySet().iterator();
while (mapIterator.hasNext()) {
String destinationUrl = mapIterator.next();
Set<FlowableEmitter<? super StompMessage>> set = mEmitters.get(destinationUrl);
Iterator<FlowableEmitter<? super StompMessage>> setIterator = set.iterator();
while (setIterator.hasNext()) {
FlowableEmitter<? super StompMessage> subscriber = setIterator.next();
if (subscriber.isCancelled()) {
setIterator.remove();
if (set.size() < 1) {
mapIterator.remove();
unsubscribePath(destinationUrl).subscribe();
}
}
}
}
}
});
});
}

private Observable<Void> subscribePath(String destinationPath, List<StompHeader> headerList) {
if (destinationPath == null) return Observable.empty();
private Flowable<Void> subscribePath(String destinationPath, List<StompHeader> headerList) {
if (destinationPath == null) return Flowable.empty();
String topicId = UUID.randomUUID().toString();

if (mTopics == null) mTopics = new HashMap<>();
@@ -207,7 +208,7 @@ private Observable<Void> subscribePath(String destinationPath, List<StompHeader>
}


private Observable<Void> unsubscribePath(String dest) {
private Flowable<Void> unsubscribePath(String dest) {
String topicId = mTopics.get(dest);
Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);

0 comments on commit 8780db3

Please sign in to comment.