Skip to content

Commit

Permalink
feat(csharp/ExcelAddIn): Make connections in the background (#6011)
Browse files Browse the repository at this point in the history
The "Test Connection" and "Set Connection" button could delay your
progress because they were serialized on a single thread.

This can get annoying if you type in the wrong credentials, as computer
might take several seconds to figure that out. Even if you correct your
credentials in the meantime, you would have to wait for the original
connection request to resolve.

This PR changes the code so that it does that work on a background
thread.

Doing so also exposed a bug in my organization of Observables, so they
have been reorganized a little bit.

In particular I was trying to be clever by having one object observe two
different things and the logic wasn't quite right.
  • Loading branch information
kosak authored Sep 2, 2024
1 parent afbb84b commit 0900c08
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 205 deletions.
29 changes: 17 additions & 12 deletions csharp/ExcelAddIn/StateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,33 @@ public IDisposable SubscribeToDefaultCredentials(IObserver<StatusOr<CredentialsB

public IDisposable SubscribeToTableTriple(TableTriple descriptor, string filter,
IObserver<StatusOr<TableHandle>> observer) {
// There is a chain with three elements.
// The final observer (i.e. the argument to this method) will be a subscriber to a TableHandleProvider that we create here.
// That TableHandleProvider will in turn be a subscriber to a session.

// So:
// There is a chain with multiple elements:
//
// 1. Make a TableHandleProvider
// 2. Subscribe it to either the session provider named by the endpoint id
// or to the default session provider
// 3. Subscribe our observer to it
// 4. Return a dispose action that disposes both Subscribes
// 2. Make a ClientProvider
// 3. Subscribe the ClientProvider to either the session provider named by the endpoint id
// or to the default session provider
// 4. Subscribe the TableHandleProvider to the ClientProvider
// 4. Subscribe our observer to the TableHandleProvider
// 5. Return a dispose action that disposes all the needfuls.

var thp = new TableHandleProvider(WorkerThread, descriptor, filter);
var cp = new ClientProvider(WorkerThread, descriptor);

var disposer1 = descriptor.EndpointId == null ?
SubscribeToDefaultSession(thp) :
SubscribeToSession(descriptor.EndpointId, thp);
var disposer2 = thp.Subscribe(observer);
SubscribeToDefaultSession(cp) :
SubscribeToSession(descriptor.EndpointId, cp);
var disposer2 = cp.Subscribe(thp);
var disposer3 = thp.Subscribe(observer);

// The disposer for this needs to dispose both "inner" disposers.
return ActionAsDisposable.Create(() => {
// TODO(kosak): probably don't need to be on the worker thread here
WorkerThread.Invoke(() => {
var temp1 = Utility.Exchange(ref disposer1, null);
var temp2 = Utility.Exchange(ref disposer2, null);
var temp3 = Utility.Exchange(ref disposer3, null);
temp3?.Dispose();
temp2?.Dispose();
temp1?.Dispose();
});
Expand Down
53 changes: 40 additions & 13 deletions csharp/ExcelAddIn/factories/CredentialsDialogFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Providers;
using Deephaven.ExcelAddIn.Util;
using Deephaven.ExcelAddIn.ViewModels;
using ExcelAddIn.views;

Expand All @@ -23,25 +23,52 @@ void OnSetCredentialsButtonClicked() {
credentialsDialog!.Close();
}

// This is used to ignore the results from stale "Test Credentials" invocations
// and to only use the results from the latest. It is read and written from different
// threads so we protect it with a synchronization object.
var sharedTestCredentialsCookie = new SimpleAtomicReference<object>(new object());

void TestCredentials(CredentialsBase creds) {
// Make a unique sentinel object to indicate that this thread should be
// the one privileged to provide the system with the answer to the "Test
// Credentials" question. If the user doesn't press the button again,
// we will go ahead and provide our answer to the system. However, if the
// user presses the button again, triggering a new thread, then that
// new thread will usurp our privilege and it will be the one to provide
// the answer.
var localLatestTcc = new object();
sharedTestCredentialsCookie.Value = localLatestTcc;

var state = "OK";
try {
// This operation might take some time.
var temp = SessionBaseFactory.Create(creds, sm.WorkerThread);
temp.Dispose();
} catch (Exception ex) {
state = ex.Message;
}

// If sharedTestCredentialsCookie is still the same, then our privilege
// has not been usurped and we can provide our answer to the system.
// On the other hand, if it changes, then we will just throw away our work.
if (!ReferenceEquals(localLatestTcc, sharedTestCredentialsCookie.Value)) {
// Our results are moot. Dispose of them.
return;
}

// Our results are valid. Keep them and tell everyone about it.
credentialsDialog!.SetTestResultsBox(state);
}

void OnTestCredentialsButtonClicked() {
if (!cvm.TryMakeCredentials(out var newCreds, out var error)) {
ShowMessageBox(error);
return;
}

credentialsDialog!.SetTestResultsBox("Checking credentials");

sm.WorkerThread.Invoke(() => {
var state = "OK";
try {
var temp = SessionBaseFactory.Create(newCreds, sm.WorkerThread);
temp.Dispose();
} catch (Exception ex) {
state = ex.Message;
}

credentialsDialog!.SetTestResultsBox(state);
});
// Check credentials on its own thread
Utility.RunInBackground(() => TestCredentials(newCreds));
}

// Save in captured variable so that the lambdas can access it.
Expand Down
65 changes: 21 additions & 44 deletions csharp/ExcelAddIn/models/Session.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Deephaven.DeephavenClient.ExcelAddIn.Util;
using Deephaven.DeephavenClient;
using Deephaven.DeephavenClient;
using Deephaven.DheClient.Session;
using Deephaven.ExcelAddIn.Providers;
using Deephaven.ExcelAddIn.Util;

namespace Deephaven.ExcelAddIn.Models;
Expand All @@ -21,70 +19,49 @@ public abstract class SessionBase : IDisposable {
}

public sealed class CoreSession(Client client) : SessionBase {
public Client? Client = client;
private Client? _client = client;

public override T Visit<T>(Func<CoreSession, T> onCore, Func<CorePlusSession, T> onCorePlus) {
return onCore(this);
}

public override void Dispose() {
Utility.Exchange(ref Client, null)?.Dispose();
Utility.Exchange(ref _client, null)?.Dispose();
}

public Client Client {
get {
if (_client == null) {
throw new Exception("Object is disposed");
}

return _client;
}
}
}

public sealed class CorePlusSession(SessionManager sessionManager, WorkerThread workerThread) : SessionBase {
private SessionManager? _sessionManager = sessionManager;
private readonly Dictionary<PersistentQueryId, CorePlusClientProvider> _clientProviders = new();

public override T Visit<T>(Func<CoreSession, T> onCore, Func<CorePlusSession, T> onCorePlus) {
return onCorePlus(this);
}

public IDisposable SubscribeToPq(PersistentQueryId persistentQueryId,
IObserver<StatusOr<Client>> observer) {
if (_sessionManager == null) {
throw new Exception("Object has been disposed");
}

CorePlusClientProvider? cp = null;
IDisposable? disposer = null;

workerThread.Invoke(() => {
if (!_clientProviders.TryGetValue(persistentQueryId, out cp)) {
cp = CorePlusClientProvider.Create(workerThread, _sessionManager, persistentQueryId);
_clientProviders.Add(persistentQueryId, cp);
}

disposer = cp.Subscribe(observer);
});

return ActionAsDisposable.Create(() => {
workerThread.Invoke(() => {
var old = Utility.Exchange(ref disposer, null);
// Do nothing if caller Disposes me multiple times.
if (old == null) {
return;
}
old.Dispose();

// Slightly weird. If "old.Dispose()" has removed the last subscriber,
// then dispose it and remove it from our dictionary.
cp!.DisposeIfEmpty(() => _clientProviders.Remove(persistentQueryId));
});
});
}

public override void Dispose() {
if (workerThread.InvokeIfRequired(Dispose)) {
return;
}

var localCps = _clientProviders.Values.ToArray();
_clientProviders.Clear();
Utility.Exchange(ref _sessionManager, null)?.Dispose();
}

public SessionManager SessionManager {
get {
if (_sessionManager == null) {
throw new Exception("Object is disposed");
}

foreach (var cp in localCps) {
cp.Dispose();
return _sessionManager;
}
}
}
129 changes: 129 additions & 0 deletions csharp/ExcelAddIn/providers/ClientProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
using Deephaven.DeephavenClient;
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Util;
using Deephaven.DeephavenClient.ExcelAddIn.Util;
using Deephaven.DheClient.Session;

namespace Deephaven.ExcelAddIn.Providers;

internal class ClientProvider(
WorkerThread workerThread,
TableTriple descriptor) : IObserver<StatusOr<SessionBase>>, IObservable<StatusOr<Client>>, IDisposable {

private readonly ObserverContainer<StatusOr<Client>> _observers = new();
private StatusOr<Client> _client = StatusOr<Client>.OfStatus("[No Client]");
private DndClient? _ownedDndClient = null;

public IDisposable Subscribe(IObserver<StatusOr<Client>> observer) {
// We need to run this on our worker thread because we want to protect
// access to our dictionary.
workerThread.Invoke(() => {
_observers.Add(observer, out _);
observer.OnNext(_client);
});

return ActionAsDisposable.Create(() => {
workerThread.Invoke(() => {
_observers.Remove(observer, out _);
});
});
}

public void Dispose() {
DisposeClientState();
}

public void OnNext(StatusOr<SessionBase> session) {
// Get onto the worker thread if we're not already on it.
if (workerThread.InvokeIfRequired(() => OnNext(session))) {
return;
}

try {
// Dispose whatever state we had before.
DisposeClientState();

// If the new state is just a status message, make that our status and transmit to our observers
if (!session.GetValueOrStatus(out var sb, out var status)) {
_observers.SetAndSendStatus(ref _client, status);
return;
}

var pqId = descriptor.PersistentQueryId;

// New state is a Core or CorePlus Session.
_ = sb.Visit(coreSession => {
if (pqId != null) {
_observers.SetAndSendStatus(ref _client, "[PQ Id Not Valid for Community Core]");
return Unit.Instance;
}

// It's a Core session so we have our Client.
_observers.SetAndSendValue(ref _client, coreSession.Client);
return Unit.Instance; // Essentially a "void" value that is ignored.
}, corePlusSession => {
// It's a CorePlus session so subscribe us to its PQ observer for the appropriate PQ ID
// If no PQ id was provided, that's a problem
if (pqId == null) {
_observers.SetAndSendStatus(ref _client, "[PQ Id is Required]");
return Unit.Instance;
}

// Connect to the PQ on a separate thread
Utility.RunInBackground(() => ConnectToPq(corePlusSession.SessionManager, pqId));
return Unit.Instance;
});
} catch (Exception ex) {
_observers.SetAndSendStatus(ref _client, ex.Message);
}
}

/// <summary>
/// This is executed on a separate thread because it might take a while.
/// </summary>
/// <param name="sessionManager"></param>
/// <param name="pqId"></param>
private void ConnectToPq(SessionManager sessionManager, PersistentQueryId pqId) {
StatusOr<Client> result;
DndClient? dndClient = null;
try {
dndClient = sessionManager.ConnectToPqByName(pqId.Id, false);
result = StatusOr<Client>.OfValue(dndClient);
} catch (Exception ex) {
result = StatusOr<Client>.OfStatus(ex.Message);
}

// commit the results, but on the worker thread
workerThread.Invoke(() => {
// This should normally be null, but maybe there's a race.
var oldDndClient = Utility.Exchange(ref _ownedDndClient, dndClient);
_observers.SetAndSend(ref _client, result);

// Yet another thread
if (oldDndClient != null) {
Utility.RunInBackground(() => Utility.IgnoreExceptions(() => oldDndClient.Dispose()));
}
});
}

private void DisposeClientState() {
// Get onto the worker thread if we're not already on it.
if (workerThread.InvokeIfRequired(DisposeClientState)) {
return;
}

var oldClient = Utility.Exchange(ref _ownedDndClient, null);
if (oldClient != null) {
_observers.SetAndSendStatus(ref _client, "Disposing client");
oldClient.Dispose();
}
}

public void OnCompleted() {
throw new NotImplementedException();
}

public void OnError(Exception error) {
throw new NotImplementedException();
}
}
Loading

0 comments on commit 0900c08

Please sign in to comment.