Skip to content

Commit

Permalink
Added Bluetooth Adapter state monitoring to connection (#275) (#278)
Browse files Browse the repository at this point in the history
`DeadObjectException` is usually thrown when interacting with `BluetoothAdapter` or `BluetoothGatt` instance that was obtained before bluetooth being turned off. Prior to library version `1.3.0` all errors raised when interacting with `BluetoothGatt` or recieved by `BluetoothGattCallback` were closing the connection (were emitted by `RxBleDevice.establishConnection()`). After `1.3.0` only errors raised in `RxBleRadioOperationConnect` and recieved by `BluetoothGattCallback.onConnectionStateChange()` were closing the connection so it was possible that `DeadObjectException`s raised repetedly by subscribing to i.e. `RxBleConnection.readRssi()` would not close the connection. Added monitoring of BluetoothAdapter’s state to prevent `DeadObjectException`s and inform the user about the connection loss as soon as possible.
Moved responsibility of checking the BluetoothAdapter’s state from Connector to DisconnectionRouter to have it in one place.
  • Loading branch information
dariuszseweryn authored Sep 11, 2017
1 parent 34cae8c commit d3e8f68
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 307 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.polidea.rxandroidble.internal.connection;


import com.jakewharton.rxrelay.PublishRelay;
import com.polidea.rxandroidble.RxBleAdapterStateObservable;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.exceptions.BleGattException;
import com.polidea.rxandroidble.internal.DeviceModule;
import com.polidea.rxandroidble.internal.util.RxBleAdapterWrapper;
import javax.inject.Inject;
import javax.inject.Named;
import rx.Observable;
import rx.functions.Func1;

/**
* A class that is responsible for routing all potential sources of disconnection to an Observable that emits only errors.
*/
@ConnectionScope
class DisconnectionRouter {

private final PublishRelay<BleException> disconnectionErrorRelay = PublishRelay.create();

private final Observable disconnectionErrorObservable;

@Inject
DisconnectionRouter(
@Named(DeviceModule.MAC_ADDRESS) final String macAddress,
final RxBleAdapterWrapper adapterWrapper,
final Observable<RxBleAdapterStateObservable.BleAdapterState> adapterStateObservable
) {
disconnectionErrorObservable = Observable.merge(
disconnectionErrorRelay
.flatMap(new Func1<BleException, Observable<?>>() {
@Override
public Observable<?> call(BleException e) {
return Observable.error(e);
}
}),
adapterStateObservable
.map(new Func1<RxBleAdapterStateObservable.BleAdapterState, Boolean>() {
@Override
public Boolean call(RxBleAdapterStateObservable.BleAdapterState bleAdapterState) {
return bleAdapterState.isUsable();
}
})
.startWith(adapterWrapper.isBluetoothEnabled())
.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean isAdapterUsable) {
return !isAdapterUsable;
}
})
.flatMap(new Func1<Boolean, Observable<?>>() {
@Override
public Observable<?> call(Boolean isAdapterUsable) {
return Observable.error(new BleDisconnectedException(macAddress)); // TODO: Introduce BleDisabledException?
}
})
)
.replay()
.autoConnect(0);
}

/**
* Method to be called whenever a connection braking exception happens. It will be routed to {@link #asObservable()}.
*
* @param disconnectedException the exception that happened
*/
void onDisconnectedException(BleDisconnectedException disconnectedException) {
disconnectionErrorRelay.call(disconnectedException);
}

/**
* Method to be called whenever a BluetoothGattCallback.onConnectionStateChange() will get called with status != GATT_SUCCESS
*
* @param disconnectedGattException the exception that happened
*/
void onGattConnectionStateException(BleGattException disconnectedGattException) {
disconnectionErrorRelay.call(disconnectedGattException);
}

