Skip to content

Commit

Permalink
Event invocation queue now belongs to the server session, zyanfx#74.
Browse files Browse the repository at this point in the history
  • Loading branch information
yallie authored and Mikhail Kanygin committed Sep 2, 2020
1 parent a917879 commit a694810
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 35 deletions.
27 changes: 9 additions & 18 deletions source/Zyan.Communication/Delegates/DynamicEventWireBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using Zyan.Communication.Threading;
using Zyan.Communication.Toolbox;
using Zyan.Communication.Toolbox.Diagnostics;
Expand All @@ -18,6 +19,11 @@ internal abstract class DynamicEventWireBase : DynamicWireBase
/// </summary>
public Func<bool> ValidateSession { get; set; }

/// <summary>
/// Asynchronous event invocation handler.
/// </summary>
public Action<WaitCallback> QueueEventInvocation { get; set; }

/// <summary>
/// Gets or sets the method to cancel subscription.
/// </summary>
Expand All @@ -33,20 +39,6 @@ internal abstract class DynamicEventWireBase : DynamicWireBase
/// </summary>
public IEventFilter EventFilter { get; set; }

private Lazy<SimpleLockThreadPool> EventThreadPool { get; } =
new Lazy<SimpleLockThreadPool>(() => new SimpleLockThreadPool(1));

public override void Dispose()
{
// stop event routing
if (EventThreadPool.IsValueCreated)
{
EventThreadPool.Value.Stop();
}

base.Dispose();
}

/// <summary>
/// Invokes client event handler.
/// If the handler throws an exception, event subsription is cancelled.
Expand All @@ -55,7 +47,7 @@ public override void Dispose()
/// <returns>Event handler return value.</returns>
protected override object InvokeClientDelegate(params object[] args)
{
if (IsDisposed || Canceled)
if (Canceled)
{
return null;
}
Expand All @@ -66,9 +58,8 @@ protected override object InvokeClientDelegate(params object[] args)
return SyncInvokeClientDelegate(args);
}

// every dynamic event wire has its own invocation queue
// so that a broken remote client can't spoil the whole party
EventThreadPool.Value.QueueUserWorkItem(x => SyncInvokeClientDelegate(args));
// by default, events are triggered asynchronously
QueueEventInvocation?.Invoke(x => SyncInvokeClientDelegate(args));
return null;
}

Expand Down
13 changes: 1 addition & 12 deletions source/Zyan.Communication/Delegates/DynamicWireBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,8 @@ namespace Zyan.Communication.Delegates
/// <summary>
/// Base class for dynamic wires.
/// </summary>
internal abstract class DynamicWireBase : IDisposable
internal abstract class DynamicWireBase
{
/// <inheritdoc/>
public virtual void Dispose()
{
IsDisposed = true;
}

/// <summary>
/// Gets or sets a value indicating whether this instance is disposed.
/// </summary>
protected bool IsDisposed { get; set; }

/// <summary>
/// Client delegate interceptor.
/// </summary>
Expand Down
25 changes: 25 additions & 0 deletions source/Zyan.Communication/ServerSession.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Security.Principal;
using System.Threading;
using Zyan.Communication.Delegates;
using Zyan.Communication.SessionMgmt;
using Zyan.Communication.Threading;
using Zyan.Communication.Toolbox;

namespace Zyan.Communication
Expand All @@ -24,6 +26,10 @@ public class ServerSession
[NonSerialized]
private SessionVariableAdapter _sessionVariableAdapter = null;

// Remote event invocation queue (server-side only).
[NonSerialized]
private SimpleLockThreadPool _eventInvocationQueue;

/// <summary>
/// Creates a new instance of the ServerSession class.
/// </summary>
Expand All @@ -37,6 +43,13 @@ internal ServerSession(Guid sessionID, DateTime timestamp, IIdentity identity, S
_sessionID = sessionID;
_identity = identity;
_sessionVariableAdapter = sessionVariableAdapter;

// every server session has its own invocation queue
// so that a disconnected client can't affect the other clients
_eventInvocationQueue = new SimpleLockThreadPool
{
WorkerThreadName = $"Events for session {sessionID}/{identity?.Name}",
};
}

/// <summary>
Expand Down Expand Up @@ -72,6 +85,18 @@ public string ClientAddress
set => _clientAddress = value;
}

