Skip to content

Commit

Permalink
Added Dispose method on session pool
Browse files Browse the repository at this point in the history
Fixing failing build due to missing locking in test code
  • Loading branch information
Zhen Li committed Feb 10, 2016
1 parent bedfea7 commit 655921d
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 29 deletions.
97 changes: 83 additions & 14 deletions Neo4j.Driver/Neo4j.Driver.Tests/SessionPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Moq;
using Neo4j.Driver.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Neo4j.Driver.Tests
{
Expand All @@ -30,6 +31,13 @@ public class SessionPoolTests

public class GetSessionMethod
{
private readonly ITestOutputHelper _output;

public GetSessionMethod(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public void ShouldCreateNewSessionWhenQueueIsEmpty()
{
Expand All @@ -51,7 +59,7 @@ public void ShouldCreateNewSessionWhenQueueOnlyContainsUnhealthySessions()
unhealthyMock.Setup(x => x.Id).Returns(unhealthyId);

sessions.Enqueue(unhealthyMock.Object);
var pool = new SessionPool(sessions, TestUri,mock.Object );
var pool = new SessionPool(sessions,null, TestUri,mock.Object );

pool.NumberOfAvailableSessions.Should().Be(1);
pool.NumberOfInUseSessions.Should().Be(0);
Expand All @@ -75,7 +83,7 @@ public void ShouldReuseOldSessionWhenHealthySessionInQueue()
mock.Setup(x => x.IsHealthy()).Returns(true);

sessions.Enqueue(mock.Object);
var pool = new SessionPool(sessions);
var pool = new SessionPool(sessions, null);

pool.NumberOfAvailableSessions.Should().Be(1);
pool.NumberOfInUseSessions.Should().Be(0);
Expand All @@ -93,7 +101,7 @@ public void ShouldReuseOldSessionWhenHealthySessionInQueue()
[InlineData(2)]
[InlineData(5)]
[InlineData(10)]
[InlineData(50)]
[InlineData(500)]
public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
{
var ids = new List<Guid>();
Expand All @@ -112,8 +120,8 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
sessions.Enqueue(mock.Object);
mockSessions.Enqueue(mock);
}
var pool = new SessionPool(sessions);

var pool = new SessionPool(sessions, null);

pool.NumberOfAvailableSessions.Should().Be(numberOfThreads);
pool.NumberOfInUseSessions.Should().Be(0);
Expand All @@ -123,12 +131,20 @@ public void ShouldGetNewSessionsWhenBeingUsedConcurrentlyBy(int numberOfThreads)
var tasks = new Task[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
tasks[i] =
Task.Run(() =>
{
Task.Delay(500);
var session = pool.GetSession();
receivedIds.Add(((IPooledSession) session).Id);
int localI = i;
tasks[localI] =
Task.Run(() =>{
try
{
Task.Delay(500);
var session = pool.GetSession();
lock(receivedIds)
receivedIds.Add(((IPooledSession) session).Id);
}
catch (Exception ex)
{
_output.WriteLine($"Task[{localI}] died: {ex}");
}
});
}

Expand Down Expand Up @@ -160,7 +176,7 @@ public void ShouldReuseHealthySessionWhenHealthySessionInQueue()

sessions.Enqueue(unhealthyMock.Object);
sessions.Enqueue(healthyMock.Object);
var pool = new SessionPool(sessions);
var pool = new SessionPool(sessions, null);

pool.NumberOfAvailableSessions.Should().Be(2);
pool.NumberOfInUseSessions.Should().Be(0);
Expand Down Expand Up @@ -188,7 +204,7 @@ public void ShouldReturnToPoolWhenSessionIsHealthy()

var inUseSessions = new Dictionary<Guid, IPooledSession>();
inUseSessions.Add(id, mock.Object);
var pool = new SessionPool(inUseSessions);
var pool = new SessionPool(null, inUseSessions);

pool.NumberOfAvailableSessions.Should().Be(0);
pool.NumberOfInUseSessions.Should().Be(1);
Expand All @@ -208,7 +224,7 @@ public void ShouldCloseSessionWhenSessionIsUnhealthy()

var inUseSessions = new Dictionary<Guid, IPooledSession>();
inUseSessions.Add(id, mock.Object);
var pool = new SessionPool(inUseSessions);
var pool = new SessionPool(null, inUseSessions);

pool.NumberOfAvailableSessions.Should().Be(0);
pool.NumberOfInUseSessions.Should().Be(1);
Expand All @@ -220,5 +236,58 @@ public void ShouldCloseSessionWhenSessionIsUnhealthy()
mock.Verify(x=>x.Close(), Times.Once);
}
}

public class DisposeMethod
{
[Fact]
public void ShouldReleaseAll()
{
var mock = new Mock<IPooledSession>();
mock.Setup(x => x.IsHealthy()).Returns(true);
var id = Guid.NewGuid();
var inUseSessions = new Dictionary<Guid, IPooledSession>();
inUseSessions.Add(id, mock.Object);

var sessions = new Queue<IPooledSession>();
var mock1 = new Mock<IPooledSession>();
mock1.Setup(x => x.IsHealthy()).Returns(true);

sessions.Enqueue(mock1.Object);

var pool = new SessionPool(sessions, inUseSessions);
pool.NumberOfAvailableSessions.Should().Be(1);
pool.NumberOfInUseSessions.Should().Be(1);

pool.Dispose();

pool.NumberOfAvailableSessions.Should().Be(0);
pool.NumberOfInUseSessions.Should().Be(0);

}

[Fact]
public void ShouldLogInUseAndAvailableSessionIds()
{
var mockLogger = new Mock<ILogger>();
var mock = new Mock<IPooledSession>();
mock.Setup(x => x.IsHealthy()).Returns(true);
var id = Guid.NewGuid();
var inUseSessions = new Dictionary<Guid, IPooledSession>();
inUseSessions.Add(id, mock.Object);

var sessions = new Queue<IPooledSession>();
var mock1 = new Mock<IPooledSession>();
mock1.Setup(x => x.IsHealthy()).Returns(true);

sessions.Enqueue(mock1.Object);

var pool = new SessionPool(sessions, inUseSessions, logger: mockLogger.Object);

pool.Dispose();

mockLogger.Verify(x => x.Info(It.Is<string>(actual => actual.StartsWith("Disposing In Use"))), Times.Once);
mockLogger.Verify(x => x.Info(It.Is<string>(actual => actual.StartsWith("Disposing Available"))), Times.Once);
}
}
}
}
2 changes: 2 additions & 0 deletions Neo4j.Driver/Neo4j.Driver/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ protected override void Dispose(bool isDisposing)

if (!isDisposing)
return;
_sessionPool?.Dispose();
_sessionPool = null;
Logger?.Dispose();
}

Expand Down
67 changes: 52 additions & 15 deletions Neo4j.Driver/Neo4j.Driver/Internal/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ public SessionPool(ILogger logger, Uri uri, Config config, IConnection connectio
_connection = connection;
}

internal SessionPool(Queue<IPooledSession> availableSessions, Uri uri = null, IConnection connection = null)
: this(null, uri, null, connection)
internal SessionPool(
Queue<IPooledSession> availableSessions,
Dictionary<Guid, IPooledSession> inUseDictionary,
Uri uri = null,
IConnection connection = null,
ILogger logger = null)
: this(logger, uri, null, connection)
{
_availableSessions = availableSessions;
}
internal SessionPool(Dictionary<Guid, IPooledSession> inUseDictionary) : this(null, null, null, null)
{
_inUseSessions = inUseDictionary;
_availableSessions = availableSessions ?? new Queue<IPooledSession>();
_inUseSessions = inUseDictionary ?? new Dictionary<Guid, IPooledSession>();
}

public ISession GetSession()
Expand All @@ -59,7 +61,10 @@ public ISession GetSession()
if (session == null)
{
session = new Session(_uri, _config, _connection, Release);
_inUseSessions.Add(session.Id, session);
lock (_inUseSessions)
{
_inUseSessions.Add(session.Id, session);
}
return session;
}

Expand All @@ -70,25 +75,27 @@ public ISession GetSession()
}

