Skip to content

Commit

Permalink
Issue #48 : Convert from RxJava 1 to RxJava 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
hardyeats committed Aug 5, 2017
1 parent d2cac6d commit d945182
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 153 deletions.
8 changes: 4 additions & 4 deletions example-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ dependencies {
compile 'com.android.support:appcompat-v7:25.2.0'
compile 'org.java-websocket:java-websocket:1.3.2'
compile 'com.android.support:recyclerview-v7:25.2.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile project(':lib')
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package ua.naiksoftware.stompclientexample;

import io.reactivex.Flowable;
import retrofit2.http.POST;
import retrofit2.http.Query;
import rx.Observable;

/**
* Created by Naik on 24.02.17.
*/
public interface ExampleRepository {

@POST("hello-convert-and-send")
Observable<Void> sendRestEcho(@Query("msg") String message);
Flowable<Void> sendRestEcho(@Query("msg") String message);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ua.naiksoftware.stompclientexample;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.util.Log;
Expand All @@ -19,10 +19,10 @@
import java.util.List;
import java.util.Locale;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import io.reactivex.FlowableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.client.StompClient;

Expand All @@ -35,7 +35,7 @@ public class MainActivity extends AppCompatActivity {
private SimpleAdapter mAdapter;
private List<String> mDataSet = new ArrayList<>();
private StompClient mStompClient;
private Subscription mRestPingSubscription;
private Disposable mRestPingDisposable;
private final SimpleDateFormat mTimeFormat = new SimpleDateFormat("HH:mm:ss", Locale.getDefault());
private RecyclerView mRecyclerView;
private Gson mGson = new GsonBuilder().create();
Expand Down Expand Up @@ -100,7 +100,7 @@ public void sendEchoViaStomp(View v) {
}

public void sendEchoViaRest(View v) {
mRestPingSubscription = RestClient.getInstance().getExampleRepository()
mRestPingDisposable = RestClient.getInstance().getExampleRepository()
.sendRestEcho("Echo REST " + mTimeFormat.format(new Date()))
.compose(applySchedulers())
.subscribe(aVoid -> {
Expand All @@ -122,8 +122,8 @@ private void toast(String text) {
Toast.makeText(this, text, Toast.LENGTH_SHORT).show();
}

protected <T> Observable.Transformer<T, T> applySchedulers() {
return rObservable -> rObservable
protected <T> FlowableTransformer<T, T> applySchedulers() {
return tFlowable -> tFlowable
.unsubscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Expand All @@ -132,7 +132,7 @@ protected <T> Observable.Transformer<T, T> applySchedulers() {
@Override
protected void onDestroy() {
mStompClient.disconnect();
if (mRestPingSubscription != null) mRestPingSubscription.unsubscribe();
if (mRestPingDisposable != null) mRestPingDisposable.dispose();
super.onDestroy();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ua.naiksoftware.stompclientexample;

import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

/**
Expand Down Expand Up @@ -33,7 +33,7 @@ public static RestClient getInstance() {
private RestClient() {
Retrofit retrofit = new Retrofit.Builder().baseUrl("http://" + ANDROID_EMULATOR_LOCALHOST + ":" + SERVER_PORT + "/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
mExampleRepository = retrofit.create(ExampleRepository.class);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ android {
dependencies {
compile fileTree(include: ['*.jar'], dir: 'libs')
testCompile 'junit:junit:4.12'
compile 'io.reactivex:rxjava:1.2.0'
compile "io.reactivex.rxjava2:rxjava:2.1.2"
// Supported transports
provided "org.java-websocket:java-websocket:1.3.2"
provided 'com.squareup.okhttp3:okhttp:3.8.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ua.naiksoftware.stomp;

import rx.Observable;
import io.reactivex.Flowable;

/**
* Created by naik on 05.05.16.
Expand All @@ -10,17 +10,17 @@ public interface ConnectionProvider {
/**
* Subscribe this for receive stomp messages
*/
Observable<String> messages();
Flowable<String> messages();

/**
* Sending stomp messages via you ConnectionProvider.
* onError if not connected or error detected will be called, or onCompleted id sending started
* TODO: send messages with ACK
*/
Observable<Void> send(String stompMessage);
Flowable<Void> send(String stompMessage);

/**
* Subscribe this for receive #LifecycleEvent events
*/
Observable<LifecycleEvent> getLifecycleReceiver();
Flowable<LifecycleEvent> getLifecycleReceiver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
import java.util.Map;
import java.util.TreeMap;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import rx.Observable;
import rx.Subscriber;

/* package */ class OkHttpConnectionProvider implements ConnectionProvider {

Expand All @@ -27,40 +28,37 @@
private final Map<String, String> mConnectHttpHeaders;
private final OkHttpClient mOkHttpClient;

private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private final List<Subscriber<? super String>> mMessagesSubscribers;
private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters;
private final List<FlowableEmitter<? super String>> mMessagesEmitters;

private WebSocket openedSocked;


/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleSubscribers = new ArrayList<>();
mMessagesSubscribers = new ArrayList<>();
mLifecycleEmitters = new ArrayList<>();
mMessagesEmitters = new ArrayList<>();
mOkHttpClient = okHttpClient;
}

@Override
public Observable<String> messages() {
Observable<String> observable = Observable.<String>create(subscriber -> {
mMessagesSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}

if (mMessagesSubscribers.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});
public Flowable<String> messages() {
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}

if (mMessagesEmitters.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});
createWebSocketConnection();
return observable;
return flowable;
}

private void createWebSocketConnection() {
Expand Down Expand Up @@ -112,29 +110,27 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
}

@Override
public Observable<Void> send(String stompMessage) {
return Observable.create(subscriber -> {
public Flowable<Void> send(String stompMessage) {
return Flowable.create(subscriber -> {
if (openedSocked == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
openedSocked.send(stompMessage);
subscriber.onCompleted();
subscriber.onComplete();
}
});
}, BackpressureStrategy.BUFFER);
}

@Override
public Observable<LifecycleEvent> getLifecycleReceiver() {
return Observable.<LifecycleEvent>create(subscriber -> {
mLifecycleSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}
});
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doOnCancel(() -> {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
});
}

private TreeMap<String, String> headersAsMap(Response response) {
Expand All @@ -154,14 +150,14 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
subscriber.onNext(lifecycleEvent);
}
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
for (FlowableEmitter<? super String> subscriber : mMessagesEmitters) {
subscriber.onNext(stompMessage);
}
}
Expand Down
Loading

0 comments on commit d945182

Please sign in to comment.