-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net6.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
using System.Buffers; | ||
using System.Runtime.CompilerServices; | ||
using System.Text; | ||
using Microsoft.Extensions.Logging; | ||
using NATS.Client.Core; | ||
using NATS.Client.JetStream; | ||
using NATS.Client.JetStream.Internal; | ||
using NATS.Client.JetStream.Models; | ||
|
||
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; | ||
|
||
await using var nats = new NatsConnection(options); | ||
|
||
var js = new NatsJSContext(nats); | ||
|
||
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }); | ||
var consumer = await js.CreateConsumerAsync("s1", "c1"); | ||
await using var sub = await consumer.CreateSubscription<RawData>( | ||
controlHandler: async (thisSub, msg) => | ||
Check warning on line 19 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (dev)
Check warning on line 19 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (main)
Check warning on line 19 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / memory test (dev)
Check warning on line 19 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / memory test (main)
|
||
{ | ||
Console.WriteLine($"____"); | ||
Console.WriteLine($"{msg.Headers?.Code} {msg.Headers?.MessageText}"); | ||
var error = msg.Error?.Error; | ||
if (error != null) | ||
Console.WriteLine($"Error: {error}"); | ||
}, | ||
reconnectRequestFactory: () => default, | ||
heartBeat: TimeSpan.FromSeconds(5), | ||
opts: new NatsSubOpts { Serializer = new RawDataSerializer() }); | ||
|
||
await sub.CallMsgNextAsync(new ConsumerGetnextRequest | ||
{ | ||
Batch = 100, | ||
IdleHeartbeat = TimeSpan.FromSeconds(3).ToNanos(), | ||
Expires = TimeSpan.FromSeconds(12).ToNanos(), | ||
}); | ||
|
||
await foreach (var jsMsg in sub.Msgs.ReadAllAsync()) | ||
{ | ||
var msg = jsMsg.Msg; | ||
await jsMsg.AckAsync(); | ||
Console.WriteLine($"____"); | ||
Console.WriteLine($"subject: {msg.Subject}"); | ||
Console.WriteLine($"data: {msg.Data}"); | ||
} | ||
|
||
class RawData | ||
Check warning on line 47 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (dev)
Check warning on line 47 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (main)
Check warning on line 47 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / memory test (dev)
|
||
{ | ||
public RawData(byte[] buffer) => Buffer = buffer; | ||
|
||
public byte[] Buffer { get; } | ||
|
||
public override string ToString() => Encoding.ASCII.GetString(Buffer); | ||
} | ||
|
||
class RawDataSerializer : INatsSerializer | ||
Check warning on line 56 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (dev)
Check warning on line 56 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / dotnet (main)
Check warning on line 56 in sandbox/Example.JetStream.PullConsumer/Program.cs GitHub Actions / memory test (dev)
|
||
{ | ||
public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value) | ||
{ | ||
if (value is RawData data) | ||
{ | ||
bufferWriter.Write(data.Buffer); | ||
return data.Buffer.Length; | ||
} | ||
|
||
throw new Exception($"Can only work with '{typeof(RawData)}'"); | ||
} | ||
|
||
public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => (T?)Deserialize(buffer, typeof(T)); | ||
|
||
public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type) | ||
{ | ||
if (type != typeof(RawData)) | ||
throw new Exception($"Can only work with '{typeof(RawData)}'"); | ||
|
||
return new RawData(buffer.ToArray()); | ||
} | ||
} |