Skip to content

Commit

Permalink
Added max limit on session pool size
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhen Li committed Feb 10, 2016
1 parent 655921d commit 934c855
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public static void GlobalBeforeScenario()
try { AfterScenario(); } catch { /*Do Nothing*/ }
throw;
}
Driver = GraphDatabase.Driver(Url);
var config = Config.DefaultConfig;
config.MaxSessionPoolSize = Config.InfiniteSessionPoolSize;
Driver = GraphDatabase.Driver(Url, config);
}

[AfterTestRun]
Expand Down
21 changes: 19 additions & 2 deletions Neo4j.Driver/Neo4j.Driver.Tests/SessionPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using Moq;
using Neo4j.Driver.Exceptions;
using Neo4j.Driver.Internal;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -38,11 +39,26 @@ public GetSessionMethod(ITestOutputHelper output)
_output = output;
}

[Fact]
public void ShouldThrowExceptionWhenMaximumPoolSizeReached()
{
var mock = new Mock<IConnection>();
var config = new Config {MaxSessionPoolSize = 2};
var pool = new SessionPool(null, TestUri, config, mock.Object);
pool.GetSession();
pool.GetSession();
pool.NumberOfAvailableSessions.Should().Be(0);
pool.NumberOfInUseSessions.Should().Be(2);

var ex = Xunit.Record.Exception(() => pool.GetSession());
ex.Should().BeOfType<ClientException>();
}

[Fact]
public void ShouldCreateNewSessionWhenQueueIsEmpty()
{
var mock = new Mock<IConnection>();
var pool = new SessionPool(null, TestUri, null, mock.Object);
var pool = new SessionPool(null, TestUri, Config.DefaultConfig, mock.Object);
pool.GetSession();
pool.NumberOfAvailableSessions.Should().Be(0);
pool.NumberOfInUseSessions.Should().Be(1);
Expand Down Expand Up @@ -120,7 +136,7 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
sessions.Enqueue(mock.Object);
mockSessions.Enqueue(mock);
}

var pool = new SessionPool(sessions, null);

pool.NumberOfAvailableSessions.Should().Be(numberOfThreads);
Expand Down Expand Up @@ -161,6 +177,7 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
{
mock.Verify(x => x.Reset(), Times.Once);
}

}


Expand Down
25 changes: 19 additions & 6 deletions Neo4j.Driver/Neo4j.Driver/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ namespace Neo4j.Driver
/// <summary>
/// Use this class to config the <see cref="Driver"/> in a certain way
/// </summary>
public class Config
public class Config
{
private Config() { }

public const int InfiniteSessionPoolSize = 0;
static Config()
{
DefaultConfig = new Config { TlsEnabled = false, Logger = new DebugLogger {Level = LogLevel.Info} };
DefaultConfig = new Config
{
TlsEnabled = false,
Logger = new DebugLogger {Level = LogLevel.Info},
MaxSessionPoolSize = 20
};
}

/// <summary>
Expand All @@ -37,9 +41,11 @@ static Config()

public static IConfigBuilder Builder => new ConfigBuilder(new Config());

public bool TlsEnabled { get; private set; }
public bool TlsEnabled { get; set; }

public ILogger Logger { get; private set; }
public ILogger Logger { get; set; }

public int MaxSessionPoolSize { get; set; }

private class ConfigBuilder : IConfigBuilder
{
Expand All @@ -62,6 +68,12 @@ public IConfigBuilder WithLogger(ILogger logger)
return this;
}

public IConfigBuilder WithMaxSessionPoolSize(int size)
{
_config.MaxSessionPoolSize = size;
return this;
}

public Config ToConfig()
{
return _config;
Expand All @@ -73,6 +85,7 @@ public interface IConfigBuilder
Config ToConfig();
IConfigBuilder WithTlsEnabled(bool enableTls);
IConfigBuilder WithLogger(ILogger logger);
IConfigBuilder WithMaxSessionPoolSize(int size);
}


Expand Down
1 change: 1 addition & 0 deletions Neo4j.Driver/Neo4j.Driver/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ public ISession Session()
{
return _sessionPool.GetSession();
}

}
}
146 changes: 84 additions & 62 deletions Neo4j.Driver/Neo4j.Driver/Internal/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// limitations under the License.
using System;
using System.Collections.Generic;
using System.Threading;
using Neo4j.Driver.Exceptions;