session.Reset();
_inUseSessions.Add(session.Id, session);
lock (_inUseSessions)
{
_inUseSessions.Add(session.Id, session);
}
return session;
}

public void Release(Guid sessionId)
{
IPooledSession session;
if (!_inUseSessions.ContainsKey(sessionId))
{
return;
}

lock (_inUseSessions)
{
if (!_inUseSessions.ContainsKey(sessionId))
{
return;
}

session = _inUseSessions[sessionId];
_inUseSessions.Remove(sessionId);
}


if (session.IsHealthy())
{
lock (_availableSessions)
Expand All @@ -100,6 +107,36 @@ public void Release(Guid sessionId)
session.Close();
}
}

protected override void Dispose(bool isDisposing)
{
if (!isDisposing)
{
return;
}

lock (_inUseSessions)
{
var sessions = new List<IPooledSession>(_inUseSessions.Values);
_inUseSessions.Clear();
foreach (var inUseSession in sessions)
{
Logger?.Info($"Disposing In Use Session {inUseSession.Id}");
inUseSession.Close();
}
}
lock (_availableSessions)
{
while (_availableSessions.Count > 0)
{
var session = _availableSessions.Dequeue();
Logger?.Info($"Disposing Available Session {session.Id}");
session.Close();
}
}

base.Dispose(true);
}
}

public interface IPooledSession : ISession
Expand Down

0 comments on commit 655921d

Please sign in to comment.