Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement republish #661

Merged
merged 3 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ internal static class ApiConstants
internal const string DenyDelete = "deny_delete";
internal const string DenyPurge = "deny_purge";
internal const string Description = "description";
internal const string Dest = "dest";
internal const string Discard = "discard";
internal const string Domain = "domain";
internal const string Duplicate = "duplicate";
Expand Down Expand Up @@ -121,6 +122,7 @@ internal static class ApiConstants
internal const string OptStartSeq = "opt_start_seq";
internal const string OptStartTime = "opt_start_time";
internal const string Placement = "placement";
internal const string Republish = "republish";
internal const string Port = "port";
internal const string Proto = "proto";
internal const string Purged = "purged";
Expand All @@ -137,6 +139,7 @@ internal static class ApiConstants
internal const string ServerName = "server_name";
internal const string Source = "source";
internal const string Sources = "sources";
internal const string Src = "src";
internal const string State = "state";
internal const string Storage = "storage";
internal const string StorageMaxStreamBytes = "storage_max_stream_bytes";
Expand Down
132 changes: 132 additions & 0 deletions src/NATS.Client/JetStream/Republish.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;
using static NATS.Client.Internals.JsonUtils;

namespace NATS.Client.JetStream
{
/// <summary>
/// Republish options for a stream
/// </summary>
public sealed class Republish : JsonSerializable
{
/// <summary>
/// The Published Subject-matching filter
/// </summary>
public string Source { get; }

/// <summary>
/// The RePublish Subject template
/// </summary>
public string Destination { get; }

/// <summary>
/// Whether to RePublish only headers (no body)
/// </summary>
public bool HeadersOnly { get; }

internal static Republish OptionalInstance(JSONNode republishNode)
{
return republishNode.Count == 0 ? null : new Republish(republishNode);
}

private Republish(JSONNode republishNode)
{
Source = republishNode[ApiConstants.Src].Value;
Destination = republishNode[ApiConstants.Dest].Value;
HeadersOnly = republishNode[ApiConstants.HeadersOnly].AsBool;
}

/// <summary>
/// Construct the Republish object
/// </summary>
/// <param name="source">the Published Subject-matching filter</param>
/// <param name="destination">the RePublish Subject template</param>
/// <param name="headersOnly">Whether to RePublish only headers (no body)</param>
public Republish(string source, string destination, bool headersOnly)
{
Source = source;
Destination = destination;
HeadersOnly = headersOnly;
}

internal override JSONNode ToJsonNode()
{
JSONObject o = new JSONObject();
o[ApiConstants.Src] = Source;
o[ApiConstants.Dest] = Destination;
AddField(o, ApiConstants.HeadersOnly, HeadersOnly);
return o;
}

/// <summary>
/// Creates a builder for a republish object.
/// </summary>
/// <returns>The Builder</returns>
public static RepublishBuilder Builder() {
return new RepublishBuilder();
}

/// <summary>
/// Republish can be created using a RepublishBuilder.
/// </summary>
public sealed class RepublishBuilder {
private string _source;
private string _destination;
private bool _headersOnly;

/// <summary>
/// Set the Published Subject-matching filter.
/// </summary>
/// <param name="source">the source</param>
/// <returns></returns>
public RepublishBuilder WithSource(string source) {
_source = source;
return this;
}

/// <summary>
/// Set the RePublish Subject template
/// </summary>
/// <param name="destination">the destination</param>
/// <returns></returns>
public RepublishBuilder WithDestination(string destination) {
_destination = destination;
return this;
}

/// <summary>
/// Set Whether to RePublish only headers (no body)
/// </summary>
/// <param name="headersOnly">the headers only flag</param>
/// <returns></returns>
public RepublishBuilder WithHeadersOnly(bool headersOnly) {
_headersOnly = headersOnly;
return this;
}

/// <summary>
/// Build a Republish object
/// </summary>
/// <returns>The Republish</returns>
public Republish Build() {
Validator.Required(_source, "Source");
Validator.Required(_destination, "Destination");
return new Republish(_source, _destination, _headersOnly);
}
}
}
}
16 changes: 16 additions & 0 deletions src/NATS.Client/JetStream/StreamConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public sealed class StreamConfiguration : JsonSerializable
public DiscardPolicy DiscardPolicy { get; }
public Duration DuplicateWindow { get; }
public Placement Placement { get; }
public Republish Republish { get; }
public Mirror Mirror { get; }
public List<Source> Sources { get; }
public bool Sealed { get; }
Expand Down Expand Up @@ -70,6 +71,7 @@ internal StreamConfiguration(JSONNode scNode)
TemplateOwner = scNode[ApiConstants.TemplateOwner].Value;
DuplicateWindow = AsDuration(scNode, ApiConstants.DuplicateWindow, Duration.Zero);
Placement = Placement.OptionalInstance(scNode[ApiConstants.Placement]);
Republish = Republish.OptionalInstance(scNode[ApiConstants.Republish]);
Mirror = Mirror.OptionalInstance(scNode[ApiConstants.Mirror]);
Sources = Source.OptionalListOf(scNode[ApiConstants.Sources]);
Sealed = scNode[ApiConstants.Sealed].AsBool;
Expand Down Expand Up @@ -98,6 +100,7 @@ private StreamConfiguration(StreamConfigurationBuilder builder)
DiscardPolicy = builder._discardPolicy;
DuplicateWindow = builder._duplicateWindow;
Placement = builder._placement;
Republish = builder._republish;
Mirror = builder._mirror;
Sources = builder._sources;
Sealed = builder._sealed;
Expand Down Expand Up @@ -136,6 +139,7 @@ internal override JSONNode ToJsonNode()
[ApiConstants.TemplateOwner] = TemplateOwner,
[ApiConstants.DuplicateWindow] = DuplicateWindow.Nanos,
[ApiConstants.Placement] = Placement?.ToJsonNode(),
[ApiConstants.Republish] = Republish?.ToJsonNode(),
[ApiConstants.Mirror] = Mirror?.ToJsonNode(),
[ApiConstants.Sources] = sources,
[ApiConstants.Sealed] = Sealed,
Expand Down Expand Up @@ -175,6 +179,7 @@ public sealed class StreamConfigurationBuilder
internal DiscardPolicy _discardPolicy = DiscardPolicy.Old;
internal Duration _duplicateWindow = Duration.Zero;
internal Placement _placement;
internal Republish _republish;
internal Mirror _mirror;
internal readonly List<Source> _sources = new List<Source>();
internal bool _sealed;
Expand Down Expand Up @@ -205,6 +210,7 @@ public StreamConfigurationBuilder(StreamConfiguration sc)
_discardPolicy = sc.DiscardPolicy;
_duplicateWindow = sc.DuplicateWindow;
_placement = sc.Placement;
_republish = sc.Republish;
_mirror = sc.Mirror;
WithSources(sc.Sources);
_sealed = sc.Sealed;
Expand Down Expand Up @@ -452,6 +458,16 @@ public StreamConfigurationBuilder WithPlacement(Placement placement) {
return this;
}

