Skip to content

Commit

Permalink
Fix #356: Drain should throw NATSConnectionClosedException when appro…
Browse files Browse the repository at this point in the history
…priate (#359)

Signed-off-by: Colin Sullivan <[email protected]>
  • Loading branch information
ColinSullivan1 authored Jan 29, 2020
1 parent ee6f739 commit 00fec40
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
17 changes: 7 additions & 10 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3875,6 +3875,13 @@ private void drain(int timeout)

lock (mu)
{
if (isClosed())
throw new NATSConnectionClosedException();

// if we're already draining, exit.
if (isDrainingSubs() || isDrainingPubs())
return;

lsubs = subs.Values;
status = ConnState.DRAINING_SUBS;
}
Expand Down Expand Up @@ -3959,11 +3966,6 @@ public void Drain(int timeout)
if (timeout <= 0)
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be greater than zero.");

lock (mu)
{
status = ConnState.DRAINING_SUBS;
}

drain(timeout);
}

Expand Down Expand Up @@ -4004,11 +4006,6 @@ public Task DrainAsync(int timeout)
if (timeout <= 0)
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be greater than zero.");

lock (mu)
{
status = ConnState.DRAINING_SUBS;
}

return Task.Run(() => drain(timeout));
}

Expand Down
9 changes: 9 additions & 0 deletions src/Tests/IntegrationTests/TestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,7 @@ public async Task TestDrainStateBehavior()
{
closed.Set();
};

using (var c = Context.ConnectionFactory.CreateConnection(opts))
{
using (c.SubscribeAsync("foo", (obj, args) =>
Expand All @@ -1109,6 +1110,8 @@ public async Task TestDrainStateBehavior()
// give us a long timeout to run our test.
var drainTask = c.DrainAsync(10000);

// Sleep a bit to ensure the drain task is running.
Thread.Sleep(100);
Assert.True(c.State == ConnState.DRAINING_SUBS);
Assert.True(c.IsDraining());

Expand All @@ -1124,6 +1127,12 @@ public async Task TestDrainStateBehavior()
Assert.True(closed.WaitOne(10000));
}
}

// Now test connection state checking in drain after being closed via API.
var conn = Context.ConnectionFactory.CreateConnection(opts);
conn.Close();
_ = Assert.Throws<NATSConnectionClosedException>(() => conn.Drain());
await Assert.ThrowsAsync<NATSConnectionClosedException>(() => { return conn.DrainAsync(); });
}
}

Expand Down

0 comments on commit 00fec40

Please sign in to comment.