Skip to content

Commit

Permalink
Added and updated Samples (#800)
Browse files Browse the repository at this point in the history
* Added JetStreamPushSubscribeAsyncQueueDurable and updated JetStreamPushSubscribeQueueDurable

* Added JetStreamPushSubscribeAsyncQueueDurable and updated JetStreamPushSubscribeQueueDurable
  • Loading branch information
scottf authored Jul 14, 2023
1 parent 5f64a92 commit 06cdeab
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 21 deletions.
7 changes: 7 additions & 0 deletions src/NATS.sln
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimplificationNext", "Sampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimplificationContext", "Samples\SimplificationContext\SimplificationContext.csproj", "{C7FB00D4-23F1-4F6A-A8C0-E58346F272DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JetStreamPushSubscribeAsyncQueueDurable", "Samples\JetStreamPushSubscribeAsyncQueueDurable\JetStreamPushSubscribeAsyncQueueDurable.csproj", "{5DCD0666-5AC9-462F-99BD-5E8E95E4B749}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -311,6 +313,10 @@ Global
{C7FB00D4-23F1-4F6A-A8C0-E58346F272DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C7FB00D4-23F1-4F6A-A8C0-E58346F272DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C7FB00D4-23F1-4F6A-A8C0-E58346F272DE}.Release|Any CPU.Build.0 = Release|Any CPU
{5DCD0666-5AC9-462F-99BD-5E8E95E4B749}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5DCD0666-5AC9-462F-99BD-5E8E95E4B749}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5DCD0666-5AC9-462F-99BD-5E8E95E4B749}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5DCD0666-5AC9-462F-99BD-5E8E95E4B749}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -363,6 +369,7 @@ Global
{77526973-BA44-412F-BDAF-BF86BDE58F5F} = {776C2E80-958B-4C0D-BCC4-67D39DB4570B}
{78F922E7-1238-4DE8-9BCF-6D09B069F810} = {776C2E80-958B-4C0D-BCC4-67D39DB4570B}
{C7FB00D4-23F1-4F6A-A8C0-E58346F272DE} = {776C2E80-958B-4C0D-BCC4-67D39DB4570B}
{5DCD0666-5AC9-462F-99BD-5E8E95E4B749} = {776C2E80-958B-4C0D-BCC4-67D39DB4570B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0972F03C-E4DF-483A-B767-9216F4434F11}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using NATS.Client;
using NATS.Client.JetStream;

