diff --git a/source/Zyan.Communication/Delegates/DynamicEventWireBase.cs b/source/Zyan.Communication/Delegates/DynamicEventWireBase.cs index 2ee3fea1..b1e14116 100644 --- a/source/Zyan.Communication/Delegates/DynamicEventWireBase.cs +++ b/source/Zyan.Communication/Delegates/DynamicEventWireBase.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Reflection; +using System.Threading; using Zyan.Communication.Threading; using Zyan.Communication.Toolbox; using Zyan.Communication.Toolbox.Diagnostics; @@ -18,6 +19,11 @@ internal abstract class DynamicEventWireBase : DynamicWireBase /// public Func ValidateSession { get; set; } + /// + /// Asynchronous event invocation handler. + /// + public Action QueueEventInvocation { get; set; } + /// /// Gets or sets the method to cancel subscription. /// @@ -33,20 +39,6 @@ internal abstract class DynamicEventWireBase : DynamicWireBase /// public IEventFilter EventFilter { get; set; } - private Lazy EventThreadPool { get; } = - new Lazy(() => new SimpleLockThreadPool(1)); - - public override void Dispose() - { - // stop event routing - if (EventThreadPool.IsValueCreated) - { - EventThreadPool.Value.Stop(); - } - - base.Dispose(); - } - /// /// Invokes client event handler. /// If the handler throws an exception, event subsription is cancelled. @@ -55,7 +47,7 @@ public override void Dispose() /// Event handler return value. protected override object InvokeClientDelegate(params object[] args) { - if (IsDisposed || Canceled) + if (Canceled) { return null; } @@ -66,9 +58,8 @@ protected override object InvokeClientDelegate(params object[] args) return SyncInvokeClientDelegate(args); } - // every dynamic event wire has its own invocation queue - // so that a broken remote client can't spoil the whole party - EventThreadPool.Value.QueueUserWorkItem(x => SyncInvokeClientDelegate(args)); + // by default, events are triggered asynchronously + QueueEventInvocation?.Invoke(x => SyncInvokeClientDelegate(args)); return null; } diff --git a/source/Zyan.Communication/Delegates/DynamicWireBase.cs b/source/Zyan.Communication/Delegates/DynamicWireBase.cs index d90ef071..159e37ec 100644 --- a/source/Zyan.Communication/Delegates/DynamicWireBase.cs +++ b/source/Zyan.Communication/Delegates/DynamicWireBase.cs @@ -6,19 +6,8 @@ namespace Zyan.Communication.Delegates /// /// Base class for dynamic wires. /// - internal abstract class DynamicWireBase : IDisposable + internal abstract class DynamicWireBase { - /// - public virtual void Dispose() - { - IsDisposed = true; - } - - /// - /// Gets or sets a value indicating whether this instance is disposed. - /// - protected bool IsDisposed { get; set; } - /// /// Client delegate interceptor. /// diff --git a/source/Zyan.Communication/ServerSession.cs b/source/Zyan.Communication/ServerSession.cs index a2e7eb2f..aa1c6e16 100644 --- a/source/Zyan.Communication/ServerSession.cs +++ b/source/Zyan.Communication/ServerSession.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; using System.Security.Principal; +using System.Threading; using Zyan.Communication.Delegates; using Zyan.Communication.SessionMgmt; +using Zyan.Communication.Threading; using Zyan.Communication.Toolbox; namespace Zyan.Communication @@ -24,6 +26,10 @@ public class ServerSession [NonSerialized] private SessionVariableAdapter _sessionVariableAdapter = null; + // Remote event invocation queue (server-side only). + [NonSerialized] + private SimpleLockThreadPool _eventInvocationQueue; + /// /// Creates a new instance of the ServerSession class. /// @@ -37,6 +43,13 @@ internal ServerSession(Guid sessionID, DateTime timestamp, IIdentity identity, S _sessionID = sessionID; _identity = identity; _sessionVariableAdapter = sessionVariableAdapter; + + // every server session has its own invocation queue + // so that a disconnected client can't affect the other clients + _eventInvocationQueue = new SimpleLockThreadPool + { + WorkerThreadName = $"Events for session {sessionID}/{identity?.Name}", + }; } /// @@ -72,6 +85,18 @@ public string ClientAddress set => _clientAddress = value; } + /// + /// Enqueues an event invocation for this session. + /// + internal void QueueEventInvocation(WaitCallback callback) => + _eventInvocationQueue.QueueUserWorkItem(callback); + + /// + /// Stops the event invocation thread pool worker threads. + /// + internal void StopEventInvocations() => + _eventInvocationQueue.Stop(); + /// /// Gets the remote subscription tracker. /// diff --git a/source/Zyan.Communication/SessionMgmt/SessionManagerBase.cs b/source/Zyan.Communication/SessionMgmt/SessionManagerBase.cs index ea257b3b..e2ac63e8 100644 --- a/source/Zyan.Communication/SessionMgmt/SessionManagerBase.cs +++ b/source/Zyan.Communication/SessionMgmt/SessionManagerBase.cs @@ -242,6 +242,7 @@ public void TerminateSession(Guid sessionID) timestamp = session.Timestamp; clientAddress = session.ClientAddress; identity = session.Identity; + session.StopEventInvocations(); } RemoveSession(sessionID); diff --git a/source/Zyan.Communication/ZyanDispatcher.cs b/source/Zyan.Communication/ZyanDispatcher.cs index f2eaa36a..4334a717 100644 --- a/source/Zyan.Communication/ZyanDispatcher.cs +++ b/source/Zyan.Communication/ZyanDispatcher.cs @@ -80,6 +80,7 @@ private void CreateClientServerWires(Type type, EventStub eventStub, IEnumerable var sessionId = currentSession.SessionID; var sessionManager = _host.SessionManager; dynamicEventWire.ValidateSession = () => sessionManager.ExistSession(sessionId); + dynamicEventWire.QueueEventInvocation = currentSession.QueueEventInvocation; dynamicEventWire.CancelSubscription = ex => { lock (wiringList) @@ -148,11 +149,6 @@ private void RemoveClientServerWires(Type type, EventStub eventStub, IEnumerable dynamicWire = DynamicWireFactory.GetDynamicWire(dynamicWireDelegate); } - if (dynamicWire != null) - { - dynamicWire.Dispose(); - } - _host.OnSubscriptionRemoved(new SubscriptionEventArgs { ComponentType = type, @@ -757,6 +753,7 @@ public void Logoff(Guid sessionID) session.ClientAddress = clientAddress; identity = session.Identity; timestamp = session.Timestamp; + session.StopEventInvocations(); } _host.SessionManager.SetCurrentSession(session); diff --git a/source/Zyan.Tests/EventsTests.cs b/source/Zyan.Tests/EventsTests.cs index 08bf0f8d..24883a2a 100644 --- a/source/Zyan.Tests/EventsTests.cs +++ b/source/Zyan.Tests/EventsTests.cs @@ -667,5 +667,43 @@ public void EventsOnSingleCallComponentsWorkGlobally() Assert.IsFalse(proxy3handled); } } + + [TestMethod] + public void SessionEventsAreRoutedByTheirOwnThreads() + { + var host = CreateZyanHost(5432, "SecondServer"); + var conn1 = CreateZyanConnection(5432, "SecondServer"); + var proxy1 = conn1.CreateProxy("Singleton2"); + var conn2 = CreateZyanConnection(5432, "SecondServer"); + var proxy2 = conn2.CreateProxy("Singleton2"); + + var handledCounter = 0; + EventHandler getEventHandler(Guid sessionId) => (s, e) => + { + handledCounter++; + + // note: this stuff works only on NullChannel because the client + // thread where the event is raised is the same server thread + var threadName = Thread.CurrentThread.Name; + Assert.IsTrue(threadName.Contains(sessionId.ToString())); + }; + + proxy1.TestEvent += getEventHandler(conn1.SessionID); + proxy2.TestEvent += getEventHandler(conn2.SessionID); + + ZyanSettings.LegacyBlockingEvents = false; + proxy1.RaiseTestEvent(); + proxy2.RaiseTestEvent(); + Thread.Sleep(300); + + ZyanSettings.LegacyBlockingEvents = true; + Assert.AreEqual(4, handledCounter); // each proxy handles 2 events, so 2 * 2 = 4 + + // Note: dispose connection before the host + // so that it can unsubscribe from all remove events + conn1.Dispose(); + conn2.Dispose(); + host.Dispose(); + } } }