From 87b71c7c3dfc4dd4e46236d5d94803c25dba844d Mon Sep 17 00:00:00 2001 From: Dariusz Seweryn Date: Wed, 26 Apr 2017 15:53:43 +0200 Subject: [PATCH] Fixed a race condition in `RxBleRadioOperationConnect`. https://github.com/Polidea/RxAndroidBle/issues/178 Reviewers: michal.zielinski, pawel.urban Reviewed By: pawel.urban Differential Revision: https://phabricator.polidea.com/D2321 --- .../RxBleRadioOperationConnect.java | 69 ++++++++++++------- .../RxBleRadioOperationConnectTest.groovy | 1 + 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnect.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnect.java index 8c6b0d501..617f14c63 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnect.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnect.java @@ -16,14 +16,15 @@ import com.polidea.rxandroidble.internal.connection.RxBleGattCallback; import com.polidea.rxandroidble.internal.util.BleConnectionCompat; -import java.util.concurrent.Callable; - import javax.inject.Inject; import javax.inject.Named; +import rx.Emitter; import rx.Observable; +import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; +import rx.functions.Cancellable; import rx.functions.Func0; import rx.functions.Func1; import rx.subjects.BehaviorSubject; @@ -182,34 +183,56 @@ private Observable getConnectedBluetoothGatt() { // start connecting the BluetoothGatt // note: Due to different Android BLE stack implementations it is not certain whether `connectGatt()` or `BluetoothGattCallback` // will emit BluetoothGatt first - return connectGatt() - // disconnect may happen even if the connection was not established yet - .mergeWith(rxBleGattCallback.observeDisconnect()) - // capture BluetoothGatt when connected - .sample(rxBleGattCallback - .getOnConnectionStateChange() - .filter(new Func1() { + return Observable.create( + new Action1>() { + @Override + public void call(Emitter emitter) { + final Subscription connectedBluetoothGattSubscription = Observable.fromCallable(new Func0() { @Override - public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) { - return rxBleConnectionState == CONNECTED; + public BluetoothGatt call() { + return bluetoothGattProvider.getBluetoothGatt(); } - })) - .take(1); - } + }) + // when the connected state will be emitted bluetoothGattProvider should contain valid Gatt + .delaySubscription( + rxBleGattCallback + .getOnConnectionStateChange() + .takeFirst( + new Func1() { + @Override + public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) { + return rxBleConnectionState == CONNECTED; + } + } + ) + ) + // disconnect may happen even if the connection was not established yet + .mergeWith(rxBleGattCallback.observeDisconnect()) + .subscribe(emitter); + + emitter.setCancellation(new Cancellable() { + @Override + public void cancel() throws Exception { + connectedBluetoothGattSubscription.unsubscribe(); + } + }); + + /* + * Apparently the connection may be established fast enough to introduce a race condition so the subscription + * must be established first before starting the connection. + * https://github.com/Polidea/RxAndroidBle/issues/178 + * */ - @NonNull - private Observable connectGatt() { - return Observable.fromCallable( - new Callable() { - @Override - public BluetoothGatt call() throws Exception { final BluetoothGatt bluetoothGatt = connectionCompat .connectGatt(bluetoothDevice, autoConnect, rxBleGattCallback.getBluetoothGattCallback()); - // Capture BluetoothGatt when connection is initiated. + /* + * Update BluetoothGatt when connection is initiated. It is not certain + * if this or RxBleGattCallback.onConnectionStateChange will be first. + * */ bluetoothGattProvider.updateBluetoothGatt(bluetoothGatt); - return bluetoothGatt; } - } + }, + Emitter.BackpressureMode.NONE ); } diff --git a/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnectTest.groovy b/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnectTest.groovy index e6f960aa4..8117c329b 100644 --- a/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnectTest.groovy +++ b/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/RxBleRadioOperationConnectTest.groovy @@ -149,6 +149,7 @@ public class RxBleRadioOperationConnectTest extends Specification { } private emitConnectedConnectionState() { + mockBluetoothGattProvider.getBluetoothGatt() >> mockGatt onConnectionStateSubject.onNext(RxBleConnection.RxBleConnectionState.CONNECTED) }