namespace NATSExamples
{
internal static class JetStreamPushSubscribeAsyncQueueDurable
{
private const string Usage =
"Usage: JetStreamPushSubscribeAsyncQueueDurable [-url url] [-creds file] [-stream stream] " +
"[-subject subject] [-queue queue] [-durable durable] [-deliver deliverSubject] [-count count] [-subscount numsubs]" +
"\n\nDefault Values:" +
"\n [-stream] qdur-stream" +
"\n [-subject] qdur-subject" +
"\n [-queue] qdur-queue" +
"\n [-durable] qdur-durable" +
"\n [-count] 10000" +
"\n [-subscount] 5";

// THIS FLAG WILL DETERMINE IF THE CONSUMER IS CREATED AHEAD OF TIME. SEE CODE BELOW
static bool CreateConsumerAheadOfTime = true;

public static void Main(string[] args)
{
ArgumentHelper helper = new ArgumentHelperBuilder("NATS JetStream Push Subscribe Queue Durable", args, Usage)
.DefaultStream("qdur-stream")
.DefaultSubject("qdur-subject")
.DefaultQueue("qdur-queue")
.DefaultDurable("qdur-durable")
.DefaultDeliverSubject("qdur-deliver")
.DefaultCount(10000)
.DefaultSubsCount(5)
.Build();

try
{
using (IConnection c = new ConnectionFactory().CreateConnection(helper.MakeOptions()))
{
// Create a JetStreamManagement context.
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();

// Use the utility to create a stream stored in memory.
JsUtils.CreateOrReplaceStream(jsm, helper.Stream, helper.Subject);

IJetStream js = c.CreateJetStreamContext();

Console.WriteLine();

PushSubscribeOptions pso; // to be set later;

if (CreateConsumerAheadOfTime)
{
// CREATE THE CONSUMER AHEAD OF TIME.
// This is generally preferred with durable consumers.
ConsumerConfiguration cc = ConsumerConfiguration.Builder()
.WithDurable(helper.Durable)
.WithDeliverSubject(helper.DeliverSubject)
.WithDeliverGroup(helper.Queue)
.WithFilterSubject(helper.Subject)
.Build();
jsm.AddOrUpdateConsumer(helper.Stream, cc);

// we will just bind to that consumer
pso = PushSubscribeOptions.BindTo(helper.Stream, helper.Durable);
}
else
{
// CREATE THE CONSUMER ON THE FLY
// Will only be created the first time, the subsequent subscribes will find it already made
pso = PushSubscribeOptions.Builder()
.WithConfiguration(ConsumerConfiguration.Builder()
.WithDurable(helper.Durable)
.WithDeliverGroup(helper.Queue)
.WithFilterSubject(helper.Subject)
.Build()
).Build();
}

CountdownEvent latch = new CountdownEvent(helper.Count);
IList<JsQueueSubscriber> subscribers = new List<JsQueueSubscriber>();
IList<IJetStreamPushAsyncSubscription> subs = new List<IJetStreamPushAsyncSubscription>();
int readCount = helper.Count / helper.SubsCount;
Console.WriteLine($"Each of the {helper.Count} queue subscriptions will receive about {readCount} messages.");

for (int id = 1; id <= helper.SubsCount; id++) {
// create and track class with the handler
JsQueueSubscriber qs = new JsQueueSubscriber(id, helper.Count, js, latch);
subscribers.Add(qs);

// setup the subscription
IJetStreamPushAsyncSubscription sub = js.PushSubscribeAsync(helper.Subject, helper.Queue,qs.Handler(), false, pso);
subs.Add(sub); // just keeping a reference around
}
c.Flush(500); // flush outgoing communication with/to the server

// create and start the publishing
Thread pubThread = new Thread(() =>
{
for (int x = 1; x <= helper.Count; x++)
{
js.Publish(helper.Subject, Encoding.ASCII.GetBytes("Data # " + x));
}

});
pubThread.Start();

// wait for publish to finish and then wait for all messages to be received.
pubThread.Join(10000);
latch.Wait(60000);

foreach (JsQueueSubscriber qs in subscribers)
{
qs.Report();
}

Console.WriteLine();

// delete the stream since we are done with it.
jsm.DeleteStream(helper.Stream);
}
}
catch (Exception ex)
{
helper.ReportException(ex);
}
}
}

class JsQueueSubscriber
{
private int id;
int thisReceived;
int msgCount;
CountdownEvent latch;
public IList<string> datas;

public JsQueueSubscriber(int id, int msgCount, IJetStream js, CountdownEvent latch)
{
this.id = id;
this.msgCount = msgCount;
this.latch = latch;
thisReceived = 0;
datas = new List<string>();
}

public void Report() {
Console.WriteLine($"Sub # {id} handled {thisReceived} messages.");
}

public EventHandler<MsgHandlerEventArgs> Handler()
{
return (s, e) =>
{
thisReceived++;
latch.Signal(1);
string data = Encoding.UTF8.GetString(e.Message.Data);
datas.Add(data);
Console.WriteLine($"QS # {id} message # {thisReceived} {data}");
e.Message.Ack();
};
}
}

class InterlockedLong
{
private long count;

public InterlockedLong() {}

public InterlockedLong(long start)
{
this.count = start;
}

public void Set(long l)
{
Interlocked.Exchange(ref count, l);
}

public long Increment()
{
return Interlocked.Increment(ref count);
}

public long Read()
{
return Interlocked.Read(ref count);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<Title>NATS JetStream Push Subscribe Async Queue Durable</Title>
<Description>NATS JetStream Push Subscribe Async Queue Durable Example</Description>
<IsPackable>false</IsPackable>
<RootNamespace>NATSExamples</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\NATS.Client\NATS.Client.csproj" />
<ProjectReference Include="..\JetStreamExampleUtils\JetStreamExampleUtils.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ internal static class JetStreamPushSubscribeQueueDurable
"\n [-subject] qdur-subject" +
"\n [-queue] qdur-queue" +
"\n [-durable] qdur-durable" +
"\n [-count] 100" +
"\n [-count] 10000" +
"\n [-subscount] 5";


// THIS FLAG WILL DETERMINE IF THE CONSUMER IS CREATED AHEAD OF TIME. SEE CODE BELOW
static bool CreateConsumerAheadOfTime = true;

public static void Main(string[] args)
{
ArgumentHelper helper = new ArgumentHelperBuilder("NATS JetStream Push Subscribe Queue Durable", args, Usage)
Expand All @@ -41,7 +44,7 @@ public static void Main(string[] args)
.DefaultQueue("qdur-queue")
.DefaultDurable("qdur-durable")
.DefaultDeliverSubject("qdur-deliver")
.DefaultCount(100)
.DefaultCount(10000)
.DefaultSubsCount(5)
.Build();

Expand All @@ -53,32 +56,54 @@ public static void Main(string[] args)
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();

// Use the utility to create a stream stored in memory.
JsUtils.CreateStreamExitWhenExists(jsm, helper.Stream, helper.Subject);
JsUtils.CreateOrReplaceStream(jsm, helper.Stream, helper.Subject);

IJetStream js = c.CreateJetStreamContext();

Console.WriteLine();

// create the consumer ahead of time
ConsumerConfiguration cc = ConsumerConfiguration.Builder()
.WithDurable(helper.Durable)
.WithDeliverSubject(helper.DeliverSubject)
.WithDeliverGroup(helper.Queue)
.Build();
jsm.AddOrUpdateConsumer(helper.Stream, cc);
PushSubscribeOptions pso; // to be set later;

if (CreateConsumerAheadOfTime)
{
// CREATE THE CONSUMER AHEAD OF TIME.
// This is generally preferred with durable consumers.
ConsumerConfiguration cc = ConsumerConfiguration.Builder()
.WithDurable(helper.Durable)
.WithDeliverSubject(helper.DeliverSubject)
.WithDeliverGroup(helper.Queue)
.WithFilterSubject(helper.Subject)
.Build();
jsm.AddOrUpdateConsumer(helper.Stream, cc);

// we will just bind to that consumer
pso = PushSubscribeOptions.BindTo(helper.Stream, helper.Durable);
}
else
{
// CREATE THE CONSUMER ON THE FLY
// Will only be created the first time, the subsequent subscribes will find it already made
pso = PushSubscribeOptions.Builder()
.WithConfiguration(ConsumerConfiguration.Builder()
.WithDurable(helper.Durable)
.WithDeliverGroup(helper.Queue)
.WithFilterSubject(helper.Subject)
.Build()
).Build();
}

// we will just bind to that consumer
PushSubscribeOptions pso = PushSubscribeOptions.BindTo(helper.Stream, helper.Durable);

InterlockedLong allReceived = new InterlockedLong();
IList<JsQueueSubscriber> subscribers = new List<JsQueueSubscriber>();
IList<Thread> subThreads = new List<Thread>();
int readCount = helper.Count / helper.SubsCount;
Console.WriteLine($"Each of the {helper.Count} queue subscriptions will read about {readCount} messages.");

for (int id = 1; id <= helper.SubsCount; id++) {
// setup the subscription
IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(helper.Subject, helper.Queue, pso);

// create and track the runnable
JsQueueSubscriber qs = new JsQueueSubscriber(id, 100, js, sub, allReceived);
JsQueueSubscriber qs = new JsQueueSubscriber(id, helper.Count, js, sub, allReceived);
subscribers.Add(qs);

// create, track and start the thread
Expand All @@ -103,7 +128,7 @@ public static void Main(string[] args)
pubThread.Join(10000);
foreach (Thread t in subThreads)
{
t.Join(10000);
t.Join(30000);
}

foreach (JsQueueSubscriber qs in subscribers)
Expand All @@ -127,10 +152,10 @@ public static void Main(string[] args)
class JsQueueSubscriber
{
private int id;
int thisReceived;
int msgCount;
IJetStreamPushSyncSubscription sub;
InterlockedLong allReceived;
public int received;
public IList<string> datas;

public JsQueueSubscriber(int id, int msgCount, IJetStream js, IJetStreamPushSyncSubscription sub, InterlockedLong allReceived)
Expand All @@ -139,12 +164,12 @@ public JsQueueSubscriber(int id, int msgCount, IJetStream js, IJetStreamPushSync
this.msgCount = msgCount;
this.sub = sub;
this.allReceived = allReceived;
received = 0;
thisReceived = 0;
datas = new List<string>();
}

public void Report() {
Console.WriteLine($"Sub # {id} handled {received} messages.");
Console.WriteLine($"Sub # {id} handled {thisReceived} messages.");
}

public void Run()
Expand All @@ -154,14 +179,17 @@ public void Run()
try
{
Msg msg = sub.NextMessage(500);
received++;
thisReceived++;
allReceived.Increment();
datas.Add(Encoding.UTF8.GetString(msg.Data));
string data = Encoding.UTF8.GetString(msg.Data);
datas.Add(data);
Console.WriteLine($"QS # {id} message # {thisReceived} {data}");
msg.Ack();
}
catch (NATSTimeoutException)
{
// timeout is acceptable, means no messages available.
Console.WriteLine($"QS # {id} Timeout is acceptable, means no messages available.");
}
}
}
Expand Down

0 comments on commit 06cdeab

Please sign in to comment.