/**
* Function returning an Observable that will only throw error in case of a disconnection
*
* @param <T> the type of returned observable
* @return the Observable
*/
<T> Observable<T> asObservable() {
//noinspection unchecked
return disconnectionErrorObservable;
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,30 @@
package com.polidea.rxandroidble.internal.connection;

import android.bluetooth.BluetoothDevice;
import android.bluetooth.BluetoothGatt;
import android.support.annotation.NonNull;

import com.polidea.rxandroidble.RxBleAdapterStateObservable.BleAdapterState;
import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.internal.RxBleRadio;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationConnect;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationDisconnect;
import com.polidea.rxandroidble.internal.util.RxBleAdapterWrapper;

import javax.inject.Inject;

import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Actions;
import rx.functions.Func0;
import rx.functions.Func1;

import static com.polidea.rxandroidble.internal.util.ObservableUtil.justOnNext;

public class RxBleConnectionConnectorImpl implements RxBleConnection.Connector {

private final BluetoothDevice bluetoothDevice;
private final RxBleRadio rxBleRadio;
private final RxBleAdapterWrapper rxBleAdapterWrapper;
private final Observable<BleAdapterState> adapterStateObservable;
private final ConnectionComponent.Builder connectionComponentBuilder;

@Inject
public RxBleConnectionConnectorImpl(
BluetoothDevice bluetoothDevice,
RxBleRadio rxBleRadio,
RxBleAdapterWrapper rxBleAdapterWrapper,
Observable<BleAdapterState> adapterStateObservable,
ConnectionComponent.Builder connectionComponentBuilder) {
this.bluetoothDevice = bluetoothDevice;
this.rxBleRadio = rxBleRadio;
this.rxBleAdapterWrapper = rxBleAdapterWrapper;
this.adapterStateObservable = adapterStateObservable;
this.connectionComponentBuilder = connectionComponentBuilder;
}

Expand All @@ -51,71 +34,36 @@ public Observable<RxBleConnection> prepareConnection(final boolean autoConnect)
@Override
public Observable<RxBleConnection> call() {

if (!rxBleAdapterWrapper.isBluetoothEnabled()) {
return Observable.error(new BleDisconnectedException(bluetoothDevice.getAddress()));
}

final ConnectionComponent connectionComponent = connectionComponentBuilder.build();
RxBleRadioOperationConnect operationConnect = connectionComponent.connectOperationBuilder()
.setAutoConnect(autoConnect)
.build();

return enqueueConnectOperation(operationConnect)
.flatMap(new Func1<BluetoothGatt, Observable<RxBleConnection>>() {
@Override
public Observable<RxBleConnection> call(final BluetoothGatt bluetoothGatt) {
return Observable.merge(
justOnNext(connectionComponent.rxBleConnection()),
connectionComponent.gattCallback().<RxBleConnection>observeDisconnect()
);
}
})
.doOnUnsubscribe(disconnect(connectionComponent.disconnectOperation()));
final RxBleConnection connection = connectionComponent.rxBleConnection();
final Observable<BluetoothGatt> connectedObservable = rxBleRadio.queue(operationConnect);
final Observable<RxBleConnection> disconnectedErrorObservable = connectionComponent.gattCallback().observeDisconnect();
final Action0 disconnect = queueIgnoringResult(connectionComponent.disconnectOperation());

return Observable.just(connection)
.delaySubscription(connectedObservable)
.mergeWith(disconnectedErrorObservable)
.doOnUnsubscribe(disconnect);
}

@NonNull
private Action0 disconnect(final RxBleRadioOperationDisconnect operationDisconnect) {
private Action0 queueIgnoringResult(final RxBleRadioOperationDisconnect operationDisconnect) {
return new Action0() {
@Override
public void call() {
enqueueDisconnectOperation(operationDisconnect);
rxBleRadio
.queue(operationDisconnect)
.subscribe(
Actions.empty(),
Actions.<Throwable>toAction1(Actions.empty())
);
}
};
}

@NonNull
private Observable<BluetoothGatt> enqueueConnectOperation(RxBleRadioOperationConnect operationConnect) {
return Observable
.merge(
rxBleRadio.queue(operationConnect),
adapterNotUsableObservable()
.flatMap(new Func1<BleAdapterState, Observable<BluetoothGatt>>() {
@Override
public Observable<BluetoothGatt> call(BleAdapterState bleAdapterState) {
return Observable.error(new BleDisconnectedException(bluetoothDevice.getAddress()));
}
})
)
.first();
}
});
}
private Observable<BleAdapterState> adapterNotUsableObservable() {
return adapterStateObservable
.filter(new Func1<BleAdapterState, Boolean>() {
@Override
public Boolean call(BleAdapterState bleAdapterState) {
return !bleAdapterState.isUsable();
}
});
}