/// <summary>
/// Sets the republish object
/// </summary>
/// <param name="republish">the republish object</param>
/// <returns>The StreamConfigurationBuilder</returns>
public StreamConfigurationBuilder WithRepublish(Republish republish) {
_republish = republish;
return this;
}

/// <summary>
/// Sets the mirror object
/// </summary>
Expand Down
7 changes: 6 additions & 1 deletion src/Tests/UnitTests/Data/StreamConfiguration.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "sname",
"description": "blah blah",
"subjects": ["foo", "bar"],
"subjects": ["foo", "bar", "repub.>"],
"retention": "interest",
"max_consumers": 730,
"max_msgs": 731,
Expand All @@ -24,6 +24,11 @@
"cluster": "clstr",
"tags": ["tag1", "tag2"]
},
"republish": {
"src": "repub.>",
"dest": "dest.>",
"headers_only": true
},
"mirror": {
"name": "eman",
"opt_start_seq": 736,
Expand Down
12 changes: 9 additions & 3 deletions src/Tests/UnitTests/JetStream/TestStreamConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace UnitTests.JetStream
{
public class TestStreamConfiguration : TestBase
{
private StreamConfiguration getTestConfiguration() {
private StreamConfiguration GetTestConfiguration() {
String json = ReadDataFile("StreamConfiguration.json");
return new StreamConfiguration(json);
}
Expand All @@ -35,7 +35,7 @@ public void BuilderWorks()
[Fact]
public void TestConstruction()
{
StreamConfiguration testSc = getTestConfiguration();
StreamConfiguration testSc = GetTestConfiguration();
// from json
Validate(testSc, false);

Expand Down Expand Up @@ -208,7 +208,8 @@ private void Validate(StreamConfiguration sc, bool serverTest)
Assert.Equal("blah blah", sc.Description);
Assert.Collection(sc.Subjects,
item => item.Equals("foo"),
item => item.Equals("bar"));
item => item.Equals("bar"),
item => item.Equals("repub.>"));

Assert.Equal(RetentionPolicy.Interest, sc.RetentionPolicy);
Assert.Equal(730, sc.MaxConsumers);
Expand All @@ -225,6 +226,11 @@ private void Validate(StreamConfiguration sc, bool serverTest)
Assert.Equal("clstr", sc.Placement.Cluster);
Assert.Collection(sc.Placement.Tags, item => item.Equals("tag1"), item => item.Equals("tag2"));

Assert.NotNull(sc.Republish);
Assert.Equal("repub.>", sc.Republish.Source);
Assert.Equal("dest.>", sc.Republish.Destination);
Assert.True(sc.Republish.HeadersOnly);

DateTime zdt = AsDateTime("2020-11-05T19:33:21.163377Z");

if (serverTest)
Expand Down