This repository has been archived by the owner on Aug 1, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 370
/
Copy pathAsyncStreamCopier.cs
537 lines (476 loc) · 21 KB
/
AsyncStreamCopier.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
//-----------------------------------------------------------------------
// <copyright file="AsyncStreamCopier.cs" company="Microsoft">
// Copyright 2013 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
//-----------------------------------------------------------------------
namespace Microsoft.WindowsAzure.Storage.Core.Util
{
using Microsoft.WindowsAzure.Storage.Core.Executor;
using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
// Class to copy streams with potentially overlapping read / writes. This uses no waithandle, extra threads, but does contain a single lock
internal class AsyncStreamCopier<T> : IDisposable
{
#region Ctors + Locals
// Note two buffers to do overlapped read and write. Once ReadBuffer is full it will be swapped over to writeBuffer and a write issued.
private byte[] currentReadBuff = null;
private byte[] currentWriteBuff = null;
private volatile int lastReadCount = -1;
private volatile int currentWriteCount = -1;
private StreamDescriptor streamCopyState = null;
// This keeps track of any exceptions that happens during the copy itself and is set on the executionState at the end.
private Exception exceptionRef = null;
// This variable keeps track of bytes that have already been read from the source stream.
// It should only be modified using Interlocked.Add and read with Interlocked.Read.
private long currentBytesReadFromSource = 0;
private long? copyLen = null;
private long? maximumLen = null;
private Stream src = null;
private Stream dest = null;
private Action<ExecutionState<T>> completedDel;
// these locals enforce a lock
private volatile IAsyncResult readRes;
private volatile IAsyncResult writeRes;
private object lockerObj = new object();
// Used for cooperative cancellation
private volatile bool cancelRequested = false;
private ExecutionState<T> state = null;
private Action previousCancellationDelegate = null;
// Used to signal copy completion
private RegisteredWaitHandle waitHandle = null;
private ManualResetEvent completedEvent = new ManualResetEvent(false);
private int completionProcessed = 0;
/// <summary>
/// Creates and initializes a new asynchronous copy operation.
/// </summary>
/// <param name="src">The source stream.</param>
/// <param name="dest">The destination stream.</param>
/// <param name="state">An ExecutionState used to coordinate copy operation.</param>
/// <param name="buffSize">Size of read and write buffers used to move data.</param>
/// <param name="calculateMd5">Boolean value indicating whether the MD-5 should be calculated.</param>
/// <param name="streamCopyState">An object that represents the state for the current operation.</param>
public AsyncStreamCopier(Stream src, Stream dest, ExecutionState<T> state, int buffSize, bool calculateMd5, StreamDescriptor streamCopyState)
{
this.src = src;
this.dest = dest;
this.state = state;
this.currentReadBuff = new byte[buffSize];
this.currentWriteBuff = new byte[buffSize];
this.streamCopyState = streamCopyState;
if (streamCopyState != null && calculateMd5 && streamCopyState.Md5HashRef == null)
{
streamCopyState.Md5HashRef = new MD5Wrapper();
}
}
#endregion
#region Publics
/// <summary>
/// Begins a stream copy operation.
/// </summary>
/// <param name="completedDelegate">Callback delegate</param>
/// <param name="copyLength">Number of bytes to copy from source stream to destination stream. Cannot be passed with a value for maxLength.</param>
/// <param name="maxLength">Maximum length of the source stream. Cannot be passed with a value for copyLength.</param>
public void StartCopyStream(Action<ExecutionState<T>> completedDelegate, long? copyLength, long? maxLength)
{
if (copyLength.HasValue && maxLength.HasValue)
{
throw new ArgumentException(SR.StreamLengthMismatch);
}
if (this.src.CanSeek && maxLength.HasValue && this.src.Length - this.src.Position > maxLength)
{
throw new InvalidOperationException(SR.StreamLengthError);
}
if (this.src.CanSeek && copyLength.HasValue && this.src.Length - this.src.Position < copyLength)
{
throw new ArgumentOutOfRangeException("copyLength", SR.StreamLengthShortError);
}
this.copyLen = copyLength;
this.maximumLen = maxLength;
this.completedDel = completedDelegate;
// If there is a max time specified start timeout timer.
if (this.state.OperationExpiryTime.HasValue)
{
this.waitHandle = ThreadPool.RegisterWaitForSingleObject(
this.completedEvent,
AsyncStreamCopier<T>.MaximumCopyTimeCallback,
this,
this.state.RemainingTimeout,
true);
}
lock (this.state.CancellationLockerObject)
{
this.previousCancellationDelegate = this.state.CancelDelegate;
this.state.CancelDelegate = this.Abort;
}
// Start first read
this.EndOpWithCatch(null);
}
/// <summary>
/// Aborts an ongoing copy operation.
/// </summary>
public void Abort()
{
this.cancelRequested = true;
AsyncStreamCopier<T>.ForceAbort(this, false);
}
/// <summary>
/// Cleans up references. To end a copy operation, use Abort().
/// </summary>
public void Dispose()
{
if (this.waitHandle != null)
{
this.waitHandle.Unregister(null);
this.waitHandle = null;
}
if (this.completedEvent != null)
{
this.completedEvent.Close();
this.completedEvent = null;
}
this.state = null;
}
#endregion
#region Privates
/// <summary>
/// Synchronizes Read and Write operations, and handles exceptions.
/// </summary>
/// <param name="res">Read/Write operation or <c>null</c> if first run.</param>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Needed to ensure exceptions are not thrown on threadpool thread.")]
private void EndOpWithCatch(IAsyncResult res)
{
// If the last op complete synchronously then ignore this callback as its caller will process next op
if (res != null && res.CompletedSynchronously)
{
return;
}
// Begins the next operation and stores any exceptions which occur
lock (this.lockerObj)
{
try
{
this.EndOperation(res);
}
catch (Exception ex)
{
if (this.state.ReqTimedOut)
{
this.exceptionRef = Exceptions.GenerateTimeoutException(this.state.Cmd != null ? this.state.Cmd.CurrentResult : null, ex);
}
else if (this.cancelRequested)
{
this.exceptionRef = Exceptions.GenerateCancellationException(this.state.Cmd != null ? this.state.Cmd.CurrentResult : null, ex);
}
else
{
this.exceptionRef = ex;
}
// if there is an outstanding read/write let it signal completion since we populated the exception.
if (this.readRes == null && this.writeRes == null)
{
this.SignalCompletion();
}
}
}
}
/// <summary>
/// Helper method for EndOpWithCatch(IAsyncResult). Begins/Ends Read and Write Stream operations.
/// Should only be called by EndOpWithCatch(IAsyncResult) since it assumes we are inside the lock.
/// </summary>
/// <param name="res">Read/Write operation or <c>null</c> if first run.</param>
private void EndOperation(IAsyncResult res)
{
// Check who made this callback
if (res != null)
{
// true is read, false is write
if ((bool)res.AsyncState)
{
// Read callback
this.ProcessEndRead();
}
else
{
// Write callback
this.ProcessEndWrite();
}
}
// While data is remaining and there are no scheduled operations, schedule next write and read
while (!this.ReachedEndOfSrc() && this.readRes == null && this.writeRes == null)
{
// Check if copying should halt
if (!this.ShouldDispatchNextOperation())
{
this.SignalCompletion();
return;
}
// If read buffer contains unwritten data, swap buffers and set currentWriteCount
if (this.ConsumeReadBuffer() > 0)
{
// Schedule write operation from the last read
this.writeRes = this.dest.BeginWrite(this.currentWriteBuff, 0, this.currentWriteCount, this.EndOpWithCatch, false /* write */);
// If this write completes synchronously, end it here to avoid stack overflow.
if (this.writeRes.CompletedSynchronously)
{
this.ProcessEndWrite();
}
}
// If data needs to be read.
int readCount = this.NextReadLength();
if (readCount != 0)
{
// Schedule read operation
this.readRes = this.src.BeginRead(this.currentReadBuff, 0, readCount, this.EndOpWithCatch, true /* read */);
// If this read completes synchronously, end it here to avoid stack overflow.
if (this.readRes.CompletedSynchronously)
{
this.ProcessEndRead();
}
}
else
{
// User requested read end here. Signal end of source.
this.lastReadCount = 0;
}
}
// If nothing more needs to be read and no write operation is scheduled, we are finished.
if (this.ReachedEndOfSrc() && this.writeRes == null)
{
if (this.exceptionRef == null && this.copyLen.HasValue && this.NextReadLength() != 0)
{
this.exceptionRef = new ArgumentOutOfRangeException("copyLength", SR.StreamLengthShortError);
}
this.SignalCompletion();
}
}
/// <summary>
/// Callback for timeout timer. Aborts the AsyncStreamCopier operation if a timeout occurs.
/// </summary>
/// <param name="copier">AsyncStreamCopier operation.</param>
/// <param name="timedOut">True if the timer has timed out, false otherwise.</param>
private static void MaximumCopyTimeCallback(object copier, bool timedOut)
{
if (timedOut)
{
AsyncStreamCopier<T> asyncCopier = (AsyncStreamCopier<T>)copier;
AsyncStreamCopier<T>.ForceAbort(asyncCopier, true);
}
}
/// <summary>
/// Aborts the AsyncStreamCopier operation.
/// </summary>
/// <param name="copier">AsyncStreamCopier operation.</param>
/// <param name="timedOut">True if aborted due to a time out, or false for a general cancellation.</param>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Reviewed.")]
private static void ForceAbort(AsyncStreamCopier<T> copier, bool timedOut)
{
ExecutionState<T> state = copier.state;
if (state != null)
{
if (state.Req != null)
{
try
{
state.ReqTimedOut = timedOut;
#if WINDOWS_DESKTOP && !WINDOWS_PHONE
state.Req.Abort();
#endif
}
catch (Exception)
{
// no op
}
}
copier.exceptionRef = timedOut ?
Exceptions.GenerateTimeoutException(state.Cmd != null ? state.Cmd.CurrentResult : null, null) :
Exceptions.GenerateCancellationException(state.Cmd != null ? state.Cmd.CurrentResult : null, null);
}
}
/// <summary>
/// Terminates and cleans up the AsyncStreamCopier.
/// </summary>
private void SignalCompletion()
{
// If already completed return
if (Interlocked.CompareExchange(ref this.completionProcessed, 1, 0) == 0)
{
this.completedEvent.Set();
this.ProcessCompletion();
}
}
/// <summary>
/// Helper method for this.SignalCompletion()
/// Should only be called by this.SignalCompletion()
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Reviewed.")]
private void ProcessCompletion()
{
// Re hookup cancellation delegate
this.state.CancelDelegate = this.previousCancellationDelegate;
#if WINDOWS_PHONE
if ((this.cancelRequested || this.state.ReqTimedOut) && this.state.Req != null)
{
this.state.Req.Abort();
}
#endif
// clear references
this.src = null;
this.dest = null;
this.currentReadBuff = null;
this.currentWriteBuff = null;
if (this.exceptionRef == null &&
this.streamCopyState != null &&
this.streamCopyState.Md5HashRef != null)
{
try
{
this.streamCopyState.Md5 = this.streamCopyState.Md5HashRef.ComputeHash();
}
catch (Exception)
{
// no op
}
finally
{
this.streamCopyState.Md5HashRef = null;
}
}
// set the exceptionRef on the execution state
if (this.exceptionRef != null)
{
this.state.ExceptionRef = this.exceptionRef;
}
// invoke the caller's callback
Action<ExecutionState<T>> callback = this.completedDel;
this.completedDel = null;
if (callback != null)
{
try
{
callback(this.state);
}
catch (Exception ex)
{
this.state.ExceptionRef = ex;
}
}
// and finally clear the reference to the state
this.Dispose();
}
/// <summary>
/// Determines whether the next operation should begin or halt due to an exception or cancellation.
/// </summary>
/// <returns>True to continue, false to halt.</returns>
private bool ShouldDispatchNextOperation()
{
if (this.maximumLen.HasValue && Interlocked.Read(ref this.currentBytesReadFromSource) > this.maximumLen)
{
this.exceptionRef = new InvalidOperationException(SR.StreamLengthError);
}
else if (this.state.OperationExpiryTime.HasValue && DateTime.Now >= this.state.OperationExpiryTime.Value)
{
this.exceptionRef = Exceptions.GenerateTimeoutException(this.state.Cmd != null ? this.state.Cmd.CurrentResult : null, null);
}
else if (this.state.CancelRequested)
{
this.exceptionRef = Exceptions.GenerateCancellationException(this.state.Cmd != null ? this.state.Cmd.CurrentResult : null, null);
}
// note cancellation will new up a exception and store it, so this will be not null;
// continue if no exceptions so far
return !this.cancelRequested && this.exceptionRef == null;
}
/// <summary>
/// Waits for a read operation to end and updates the AsyncStreamCopier state.
/// </summary>
private void ProcessEndRead()
{
IAsyncResult lastReadRes = this.readRes;
this.readRes = null;
this.lastReadCount = this.src.EndRead(lastReadRes);
Interlocked.Add(ref this.currentBytesReadFromSource, this.lastReadCount);
this.state.UpdateCompletedSynchronously(lastReadRes.CompletedSynchronously);
}
/// <summary>
/// Waits for a write operation to end and updates the AsyncStreamCopier state.
/// </summary>
private void ProcessEndWrite()
{
IAsyncResult lastWriteRes = this.writeRes;
this.writeRes = null;
this.dest.EndWrite(lastWriteRes);
this.state.UpdateCompletedSynchronously(lastWriteRes.CompletedSynchronously);
if (this.streamCopyState != null)
{
this.streamCopyState.Length += this.currentWriteCount;
if (this.streamCopyState.Md5HashRef != null)
{
this.streamCopyState.Md5HashRef.UpdateHash(this.currentWriteBuff, 0, this.currentWriteCount);
}
}
}
/// <summary>
/// If a read operation has completed with data, swaps the read/write buffers and resets their corresponding counts.
/// This must be called inside a lock as it could lead to undefined behavior if multiple unsynchronized callers simultaneously called in.
/// </summary>
/// <returns>Number of bytes to write, or negative if no read operation has completed.</returns>
private int ConsumeReadBuffer()
{
if (!this.ReadBufferFull())
{
return this.lastReadCount;
}
this.currentWriteCount = this.lastReadCount;
this.lastReadCount = -1;
// The buffer swap sa ves us a memcopy of the data in readBuff.
byte[] tempBuff = null;
tempBuff = this.currentReadBuff;
this.currentReadBuff = this.currentWriteBuff;
this.currentWriteBuff = tempBuff;
return this.currentWriteCount;
}
/// <summary>
/// Determines the number of bytes that should be read from the source in the next BeginRead operation.
/// Should only be called when no outstanding read operations exist.
/// </summary>
/// <returns>Number of bytes to read.</returns>
private int NextReadLength()
{
if (this.copyLen.HasValue)
{
long bytesRemaining = this.copyLen.Value - Interlocked.Read(ref this.currentBytesReadFromSource);
return (int)Math.Min(bytesRemaining, this.currentReadBuff.Length);
}
return this.currentReadBuff.Length;
}
/// <summary>
/// Determines whether no more data can be read from the source Stream.
/// </summary>
/// <returns>True if at the end, false otherwise.</returns>
private bool ReachedEndOfSrc()
{
return this.lastReadCount == 0;
}
/// <summary>
/// Determines whether the current read buffer contains data ready to be written.
/// </summary>
/// <returns>True if read buffer is full, false otherwise.</returns>
private bool ReadBufferFull()
{
return this.lastReadCount > 0;
}
#endregion
}
}