From 0a179d9302a20706b4cc06358ee11313d07f4119 Mon Sep 17 00:00:00 2001 From: NaikSoftware Date: Thu, 10 Aug 2017 20:16:09 +0300 Subject: [PATCH] Fix NPE when unsubscribe from lifecycle and emit lifecycle event in one time --- .idea/misc.xml | 2 +- example-client/build.gradle | 4 +- .../stomp/OkHttpConnectionProvider.java | 50 +++++++++++-------- .../stomp/WebSocketsConnectionProvider.java | 49 ++++++++++-------- 4 files changed, 59 insertions(+), 46 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index d2e6725..4c9d020 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -27,7 +27,7 @@ - + diff --git a/example-client/build.gradle b/example-client/build.gradle index 2646b67..6133d79 100644 --- a/example-client/build.gradle +++ b/example-client/build.gradle @@ -28,9 +28,9 @@ android { dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) testCompile 'junit:junit:4.12' - compile 'com.android.support:appcompat-v7:25.2.0' + compile 'com.android.support:appcompat-v7:25.3.1' compile 'org.java-websocket:java-websocket:1.3.2' - compile 'com.android.support:recyclerview-v7:25.2.0' + compile 'com.android.support:recyclerview-v7:25.3.1' compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'com.squareup.retrofit2:converter-gson:2.3.0' compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' diff --git a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java index 6d53692..0bdfffb 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java @@ -33,6 +33,8 @@ private WebSocket openedSocked; + private final Object mLifecycleLock = new Object(); + /* package */ OkHttpConnectionProvider(String uri, Map connectHttpHeaders, OkHttpClient okHttpClient) { mUri = uri; @@ -45,18 +47,18 @@ @Override public Flowable messages() { Flowable flowable = Flowable.create(mMessagesEmitters::add, BackpressureStrategy.BUFFER) - .doOnCancel(() -> { - Iterator> iterator = mMessagesEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } + .doOnCancel(() -> { + Iterator> iterator = mMessagesEmitters.iterator(); + while (iterator.hasNext()) { + if (iterator.next().isCancelled()) iterator.remove(); + } - if (mMessagesEmitters.size() < 1) { - Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); - openedSocked.close(1000, ""); - openedSocked = null; - } - }); + if (mMessagesEmitters.size() < 1) { + Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); + openedSocked.close(1000, ""); + openedSocked = null; + } + }); createWebSocketConnection(); return flowable; } @@ -69,9 +71,9 @@ private void createWebSocketConnection() { Request.Builder requestBuilder = new Request.Builder() .url(mUri); - + addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders); - + openedSocked = mOkHttpClient.newWebSocket(requestBuilder.build(), new WebSocketListener() { @Override @@ -125,12 +127,14 @@ public Flowable send(String stompMessage) { @Override public Flowable getLifecycleReceiver() { return Flowable.create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) - .doOnCancel(() -> { - Iterator> iterator = mLifecycleEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - }); + .doOnCancel(() -> { + synchronized (mLifecycleLock) { + Iterator> iterator = mLifecycleEmitters.iterator(); + while (iterator.hasNext()) { + if (iterator.next().isCancelled()) iterator.remove(); + } + } + }); } private TreeMap headersAsMap(Response response) { @@ -149,9 +153,11 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map subscriber : mLifecycleEmitters) { - subscriber.onNext(lifecycleEvent); + synchronized (mLifecycleLock) { + Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); + for (FlowableEmitter subscriber : mLifecycleEmitters) { + subscriber.onNext(lifecycleEvent); + } } } diff --git a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java index 17db25e..4ad7d59 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java @@ -41,8 +41,11 @@ private boolean haveConnection; private TreeMap mServerHandshakeHeaders; + private final Object mLifecycleLock = new Object(); + /** * Support UIR scheme ws://host:port/path + * * @param connectHttpHeaders may be null */ /* package */ WebSocketsConnectionProvider(String uri, Map connectHttpHeaders) { @@ -55,17 +58,17 @@ @Override public Flowable messages() { Flowable flowable = Flowable.create(mMessagesEmitters::add, BackpressureStrategy.BUFFER) - .doOnCancel(() -> { - Iterator> iterator = mMessagesEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - - if (mMessagesEmitters.size() < 1) { - Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); - mWebSocketClient.close(); - } - }); + .doOnCancel(() -> { + Iterator> iterator = mMessagesEmitters.iterator(); + while (iterator.hasNext()) { + if (iterator.next().isCancelled()) iterator.remove(); + } + + if (mMessagesEmitters.size() < 1) { + Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); + mWebSocketClient.close(); + } + }); createWebSocketConnection(); return flowable; } @@ -115,7 +118,7 @@ public void onError(Exception ex) { } }; - if(mUri.startsWith("wss")) { + if (mUri.startsWith("wss")) { try { SSLContext sc = SSLContext.getInstance("TLS"); sc.init(null, null, null); @@ -144,9 +147,11 @@ public Flowable send(String stompMessage) { } private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) { - Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); - for (FlowableEmitter emitter : mLifecycleEmitters) { - emitter.onNext(lifecycleEvent); + synchronized (mLifecycleLock) { + Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); + for (FlowableEmitter emitter : mLifecycleEmitters) { + emitter.onNext(lifecycleEvent); + } } } @@ -160,11 +165,13 @@ private void emitMessage(String stompMessage) { @Override public Flowable getLifecycleReceiver() { return Flowable.create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) - .doOnCancel(() -> { - Iterator> iterator = mLifecycleEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - }); + .doOnCancel(() -> { + synchronized (mLifecycleLock) { + Iterator> iterator = mLifecycleEmitters.iterator(); + while (iterator.hasNext()) { + if (iterator.next().isCancelled()) iterator.remove(); + } + } + }); } }