private Subscription enqueueDisconnectOperation(RxBleRadioOperationDisconnect operationDisconnect) {
return rxBleRadio
.queue(operationDisconnect)
.subscribe(
Actions.empty(),
Actions.<Throwable>toAction1(Actions.empty())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import android.bluetooth.BluetoothGattCallback;
import android.bluetooth.BluetoothGattCharacteristic;
import android.bluetooth.BluetoothGattDescriptor;
import android.util.Pair;

import com.jakewharton.rxrelay.PublishRelay;
import com.jakewharton.rxrelay.SerializedRelay;
Expand Down Expand Up @@ -35,7 +34,8 @@ public class RxBleGattCallback {

private final Scheduler callbackScheduler;
private final BluetoothGattProvider bluetoothGattProvider;
private final Output<Pair<BluetoothGatt, RxBleConnectionState>> gattAndConnectionStateOutput = new Output<>();
private final DisconnectionRouter disconnectionRouter;
private final PublishRelay<RxBleConnectionState> connectionStatePublishRelay = PublishRelay.create();
private final Output<RxBleDeviceServices> servicesDiscoveredOutput = new Output<>();
private final Output<ByteAssociation<UUID>> readCharacteristicOutput = new Output<>();
private final Output<ByteAssociation<UUID>> writeCharacteristicOutput = new Output<>();
Expand All @@ -45,39 +45,20 @@ public class RxBleGattCallback {
private final Output<ByteAssociation<BluetoothGattDescriptor>> writeDescriptorOutput = new Output<>();
private final Output<Integer> readRssiOutput = new Output<>();
private final Output<Integer> changedMtuOutput = new Output<>();
private final Func1<BleGattException, Object> errorMapper = new Func1<BleGattException, Object>() {
private final Func1<BleGattException, Observable<?>> errorMapper = new Func1<BleGattException, Observable<?>>() {
@Override
public Object call(BleGattException bleGattException) {
throw bleGattException;
public Observable<?> call(BleGattException bleGattException) {
return Observable.error(bleGattException);
}
};
private final Observable disconnectedErrorObservable = gattAndConnectionStateOutput.valueRelay
.filter(new Func1<Pair<BluetoothGatt, RxBleConnectionState>, Boolean>() {
@Override
public Boolean call(Pair<BluetoothGatt, RxBleConnectionState> pair) {
return isDisconnectedOrDisconnecting(pair);
}
})
.map(new Func1<Pair<BluetoothGatt, RxBleConnectionState>, Object>() {
@Override
public Object call(Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleConnectionStatePair) {
throw new BleDisconnectedException(bluetoothGattRxBleConnectionStatePair.first.getDevice().getAddress());
}
})
.mergeWith(gattAndConnectionStateOutput.errorRelay.map(errorMapper))
.replay()
.autoConnect(0);

@Inject
public RxBleGattCallback(@Named(ClientComponent.NamedSchedulers.GATT_CALLBACK) Scheduler callbackScheduler,
BluetoothGattProvider bluetoothGattProvider) {
BluetoothGattProvider bluetoothGattProvider,
DisconnectionRouter disconnectionRouter) {
this.callbackScheduler = callbackScheduler;
this.bluetoothGattProvider = bluetoothGattProvider;
}

private boolean isDisconnectedOrDisconnecting(Pair<BluetoothGatt, RxBleConnectionState> pair) {
RxBleConnectionState rxBleConnectionState = pair.second;
return rxBleConnectionState == RxBleConnectionState.DISCONNECTED || rxBleConnectionState == RxBleConnectionState.DISCONNECTING;
this.disconnectionRouter = disconnectionRouter;
}

private BluetoothGattCallback bluetoothGattCallback = new BluetoothGattCallback() {
Expand All @@ -88,8 +69,19 @@ public void onConnectionStateChange(BluetoothGatt gatt, int status, int newState
super.onConnectionStateChange(gatt, status, newState);
bluetoothGattProvider.updateBluetoothGatt(gatt);

propagateErrorIfOccurred(gattAndConnectionStateOutput, gatt, status, BleGattOperationType.CONNECTION_STATE);
gattAndConnectionStateOutput.valueRelay.call(new Pair<>(gatt, mapConnectionStateToRxBleConnectionStatus(newState)));
if (isDisconnectedOrDisconnecting(newState)) {
disconnectionRouter.onDisconnectedException(new BleDisconnectedException(gatt.getDevice().getAddress()));
} else if (status != BluetoothGatt.GATT_SUCCESS) {
disconnectionRouter.onGattConnectionStateException(
new BleGattException(gatt, status, BleGattOperationType.CONNECTION_STATE)
);
}

connectionStatePublishRelay.call(mapConnectionStateToRxBleConnectionStatus(newState));
}

private boolean isDisconnectedOrDisconnecting(int newState) {
return newState == BluetoothGatt.STATE_DISCONNECTED || newState == BluetoothGatt.STATE_DISCONNECTING;
}

@Override
Expand Down Expand Up @@ -253,10 +245,9 @@ private boolean propagateStatusError(Output output, BleGattException exception)
private <T> Observable<T> withDisconnectionHandling(Output<T> output) {
//noinspection unchecked
return Observable.merge(
disconnectedErrorObservable,
gattAndConnectionStateOutput.errorRelay.map(errorMapper),
disconnectionRouter.<T>asObservable(),
output.valueRelay,
output.errorRelay.map(errorMapper)
(Observable<T>) output.errorRelay.flatMap(errorMapper)
);
}

Expand All @@ -271,23 +262,15 @@ public BluetoothGattCallback getBluetoothGattCallback() {
*/
public <T> Observable<T> observeDisconnect() {
//noinspection unchecked
return disconnectedErrorObservable;
return disconnectionRouter.asObservable();
}

/**
* @return Observable that emits RxBleConnectionState that matches BluetoothGatt's state.
* Does NOT emit errors even if status != GATT_SUCCESS.
*/
public Observable<RxBleConnectionState> getOnConnectionStateChange() {
return gattAndConnectionStateOutput.valueRelay.map(
new Func1<Pair<BluetoothGatt, RxBleConnectionState>, RxBleConnectionState>() {
@Override
public RxBleConnectionState call(
Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleConnectionStatePair) {
return bluetoothGattRxBleConnectionStatePair.second;
}
}
).observeOn(callbackScheduler);
return connectionStatePublishRelay.observeOn(callbackScheduler);
}

public Observable<RxBleDeviceServices> getOnServicesDiscovered() {
Expand All @@ -309,7 +292,7 @@ public Observable<ByteAssociation<UUID>> getOnCharacteristicWrite() {
public Observable<CharacteristicChangedEvent> getOnCharacteristicChanged() {
//noinspection unchecked
return Observable.merge(
disconnectedErrorObservable,
disconnectionRouter.<CharacteristicChangedEvent>asObservable(),
changedCharacteristicSerializedPublishRelay
)
.observeOn(callbackScheduler);
Expand Down
Loading

0 comments on commit d3e8f68

Please sign in to comment.