Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare DisconnectionRouter for concurrent access. #442

Merged
merged 1 commit into from
Jun 13, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.polidea.rxandroidble2.internal.RxBleLog;
import com.polidea.rxandroidble2.internal.util.RxBleAdapterWrapper;

import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;

import bleshadow.javax.inject.Inject;
Expand All @@ -30,7 +30,7 @@
class DisconnectionRouter implements DisconnectionRouterInput, DisconnectionRouterOutput {

private static final String TAG = "DisconnectionRouter";
private final Queue<ObservableEmitter<BleException>> exceptionEmitters = new LinkedList<>();
private final Queue<ObservableEmitter<BleException>> exceptionEmitters = new ConcurrentLinkedQueue<>();
private BleException exceptionOccurred;
private Disposable adapterMonitoringDisposable;

Expand Down Expand Up @@ -60,14 +60,14 @@ public BleException apply(Boolean isAdapterUsable) {
.firstElement()
.subscribe(new Consumer<BleException>() {
@Override
public void accept(BleException exception) throws Exception {
public void accept(BleException exception) {
RxBleLog.d(TAG, "An exception received, indicating that the adapter has became unusable.");
exceptionOccurred = exception;
notifySubscribersAboutException();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
public void accept(Throwable throwable) {
RxBleLog.w(TAG, "Failed to monitor adapter state.", throwable);
}
});
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onGattConnectionStateException(BleGattException disconnectedGattExce
onExceptionOccurred(disconnectedGattException);
}

private void onExceptionOccurred(BleException exception) {
private synchronized void onExceptionOccurred(BleException exception) {
if (exceptionOccurred == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use AtomicReference instead?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this would be sufficient for this use-case as the race may if a new observer is subscribing to asValueOnlyObservable after the check for exceptionOccured is made

exceptionOccurred = exception;
notifySubscribersAboutException();
Expand All @@ -133,12 +133,14 @@ private void notifySubscribersAboutException() {
public Observable<BleException> asValueOnlyObservable() {
return Observable.create(new ObservableOnSubscribe<BleException>() {
@Override
public void subscribe(final ObservableEmitter<BleException> emitter) throws Exception {
if (exceptionOccurred != null) {
emitter.onNext(exceptionOccurred);
emitter.onComplete();
} else {
storeEmitterToBeNotifiedInTheFuture(emitter);
public void subscribe(final ObservableEmitter<BleException> emitter) {
synchronized (DisconnectionRouter.this) {
if (exceptionOccurred != null) {
emitter.onNext(exceptionOccurred);
emitter.onComplete();
} else {
storeEmitterToBeNotifiedInTheFuture(emitter);
}
}
}
});
Expand All @@ -148,7 +150,7 @@ private void storeEmitterToBeNotifiedInTheFuture(final ObservableEmitter<BleExce
exceptionEmitters.add(emitter);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
public void cancel() {
exceptionEmitters.remove(emitter);
}
});
Expand Down