Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Randomize implicit servers #376

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions src/NATS.Client/ServerPool.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -19,16 +19,18 @@ namespace NATS.Client
{
internal sealed class ServerPool
{
private object poolLock = new object();
private LinkedList<Srv> sList = new LinkedList<Srv>();
private readonly object poolLock = new object();
private readonly LinkedList<Srv> sList = new LinkedList<Srv>();
private Srv currentServer = null;
private readonly Random rand = new Random(DateTime.Now.Millisecond);
private bool randomize = true;

// Used to find duplicates in the server pool.
// Loopback is equivalent to localhost, and
// a URL match is equivalent.
private class SrvEqualityComparer : IEqualityComparer<Srv>
{
private bool isLocal(Uri url)
private bool IsLocal(Uri url)
{
if (url.IsLoopback)
return true;
Expand All @@ -50,7 +52,7 @@ public bool Equals(Srv x, Srv y)
if (x.url.Equals(y.url))
return true;

if (isLocal(x.url) && isLocal(y.url) && (y.url.Port == x.url.Port))
if (IsLocal(x.url) && IsLocal(y.url) && (y.url.Port == x.url.Port))
return true;

return false;
Expand All @@ -74,16 +76,17 @@ internal void Setup(Options opts)
{
Add(opts.Servers, false);

if (!opts.NoRandomize)
shuffle();
randomize = !opts.NoRandomize;
if (randomize)
Shuffle();
}

if (!string.IsNullOrWhiteSpace(opts.Url))
add(opts.Url, false);
Add(opts.Url, false);

// Place default URL if pool is empty.
if (isEmpty())
add(Defaults.Url, false);
if (IsEmpty())
Add(Defaults.Url, false);
}

// Used for initially connecting to a server.
Expand Down Expand Up @@ -134,7 +137,7 @@ internal Srv CurrentServer
// a server was removed in the meantime, add it back.
if (sList.Contains(currentServer) == false)
{
add(currentServer);
Add(currentServer);
}
}
}
Expand Down Expand Up @@ -164,7 +167,7 @@ internal Srv SelectNextServer(int maxReconnect)
sList.AddLast(s);
}

currentServer = isEmpty() ? null : sList.First();
currentServer = IsEmpty() ? null : sList.First();

return currentServer;
}
Expand Down Expand Up @@ -200,21 +203,30 @@ internal string[] GetServerList(bool implicitOnly)

// returns true if it modified the pool, false if
// the url already exists.
private bool add(string s, bool isImplicit)
private bool Add(string s, bool isImplicit)
{
return add(new Srv(s, isImplicit));
return Add(new Srv(s, isImplicit));
}

// returns true if it modified the pool, false if
// the url already exists.
private bool add(Srv s)
private bool Add(Srv s)
{
lock (poolLock)
{
if (sList.Contains(s, duplicateSrvCheck))
return false;

sList.AddLast(s);
if (s.isImplicit && randomize)
{
// pick a random spot to add the server.
var randElem = sList.ElementAt(rand.Next(sList.Count));
sList.AddAfter(sList.Find(randElem), s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on latest comment on the Go client, I wonder if this need to be updated. Basically in Go I was shuffling the pool after adding all new URLs (when processing INFO). But the point made was that possibly the first URL in the pool would be moved (say to position 2 (index 1)), and on reconnect that would be the very next one to be tried although we just disconnected from it. Let me know if I make sense (note the Go PR has not been updated as I type this).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussions in Slack, we've concluded that this achieves the same result, so I'll keep this as is. Appreciate the catch here!

}
else
{
sList.AddLast(s);
}

return true;
}
Expand Down Expand Up @@ -257,15 +269,15 @@ internal bool Add(string[] urls, bool isImplicit)
bool didAdd = false;
foreach (string s in urls)
{
didAdd |= add(s, isImplicit);
didAdd |= Add(s, isImplicit);
}

return didAdd;
}

// Convenience method to shuffle a list. The list passed
// is modified.
internal static void shuffle<T>(IList<T> list)
internal static void Shuffle<T>(IList<T> list)
{
if (list == null)
return;
Expand All @@ -285,12 +297,12 @@ internal static void shuffle<T>(IList<T> list)
}
}

private void shuffle()
private void Shuffle()
{
lock (poolLock)
{
var servers = sList.ToArray();
shuffle(servers);
Shuffle(servers);

sList.Clear();
foreach (Srv s in servers)
Expand All @@ -300,7 +312,7 @@ private void shuffle()
}
}

private bool isEmpty()
private bool IsEmpty()
{
return sList.Count == 0;
}
Expand Down
139 changes: 139 additions & 0 deletions src/Tests/UnitTests/TestServerPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using NATS.Client;
using Xunit;

namespace UnitTests
{
public class TestServerPool
{
static readonly string[] startUrls = {
"nats://a:4222", "nats://b:4222", "nats://c:4222", "nats://d:4222",
"nats://e:4222", "nats://f:4222", "nats://g:4222", "nats://h:4222",
"nats://i:4222", "nats://j:4222", "nats://k:4222", "nats://l:4222"
};

[Fact]
public void TestDefault()
{
var sp = new ServerPool();
sp.Setup(new Options());

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == 1);
Assert.Equal(poolUrls[0], Defaults.Url);
}

[Fact]
public void TestBasicRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.False(poolUrls.SequenceEqual(startUrls));
}
}

[Fact]
public void TestIdempotency()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;

var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);

sp.Add(startUrls, true);
Assert.True(poolUrls.Length == startUrls.Length);
}

[Fact]
public void TestNoRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;
opts.NoRandomize = true;

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.True(poolUrls.SequenceEqual(startUrls));
}

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.True(poolUrls.SequenceEqual(startUrls));

string[] impUrls = {
"nats://impA:4222", "nats://impB:4222", "nats://impC:4222", "nats://impD:4222",
"nats://impE:4222", "nats://impF:4222", "nats://impG:4222", "nats://impH:4222",
};
sp.Add(impUrls, true);
Assert.True(poolUrls.SequenceEqual(startUrls));
}
}

[Fact]
public void TestImplicitRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = null;
opts.Servers = startUrls;

var sp = new ServerPool();
sp.Setup(opts);

string[] impUrls = {
"nats://impA:4222", "nats://impB:4222", "nats://impC:4222", "nats://impD:4222",
"nats://impE:4222", "nats://impF:4222", "nats://impG:4222", "nats://impH:4222",
};
sp.Add(impUrls, true);

var poolUrls = sp.GetServerList(false);

// Ensure length is OK and that we have randomized the list
Assert.True(poolUrls.Length == startUrls.Length + impUrls.Length);
Assert.False(poolUrls.SequenceEqual(startUrls));

// Ensure implicit urls aren't placed at the end of the list.
int i;
for (i = 0; i < startUrls.Length; i++)
{
if (poolUrls[i].Contains("imp"))
break;
}
Assert.True(i != startUrls.Length);
}
}
}