Skip to content

Commit

Permalink
Debounce remote event reconnection, zyanfx#3.
Browse files Browse the repository at this point in the history
  • Loading branch information
yallie committed Feb 21, 2019
1 parent 107e0da commit bb4220c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 36 deletions.
51 changes: 28 additions & 23 deletions source/Zyan.Communication/ZyanConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ public ZyanConnection(string serverUrl, IClientProtocolSetup protocolSetup, Auth
throw ex.PreserveStackTrace();
}

var reconnectEvents = new Action(ReconnectRemoteEventsCore);
var debounceInterval = ZyanSettings.ReconnectRemoteEventsDebounceInterval.TotalMilliseconds;
ReconnectRemoteEvents = reconnectEvents.Debounce((int)debounceInterval);

StartKeepSessionAliveTimer();
lock (_connections)
{
Expand Down Expand Up @@ -983,6 +987,16 @@ public void RefreshRegisteredComponents()
_registeredComponents = new List<ComponentInfo>(RemoteDispatcher.GetRegisteredComponents());
}

/// <summary>
/// Gets the remote events lock object.
/// </summary>
internal object RemoteEventsLock { get; } = new object();

/// <summary>
/// Reconnects to all remote events or delegates after a server restart.
/// </summary>
private Action ReconnectRemoteEvents { get; }

/// <summary>
/// Reconnects to all remote events or delegates of any known proxy for this connection, after a server restart.
/// </summary>
Expand All @@ -991,36 +1005,27 @@ public void RefreshRegisteredComponents()
/// </remarks>
private void ReconnectRemoteEventsCore()
{
var subscriptions = AliveProxies.Select(p => p.GetActiveSubscriptions()).Where(s => !s.DelegateCorrelationSet.IsNullOrEmpty()).ToArray();
var correlationSets = subscriptions.SelectMany(s => s.DelegateCorrelationSet).ToArray();
PrepareCallContext(false);
RemoteDispatcher.ReconnectEventHandlers(subscriptions);
_localSubscriptionTracker.Reset(correlationSets);
}

/// <summary>
/// Reconnects to all remote events or delegates after a server restart.
/// </summary>
private void ReconnectRemoteEvents()
{
if (!ZyanSettings.LegacyAsyncResubscriptions)
if (_isDisposed)
{
ReconnectRemoteEventsCore();
return;
}

// legacy method is unsafe as it silently swallows the exceptions
ThreadPool.QueueUserWorkItem(x =>
try
{
try
{
ReconnectRemoteEventsCore();
}
catch (Exception ex)
lock (RemoteEventsLock)
{
Trace.WriteLine("Error while restoring client subscriptions: {0}", ex);
var subscriptions = AliveProxies.Select(p => p.GetActiveSubscriptions()).Where(s => !s.DelegateCorrelationSet.IsNullOrEmpty()).ToArray();
var correlationSets = subscriptions.SelectMany(s => s.DelegateCorrelationSet).ToArray();
_localSubscriptionTracker.Reset(correlationSets);

PrepareCallContext(false);
RemoteDispatcher.ReconnectEventHandlers(subscriptions);
}
});
}
catch (Exception ex)
{
Trace.WriteLine("Error while restoring client subscriptions: {0}", ex);
}
}

/// <summary>
Expand Down
18 changes: 12 additions & 6 deletions source/Zyan.Communication/ZyanProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,11 @@ private void AddRemoteEventHandlers(List<DelegateCorrelationInfo> correlationSet
if (count > 0)
{
_connection.TrackRemoteSubscriptions(correlationSet);
_connection.PrepareCallContext(false);
_connection.RemoteDispatcher.AddEventHandlers(_interfaceType.FullName, correlationSet, _uniqueName);
lock (_connection.RemoteEventsLock)
{
_connection.PrepareCallContext(false);
_connection.RemoteDispatcher.AddEventHandlers(_interfaceType.FullName, correlationSet, _uniqueName);
}
}
}

