Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempt to fix pull flappers #773

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Benchmarks/JsMulti/Settings/Context.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private static int Choice(string name, string val, params string[] choices) {
}
}

Error("Invalid choice for " + name + " [" + lower + "]. Must be one of [" + String.Join(", ", choices) + "]");
Error("Invalid choice for " + name + " [" + lower + "]. Must be one of [" + string.Join(", ", choices) + "]");
return -1;
}
}
Expand Down
42 changes: 26 additions & 16 deletions src/NATS.Client/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace NATS.Client
/// <summary>
/// Provides factory methods to create connections to NATS Servers.
/// </summary>
// ReSharper disable once ClassNeverInstantiated.Global
public sealed class ConnectionFactory : IConnectionFactory
{
/// <summary>
Expand All @@ -43,12 +44,13 @@ public ConnectionFactory() { }
/// details.</para></exception>
public IConnection CreateConnection(string url)
{
Options opts = new Options();
opts.processUrlString(url);
Options opts = new Options
{
Url = url
};
return CreateConnection(opts);
}



/// <summary>
/// Attempt to connect to the NATS server referenced by <paramref name="url"/> with NATS 2.0 credentials.
/// </summary>
Expand All @@ -68,10 +70,12 @@ public IConnection CreateConnection(string url)
public IConnection CreateConnection(string url, string credentialsPath)
{
if (string.IsNullOrWhiteSpace(credentialsPath))
throw new ArgumentException("Invalid credentials path", "credentials");
throw new ArgumentException("Invalid credentials path", nameof(credentialsPath));

Options opts = new Options();
opts.processUrlString(url);
Options opts = new Options
{
Url = url
};
opts.SetUserCredentials(credentialsPath);
return CreateConnection(opts);
}
Expand All @@ -96,12 +100,14 @@ public IConnection CreateConnection(string url, string credentialsPath)
public IConnection CreateConnection(string url, string jwt, string privateNkey)
{
if (string.IsNullOrWhiteSpace(jwt))
throw new ArgumentException("Invalid jwt path", "jwt");
throw new ArgumentException("Invalid jwt path", nameof(jwt));
if (string.IsNullOrWhiteSpace(privateNkey))
throw new ArgumentException("Invalid nkey path", "privateNkey");
throw new ArgumentException("Invalid nkey path", nameof(privateNkey));

Options opts = new Options();
opts.processUrlString(url);
Options opts = new Options
{
Url = url
};
opts.SetUserCredentials(jwt, privateNkey);
return CreateConnection(opts);
}
Expand Down Expand Up @@ -132,9 +138,11 @@ public static Options GetDefaultOptions()
/// details.</para></exception>
public IConnection CreateSecureConnection(string url)
{
Options opts = new Options();
opts.processUrlString(url);
opts.Secure = true;
Options opts = new Options
{
Url = url,
Secure = true
};
return CreateConnection(opts);
}

Expand Down Expand Up @@ -210,8 +218,10 @@ public IEncodedConnection CreateEncodedConnection()
/// details.</para></exception>
public IEncodedConnection CreateEncodedConnection(string url)
{
Options opts = new Options();
opts.processUrlString(url);
Options opts = new Options
{
Url = url
};
return CreateEncodedConnection(opts);
}

Expand Down
50 changes: 27 additions & 23 deletions src/NATS.Client/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,7 @@ internal Options(Options o)
subscriptionBatchSize = o.subscriptionBatchSize;
customInboxPrefix = o.customInboxPrefix;

if (o.url != null)
{
processUrlString(o.url);
}

url = o.url;
if (o.servers != null)
{
servers = new string[o.servers.Length];
Expand Down Expand Up @@ -342,37 +338,32 @@ static string ensureProperUrl(string url)

var parts = url.Split(protcolSep, StringSplitOptions.RemoveEmptyEntries);
if (parts.Length == 1)
return $"nats://{url}";
return $"nats://{url.Trim()}";

throw new ArgumentException("Allowed protocols are: 'nats://, tls://'.");
}

internal void processUrlString(string url)
{
if (url == null)
return;

string[] urls = url.Split(',');
for (int i = 0; i < urls.Length; i++)
{
urls[i] = urls[i].Trim();
}

servers = urls;
}

/// <summary>
/// Gets or sets the url used to connect to the NATs server.
/// Can be a comma delimited list, it will be turning into Servers
/// </summary>
/// <remarks>
/// This may contain username/password information.
/// </remarks>
public string Url
{
get { return url; }
get => url;
set
{
url = ensureProperUrl(value);
if (value == null)
{
url = null;
servers = null;
}
else
{
Servers = value.Split(',');
}
}
}

Expand All @@ -387,7 +378,20 @@ public string[] Servers
get { return servers; }
set
{
servers = value?.Select(ensureProperUrl).ToArray();
if (value == null || value.Length == 0)
{
servers = null;
url = null;
}
else
{
servers = new string[value.Length];
for (int i = 0; i < value.Length; i++)
{
servers[i] = ensureProperUrl(value[i]);
}
url = string.Join(",", servers);
}
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/NATS.Client/ServerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ public void Setup(Options opts)
}
}

if (!string.IsNullOrWhiteSpace(opts.Url))
{
Add(opts.Url, false);
}

