-
Notifications
You must be signed in to change notification settings - Fork 152
/
Copy pathMessageHandlerBase.cs
336 lines (301 loc) · 14.9 KB
/
MessageHandlerBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace StreamJsonRpc
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft;
using Microsoft.VisualStudio.Threading;
using StreamJsonRpc.Protocol;
/// <summary>
/// An abstract base class for for sending and receiving messages.
/// </summary>
/// <remarks>
/// This class and its derivatives are safe to call from any thread.
/// Calls to <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/>
/// are protected by a semaphore to guarantee message integrity
/// and may be made from any thread.
/// The caller must take care to call <see cref="ReadAsync(CancellationToken)"/> sequentially.
/// </remarks>
public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableObservable, Microsoft.VisualStudio.Threading.IAsyncDisposable
{
/// <summary>
/// The source of a token that is canceled when this instance is disposed.
/// </summary>
private readonly CancellationTokenSource disposalTokenSource = new CancellationTokenSource();
/// <summary>
/// A semaphore acquired while sending a message.
/// </summary>
private readonly AsyncSemaphore sendingSemaphore = new AsyncSemaphore(1);
/// <summary>
/// The sync object to lock when inspecting and mutating the <see cref="state"/> field.
/// </summary>
private readonly object syncObject = new object();
/// <summary>
/// A signal that the last read operation has completed.
/// </summary>
private readonly AsyncManualResetEvent readingCompleted = new AsyncManualResetEvent();
/// <summary>
/// A signal that the last write operation has completed.
/// </summary>
private readonly AsyncManualResetEvent writingCompleted = new AsyncManualResetEvent();
/// <summary>
/// A value indicating whether the <see cref="ReadAsync(CancellationToken)"/> and/or <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/> methods are in progress.
/// </summary>
private MessageHandlerState state;
/// <summary>
/// Initializes a new instance of the <see cref="MessageHandlerBase"/> class.
/// </summary>
/// <param name="formatter">The formatter used to serialize messages.</param>
public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
{
Requires.NotNull(formatter, nameof(formatter));
this.Formatter = formatter;
Task readerDisposal = this.readingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeReader(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
Task writerDisposal = this.writingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeWriter(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
this.Completion = Task.WhenAll(readerDisposal, writerDisposal).ContinueWith((_, s) => ((MessageHandlerBase)s).Dispose(true), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
[Flags]
private enum MessageHandlerState
{
/// <summary>
/// No flags.
/// </summary>
None = 0,
/// <summary>
/// Indicates that the <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/> method is in running.
/// </summary>
Writing = 0x1,
/// <summary>
/// Indicates that the <see cref="ReadAsync(CancellationToken)"/> method is in running.
/// </summary>
Reading = 0x2,
}
/// <summary>
/// Gets a value indicating whether this message handler can receive messages.
/// </summary>
public abstract bool CanRead { get; }
/// <summary>
/// Gets a value indicating whether this message handler can send messages.
/// </summary>
public abstract bool CanWrite { get; }
/// <inheritdoc/>
public IJsonRpcMessageFormatter Formatter { get; }
/// <summary>
/// Gets a value indicating whether this instance has been disposed.
/// </summary>
bool IDisposableObservable.IsDisposed => this.DisposalToken.IsCancellationRequested;
/// <summary>
/// Gets a token that is canceled when this instance is disposed.
/// </summary>
protected CancellationToken DisposalToken => this.disposalTokenSource.Token;
/// <summary>
/// Gets a task that completes when this instance has completed disposal.
/// </summary>
private Task Completion { get; }
/// <summary>
/// Reads a distinct and complete message from the transport, waiting for one if necessary.
/// </summary>
/// <param name="cancellationToken">A token to cancel the read request.</param>
/// <returns>The received message, or <c>null</c> if the underlying transport ends before beginning another message.</returns>
/// <exception cref="InvalidOperationException">Thrown when <see cref="CanRead"/> returns <c>false</c>.</exception>
/// <exception cref="System.IO.EndOfStreamException">Thrown if the transport ends while reading a message.</exception>
/// <exception cref="OperationCanceledException">Thrown if <paramref name="cancellationToken"/> is canceled before a new message is received.</exception>
/// <remarks>
/// Implementations may assume this method is never called before any async result
/// from a prior call to this method has completed.
/// </remarks>
public async ValueTask<JsonRpcMessage?> ReadAsync(CancellationToken cancellationToken)
{
Verify.Operation(this.CanRead, "No receiving stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
this.SetState(MessageHandlerState.Reading);
try
{
JsonRpcMessage? result = await this.ReadCoreAsync(cancellationToken).ConfigureAwait(false);
return result;
}
catch (InvalidOperationException ex) when (cancellationToken.IsCancellationRequested)
{
// PipeReader.ReadAsync can throw InvalidOperationException in a race where PipeReader.Complete() has been
// called but we haven't noticed the CancellationToken was canceled yet.
throw new OperationCanceledException("Reading failed during cancellation.", ex, cancellationToken);
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
lock (this.syncObject)
{
this.state &= ~MessageHandlerState.Reading;
if (this.DisposalToken.IsCancellationRequested)
{
this.readingCompleted.Set();
}
}
}
}
/// <summary>
/// Writes a message to the transport and flushes.
/// </summary>
/// <param name="content">The message to write.</param>
/// <param name="cancellationToken">A token to cancel the write request.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown when <see cref="CanWrite"/> returns <c>false</c>.</exception>
/// <exception cref="OperationCanceledException">Thrown if <paramref name="cancellationToken"/> is canceled before message transmission begins.</exception>
/// <remarks>
/// Implementations should expect this method to be invoked concurrently
/// and use a queue to preserve message order as they are transmitted one at a time.
/// </remarks>
public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken cancellationToken)
{
Requires.NotNull(content, nameof(content));
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();
try
{
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
{
this.SetState(MessageHandlerState.Writing);
try
{
cancellationToken.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
lock (this.syncObject)
{
this.state &= ~MessageHandlerState.Writing;
if (this.DisposalToken.IsCancellationRequested)
{
this.writingCompleted.Set();
}
}
}
}
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
}
#pragma warning disable VSTHRD002 // We synchronously block, but nothing here should ever require the main thread.
/// <summary>
/// Disposes this instance, and cancels any pending read or write operations.
/// </summary>
[Obsolete("Call IAsyncDisposable.DisposeAsync instead.")]
public void Dispose() => this.DisposeAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
/// <summary>
/// Disposes this instance, and cancels any pending read or write operations.
/// </summary>
public virtual async Task DisposeAsync()
{
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
// Kick off disposal of reading and/or writing resources based on whether they're active right now or not.
// If they're active, they'll take care of themselves when they finish since we signaled disposal.
lock (this.syncObject)
{
if (!this.state.HasFlag(MessageHandlerState.Reading))
{
this.readingCompleted.Set();
}
if (!this.state.HasFlag(MessageHandlerState.Writing))
{
this.writingCompleted.Set();
}
}
// Wait for completion to actually complete, and re-throw any exceptions.
await this.Completion.ConfigureAwait(false);
this.Dispose(true);
}
}
/// <summary>
/// Disposes resources allocated by this instance that are common to both reading and writing.
/// </summary>
/// <param name="disposing"><c>true</c> when being disposed; <c>false</c> when being finalized.</param>
/// <remarks>
/// <para>
/// This method is called by <see cref="DisposeAsync"/> after both <see cref="DisposeReader"/> and <see cref="DisposeWriter"/> have completed.
/// </para>
/// <para>Overrides of this method *should* call the base method as well.</para>
/// </remarks>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
(this.Formatter as IDisposable)?.Dispose();
GC.SuppressFinalize(this);
}
}
/// <summary>
/// Disposes resources allocated by this instance that are used for reading (not writing).
/// </summary>
/// <remarks>
/// This method is called by <see cref="MessageHandlerBase"/> after the last read operation has completed.
/// </remarks>
protected virtual void DisposeReader()
{
}
/// <summary>
/// Disposes resources allocated by this instance that are used for writing (not reading).
/// </summary>
/// <remarks>
/// This method is called by <see cref="MessageHandlerBase"/> after the last write operation has completed.
/// Overrides of this method *should* call the base method as well.
/// </remarks>
protected virtual void DisposeWriter()
{
this.sendingSemaphore.Dispose();
}
/// <summary>
/// Reads a distinct and complete message, waiting for one if necessary.
/// </summary>
/// <param name="cancellationToken">A token to cancel the read request.</param>
/// <returns>
/// A task whose result is the received message.
/// A null string indicates the stream has ended.
/// An empty string should never be returned.
/// </returns>
protected abstract ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken);
/// <summary>
/// Writes a message.
/// </summary>
/// <param name="content">The message to write.</param>
/// <param name="cancellationToken">A token to cancel the transmission.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
protected abstract ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken);
/// <summary>
/// Ensures that all messages transmitted up to this point are en route to their destination,
/// rather than sitting in some local buffer.
/// </summary>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>
/// A <see cref="Task"/> that completes when the write buffer has been transmitted,
/// or at least that the operation is in progress, if final transmission cannot be tracked.
/// </returns>
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);
private void SetState(MessageHandlerState startingOperation)
{
lock (this.syncObject)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
}
}
}
}