Expand All @@ -432,10 +435,13 @@ private void RemoveRemoteEventHandlers(List<DelegateCorrelationInfo> correlation

var count = correlationSet.Count;
if (count > 0)
{
_connection.UntrackRemoteSubscriptions(correlationSet);
_connection.PrepareCallContext(false);
_connection.RemoteDispatcher.RemoveEventHandlers(_interfaceType.FullName, correlationSet, _uniqueName);
{
lock (_connection.RemoteEventsLock)
{
_connection.UntrackRemoteSubscriptions(correlationSet);
_connection.PrepareCallContext(false);
_connection.RemoteDispatcher.RemoveEventHandlers(_interfaceType.FullName, correlationSet, _uniqueName);
}
}
}

Expand Down
8 changes: 5 additions & 3 deletions source/Zyan.Communication/ZyanSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ public static class ZyanSettings
public static bool LegacyIgnoreDuplicateRegistrations { get; set; }

/// <summary>
/// Gets or sets a value indicating whether ZyanConnection restores subscriptions asynchronously.
/// Gets or sets a delay before ZyanConnection restores subscriptions.
/// </summary>
/// <remarks>
/// Zyan v2.11 and below used to restore missing event subscriptions asynchronously.
/// Zyan v2.11 and below used to restore missing event subscriptions asynchronously, without any delay.
/// Zyan v2.12 optionally debounces this method, zero interval means that subscriptions are restored synchronously.
/// This setting affects new ZyanConnection instances and doesn't change any existing connections.
/// </remarks>
public static bool LegacyAsyncResubscriptions { get; set; }
public static TimeSpan ReconnectRemoteEventsDebounceInterval { get; set; } = TimeSpan.FromSeconds(1);
}
}
58 changes: 54 additions & 4 deletions source/Zyan.Tests/EventsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ private static ZyanComponentHost CreateZyanHost(int port, string name)
return zyanHost;
}

private static ZyanConnection CreateZyanConnection(int port, string name)
private static ZyanConnection CreateZyanConnection(int port, string name, int debounceInterval = 0)
{
ZyanSettings.ReconnectRemoteEventsDebounceInterval = debounceInterval;
return new ZyanConnection("null://NullChannel:" + port + "/" + name, true);
}

Expand Down Expand Up @@ -254,6 +255,58 @@ public void ZyanHostSubscriptionRelatedEventsAreRaised()
ZyanHost.SubscriptionCanceled -= canceledHandler;
}

[TestMethod]
public void ExceptionInEventHandlerCancelsTheSubscription()
{
ZyanSettings.LegacyBlockingEvents = true;
ZyanSettings.LegacyBlockingSubscriptions = true;
ZyanSettings.LegacyUnprotectedEventHandlers = true;

var host = CreateZyanHost(5432, "ThirdServer");
var conn = CreateZyanConnection(5432, "ThirdServer", 1000);
var proxy = conn.CreateProxy<ISampleServer>("Singleton2");

var handled = false;
var eventHandler = new EventHandler((s, e) =>
{
handled = true;
throw new Exception();
});

proxy.TestEvent += eventHandler;

var subscriptionCanceled = false;
var clientSideException = default(Exception);
host.SubscriptionCanceled += (s, e) =>
{
subscriptionCanceled = true;
clientSideException = e.Exception;
};

var subscriptionsRestored = false;
var restoredHandler = new EventHandler((s, e) => subscriptionsRestored = true);
host.SubscriptionsRestored += restoredHandler;

// raise an event, catch exception and unsubscribe automatically
proxy.RaiseTestEvent();
Assert.IsTrue(handled);
Assert.IsTrue(subscriptionCanceled);
Assert.IsFalse(subscriptionsRestored);

// raise an event and check if it's ignored because reconnection isn't called yet
// due to the large debounce interval used in ZyanConnection
handled = false;
subscriptionsRestored = false;
proxy.RaiseTestEvent();
Assert.IsFalse(handled);
Assert.IsFalse(subscriptionsRestored);

// Note: dispose connection before the host
// so that it can unsubscribe from all remove events
conn.Dispose();
host.Dispose();
}

// The subscription gets restored on the next remote call to RaiseTestEvent:
// local subscription checksum doesn't match remote checksum => re-subscribe => ok!
[TestMethod]
Expand All @@ -262,7 +315,6 @@ public void ExceptionInEventHandlerCancelsSubscriptionButTheConnetionResubscribe
ZyanSettings.LegacyBlockingEvents = true;
ZyanSettings.LegacyBlockingSubscriptions = true;
ZyanSettings.LegacyUnprotectedEventHandlers = true;
ZyanSettings.LegacyAsyncResubscriptions = false;

var handled = false;
var eventHandler = new EventHandler((s, e) =>
Expand Down Expand Up @@ -308,11 +360,9 @@ public void ExceptionInEventHandlerCancelsSubscriptionButTheConnetionResubscribe
[TestMethod]
public void ZyanConnectionResubscribesAfterServerRestart()
{
Trace.WriteLine("ZyanConnectionResubscribesAfterServerRestart");
ZyanSettings.LegacyBlockingEvents = true;
ZyanSettings.LegacyBlockingSubscriptions = true;
ZyanSettings.LegacyUnprotectedEventHandlers = true;
ZyanSettings.LegacyAsyncResubscriptions = false;

var host = CreateZyanHost(5432, "SecondServer");
var conn = CreateZyanConnection(5432, "SecondServer");
Expand Down

0 comments on commit bb4220c

Please sign in to comment.