// Place default URL if pool is empty.
if (IsEmpty())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ public static void Main(string[] args)
// 3. get a list of all consumers
Console.WriteLine("\n----------\n3. getConsumerNames");
IList<string> consumerNames = jsm.GetConsumerNames(helper.Stream);
Console.WriteLine("Consumer Names: " + String.Join(",", consumerNames));
Console.WriteLine("Consumer Names: " + string.Join(",", consumerNames));

// 4. Delete a consumer, then list them again
// Subsequent calls to deleteStream will throw a
// NATSJetStreamException [10014]
Console.WriteLine("\n----------\n4. Delete a consumer");
jsm.DeleteConsumer(helper.Stream, durable1);
consumerNames = jsm.GetConsumerNames(helper.Stream);
Console.WriteLine("Consumer Names: " + String.Join(",", consumerNames));
Console.WriteLine("Consumer Names: " + string.Join(",", consumerNames));

// 5. Try to delete the consumer again and get the exception
Console.WriteLine("\n----------\n5. Delete consumer again");
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/JetStreamStarter/JetStreamStarter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static void Main(string[] args)
{
Console.WriteLine("Connection closed.");
};

Console.WriteLine($"Connecting to '{opts.Url}'");

using (IConnection c = new ConnectionFactory().CreateConnection(opts))
Expand Down
46 changes: 16 additions & 30 deletions src/Tests/IntegrationTests/TestBasic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
using UnitTests;
using Xunit;
using Xunit.Abstractions;

namespace IntegrationTests
{
Expand All @@ -28,7 +30,13 @@ namespace IntegrationTests
[Collection(DefaultSuiteContext.CollectionKey)]
public class TestBasic : TestSuite<DefaultSuiteContext>
{
public TestBasic(DefaultSuiteContext context) : base(context) { }
private readonly ITestOutputHelper output;

public TestBasic(ITestOutputHelper output, DefaultSuiteContext context) : base(context)
{
this.output = output;
Console.SetOut(new TestBase.ConsoleWriter(output));
}

[Fact]
public void TestConnectedServer()
Expand Down Expand Up @@ -1589,37 +1597,17 @@ public void TestAsyncSubHandlerAPI()
[Fact]
public void TestUrlArgument()
{
string url1 = Context.DefaultServer.Url;
string url1 = Context.Server2.Url;
string url2 = Context.Server3.Url;
string url3 = Context.Server4.Url;

string urls = url1 + "," + url2 + "," + url3;

using (NATSServer.CreateFastAndVerify())
{
using (var c = Context.ConnectionFactory.CreateConnection(urls))
{
Assert.Equal(c.Opts.Servers[0], url1);
Assert.Equal(c.Opts.Servers[1], url2);
Assert.Equal(c.Opts.Servers[2], url3);

c.Close();
}

urls = url1 + " , " + url2 + "," + url3;
using (var c = Context.ConnectionFactory.CreateConnection(urls))
{
Assert.Equal(c.Opts.Servers[0], url1);
Assert.Equal(c.Opts.Servers[1], url2);
Assert.Equal(c.Opts.Servers[2], url3);
c.Close();
}

using (var c = Context.ConnectionFactory.CreateConnection(url1))
{
c.Close();
}
}
Options opts = ConnectionFactory.GetDefaultOptions();
opts.Url = urls;
Assert.Equal(opts.Servers[0], url1);
Assert.Equal(opts.Servers[1], url2);
Assert.Equal(opts.Servers[2], url3);
}

private bool assureClusterFormed(IConnection c, int serverCount)
Expand Down Expand Up @@ -1912,7 +1900,6 @@ public void TestSimpleUrlArgument()
using (Context.ConnectionFactory.CreateConnection(o)) { }

// servers with a simple hostname
o.Url = null;
o.Servers = new string[] { "127.0.0.1" };
using (var cn = Context.ConnectionFactory.CreateConnection(o))
cn.Close();
Expand All @@ -1926,8 +1913,7 @@ public void TestSimpleUrlArgument()
using (Context.ConnectionFactory.CreateConnection(o)) { }

// servers with multiple hosts
o.Url = null;
o.Servers = new string[] { "127.0.0.1", "localhost" };
o.Servers = new [] { "127.0.0.1", "localhost" };
using (var cn = Context.ConnectionFactory.CreateConnection(o))
cn.Close();
}
Expand Down
33 changes: 33 additions & 0 deletions src/Tests/IntegrationTests/TestEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using NATS.Client;

namespace IntegrationTests
Expand Down Expand Up @@ -32,5 +33,37 @@ public EventHandler<StatusEventArgs> PullStatusErrorEventHandler
options.PullStatusWarningEventHandler = PullStatusWarningEventHandler;
options.PullStatusErrorEventHandler = PullStatusErrorEventHandler;
};

public bool PullStatusWarningOrWait(String contains, long timeout)
{
Stopwatch sw = Stopwatch.StartNew();
int i = 0;
do {
int count = PullStatusWarningEvents.Count;
for (; i < count; i++) {
if (PullStatusWarningEvents[i].Status.Message.Contains(contains)) {
return true;
}
}
}
while (sw.ElapsedMilliseconds <= timeout);
return false;
}

public bool PullStatusErrorOrWait(String contains, long timeout)
{
Stopwatch sw = Stopwatch.StartNew();
int i = 0;
do {
int count = PullStatusErrorEvents.Count;
for (; i < count; i++) {
if (PullStatusErrorEvents[i].Status.Message.Contains(contains)) {
return true;
}
}
}
while (sw.ElapsedMilliseconds <= timeout);
return false;
}
}
}
Loading