namespace Neo4j.Driver.Internal
{
Expand All @@ -26,6 +28,8 @@ internal class SessionPool : LoggerBase
private readonly Uri _uri;
private readonly Config _config;
private readonly IConnection _connection;
private readonly int _maxSessionPoolSize;
private int _currentPoolSize;

internal int NumberOfInUseSessions => _inUseSessions.Count;
internal int NumberOfAvailableSessions => _availableSessions.Count;
Expand All @@ -35,77 +39,92 @@ public SessionPool(ILogger logger, Uri uri, Config config, IConnection connectio
_uri = uri;
_config = config;
_connection = connection;
_maxSessionPoolSize = config.MaxSessionPoolSize;
}

internal SessionPool(
Queue<IPooledSession> availableSessions,
Dictionary<Guid, IPooledSession> inUseDictionary,
Uri uri = null,
IConnection connection = null,
ILogger logger = null)
: this(logger, uri, null, connection)
Queue<IPooledSession> availableSessions,
Dictionary<Guid, IPooledSession> inUseDictionary,
Uri uri = null,
IConnection connection = null,
ILogger logger = null)
: this(logger, uri, Config.DefaultConfig, connection)
{
_availableSessions = availableSessions ?? new Queue<IPooledSession>();
_inUseSessions = inUseDictionary ?? new Dictionary<Guid, IPooledSession>();
}

public ISession GetSession()
{
IPooledSession session = null;
lock (_availableSessions)
return TryExecute(() =>
{
if(_availableSessions.Count != 0)
session = _availableSessions.Dequeue();
}
IPooledSession session = null;
lock (_availableSessions)
{
if (_availableSessions.Count != 0)
session = _availableSessions.Dequeue();
}

if (session == null)
{
session = new Session(_uri, _config, _connection, Release);
if (_maxSessionPoolSize > Config.InfiniteSessionPoolSize && _currentPoolSize >= _maxSessionPoolSize)
{
throw new ClientException($"Maximum session pool size ({_maxSessionPoolSize}) reached.");
}

if (session == null)
{
session = new Session(_uri, _config, _connection, Release);
Interlocked.Increment(ref _currentPoolSize);
lock (_inUseSessions)
{
_inUseSessions.Add(session.Id, session);
}
return session;
}

if (!session.IsHealthy())
{
session.Close();
Interlocked.Decrement(ref _currentPoolSize);
return GetSession();
}

session.Reset();
lock (_inUseSessions)
{
_inUseSessions.Add(session.Id, session);
}
return session;
}

if (!session.IsHealthy())
{
session.Close();
return GetSession();
}

session.Reset();
lock (_inUseSessions)
{
_inUseSessions.Add(session.Id, session);
}
return session;
});
}

public void Release(Guid sessionId)
{
IPooledSession session;
lock (_inUseSessions)
TryExecute(() =>
{
if (!_inUseSessions.ContainsKey(sessionId))
IPooledSession session;
lock (_inUseSessions)
{
return;
}
if (!_inUseSessions.ContainsKey(sessionId))
{
return;
}

session = _inUseSessions[sessionId];
_inUseSessions.Remove(sessionId);
}
session = _inUseSessions[sessionId];
_inUseSessions.Remove(sessionId);
}

if (session.IsHealthy())
{
lock (_availableSessions)
_availableSessions.Enqueue(session);
}
else
{
//release resources by session
session.Close();
}
if (session.IsHealthy())
{
lock (_availableSessions)
_availableSessions.Enqueue(session);
}
else
{
Interlocked.Decrement(ref _currentPoolSize);
//release resources by session
session.Close();
}
});
}

protected override void Dispose(bool isDisposing)
Expand All @@ -115,30 +134,33 @@ protected override void Dispose(bool isDisposing)
return;
}

lock (_inUseSessions)
{
var sessions = new List<IPooledSession>(_inUseSessions.Values);
_inUseSessions.Clear();
foreach (var inUseSession in sessions)
TryExecute(() =>
{
lock (_inUseSessions)
{
Logger?.Info($"Disposing In Use Session {inUseSession.Id}");
inUseSession.Close();
var sessions = new List<IPooledSession>(_inUseSessions.Values);
_inUseSessions.Clear();
foreach (var inUseSession in sessions)
{
Logger?.Info($"Disposing In Use Session {inUseSession.Id}");
inUseSession.Close();
}
}
}
lock (_availableSessions)
{
while (_availableSessions.Count > 0)
lock (_availableSessions)
{
var session = _availableSessions.Dequeue();
Logger?.Info($"Disposing Available Session {session.Id}");
session.Close();
while (_availableSessions.Count > 0)
{
var session = _availableSessions.Dequeue();
Logger?.Info($"Disposing Available Session {session.Id}");
session.Close();
}
}
}

});
base.Dispose(true);
}
}


public interface IPooledSession : ISession
{
Guid Id { get; }
Expand Down

0 comments on commit 934c855

Please sign in to comment.