Skip to content

Commit

Permalink
Randomize implicit servers (#376)
Browse files Browse the repository at this point in the history
* randomize implicit servers

Signed-off-by: Colin Sullivan <[email protected]>
  • Loading branch information
ColinSullivan1 authored May 15, 2020
1 parent 19c6eff commit 6742c53
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 21 deletions.
54 changes: 33 additions & 21 deletions src/NATS.Client/ServerPool.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -19,16 +19,18 @@ namespace NATS.Client
{
internal sealed class ServerPool
{
private object poolLock = new object();
private LinkedList<Srv> sList = new LinkedList<Srv>();
private readonly object poolLock = new object();
private readonly LinkedList<Srv> sList = new LinkedList<Srv>();
private Srv currentServer = null;
private readonly Random rand = new Random(DateTime.Now.Millisecond);
private bool randomize = true;

// Used to find duplicates in the server pool.
// Loopback is equivalent to localhost, and
// a URL match is equivalent.
private class SrvEqualityComparer : IEqualityComparer<Srv>
{
private bool isLocal(Uri url)
private bool IsLocal(Uri url)
{
if (url.IsLoopback)
return true;
Expand All @@ -50,7 +52,7 @@ public bool Equals(Srv x, Srv y)
if (x.url.Equals(y.url))
return true;

if (isLocal(x.url) && isLocal(y.url) && (y.url.Port == x.url.Port))
if (IsLocal(x.url) && IsLocal(y.url) && (y.url.Port == x.url.Port))
return true;

return false;
Expand All @@ -74,16 +76,17 @@ internal void Setup(Options opts)
{
Add(opts.Servers, false);

if (!opts.NoRandomize)
shuffle();
randomize = !opts.NoRandomize;
if (randomize)
Shuffle();
}

if (!string.IsNullOrWhiteSpace(opts.Url))
add(opts.Url, false);
Add(opts.Url, false);

// Place default URL if pool is empty.
if (isEmpty())
add(Defaults.Url, false);
if (IsEmpty())
Add(Defaults.Url, false);
}

// Used for initially connecting to a server.
Expand Down Expand Up @@ -134,7 +137,7 @@ internal Srv CurrentServer
// a server was removed in the meantime, add it back.
if (sList.Contains(currentServer) == false)
{
add(currentServer);
Add(currentServer);
}
}
}
Expand Down Expand Up @@ -164,7 +167,7 @@ internal Srv SelectNextServer(int maxReconnect)
sList.AddLast(s);
}

currentServer = isEmpty() ? null : sList.First();
currentServer = IsEmpty() ? null : sList.First();

return currentServer;
}
Expand Down Expand Up @@ -200,21 +203,30 @@ internal string[] GetServerList(bool implicitOnly)

// returns true if it modified the pool, false if
// the url already exists.
private bool add(string s, bool isImplicit)
private bool Add(string s, bool isImplicit)
{
return add(new Srv(s, isImplicit));
return Add(new Srv(s, isImplicit));
}

// returns true if it modified the pool, false if
// the url already exists.
private bool add(Srv s)
private bool Add(Srv s)
{
lock (poolLock)
{
if (sList.Contains(s, duplicateSrvCheck))
return false;

sList.AddLast(s);
if (s.isImplicit && randomize)
{
// pick a random spot to add the server.
var randElem = sList.ElementAt(rand.Next(sList.Count));
sList.AddAfter(sList.Find(randElem), s);
}
else
{
sList.AddLast(s);
}

return true;
}
Expand Down Expand Up @@ -257,15 +269,15 @@ internal bool Add(string[] urls, bool isImplicit)
bool didAdd = false;
foreach (string s in urls)
{
didAdd |= add(s, isImplicit);
didAdd |= Add(s, isImplicit);
}

return didAdd;
}

// Convenience method to shuffle a list. The list passed
// is modified.
internal static void shuffle<T>(IList<T> list)
internal static void Shuffle<T>(IList<T> list)
{
if (list == null)
return;
Expand All @@ -285,12 +297,12 @@ internal static void shuffle<T>(IList<T> list)
}
}

private void shuffle()
private void Shuffle()
{
lock (poolLock)
{
var servers = sList.ToArray();
shuffle(servers);
Shuffle(servers);

sList.Clear();
foreach (Srv s in servers)
Expand All @@ -300,7 +312,7 @@ private void shuffle()
}
}

private bool isEmpty()
private bool IsEmpty()
{
return sList.Count == 0;
}
Expand Down
139 changes: 139 additions & 0 deletions src/Tests/UnitTests/TestServerPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using NATS.Client;
using Xunit;

namespace UnitTests
{
public class TestServerPool
{
static readonly string[] startUrls = {
"nats://a:4222", "nats://b:4222", "nats://c:4222", "nats://d:4222",
"nats://e:4222", "nats://f:4222", "nats://g:4222", "nats://h:4222",
"nats://i:4222", "nats://j:4222", "nats://k:4222", "nats://l:4222"
};

[Fact]
public void TestDefault()
{
var sp = new ServerPool();
sp.Setup(new Options());

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == 1);
Assert.Equal(poolUrls[0], Defaults.Url);
}

[Fact]
public void TestBasicRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.False(poolUrls.SequenceEqual(startUrls));
}
}

[Fact]
public void TestIdempotency()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;

var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);

sp.Add(startUrls, true);
Assert.True(poolUrls.Length == startUrls.Length);
}

[Fact]
public void TestNoRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Servers = startUrls;
opts.NoRandomize = true;

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.True(poolUrls.SequenceEqual(startUrls));
}

for (int i = 0; i < 10; i++)
{
var sp = new ServerPool();
sp.Setup(opts);

var poolUrls = sp.GetServerList(false);
Assert.True(poolUrls.Length == startUrls.Length);
Assert.True(poolUrls.SequenceEqual(startUrls));

string[] impUrls = {
"nats://impA:4222", "nats://impB:4222", "nats://impC:4222", "nats://impD:4222",
"nats://impE:4222", "nats://impF:4222", "nats://impG:4222", "nats://impH:4222",
};
sp.Add(impUrls, true);
Assert.True(poolUrls.SequenceEqual(startUrls));
}
}

[Fact]
public void TestImplicitRandomization()
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = null;
opts.Servers = startUrls;

var sp = new ServerPool();
sp.Setup(opts);

string[] impUrls = {
"nats://impA:4222", "nats://impB:4222", "nats://impC:4222", "nats://impD:4222",
"nats://impE:4222", "nats://impF:4222", "nats://impG:4222", "nats://impH:4222",
};
sp.Add(impUrls, true);

var poolUrls = sp.GetServerList(false);

// Ensure length is OK and that we have randomized the list
Assert.True(poolUrls.Length == startUrls.Length + impUrls.Length);
Assert.False(poolUrls.SequenceEqual(startUrls));

// Ensure implicit urls aren't placed at the end of the list.
int i;
for (i = 0; i < startUrls.Length; i++)
{
if (poolUrls[i].Contains("imp"))
break;
}
Assert.True(i != startUrls.Length);
}
}
}

0 comments on commit 6742c53

Please sign in to comment.