/// <summary>
/// Enqueues an event invocation for this session.
/// </summary>
internal void QueueEventInvocation(WaitCallback callback) =>
_eventInvocationQueue.QueueUserWorkItem(callback);

/// <summary>
/// Stops the event invocation thread pool worker threads.
/// </summary>
internal void StopEventInvocations() =>
_eventInvocationQueue.Stop();

/// <summary>
/// Gets the remote subscription tracker.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public void TerminateSession(Guid sessionID)
timestamp = session.Timestamp;
clientAddress = session.ClientAddress;
identity = session.Identity;
session.StopEventInvocations();
}

RemoveSession(sessionID);
Expand Down
7 changes: 2 additions & 5 deletions source/Zyan.Communication/ZyanDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private void CreateClientServerWires(Type type, EventStub eventStub, IEnumerable
var sessionId = currentSession.SessionID;
var sessionManager = _host.SessionManager;
dynamicEventWire.ValidateSession = () => sessionManager.ExistSession(sessionId);
dynamicEventWire.QueueEventInvocation = currentSession.QueueEventInvocation;
dynamicEventWire.CancelSubscription = ex =>
{
lock (wiringList)
Expand Down Expand Up @@ -148,11 +149,6 @@ private void RemoveClientServerWires(Type type, EventStub eventStub, IEnumerable
dynamicWire = DynamicWireFactory.GetDynamicWire(dynamicWireDelegate);
}

if (dynamicWire != null)
{
dynamicWire.Dispose();
}

_host.OnSubscriptionRemoved(new SubscriptionEventArgs
{
ComponentType = type,
Expand Down Expand Up @@ -757,6 +753,7 @@ public void Logoff(Guid sessionID)
session.ClientAddress = clientAddress;
identity = session.Identity;
timestamp = session.Timestamp;
session.StopEventInvocations();
}

_host.SessionManager.SetCurrentSession(session);
Expand Down
38 changes: 38 additions & 0 deletions source/Zyan.Tests/EventsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -667,5 +667,43 @@ public void EventsOnSingleCallComponentsWorkGlobally()
Assert.IsFalse(proxy3handled);
}
}

[TestMethod]
public void SessionEventsAreRoutedByTheirOwnThreads()
{
var host = CreateZyanHost(5432, "SecondServer");
var conn1 = CreateZyanConnection(5432, "SecondServer");
var proxy1 = conn1.CreateProxy<ISampleServer>("Singleton2");
var conn2 = CreateZyanConnection(5432, "SecondServer");
var proxy2 = conn2.CreateProxy<ISampleServer>("Singleton2");

var handledCounter = 0;
EventHandler getEventHandler(Guid sessionId) => (s, e) =>
{
handledCounter++;

// note: this stuff works only on NullChannel because the client
// thread where the event is raised is the same server thread
var threadName = Thread.CurrentThread.Name;
Assert.IsTrue(threadName.Contains(sessionId.ToString()));
};

proxy1.TestEvent += getEventHandler(conn1.SessionID);
proxy2.TestEvent += getEventHandler(conn2.SessionID);

ZyanSettings.LegacyBlockingEvents = false;
proxy1.RaiseTestEvent();
proxy2.RaiseTestEvent();
Thread.Sleep(300);

ZyanSettings.LegacyBlockingEvents = true;
Assert.AreEqual(4, handledCounter); // each proxy handles 2 events, so 2 * 2 = 4

// Note: dispose connection before the host
// so that it can unsubscribe from all remove events
conn1.Dispose();
conn2.Dispose();
host.Dispose();
}
}
}

0 comments on commit a694810

Please sign in to comment.