Skip to content

Commit

Permalink
Fix messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
bjester committed Jan 15, 2024
1 parent ed1d5e5 commit 2c1f664
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import android.os.RemoteException;
import android.util.Log;

import java9.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -23,10 +24,12 @@ public class WorkController {
private final Context context;
private Connection connection;
private Messenger messenger;
private CompletableFuture<Messenger> connected;
private final AtomicBoolean isConnected = new AtomicBoolean(false);

private WorkController(Context context) {
this.context = context;
this.connected = new CompletableFuture<>();
}

public static WorkController getInstance(Context context) {
Expand Down Expand Up @@ -109,6 +112,9 @@ protected void dispatch(Message message, int attempts) {
synchronized (isConnected) {
// If we're already connected, then it's awake
if (!isConnected.get()) {
if (connected.isDone()) {
connected = new CompletableFuture<>();
}
// Binding allows us to monitor the connection state
context.bindService(
new Intent(context, WorkControllerService.class),
Expand All @@ -118,21 +124,23 @@ protected void dispatch(Message message, int attempts) {
}
}

// Start the service with this intent
try {
messenger.send(message);
} catch (RemoteException e) {
// If the remote process has died, then we need to rebind
synchronized (isConnected) {
isConnected.set(false);
messenger = null;
}
if (attempts < 3) {
dispatch(message, attempts + 1);
connected.thenApply((messenger) -> {
try {
messenger.send(message);
} catch (RemoteException e) {
// If the remote process has died, then we need to rebind
synchronized (isConnected) {
isConnected.set(false);
messenger = null;
}
if (attempts < 3) {
dispatch(message, attempts + 1);
}
} catch (Exception e) {
Log.e(TAG, "Failed to send message " + message, e);
}
} catch (Exception e) {
Log.e(TAG, "Failed to send message " + message, e);
}
return messenger;
});
}

public class Connection implements ServiceConnection {
Expand All @@ -141,6 +149,7 @@ public void onServiceConnected(ComponentName name, IBinder service) {
synchronized (isConnected) {
isConnected.set(true);
messenger = new Messenger(service);
connected.complete(messenger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* initializing the work manager in the task worker process.
*/
public class WorkControllerService extends Service {
public static final String TAG = "Kolibri.MonitorService";
public static final String TAG = "Kolibri.WorkControllerService";
public static final int ACTION_WAKE = 1;
public static final int ACTION_SLEEP = 2;
public static final int ACTION_STOP = 3;
Expand Down Expand Up @@ -75,6 +75,7 @@ public void onDestroy() {
}

protected void onWake() {
Log.d(TAG, "Waking up work controller service");
synchronized (state) {
if (state.get() != State.AWAKE_LOW_MEMORY) {
state.set(State.AWAKE);
Expand All @@ -89,53 +90,65 @@ protected void onWake() {
}

startTask(() -> {
Log.d(TAG, "Binding to work manager service");
// Wakey wakey remote work manager service
bindService(workManagerIntent, connection, Context.BIND_AUTO_CREATE);
return null;
});
}

protected void onReconcile() {
startTask(() -> Task.reconcile(getApplicationContext(), executor)
.thenApply((r) -> {
if (r) {
Log.d(TAG, "Reconciliation task completed");
} else {
Log.d(TAG, "Reconciliation task failed");
}
return null;
}));
Log.d(TAG, "Enqueuing task reconciliation");
startTask(() -> {
Log.d(TAG, "Running task reconciliation");
return Task.reconcile(getApplicationContext(), executor)
.thenApply((r) -> {
if (r) {
Log.d(TAG, "Reconciliation task completed");
} else {
Log.d(TAG, "Reconciliation task failed");
}
return null;
});
});
}

protected void onSleep() {
Log.d(TAG, "Sleeping work controller service");
synchronized (state) {
state.set(State.SLEEPING);
}
synchronized (taskCount) {
if (taskCount.get() == 0) {
Log.d(TAG, "Stopping service due to no more tasks");
stopSelf();
} else {
Log.d(TAG, "Waiting for " + taskCount.get() + " tasks to complete");
}
}
}

protected void onStop() {
Log.d(TAG, "Stopping work controller service");
// should eventually call `onDestroy` and that will set the stopped state
stopSelf();
}

protected void startTask(WorkTask task) {
futureChain = futureChain.thenComposeAsync((nothing) -> {
try {
Log.d(TAG, "Running task");
CompletableFuture<Void> f = task.run();
if (f != null) {
return f;
}
} catch (Exception e) {
Log.e(TAG, "Failed running task", e);
} finally {
Log.d(TAG, "Task completed");
synchronized (taskCount) {
if (taskCount.decrementAndGet() == 0) {
Log.d(TAG, "Checking state for stopping service");
synchronized (state) {
if (state.get() != State.AWAKE) {
Log.d(TAG, "Stopping service due to no more tasks");
Expand Down

0 comments on commit 2c1f664

Please sign in to comment.