From d06a6ab7ee86ef0d0132f9a7942684253068c95a Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Tue, 25 Oct 2016 16:41:31 +0200 Subject: [PATCH] Added basic error handling callback --- .../Tck/ErrorReportingSteps.cs | 2 +- .../Neo4j.Driver.Tests/ConnectionPoolTests.cs | 42 +++--- .../Connector/SocketConnectionTests.cs | 21 +-- .../Neo4j.Driver.Tests.csproj | 4 +- .../ClusterConnectionPoolTests.cs} | 130 ++++++++-------- .../ConcurrentRoundRobinSetTests.cs} | 45 +++--- .../Neo4j.Driver.Tests/SessionTests.cs | 16 +- .../Neo4j.Driver/Internal/ConnectionPool.cs | 28 ++-- .../Internal/Connector/IConnection.cs | 14 +- .../Connector/IConnectionErrorHandler.cs | 11 ++ .../Internal/Connector/PooledConnection.cs | 68 ++++++--- .../Internal/Connector/SocketConnection.cs | 34 ++++- .../Neo4j.Driver/Internal/DirectDriver.cs | 6 - .../Extensions/Neo4jErrorExtensions.cs | 35 +++++ .../Internal/Routing/ClusterConnectionPool.cs | 57 ++++--- .../Routing/ClusterDiscoveryManager.cs | 22 ++- .../Routing/ConcurrentRoundRobinSet.cs | 40 +---- .../Routing/IClusterConnectionPool.cs | 15 +- .../Internal/Routing/ILoadBalancer.cs | 4 +- .../Internal/Routing/RoundRobinClusterView.cs | 6 +- .../Routing/RoundRobinLoadBalancer.cs | 140 ++++++++++++++---- .../Internal/Routing/RoutingDriver.cs | 33 ++++- .../Internal/Routing/RoutingSession.cs | 48 ------ Neo4j.Driver/Neo4j.Driver/Internal/Session.cs | 40 ++--- Neo4j.Driver/Neo4j.Driver/Neo4j.Driver.csproj | 4 +- Neo4j.Driver/Neo4j.Driver/V1/GraphDatabase.cs | 10 +- Neo4j.Driver/Neo4j.Driver/V1/IDriver.cs | 2 +- .../Neo4j.Driver/V1/Neo4jException.cs | 6 +- 28 files changed, 506 insertions(+), 377 deletions(-) rename Neo4j.Driver/Neo4j.Driver.Tests/{ClusterServerPoolTests.cs => Routing/ClusterConnectionPoolTests.cs} (63%) rename Neo4j.Driver/Neo4j.Driver.Tests/{ConcurrentRoundRobinQueueTests.cs => Routing/ConcurrentRoundRobinSetTests.cs} (75%) create mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnectionErrorHandler.cs create mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Neo4jErrorExtensions.cs delete mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingSession.cs diff --git a/Neo4j.Driver/Neo4j.Driver.Tck.Tests/Tck/ErrorReportingSteps.cs b/Neo4j.Driver/Neo4j.Driver.Tck.Tests/Tck/ErrorReportingSteps.cs index 25739dca2..42f99e9e3 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tck.Tests/Tck/ErrorReportingSteps.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tck.Tests/Tck/ErrorReportingSteps.cs @@ -91,7 +91,7 @@ public void WhenISetUpADriverWithWrongScheme() { var ex = Xunit.Record.Exception(() => driver.Session()); ex.Should().BeOfType(); - ex.Message.Should().Be("Unsupported protocol: http"); + ex.Message.Should().Be("Unsupported URI scheme: http"); } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs index d0acf6ea8..d269e2800 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/ConnectionPoolTests.cs @@ -37,7 +37,7 @@ private IConnection MockedConnection get { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); return mock.Object; } } @@ -116,7 +116,7 @@ public void ShouldCreateNewWhenQueueOnlyContainsUnhealthyConnections() var conns = new Queue(); var unhealthyId = Guid.NewGuid(); var unhealthyMock = new Mock(); - unhealthyMock.Setup(x => x.IsHealthy).Returns(false); + unhealthyMock.Setup(x => x.IsOpen).Returns(false); unhealthyMock.Setup(x => x.Id).Returns(unhealthyId); conns.Enqueue(unhealthyMock.Object); @@ -129,7 +129,7 @@ public void ShouldCreateNewWhenQueueOnlyContainsUnhealthyConnections() pool.NumberOfAvailableConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); - unhealthyMock.Verify(x => x.IsHealthy, Times.Once); + unhealthyMock.Verify(x => x.IsOpen, Times.Once); unhealthyMock.Verify(x => x.Close(), Times.Once); conn.Should().NotBeNull(); @@ -141,7 +141,7 @@ public void ShouldReuseOldWhenReusableConnectionInQueue() { var conns = new Queue(); var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); conns.Enqueue(mock.Object); var pool = new ConnectionPool(MockedConnection, conns); @@ -153,7 +153,7 @@ public void ShouldReuseOldWhenReusableConnectionInQueue() pool.NumberOfAvailableConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(1); - mock.Verify(x => x.IsHealthy, Times.Once); + mock.Verify(x => x.IsOpen, Times.Once); conn.Should().Be(mock.Object); } @@ -162,9 +162,9 @@ public void ShouldReuseReusableWhenReusableConnectionInQueue() { var conns = new Queue(); var healthyMock = new Mock(); - healthyMock.Setup(x => x.IsHealthy).Returns(true); + healthyMock.Setup(x => x.IsOpen).Returns(true); var unhealthyMock = new Mock(); - unhealthyMock.Setup(x => x.IsHealthy).Returns(false); + unhealthyMock.Setup(x => x.IsOpen).Returns(false); conns.Enqueue(unhealthyMock.Object); conns.Enqueue(healthyMock.Object); @@ -201,7 +201,7 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) for (var i = 0; i < numberOfThreads; i++) { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); mock.Setup(x => x.Id).Returns(ids[i]); conns.Enqueue(mock.Object); mockConns.Enqueue(mock); @@ -246,7 +246,7 @@ public void ShouldAcquireNewWhenBeingUsedConcurrentlyBy(int numberOfThreads) foreach (var mock in mockConns) { - mock.Verify(x => x.IsHealthy, Times.Once); + mock.Verify(x => x.IsOpen, Times.Once); } } @@ -277,7 +277,7 @@ public void ShouldCloseAcquiredConnectionIfPoolDisposeStarted() // This is to simulate Acquire called first, // but before Acquire put a new conn into inUseConn, Dispose get called. // Note: Once dispose get called, it is forbiden to put anything into queue. - healthyMock.Setup(x => x.IsHealthy).Returns(true) + healthyMock.Setup(x => x.IsOpen).Returns(true) .Callback(() => pool.DisposeCalled = true); // Simulte Dispose get called at this time conns.Enqueue(healthyMock.Object); pool.NumberOfAvailableConnections.Should().Be(1); @@ -286,7 +286,7 @@ public void ShouldCloseAcquiredConnectionIfPoolDisposeStarted() pool.NumberOfAvailableConnections.Should().Be(0); pool.NumberOfInUseConnections.Should().Be(0); - healthyMock.Verify(x => x.IsHealthy, Times.Once); + healthyMock.Verify(x => x.IsOpen, Times.Once); healthyMock.Verify(x => x.Close(), Times.Once); exception.Should().BeOfType(); exception.Message.Should().Contain("the driver has already started to dispose"); @@ -299,7 +299,7 @@ public class ReleaseMethod public void ShouldReturnToPoolWhenConnectionIsReusableAndPoolIsNotFull() { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); var id = new Guid(); var inUseconns = new Dictionary(); @@ -319,7 +319,7 @@ public void ShouldReturnToPoolWhenConnectionIsReusableAndPoolIsNotFull() public void ShouldCloseConnectionWhenConnectionIsUnhealthy() { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(false); + mock.Setup(x => x.IsOpen).Returns(false); var id = new Guid(); var inUseConns = new Dictionary(); @@ -337,10 +337,10 @@ public void ShouldCloseConnectionWhenConnectionIsUnhealthy() } [Fact] - public void ShouldCloseConnectionWhenConnectionIsHealthyButNotResetable() + public void ShouldCloseConnectionWhenConnectionIsOpenButNotResetable() { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); mock.Setup(x => x.ClearConnection()).Throws(); var id = new Guid(); @@ -362,7 +362,7 @@ public void ShouldCloseConnectionWhenConnectionIsHealthyButNotResetable() public void ShouldCloseTheConnectionIfSessionIsReusableButThePoolIsFull() { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); var id = new Guid(); var inUseConns = new Dictionary(); @@ -408,7 +408,7 @@ public void ShouldCloseConnectionIfPoolDisposeStarted() // this is to simulate Release called first, // but before Release put a new conn into availConns, Dispose get called. // Note: Once dispose get called, it is forbiden to put anything into queue. - mock.Setup(x => x.IsHealthy).Returns(true) + mock.Setup(x => x.IsOpen).Returns(true) .Callback(() => pool.DisposeCalled = true); // Simulte Dispose get called at this time pool.Release(id); @@ -425,14 +425,14 @@ public class DisposeMethod public void ShouldReleaseAll() { var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); var id = Guid.NewGuid(); var inUseConns = new Dictionary(); inUseConns.Add(id, mock.Object); var availableConns = new Queue(); var mock1 = new Mock(); - mock1.Setup(x => x.IsHealthy).Returns(true); + mock1.Setup(x => x.IsOpen).Returns(true); availableConns.Enqueue(mock1.Object); @@ -451,14 +451,14 @@ public void ShouldLogInUseAndAvailableConnectionIds() { var mockLogger = new Mock(); var mock = new Mock(); - mock.Setup(x => x.IsHealthy).Returns(true); + mock.Setup(x => x.IsOpen).Returns(true); var id = Guid.NewGuid(); var inUseConns = new Dictionary(); inUseConns.Add(id, mock.Object); var availableConns = new Queue(); var mock1 = new Mock(); - mock1.Setup(x => x.IsHealthy).Returns(true); + mock1.Setup(x => x.IsOpen).Returns(true); availableConns.Enqueue(mock1.Object); diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Connector/SocketConnectionTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Connector/SocketConnectionTests.cs index 0c50bbcd9..20c67e748 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Connector/SocketConnectionTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Connector/SocketConnectionTests.cs @@ -194,6 +194,7 @@ public void ShouldNotClearMessagesResponseHandlerAndEnqueueResetMessage() } } + // TODO move this to PooledConnectionTests public class HasUnrecoverableError { [Fact] @@ -201,7 +202,7 @@ public void ShouldReportErrorIfIsTransientException() { var mock = MockSocketClient; var mockResponseHandler = new Mock(); - var con = new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object); + var con = new PooledConnection (new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); mockResponseHandler.Setup(x => x.Error).Returns(new TransientException("BLAH", "lalala")); con.HasUnrecoverableError.Should().BeFalse(); @@ -212,7 +213,7 @@ public void ShouldReportErrorIfIsDatabaseException() { var mock = MockSocketClient; var mockResponseHandler = new Mock(); - var con = new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object); + var con = new PooledConnection(new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); mockResponseHandler.Setup(x => x.HasError).Returns(true); mockResponseHandler.Setup(x => x.Error).Returns(new DatabaseException("BLAH", "lalala")); @@ -230,14 +231,14 @@ public void ShouldNotReportErrorIfIsOtherExceptions() { var mock = MockSocketClient; var mockResponseHandler = new Mock(); - var con = new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object); + var con = new PooledConnection(new SocketConnection(mock.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); mockResponseHandler.Setup(x => x.Error).Returns(new ClientException("BLAH", "lalala")); con.HasUnrecoverableError.Should().BeFalse(); } } - public class IsHealthyMethod + public class IsOpenMethod { [Fact] public void ShouldBeFalseWhenConectionIsNotOpen() @@ -247,8 +248,8 @@ public void ShouldBeFalseWhenConectionIsNotOpen() var mockResponseHandler = new Mock(); mockResponseHandler.Setup(x => x.Error).Returns(new ClientException()); // has no unrecoverable error - var conn = new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object); - conn.IsHealthy.Should().BeFalse(); + var conn = new PooledConnection(new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); + conn.IsOpen.Should().BeFalse(); } [Fact] @@ -259,8 +260,8 @@ public void ShouldBeFalseWhenConnectionHasUnrecoverableError() var mockResponseHandler = new Mock(); mockResponseHandler.Setup(x => x.Error).Returns(new DatabaseException()); // unrecoverable error - var conn = new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object); - conn.IsHealthy.Should().BeFalse(); + var conn = new PooledConnection(new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); + conn.IsOpen.Should().BeFalse(); } [Fact] @@ -271,8 +272,8 @@ public void ShouldReturnTrueWhenIsHealthy() var mockResponseHandler = new Mock(); mockResponseHandler.Setup(x => x.Error).Returns(new ClientException()); // has no unrecoverable error - var conn = new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object); - conn.IsHealthy.Should().BeTrue(); + var conn = new PooledConnection(new SocketConnection(mockClient.Object, AuthTokens.None, Logger, mockResponseHandler.Object)); + conn.IsOpen.Should().BeTrue(); } } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj index a340ac3ad..a3cdd1cd5 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj @@ -152,8 +152,8 @@ - - + + diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ClusterServerPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs similarity index 63% rename from Neo4j.Driver/Neo4j.Driver.Tests/ClusterServerPoolTests.cs rename to Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs index 805235076..3809f76d0 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ClusterServerPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs @@ -10,32 +10,29 @@ namespace Neo4j.Driver.Tests { - public class ClusterServerPoolTests + public class ClusterConnectionPoolTests { - private static string ServerUri { get; } = "bolt+routing://1234:5678"; + private static Uri ServerUri { get; } = new Uri("bolt+routing://1234:5678"); public class AcquireMethod { [Fact] - public void ShouldCreateNewConnectionPoolIfUriDoseNotExist() + public void ShouldNotCreateNewConnectionPoolIfUriDoseNotExist() { // Given var mockedConnectionPool = new Mock(); - var mockedConnection = new Mock(); - mockedConnectionPool.Setup(x => x.Acquire()).Returns(mockedConnection.Object); var connectionPoolDict = new ConcurrentDictionary(); var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); connectionPoolDict.Count.Should().Be(0); // When - var connection = pool.Acquire(new Uri(ServerUri)); + IPooledConnection connection; + var acquired = pool.TryAcquire(ServerUri, out connection); // Then - connectionPoolDict.Count.Should().Be(1); - connectionPoolDict.Keys.Single().Should().Be(new Uri(ServerUri)); - connectionPoolDict[new Uri(ServerUri)].Should().Be(mockedConnectionPool.Object); - connection.Should().Be(mockedConnection.Object); + acquired.Should().BeFalse(); + connectionPoolDict.Count.Should().Be(0); } [Fact] @@ -47,43 +44,23 @@ public void ShouldReturnExisitingConnectionPoolIfUriAlreadyExist() mockedConnectionPool.Setup(x => x.Acquire()).Returns(mockedConnection.Object); var connectionPoolDict = new ConcurrentDictionary(); - connectionPoolDict.GetOrAdd(new Uri(ServerUri), mockedConnectionPool.Object); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); var pool = new ClusterConnectionPool(null, connectionPoolDict); connectionPoolDict.Count.Should().Be(1); - connectionPoolDict.Keys.Single().Should().Be(new Uri(ServerUri)); - connectionPoolDict[new Uri(ServerUri)].Should().Be(mockedConnectionPool.Object); + connectionPoolDict.Keys.Single().Should().Be(ServerUri); + connectionPoolDict[ServerUri].Should().Be(mockedConnectionPool.Object); // When - var connection = pool.Acquire(new Uri(ServerUri)); + IPooledConnection connection; + var acquired = pool.TryAcquire(ServerUri, out connection); // Then + acquired.Should().BeTrue(); connection.Should().Be(mockedConnection.Object); } - [Fact] - public void ShouldRemoveNewlyCreatedPoolAndThrowExceptionIfDisposeAlreadyCalled() - { - // Given - var mockedConnectionPool = new Mock(); - var mockedConnectionPoolDict = new Mock>(); - var pool = new ClusterConnectionPool(mockedConnectionPool.Object, mockedConnectionPoolDict.Object); - - // When - pool.Dispose(); - var exception = Record.Exception(() => pool.Acquire(new Uri(ServerUri))); - - // Then - mockedConnectionPool.Verify(x=>x.Dispose()); - - exception.Should().BeOfType(); - exception.Message.Should().Contain("Failed to create connections with server"); - } - } - - public class HasAddressMethod - { [Theory] [InlineData("bolt+routing://localhost:7687", "bolt+routing://127.0.0.1:7687", false)] [InlineData("bolt+routing://127.0.0.1:7687", "bolt+routing://127.0.0.1:7687", true)] @@ -97,88 +74,101 @@ public void AddressMatchTest(string first, string second, bool expectedResult) connectionPoolDict.GetOrAdd(new Uri(first), mockedConnectionPool.Object); var pool = new ClusterConnectionPool(null, connectionPoolDict); - pool.HasAddress(new Uri(second)).Should().Be(expectedResult); + IPooledConnection ignored; + pool.TryAcquire(new Uri(second), out ignored).Should().Be(expectedResult); } } - public class PurgeMethod + public class UpdateMethod { [Fact] - public void ShouldRemovedIfExist() + public void ShouldAddNewConnectionPoolIfDoesNotExist() { // Given var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); - connectionPoolDict.GetOrAdd(new Uri(ServerUri), mockedConnectionPool.Object); + var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); - var pool = new ClusterConnectionPool(null, connectionPoolDict); + // When + pool.Update(new[] { ServerUri }); + + // Then + connectionPoolDict.Count.Should().Be(1); + connectionPoolDict.ContainsKey(ServerUri).Should().BeTrue(); + connectionPoolDict[ServerUri].Should().Be(mockedConnectionPool.Object); + } + + [Fact] + public void ShouldRemoveNewlyCreatedPoolnIfDisposeAlreadyCalled() + { + // Given + var mockedConnectionPool = new Mock(); + var mockedConnectionPoolDict = new Mock>(); + var pool = new ClusterConnectionPool(mockedConnectionPool.Object, mockedConnectionPoolDict.Object); // When - pool.Purge(new Uri(ServerUri)); - + pool.Dispose(); + var exception = Record.Exception(() => pool.Update(new[] {ServerUri})); + // Then - mockedConnectionPool.Verify(x=>x.Dispose(), Times.Once); - connectionPoolDict.Count.Should().Be(0); - connectionPoolDict.ContainsKey(new Uri(ServerUri)).Should().BeFalse(); + mockedConnectionPool.Verify(x => x.Dispose()); + + exception.Should().BeOfType(); + exception.Message.Should().Contain("Failed to create connections with server"); } [Fact] - public void ShouldRemoveNothingIfNotFound() + public void ShouldRemoveServerPoolIfNotPresentInNewServers() { // Given + var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); - - var pool = new ClusterConnectionPool(null, connectionPoolDict); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); + var pool = new ClusterConnectionPool(mockedConnectionPool.Object, connectionPoolDict); // When - pool.Purge(new Uri(ServerUri)); + pool.Update(new Uri[0]); // Then connectionPoolDict.Count.Should().Be(0); - connectionPoolDict.ContainsKey(new Uri(ServerUri)).Should().BeFalse(); } } - public class ReleaseMethod + public class PurgeMethod { [Fact] - public void ShouldReleaseIfExist() + public void ShouldRemovedIfExist() { // Given var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); - connectionPoolDict.GetOrAdd(new Uri(ServerUri), mockedConnectionPool.Object); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - var id = new Guid(); - pool.Release(new Uri(ServerUri), id); - + pool.Purge(ServerUri); + // Then - mockedConnectionPool.Verify(x => x.Release(id), Times.Once); - connectionPoolDict.Count.Should().Be(1); - connectionPoolDict.ContainsKey(new Uri(ServerUri)).Should().BeTrue(); + mockedConnectionPool.Verify(x=>x.Dispose(), Times.Once); + connectionPoolDict.Count.Should().Be(0); + connectionPoolDict.ContainsKey(ServerUri).Should().BeFalse(); } [Fact] - public void ShouldNotReleaseIfNotFound() + public void ShouldRemoveNothingIfNotFound() { // Given - var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); - connectionPoolDict.GetOrAdd(new Uri(ServerUri), mockedConnectionPool.Object); var pool = new ClusterConnectionPool(null, connectionPoolDict); // When - var id = new Guid(); - pool.Release(new Uri("http://123"), id); + pool.Purge(ServerUri); // Then - mockedConnectionPool.Verify(x => x.Release(id), Times.Never()); - connectionPoolDict.Count.Should().Be(1); - connectionPoolDict.ContainsKey(new Uri(ServerUri)).Should().BeTrue(); + connectionPoolDict.Count.Should().Be(0); + connectionPoolDict.ContainsKey(ServerUri).Should().BeFalse(); } } @@ -190,7 +180,7 @@ public void ShouldRemoveAllAfterDispose() // Given var mockedConnectionPool = new Mock(); var connectionPoolDict = new ConcurrentDictionary(); - connectionPoolDict.GetOrAdd(new Uri(ServerUri), mockedConnectionPool.Object); + connectionPoolDict.GetOrAdd(ServerUri, mockedConnectionPool.Object); var pool = new ClusterConnectionPool(null, connectionPoolDict); @@ -200,7 +190,7 @@ public void ShouldRemoveAllAfterDispose() // Then mockedConnectionPool.Verify(x => x.Dispose(), Times.Once); connectionPoolDict.Count.Should().Be(0); - connectionPoolDict.ContainsKey(new Uri(ServerUri)).Should().BeFalse(); + connectionPoolDict.ContainsKey(ServerUri).Should().BeFalse(); } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/ConcurrentRoundRobinQueueTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ConcurrentRoundRobinSetTests.cs similarity index 75% rename from Neo4j.Driver/Neo4j.Driver.Tests/ConcurrentRoundRobinQueueTests.cs rename to Neo4j.Driver/Neo4j.Driver.Tests/Routing/ConcurrentRoundRobinSetTests.cs index 6c205421e..a9b3e88dd 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/ConcurrentRoundRobinQueueTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ConcurrentRoundRobinSetTests.cs @@ -1,22 +1,21 @@ -using System; -using System.Linq; +using System.Linq; using FluentAssertions; using Neo4j.Driver.Internal.Routing; using Xunit; namespace Neo4j.Driver.Tests { - public class ConcurrentRoundRobinQueueTests + public class ConcurrentRoundRobinSetTests { - public class HopMethod + public class TryNextMethod { [Fact] - public void ShouldThrowExceptionIfNoElementInSet() + public void ShouldReturnFalseIfNoElementInSet() { var set = new ConcurrentRoundRobinSet(); - var exception = Xunit.Record.Exception(() => set.Hop()); - exception.Should().BeOfType(); - exception.Message.Should().Be("No item in set"); + int value; + set.TryNext(out value).Should().BeFalse(); + value.Should().Be(default(int)); } [Fact] @@ -26,7 +25,8 @@ public void ShouldRoundRobin() for (var i = 0; i < 10; i++) { - var real = set.Hop(); + int real; + set.TryNext(out real).Should().BeTrue(); var expect = i % set.Count; real.Should().Be(expect); } @@ -38,6 +38,7 @@ public class AddMethod [Fact] public void ShouldAddNew() { + // ReSharper disable once UseObjectOrCollectionInitializer var set = new ConcurrentRoundRobinSet(); set.Add(1); set.Count.Should().Be(1); @@ -46,6 +47,7 @@ public void ShouldAddNew() [Fact] public void ShouldNotAddIfAlreadyExists() { + // ReSharper disable once UseObjectOrCollectionInitializer var set = new ConcurrentRoundRobinSet { 0, 1, 2, 3 }; set.Add(0); set.Add(1); @@ -56,17 +58,6 @@ public void ShouldNotAddIfAlreadyExists() } } - public class ClearMethod - { - [Fact] - public void ShouldRemoveAll() - { - var set = new ConcurrentRoundRobinSet { 0, 1, 2, 3 }; - set.Clear(); - set.Count.Should().Be(0); - } - } - public class RemoveMethod { [Fact] @@ -98,7 +89,9 @@ public void ShouldBeAbleToAccessNewlyAddedItem() // we loop serveral turns on the full set for (var i = 0; i < 3*set.Count; i++) { - set.Hop().Should().Be(i % set.Count); + int real; + set.TryNext(out real).Should().BeTrue(); + real.Should().Be(i%set.Count); } // we add a new item into the set @@ -107,7 +100,9 @@ public void ShouldBeAbleToAccessNewlyAddedItem() // we loop again and everything is in set for (var i = 0; i < 3*set.Count; i++) { - set.Hop().Should().Be(i % set.Count); + int real; + set.TryNext(out real).Should().BeTrue(); + real.Should().Be(i % set.Count); } } @@ -118,7 +113,8 @@ public void ShouldBeAbleToRemoveItem() for (var i = 0; i < 3 * set.Count; i++) { - var real = set.Hop(); + int real; + set.TryNext(out real).Should().BeTrue(); var expect = i % set.Count; real.Should().Be(expect); } @@ -127,7 +123,8 @@ public void ShouldBeAbleToRemoveItem() for (var i = 0; i < 3 * set.Count; i++) { - var real = set.Hop(); + int real; + set.TryNext(out real).Should().BeTrue(); var expect = i % set.Count; real.Should().Be(expect); } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/SessionTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/SessionTests.cs index d374ecb75..ab3390d67 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/SessionTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/SessionTests.cs @@ -34,7 +34,7 @@ public class RunMethod public void ShouldSyncOnRun() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); session.Run("lalalal"); @@ -49,7 +49,7 @@ public class BeginTransactionMethod public void ShouldNotAllowNewTxWhileOneIsRunning() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); session.BeginTransaction(); var error = Record.Exception(() => session.BeginTransaction()); @@ -60,7 +60,7 @@ public void ShouldNotAllowNewTxWhileOneIsRunning() public void ShouldBeAbleToOpenTxAfterPreviousIsClosed() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); var tx = session.BeginTransaction(); tx.Dispose(); @@ -71,7 +71,7 @@ public void ShouldBeAbleToOpenTxAfterPreviousIsClosed() public void ShouldNotBeAbleToUseSessionWhileOngoingTransaction() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); var tx = session.BeginTransaction(); @@ -83,7 +83,7 @@ public void ShouldNotBeAbleToUseSessionWhileOngoingTransaction() public void ShouldBeAbleToUseSessionAgainWhenTransactionIsClosed() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); var tx = session.BeginTransaction(); tx.Dispose(); @@ -116,7 +116,7 @@ public void ShouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() [Fact] public void ShouldNotAllowMoreStatementsInSessionWhileConnectionHasUnrecoverableError() { - var mockConn = new Mock(); + var mockConn = new Mock(); mockConn.Setup(x => x.HasUnrecoverableError).Returns(true); var session = new Session(mockConn.Object, null); @@ -127,7 +127,7 @@ public void ShouldNotAllowMoreStatementsInSessionWhileConnectionHasUnrecoverable [Fact] public void ShouldNotAllowMoreTransactionsInSessionWhileConnectionHasUnrecoverableError() { - var mockConn = new Mock(); + var mockConn = new Mock(); mockConn.Setup(x => x.HasUnrecoverableError).Returns(true); var session = new Session(mockConn.Object, null); @@ -142,7 +142,7 @@ public class DisposeMethod public void ShouldDisposeTxOnDispose() { var mockConn = new Mock(); - mockConn.Setup(x => x.IsHealthy).Returns(true); + mockConn.Setup(x => x.IsOpen).Returns(true); var session = new Session(mockConn.Object, null); var tx = session.BeginTransaction(); session.Dispose(); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 2580756be..c73497ff7 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -34,6 +34,8 @@ internal class ConnectionPool : LoggerBase, IConnectionPool private readonly Queue _availableConnections = new Queue(); private readonly Dictionary _inUseConnections = new Dictionary(); + private readonly IConnectionErrorHandler _externalErrorHandler; + private volatile bool _disposeCalled; // for test only @@ -46,7 +48,13 @@ internal bool DisposeCalled set { _disposeCalled = value; } } - public ConnectionPool(Uri uri, IAuthToken authToken, EncryptionManager encryptionManager, ConnectionPoolSettings connectionPoolSettings, ILogger logger) + public ConnectionPool( + Uri uri, + IAuthToken authToken, + EncryptionManager encryptionManager, + ConnectionPoolSettings connectionPoolSettings, + ILogger logger, + IConnectionErrorHandler exteralErrorHandler = null) : base(logger) { _uri = uri; @@ -55,6 +63,8 @@ public ConnectionPool(Uri uri, IAuthToken authToken, EncryptionManager encryptio _encryptionManager = encryptionManager; _idleSessionPoolSize = connectionPoolSettings.MaxIdleSessionPoolSize; + _externalErrorHandler = exteralErrorHandler; + _logger = logger; } @@ -73,7 +83,12 @@ internal ConnectionPool( private IPooledConnection CreateNewPooledConnection() { - return _fakeConnection != null ? new PooledConnection(_fakeConnection, Release) : new PooledConnection(new SocketConnection(_uri, _authToken, _encryptionManager, _logger), Release); + var conn = _fakeConnection != null ? new PooledConnection(_fakeConnection, Release) : new PooledConnection(new SocketConnection(_uri, _authToken, _encryptionManager, _logger), Release); + if (_externalErrorHandler != null) + { + conn.AddConnectionErrorHander(_externalErrorHandler); + } + return conn; } public IPooledConnection Acquire() @@ -95,7 +110,7 @@ public IPooledConnection Acquire() { connection = CreateNewPooledConnection(); } - else if (!connection.IsHealthy) + else if (!connection.IsOpen) { connection.Close(); return Acquire(); @@ -116,7 +131,7 @@ public IPooledConnection Acquire() private bool IsConnectionReusable(IPooledConnection connection) { - if (!connection.IsHealthy) + if (!connection.IsOpen) { return false; } @@ -239,10 +254,5 @@ internal interface IPooledConnection : IConnection /// Return true if unrecoverable error has been received on this connection, otherwise false. /// bool HasUnrecoverableError { get; } - - /// - /// Return true if more statements could be run on this connection, otherwise false. - /// - bool IsHealthy { get; } } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnection.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnection.cs index fdfb5096f..c1ce56087 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnection.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnection.cs @@ -43,14 +43,6 @@ internal interface IConnection : IDisposable /// Return true if the underlying socket connection is till open, otherwise false. /// bool IsOpen { get; } -// /// -// /// Return true if unrecoverable error has been received on this connection, otherwise false. -// /// -// bool HasUnrecoverableError { get; } -// /// -// /// Return true if more statements could be run on this connection, otherwise false. -// /// -// bool IsHealthy { get; } /// /// The version of the server the connection connected to. Default to null if not supported by server @@ -64,5 +56,11 @@ internal interface IConnection : IDisposable /// Close and release related resources /// void Close(); + + /// + /// Adds an extra error handler that you wish to be called back when a consreponding error is received + /// + /// The extra error handler to add. + void AddConnectionErrorHander(IConnectionErrorHandler handler); } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnectionErrorHandler.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnectionErrorHandler.cs new file mode 100644 index 000000000..d6cc4b783 --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnectionErrorHandler.cs @@ -0,0 +1,11 @@ +using System; +using Neo4j.Driver.V1; + +namespace Neo4j.Driver.Internal.Connector +{ + internal interface IConnectionErrorHandler + { + Exception OnConnectionError(Exception e); + Neo4jException OnNeo4jError(Neo4jException e); + } +} \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/PooledConnection.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/PooledConnection.cs index 65eaddad0..455ef529a 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/PooledConnection.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/PooledConnection.cs @@ -26,28 +26,13 @@ internal class PooledConnection : IPooledConnection private readonly Action _releaseAction; private readonly IConnection _connection; - public void OnReadError(Exception error) - { - if (IsRecoverableError(error)) - { - _connection.AckFailure(); - } - else - { - HasUnrecoverableError = true; - } - } - private bool IsRecoverableError(Exception error) - { - return error is ClientException || error is TransientException; - } - - public bool HasUnrecoverableError { private set; get; } - public PooledConnection(IConnection connection, Action releaseAction = null) { _connection = connection; _releaseAction = releaseAction ?? (x => { }); + + //Adds call back error handler + AddConnectionErrorHander(new PooledConnectionErrorHandler(OnNeo4jError)); } public Guid Id { get; } = Guid.NewGuid(); @@ -92,8 +77,7 @@ public void ResetAsync() _connection.ResetAsync(); } - public bool IsOpen => _connection.IsOpen; - public bool IsHealthy => IsOpen && !HasUnrecoverableError; + public bool IsOpen => _connection.IsOpen && !HasUnrecoverableError; public string Server => _connection.Server; /// @@ -104,6 +88,11 @@ public void Close() _connection.Close(); } + public void AddConnectionErrorHander(IConnectionErrorHandler handler) + { + _connection.AddConnectionErrorHander(handler); + } + /// /// Disposing a pooled connection will try to release the connection resource back to pool /// @@ -111,5 +100,44 @@ public void Dispose() { _releaseAction(Id); } + + public bool HasUnrecoverableError { private set; get; } + + private Neo4jException OnNeo4jError(Neo4jException error) + { + if (error.IsRecoverableError()) + { + _connection.AckFailure(); + } + else + { + HasUnrecoverableError = true; + } + return error; + } + + private class PooledConnectionErrorHandler : IConnectionErrorHandler + { + private readonly Func _onNeo4jErrorFunc; + private readonly Func _onConnErrorFunc; + + public PooledConnectionErrorHandler( + Func onNeo4JErrorFunc, + Func onConnectionErrorFunc = null) + { + _onNeo4jErrorFunc = onNeo4JErrorFunc; + _onConnErrorFunc = onConnectionErrorFunc; + } + + public Exception OnConnectionError(Exception e) + { + return _onConnErrorFunc.Invoke(e); + } + + public Neo4jException OnNeo4jError(Neo4jException e) + { + return _onNeo4jErrorFunc.Invoke(e); + } + } } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/SocketConnection.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/SocketConnection.cs index 632c2bc6b..74a5e7d32 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Connector/SocketConnection.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Connector/SocketConnection.cs @@ -35,10 +35,7 @@ internal class SocketConnection : IConnection private volatile bool _interrupted; private readonly object _syncLock = new object(); - // TODO pull out as an error handler - // If no given then use a null error handler otherwise use the specified error handler - private readonly Action _onReadErrorAction; - private readonly Action _onWriteErrorAction; + private IList _handlers = new List(); public SocketConnection(ISocketClient socketClient, IAuthToken authToken, ILogger logger, IMessageResponseHandler messageResponseHandler = null) @@ -90,7 +87,7 @@ public void Send() } catch (Exception error) { - _onWriteErrorAction(error); + OnConnectionError(error); throw; } @@ -113,7 +110,7 @@ private void Receive() } catch (Exception error) { - _onReadErrorAction(error); + OnConnectionError(error); throw; } @@ -128,7 +125,7 @@ public void ReceiveOne() } catch (Exception error) { - _onReadErrorAction(error); + OnConnectionError(error); throw; } @@ -181,6 +178,11 @@ public void Close() Dispose(); } + public void AddConnectionErrorHander(IConnectionErrorHandler handler) + { + _handlers.Add(handler); + } + public void Dispose() { Dispose(true); @@ -201,7 +203,7 @@ private void AssertNoServerFailure() { var error = _responseHandler.Error; - _onReadErrorAction(error); + OnNeo4jError(error); _responseHandler.Error = null; _interrupted = false; @@ -209,6 +211,22 @@ private void AssertNoServerFailure() } } + private void OnConnectionError(Exception e) + { + foreach (var handler in _handlers) + { + e = handler.OnConnectionError(e); + } + } + + public void OnNeo4jError(Neo4jException e) + { + foreach (var handler in _handlers) + { + e = handler.OnNeo4jError(e); + } + } + private void Enqueue(IRequestMessage requestMessage, IMessageResponseCollector resultBuilder = null, IRequestMessage requestStreamingMessage = null) { lock (_syncLock) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/DirectDriver.cs b/Neo4j.Driver/Neo4j.Driver/Internal/DirectDriver.cs index e3e6c44f0..d2e371b09 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/DirectDriver.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/DirectDriver.cs @@ -32,12 +32,6 @@ internal DirectDriver(Uri uri, IAuthToken authToken, EncryptionManager encryptio Throw.ArgumentNullException.IfNull(encryptionManager, nameof(encryptionManager)); Throw.ArgumentNullException.IfNull(connectionPoolSettings, nameof(connectionPoolSettings)); - if (uri.Port == -1) - { - var builder = new UriBuilder(uri.Scheme, uri.Host, 7687); - uri = builder.Uri; - } - Uri = uri; _logger = logger; _connectionPool = new ConnectionPool(uri, authToken, encryptionManager, connectionPoolSettings, _logger); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Neo4jErrorExtensions.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Neo4jErrorExtensions.cs new file mode 100644 index 000000000..1576bd71b --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Extensions/Neo4jErrorExtensions.cs @@ -0,0 +1,35 @@ +// Copyright (c) 2002-2016 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neo4j.Driver.V1; + +namespace Neo4j.Driver.Internal +{ + internal static class Neo4jErrorExtensions + { + public static bool IsRecoverableError(this Neo4jException error) + { + return (error is ClientException || error is TransientException) && !IsClusterError(error); + } + + public static bool IsClusterError(this Neo4jException error) + { + return error.Code.Equals("Neo.ClientError.Cluster.NotALeader") + || error.Code.Equals("Neo.ClientError.General.ForbiddenOnReadOnlyDatabase"); + } + } +} \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 3b90171a3..646782477 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -17,6 +17,9 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; namespace Neo4j.Driver.Internal.Routing @@ -27,25 +30,37 @@ internal class ClusterConnectionPool : LoggerBase, IClusterConnectionPool private readonly IAuthToken _authToken; private readonly EncryptionManager _encryptionManager; private readonly ConnectionPoolSettings _poolSettings; + private readonly Func _clusterErrorHandlerCreator; // for test only private readonly IConnectionPool _fakeConnectionPool; private volatile bool _disposeCalled; - public ClusterConnectionPool(IAuthToken authToken, EncryptionManager encryptionManager, ConnectionPoolSettings poolSettings, ILogger logger) + public ClusterConnectionPool( + Uri seedServer, + IAuthToken authToken, + EncryptionManager encryptionManager, + ConnectionPoolSettings poolSettings, + ILogger logger, + Func clusterErrorHandlerCreator) : base(logger) { _authToken = authToken; _encryptionManager = encryptionManager; _poolSettings = poolSettings; + _clusterErrorHandlerCreator = clusterErrorHandlerCreator; + if (seedServer != null) + { + Add(seedServer); + } } internal ClusterConnectionPool(IConnectionPool connectionPool, ConcurrentDictionary pool=null, ConnectionPoolSettings poolSettings=null, ILogger logger=null) : - this(null, encryptionManager: null, poolSettings: poolSettings, logger: logger) + this(null, null, null, poolSettings, logger, null) { _fakeConnectionPool = connectionPool; _pools = pool; @@ -53,7 +68,7 @@ internal ClusterConnectionPool(IConnectionPool connectionPool, private IConnectionPool CreateNewConnectionPool(Uri uri) { - return _fakeConnectionPool ?? new ConnectionPool(uri, _authToken, _encryptionManager, _poolSettings, Logger); + return _fakeConnectionPool ?? new ConnectionPool(uri, _authToken, _encryptionManager, _poolSettings, Logger, _clusterErrorHandlerCreator.Invoke(uri)); } public bool TryAcquire(Uri uri, out IPooledConnection conn) @@ -69,13 +84,8 @@ public bool TryAcquire(Uri uri, out IPooledConnection conn) return true; } - public bool HasAddress(Uri uri) - { - return _pools.ContainsKey(uri); - } - - // This is the only place to add a pool - public void Add(Uri uri) + // This is the ultimate method to add a pool + private void Add(Uri uri) { _pools.GetOrAdd(uri, CreateNewConnectionPool); if (_disposeCalled) @@ -86,6 +96,21 @@ public void Add(Uri uri) } } + public void Update(IEnumerable servers) + { + foreach (var uri in _pools.Keys) + { + if (!servers.Contains(uri)) + { + Purge(uri); + } + } + foreach (var uri in servers) + { + Add(uri); + } + } + public void Purge(Uri uri) { IConnectionPool toRemvoe; @@ -96,7 +121,7 @@ public void Purge(Uri uri) } } - public void Clear() + private void Clear() { var uris = _pools.Keys; foreach (var uri in uris) @@ -105,16 +130,6 @@ public void Clear() } } - public void Release(Uri uri, Guid id) - { - IConnectionPool pool; - var found = _pools.TryGetValue(uri, out pool); - if (found) - { - pool.Release(id); - } - } - protected override void Dispose(bool isDisposing) { _disposeCalled = true; diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterDiscoveryManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterDiscoveryManager.cs index 260368e24..b1726c4cb 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterDiscoveryManager.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterDiscoveryManager.cs @@ -24,21 +24,22 @@ namespace Neo4j.Driver.Internal.Routing internal class ClusterDiscoveryManager { private readonly IPooledConnection _conn; - private ILogger logger; + private readonly ILogger _logger; public IEnumerable Readers { get; internal set; } // = new Uri[0]; public IEnumerable Writers { get; internal set; } // = new Uri[0]; public IEnumerable Routers { get; internal set; } // = new Uri[0]; private const string ProcedureName = "dbms.cluster.routing.getServers"; - public ClusterDiscoveryManager(IPooledConnection connection) + public ClusterDiscoveryManager(IPooledConnection connection, ILogger logger) { _conn = connection; + _logger = logger; } + /// Throws if the discovery result is invalid. public void Rediscovery() { - // TODO error handling??? - using (var session = new Session(_conn, logger)) + using (var session = new Session(_conn, _logger)) { var result = session.Run($"CALL {ProcedureName}"); var record = result.Single(); @@ -48,7 +49,6 @@ public void Rediscovery() var role = servers["role"].As(); switch (role) { - // TODO test 0 size array case "READ": Readers = addresses.Select(address => new Uri(address)).ToArray(); break; @@ -61,6 +61,18 @@ public void Rediscovery() } } } + if (!Readers.Any() || !Writers.Any() || !Routers.Any()) + { + throw new InvalidDiscoveryException( + $"Invalid discovery result: discovered {Routers.Count()} routers, " + + $"{Writers.Count()} writers and {Readers.Count()} readers. A Redisvoery is required."); + } } } + + internal class InvalidDiscoveryException : Exception + { + public InvalidDiscoveryException(string message) : base(message) + {} + } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ConcurrentRoundRobinSet.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ConcurrentRoundRobinSet.cs index be302edd2..83c5753c8 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ConcurrentRoundRobinSet.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ConcurrentRoundRobinSet.cs @@ -65,34 +65,6 @@ public void Add(IEnumerable items) } } -// public void Clear() -// { -// lock (_syncLock) -// { -// var count = _queue.Count; -// for (var i = 0; i < count; i++) -// { -// T ignore; -// _queue.TryDequeue(out ignore); -// } -// } -// } -// -// /// -// /// Remove all items from the set and add the given items into the set -// /// -// /// The new items to add in this set -// public void Update(IEnumerable items) -// { -// lock (_syncLock) -// { -// // Clear -// Clear(); -// // Add -// Add(items); -// } -// } - /// /// Remove one item from this set /// @@ -119,17 +91,7 @@ public void Remove(T item) } } - public T Hop() - { - T value; - if (!TryHop(out value)) - { - throw new InvalidOperationException("No item in set"); - } - return value; - } - - public bool TryHop(out T value) + public bool TryNext(out T value) { lock (_syncLock) { diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs index 8e3e60d56..c3589d30e 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPool.cs @@ -16,22 +16,17 @@ // limitations under the License. using System; +using System.Collections.Generic; namespace Neo4j.Driver.Internal.Routing { - internal interface IClusterConnectionPool + internal interface IClusterConnectionPool : IDisposable { // Try to acquire a connection with the server specified by the uri bool TryAcquire(Uri uri, out IPooledConnection conn); - // Release the connection back to the server connection pool specified by the uri - void Release(Uri uri, Guid id); - // Add a pool for a new uri - void Add(Uri uri); - // Remove all the connections with the server specified by the uri + // Update the pool keys with the new server uris + void Update(IEnumerable uris); + // Remove all the connection pool with the server specified by the uri void Purge(Uri uri); - // Purge all - void Clear(); - // Test if we have established connections with the server specified by the uri - bool HasAddress(Uri uri); } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ILoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ILoadBalancer.cs index 972ed73f3..4e802d1bc 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ILoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ILoadBalancer.cs @@ -14,11 +14,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +using System; using Neo4j.Driver.V1; namespace Neo4j.Driver.Internal.Routing { - internal interface ILoadBalancer + internal interface ILoadBalancer : IDisposable { IPooledConnection AcquireConnection(AccessMode mode); } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinClusterView.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinClusterView.cs index a306e7054..c468d51a3 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinClusterView.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinClusterView.cs @@ -52,17 +52,17 @@ public bool IsStale() public bool TryNextRouter(out Uri uri) { - return _routers.TryHop(out uri); + return _routers.TryNext(out uri); } public bool TryNextReader(out Uri uri) { - return _readers.TryHop(out uri); + return _readers.TryNext(out uri); } public bool TryNextWriter(out Uri uri) { - return _writers.TryHop(out uri); + return _writers.TryNext(out uri); } public void Remove(Uri uri) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinLoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinLoadBalancer.cs index a6c88f9dd..5b800c41b 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinLoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinLoadBalancer.cs @@ -16,6 +16,7 @@ // limitations under the License. using System; +using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; namespace Neo4j.Driver.Internal.Routing @@ -23,7 +24,9 @@ namespace Neo4j.Driver.Internal.Routing internal class RoundRobinLoadBalancer : ILoadBalancer { private RoundRobinClusterView _clusterView; - private readonly IClusterConnectionPool _connectionPool; + private IClusterConnectionPool _connectionPool; + private ILogger _logger; + private readonly object _syncLock = new object(); public RoundRobinLoadBalancer( Uri seedServer, @@ -32,14 +35,14 @@ public RoundRobinLoadBalancer( ConnectionPoolSettings poolSettings, ILogger logger) { - _connectionPool = new ClusterConnectionPool(authToken, encryptionManager, poolSettings, logger); - _connectionPool.Add(seedServer); + _connectionPool = new ClusterConnectionPool(seedServer, authToken, encryptionManager, poolSettings, logger, CreateClusterPooledConnectionErrorHandler); _clusterView = new RoundRobinClusterView(seedServer); + _logger = logger; } public IPooledConnection AcquireConnection(AccessMode mode) { - Discovery(); + EnsureDiscovery(); switch (mode) { case AccessMode.Read: @@ -70,9 +73,10 @@ private IPooledConnection AcquireReadConnection() return conn; } } - catch (ConnectionFailureException) + catch (SessionExpiredException) { - Forget(uri); + // ignored + // Already handled by connectionpool error handler to remove from load balancer } } throw new SessionExpiredException("Failed to connect to any read server."); @@ -96,9 +100,10 @@ private IPooledConnection AcquireWriteConnection() return conn; } } - catch (ConnectionFailureException) + catch (SessionExpiredException) { - Forget(uri); + // ignored + // Already handled by connectionpool error handler to remove from load balancer } } throw new SessionExpiredException("Failed to connect to any write server."); @@ -110,29 +115,19 @@ public void Forget(Uri uri) _connectionPool.Purge(uri); } - // TODO: Should sync on this method - public void Discovery() + public void EnsureDiscovery() { - if (!_clusterView.IsStale()) + lock (_syncLock) { - return; - } - - var oldServers = _clusterView.All(); - var newView = NewClusterView(); - var newServers = newView.All(); + if (!_clusterView.IsStale()) + { + return; + } - oldServers.ExceptWith(newServers); - foreach (var server in oldServers) - { - _connectionPool.Purge(server); + var newView = NewClusterView(); + _connectionPool.Update(newView.All()); + _clusterView = newView; } - foreach (var server in newServers) - { - _connectionPool.Add(server); - } - - _clusterView = newView; } public RoundRobinClusterView NewClusterView() @@ -151,21 +146,102 @@ public RoundRobinClusterView NewClusterView() IPooledConnection conn; if (_connectionPool.TryAcquire(uri, out conn)) { - var discoveryManager = new ClusterDiscoveryManager(conn); + var discoveryManager = new ClusterDiscoveryManager(conn, _logger); discoveryManager.Rediscovery(); - return new RoundRobinClusterView(discoveryManager.Routers, discoveryManager.Readers, discoveryManager.Writers); + return new RoundRobinClusterView(discoveryManager.Routers, discoveryManager.Readers, + discoveryManager.Writers); } } - catch (ConnectionFailureException) + catch (SessionExpiredException) + { + // ignored + // Already handled by connection pool error handler to remove from load balancer + } + catch (InvalidDiscoveryException) { - Forget(uri); + _clusterView.Remove(uri); } } // TODO also try each detached routers - throw new SessionExpiredException( + // We retied and tried our best however there is just no cluster. + // This is the ultimate place we will inform the user that you need to re-create a driver + throw new ServerUnavailableException( "Failed to connect to any routing server. " + "Please make sure that the cluster is up and can be accessed by the driver and retry."); } + + private Exception OnConnectionError(Exception e, Uri uri) + { + Forget(uri); + return new SessionExpiredException($"Server at {uri} is no longer available", e); + } + + private Neo4jException OnNeo4jError(Neo4jException error, Uri uri) + { + if (error.Code.Equals("Neo.ClientError.Cluster.NotALeader")) + { + // The lead is no longer a leader, a.k.a. the write server no longer accepts writes + // However the server is still available for possible reads. + // Therefore we just remove it from ClusterView but keep it in connection pool. + _clusterView.Remove(uri); + return new SessionExpiredException($"Server at {uri} no longer accepts writes"); + } + else if (error.Code.Equals("Neo.ClientError.General.ForbiddenOnReadOnlyDatabase")) + { + // The user was trying to run a write in a read session + // So inform the user and let him try with a proper session mode + return new ClientException("Write queries cannot be performed in READ access mode."); + } + return error; + } + + private ClusterPooledConnectionErrorHandler CreateClusterPooledConnectionErrorHandler(Uri uri) + { + return new ClusterPooledConnectionErrorHandler(x => OnConnectionError(x, uri), x => OnNeo4jError(x, uri)); + } + + private class ClusterPooledConnectionErrorHandler : IConnectionErrorHandler + { + private Func _onConnectionErrorFunc; + private readonly Func _onNeo4jErrorFunc; + + public ClusterPooledConnectionErrorHandler(Func onConnectionErrorFuncFunc, Func onNeo4JErrorFuncFunc) + { + _onConnectionErrorFunc = onConnectionErrorFuncFunc; + _onNeo4jErrorFunc = onNeo4JErrorFuncFunc; + } + + public Exception OnConnectionError(Exception e) + { + return _onConnectionErrorFunc.Invoke(e); + } + + public Neo4jException OnNeo4jError(Neo4jException e) + { + return _onNeo4jErrorFunc.Invoke(e); + } + } + + protected virtual void Dispose(bool isDisposing) + { + if (!isDisposing) + return; + + _clusterView = null; + + if (_connectionPool != null) + { + _connectionPool.Dispose(); + _connectionPool = null; + } + _logger = null; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingDriver.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingDriver.cs index fe4833165..c0bdb4320 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingDriver.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingDriver.cs @@ -16,14 +16,15 @@ // limitations under the License. using System; -using Neo4j.Driver.Internal.Routing; using Neo4j.Driver.V1; -namespace Neo4j.Driver.Internal +namespace Neo4j.Driver.Internal.Routing { + /// + /// A driver with a simple load balancer to route to a cluster + /// internal class RoutingDriver : IDriver { - private ILogger _logger; private ILoadBalancer _loadBalancer; @@ -34,14 +35,33 @@ internal RoutingDriver( ConnectionPoolSettings poolSettings, ILogger logger) { + Throw.ArgumentNullException.IfNull(seedServer, nameof(seedServer)); + Throw.ArgumentNullException.IfNull(authToken, nameof(authToken)); + Throw.ArgumentNullException.IfNull(encryptionManager, nameof(encryptionManager)); + Throw.ArgumentNullException.IfNull(poolSettings, nameof(poolSettings)); + Uri = seedServer; _logger = logger; + _loadBalancer = new RoundRobinLoadBalancer(seedServer, authToken, encryptionManager, poolSettings, _logger); + } + protected virtual void Dispose(bool isDisposing) + { + if (!isDisposing) + return; + + if (_loadBalancer != null) + { + _loadBalancer.Dispose(); + _loadBalancer = null; + } + _logger = null; } public void Dispose() { - throw new NotImplementedException(); + Dispose(true); + GC.SuppressFinalize(this); } public Uri Uri { get; } @@ -52,10 +72,7 @@ public ISession Session() public ISession Session(AccessMode mode) { - IPooledConnection connection = _loadBalancer.AcquireConnection(mode); - - throw new NotImplementedException(); - //return new RoutingSession(connection, mode, ) + return new Session(_loadBalancer.AcquireConnection(mode), _logger); } } } \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingSession.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingSession.cs deleted file mode 100644 index e1e15d547..000000000 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingSession.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Neo4j.Driver.V1; - -namespace Neo4j.Driver.Internal.Routing -{ - internal class RoutingSession : ISession - { - private ISession _delegate; - public void Dispose() - { - _delegate.Dispose(); - } - - public IStatementResult Run(string statement, IDictionary parameters = null) - { - return _delegate.Run(statement, parameters); - } - - public IStatementResult Run(Statement statement) - { - return _delegate.Run(statement); - } - - public IStatementResult Run(string statement, object parameters) - { - return _delegate.Run(statement, parameters); - } - - public ITransaction BeginTransaction() - { - return _delegate.BeginTransaction(); - } - - public void Reset() - { - _delegate.Reset(); - } - - public string Server() - { - return _delegate.Server(); - } - } -} diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Session.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Session.cs index ade8ef6d0..1a679ac8c 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Session.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Session.cs @@ -66,30 +66,38 @@ protected override void Dispose(bool isDisposing) { throw new InvalidOperationException("Failed to dispose this seesion as it has already been disposed."); } - lock (_txSyncLock) + if (!_connection.IsOpen) + { + // can not sync any data on this connection + _connection.Dispose(); + } + else { - if (_transaction != null) + lock (_txSyncLock) { + if (_transaction != null) + { + try + { + _transaction.Dispose(); + } + catch + { + // Best-effort + } + } try { - _transaction.Dispose(); + _connection.Sync(); } - catch + finally { - // Best-effort + _connection.Dispose(); } } - try - { - _connection.Sync(); - } - finally - { - _connection.Dispose(); - } } + }); - base.Dispose(true); } @@ -140,7 +148,7 @@ private void EnsureSessionIsOpen() private void EnsureConnectionIsHealthy() { - if (!_connection.IsHealthy) + if (!_connection.IsOpen) { throw new ClientException("The current session cannot be reused as the underlying connection with the " + "server has been closed or is going to be closed due to unrecoverable errors. " + @@ -159,8 +167,6 @@ private void EnsureNoOpenTransaction() public Guid Id { get; } = Guid.NewGuid(); - public bool IsHealthy => _connection.IsHealthy; - public void Reset() { EnsureSessionIsOpen(); diff --git a/Neo4j.Driver/Neo4j.Driver/Neo4j.Driver.csproj b/Neo4j.Driver/Neo4j.Driver/Neo4j.Driver.csproj index fa07e8859..c154b23a2 100644 --- a/Neo4j.Driver/Neo4j.Driver/Neo4j.Driver.csproj +++ b/Neo4j.Driver/Neo4j.Driver/Neo4j.Driver.csproj @@ -41,6 +41,8 @@ bin\Release\Neo4j.Driver.xml + + @@ -55,7 +57,7 @@ - + diff --git a/Neo4j.Driver/Neo4j.Driver/V1/GraphDatabase.cs b/Neo4j.Driver/Neo4j.Driver/V1/GraphDatabase.cs index 21abf3df8..885024d2d 100644 --- a/Neo4j.Driver/Neo4j.Driver/V1/GraphDatabase.cs +++ b/Neo4j.Driver/Neo4j.Driver/V1/GraphDatabase.cs @@ -17,6 +17,7 @@ using System; using Neo4j.Driver.Internal; +using Neo4j.Driver.Internal.Routing; namespace Neo4j.Driver.V1 { @@ -102,6 +103,13 @@ public static IDriver Driver(Uri uri, IAuthToken authToken, Config config = null config = config ?? Config.DefaultConfig; var encryptionManager = new EncryptionManager(config.EncryptionLevel, config.TrustStrategy, config.Logger); var connectionPoolSettings = new ConnectionPoolSettings(config.MaxIdleSessionPoolSize); + + if (uri.Port == -1) + { + var builder = new UriBuilder(uri.Scheme, uri.Host, 7687); + uri = builder.Uri; + } + switch (uri.Scheme.ToLower()) { case "bolt": @@ -109,7 +117,7 @@ public static IDriver Driver(Uri uri, IAuthToken authToken, Config config = null case "bolt+routing": return new RoutingDriver(uri, authToken, encryptionManager, connectionPoolSettings, config.Logger); default: - throw new InvalidOperationException($"Unsupported URI scheme: {uri.Scheme}"); + throw new NotSupportedException($"Unsupported URI scheme: {uri.Scheme}"); } } } diff --git a/Neo4j.Driver/Neo4j.Driver/V1/IDriver.cs b/Neo4j.Driver/Neo4j.Driver/V1/IDriver.cs index f3e88b220..d6ad33e9d 100644 --- a/Neo4j.Driver/Neo4j.Driver/V1/IDriver.cs +++ b/Neo4j.Driver/Neo4j.Driver/V1/IDriver.cs @@ -21,7 +21,7 @@ namespace Neo4j.Driver.V1 { /// /// The instance maintains the connections with a Neo4j database, providing an access point via the - /// method. + /// method. /// /// /// The Driver maintains a session pool buffering the s created by the user. diff --git a/Neo4j.Driver/Neo4j.Driver/V1/Neo4jException.cs b/Neo4j.Driver/Neo4j.Driver/V1/Neo4jException.cs index bed61b2f9..0e6125618 100644 --- a/Neo4j.Driver/Neo4j.Driver/V1/Neo4jException.cs +++ b/Neo4j.Driver/Neo4j.Driver/V1/Neo4jException.cs @@ -130,13 +130,13 @@ public DatabaseException(string code, string message, Exception innerException) } [DataContract] - public class ConnectionFailureException : Neo4jException + public class ServerUnavailableException : Neo4jException { - public ConnectionFailureException(string message) : base(message) + public ServerUnavailableException(string message) : base(message) { } - public ConnectionFailureException(string message, Exception innerException) : base(message, innerException) + public ServerUnavailableException(string message, Exception innerException) : base(message, innerException) { } }