Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Message.Respond implementing #281 #283

Merged
merged 3 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions NATS.Client/AsyncSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ internal void enableAsyncProcessing()

internal void disableAsyncProcessing()
{
if (msgFeeder != null)
lock (mu)
{
mch.close();
msgFeeder = null;
if (msgFeeder != null)
{
mch.close();
msgFeeder = null;
}
MessageHandler = null;
started = false;
}
MessageHandler = null;
started = false;
}

/// <summary>
Expand All @@ -155,11 +158,14 @@ public void Start()
if (started)
return;

if (conn == null)
throw new NATSBadSubscriptionException();
lock (mu)
{
if (conn == null)
throw new NATSBadSubscriptionException();

conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
}
}

/// <summary>
Expand Down
1 change: 0 additions & 1 deletion NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3485,7 +3485,6 @@ internal virtual void removeSub(Subscription s)
s.mch = null;
}

s.conn = null;
s.closed = true;
}

Expand Down
25 changes: 25 additions & 0 deletions NATS.Client/Msg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,31 @@ public ISubscription ArrivalSubcription
get { return sub; }
}

/// <summary>
/// Send a response to the message on the arrival subscription.
/// </summary>
/// <param name="data">The response payload to send.</param>
/// <exception cref="NATSException">
/// <para><see cref="Reply"/> is null or empty.</para>
/// <para>-or-</para>
/// <para><see cref="ArrivalSubcription"/> is null.</para>
/// </exception>
public void Respond(byte[] data)
{
if (String.IsNullOrEmpty(Reply))
{
throw new NATSException("No Reply subject");
}

Connection conn = ArrivalSubcription?.Connection;
if (conn == null)
{
throw new NATSException("Message is not bound to a subscription");
}

conn.Publish(this.Reply, data);
}

/// <summary>
/// Generates a string representation of the messages.
/// </summary>
Expand Down
19 changes: 11 additions & 8 deletions NATS.Client/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,22 @@ public bool IsValid
{
lock (mu)
{
return (conn != null);
return (conn != null) && !closed;
}
}
}

internal void unsubscribe(bool throwEx)
{
Connection c;
bool isClosed;
lock (mu)
{
c = this.conn;
isClosed = this.closed;
}

if (c == null)
if (c == null || closed)
{
if (throwEx)
throw new NATSBadSubscriptionException();
Expand Down Expand Up @@ -281,7 +283,7 @@ public virtual void AutoUnsubscribe(int max)

lock (mu)
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

c = conn;
Expand All @@ -301,7 +303,7 @@ public int QueuedMessageCount
{
lock (mu)
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

return mch.Count;
Expand Down Expand Up @@ -332,6 +334,9 @@ protected virtual void Dispose(bool disposing)
// auto unsubscribing, so ignore.
}

conn = null;
closed = true;

disposedValue = true;
}
}
Expand Down Expand Up @@ -370,9 +375,8 @@ public override string ToString()

private void checkState()
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

}

/// <summary>
Expand Down Expand Up @@ -573,7 +577,7 @@ internal Task InternalDrain(int timeout)

lock (mu)
{
if (conn == null)
if (conn == null || closed)
throw new NATSBadSubscriptionException();

c = conn;
Expand All @@ -595,7 +599,6 @@ public Task DrainAsync(int timeout)
return InternalDrain(timeout);
}


public void Drain()
{
Drain(Defaults.DefaultDrainTimeout);
Expand Down
65 changes: 64 additions & 1 deletion NATSUnitTests/UnitTestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ public void TestAsyncSubscriptionPending()
}
}


[Fact]
public void TestAsyncPendingSubscriptionBatchSizeExactlyOne()
{
Expand Down Expand Up @@ -1040,6 +1039,70 @@ public void TestInvalidSubjects()
}
}
}

[Fact]
public void TestRespond()
{
using (new NATSServer())
watfordgnf marked this conversation as resolved.
Show resolved Hide resolved
{
using (IConnection c = utils.DefaultTestConnection)
{
ISyncSubscription s = c.SubscribeSync("foo");

string replyTo = c.NewInbox();
ISyncSubscription r = c.SubscribeSync(replyTo);

c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

Msg m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

byte[] reply = Encoding.UTF8.GetBytes("reply");
m.Respond(reply);

m = r.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Subject);
Assert.Equal(reply, m.Data);

s.Unsubscribe();
r.Unsubscribe();
}
}
}

[Fact]
public void TestRespondWithAutoUnsubscribe()
{
using (new NATSServer())
{
using (IConnection c = utils.DefaultTestConnection)
{
ISyncSubscription s = c.SubscribeSync("foo");
s.AutoUnsubscribe(1);

string replyTo = c.NewInbox();
ISyncSubscription r = c.SubscribeSync(replyTo);

c.Publish("foo", replyTo, Encoding.UTF8.GetBytes("message"));

Msg m = s.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Reply);

byte[] reply = Encoding.UTF8.GetBytes("reply");
m.Respond(reply);

m = r.NextMessage(1000);
Assert.NotNull(m);
Assert.Equal(replyTo, m.Subject);
Assert.Equal(reply, m.Data);

r.Unsubscribe();
}
}
}
}
}