-
Notifications
You must be signed in to change notification settings - Fork 0
/
BufferedStream.cs
500 lines (463 loc) · 21.4 KB
/
BufferedStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
/*============================================================
**
** Class: BufferedStream
**
** <OWNER>[....]</OWNER>
**
** Purpose: A composable Stream that buffers reads & writes
** to the underlying stream.
**
**
===========================================================*/
using System;
using System.IO;
using System.Runtime.InteropServices;
using System.Globalization;
using System.Diagnostics.Contracts;
// Code copied from http://www.dotnetframework.org/default.aspx/4@0/4@0/untmp/DEVDIV_TFS/Dev10/Releases/RTMRel/ndp/clr/src/BCL/System/IO/BufferedStream@cs/1305376/BufferedStream@cs
//namespace System.IO
namespace OpenMcdf
{
// This is a somewhat complex but efficient implementation. The biggest
// design goal here is to prevent the buffer from getting in the way and slowing
// down IO accesses when it isn't needed. If you always read & write for sizes
// greater than the internal buffer size, then this class may not even allocate
// the internal buffer. Secondly, it buffers reads & writes in a shared buffer.
// (If you maintained two buffers separately, one operation would always trash
// the other buffer anyways, so we might as well use one buffer.) The
// assumption here is you will almost always be doing a series of reads or
// writes, but rarely alternate between the two of them on the same stream.
//
// This is adaptive buffering code. I had some ideas to make the pathological case better
// here, but the pathological cases for the new code would have avoided
// memcpy's by introducing more disk writes (which are 3 orders of magnitude
// slower). I've made some optimizations, fixed several bugs, and
// tried documenting this code. --
//
// Possible future perf optimization:
// When we have more time to look at buffering perf, consider the following
// scenario for improving writing (and reading):
// Consider a 4K buffer and three write requests, one for 3K, one for 2K, then
// another for 3K in that order. In the current implementation, we will copy
// the first 3K into our buffer, copy the first 1K of the 2K second write into
// our buffer, write that 4K to disk, write the remaining 1K from the second
// write to the disk, then copy the following 3K buffer to our internal buffer.
// If we then closed the file, we would have to write the remaining 3K to disk.
// We could possibly avoid a disk write by buffering the second half of the 2K
// write. This may be a perf optimization, but there is a threshold where we
// won't be winning anything (in fact, we'd be taking an extra memcpy). If we
// have buffered data plus data to write that is bigger than 2x our buffer size,
// we're better off writing our buffered data to disk then our given byte[] to
// avoid a call to memcpy. But for cases when we have less data, it might be
// better to copy any spilled over data into our internal buffer. Consider
// implementing this and looking at perf results on many random sized writes.
// Also, this should apply to Read trivially.
//
// Class Invariants:
// The class has one buffer, shared for reading & writing. It can only be
// used for one or the other at any point in time - not both. The following
// should be true:
// 0 <= _readPos <= _readLen < _bufferSize
// 0 <= _writePos < _bufferSize
// _readPos == _readLen && _readPos > 0 implies the read buffer is valid,
// but we're at the end of the buffer.
// _readPos == _readLen == 0 means the read buffer contains garbage.
// Either _writePos can be greater than 0, or _readLen & _readPos can be
// greater than zero, but neither can be greater than zero at the same time.
//
[ComVisible(true)]
public sealed class BufferedStream : Stream
{
private Stream _s; // Underlying stream. Close sets _s to null.
private byte[] _buffer; // Shared read/write buffer. Alloc on first use.
private int _readPos; // Read pointer within shared buffer.
private int _readLen; // Number of bytes read in buffer from _s.
private int _writePos; // Write pointer within shared buffer.
private int _bufferSize; // Length of internal buffer, if it's allocated.
private const int _DefaultBufferSize = 4096;
private BufferedStream() { }
public BufferedStream(Stream stream)
: this(stream, _DefaultBufferSize)
{
}
public BufferedStream(Stream stream, int bufferSize)
{
if (stream == null)
throw new ArgumentNullException("stream");
//if (bufferSize <= 0)
// throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_MustBePositive", "bufferSize"));
Contract.EndContractBlock();
//BCLDebug.Perf(!(stream is FileStream), "FileStream is buffered - don't wrap it in a BufferedStream");
//BCLDebug.Perf(!(stream is MemoryStream), "MemoryStream shouldn't be wrapped in a BufferedStream!");
_s = stream;
_bufferSize = bufferSize;
// Allocate _buffer on its first use - it will not be used if all reads
// & writes are greater than or equal to buffer size.
//if (!_s.CanRead && !_s.CanWrite) __Error.StreamIsClosed();
}
public override bool CanRead
{
[Pure]
get { return _s != null && _s.CanRead; }
}
public override bool CanWrite
{
[Pure]
get { return _s != null && _s.CanWrite; }
}
public override bool CanSeek
{
[Pure]
get { return _s != null && _s.CanSeek; }
}
public override long Length
{
get
{
//if (_s == null) __Error.StreamIsClosed();
if (_writePos > 0) FlushWrite();
return _s.Length;
}
}
public override long Position
{
get
{
//if (_s == null) __Error.StreamIsClosed();
//if (!_s.CanSeek) __Error.SeekNotSupported();
// return _s.Seek(0, SeekOrigin.Current) + (_readPos + _writePos - _readLen);
return _s.Position + (_readPos - _readLen + _writePos);
}
set
{
//if (value < 0) throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
Contract.EndContractBlock();
//if (_s == null) __Error.StreamIsClosed();
//if (!_s.CanSeek) __Error.SeekNotSupported();
if (_writePos > 0) FlushWrite();
_readPos = 0;
_readLen = 0;
_s.Seek(value, SeekOrigin.Begin);
}
}
protected override void Dispose(bool disposing)
{
try
{
if (disposing && _s != null)
{
try
{
Flush();
}
finally
{
_s.Close();
}
}
}
finally
{
_s = null;
_buffer = null;
// Call base.Dispose(bool) to cleanup async IO resources
base.Dispose(disposing);
}
}
public override void Flush()
{
//if (_s == null) __Error.StreamIsClosed();
if (_writePos > 0)
{
FlushWrite();
}
else if (_readPos < _readLen && _s.CanSeek)
{
FlushRead();
}
_readPos = 0;
_readLen = 0;
}
// Reading is done by blocks from the file, but someone could read
// 1 byte from the buffer then write. At that point, the OS's file
// pointer is out of [....] with the stream's position. All write
// functions should call this function to preserve the position in the file.
private void FlushRead()
{
Contract.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!");
if (_readPos - _readLen != 0)
_s.Seek(_readPos - _readLen, SeekOrigin.Current);
_readPos = 0;
_readLen = 0;
}
// Writes are buffered. Anytime the buffer fills up
// (_writePos + delta > _bufferSize) or the buffer switches to reading
// and there is dirty data (_writePos > 0), this function must be called.
private void FlushWrite()
{
Contract.Assert(_readPos == 0 && _readLen == 0, "BufferedStream: Read buffer must be empty in FlushWrite!");
_s.Write(_buffer, 0, _writePos);
_writePos = 0;
_s.Flush();
}
[System.Security.SecuritySafeCritical] // auto-generated
public override int Read([In, Out] byte[] array, int offset, int count)
{
//if (array == null)
// throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer"));
//if (offset < 0)
// throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
//if (count < 0)
// throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
//if (array.Length - offset < count)
// throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen"));
Contract.EndContractBlock();
//if (_s == null) __Error.StreamIsClosed();
int n = _readLen - _readPos;
// if the read buffer is empty, read into either user's array or our
// buffer, depending on number of bytes user asked for and buffer size.
if (n == 0)
{
//if (!_s.CanRead) __Error.ReadNotSupported();
if (_writePos > 0) FlushWrite();
if (count >= _bufferSize)
{
n = _s.Read(array, offset, count);
// Throw away read buffer.
_readPos = 0;
_readLen = 0;
return n;
}
if (_buffer == null) _buffer = new byte[_bufferSize];
n = _s.Read(_buffer, 0, _bufferSize);
if (n == 0) return 0;
_readPos = 0;
_readLen = n;
}
// Now copy min of count or numBytesAvailable (ie, near EOF) to array.
if (n > count) n = count;
Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
//Buffer.InternalBlockCopy(_buffer, _readPos, array, offset, n);
_readPos += n;
// We may have read less than the number of bytes the user asked
// for, but that is part of the Stream contract. Reading again for
// more data may cause us to block if we're using a device with
// no clear end of file, such as a serial port or pipe. If we
// blocked here & this code was used with redirected pipes for a
// process's standard output, this can lead to deadlocks involving
// two processes.
// BUT - this is a breaking change.
// If we hit the end of the buffer and didn't have enough bytes, we must
// read some more from the underlying stream.
if (n < count)
{
int moreBytesRead = _s.Read(array, offset + n, count - n);
n += moreBytesRead;
_readPos = 0;
_readLen = 0;
}
return n;
}
// Reads a byte from the underlying stream. Returns the byte cast to an int
// or -1 if reading from the end of the stream.
public override int ReadByte()
{
//if (_s == null) __Error.StreamIsClosed();
//if (_readLen == 0 && !_s.CanRead) __Error.ReadNotSupported();
if (_readPos == _readLen)
{
if (_writePos > 0) FlushWrite();
if (_buffer == null) _buffer = new byte[_bufferSize];
_readLen = _s.Read(_buffer, 0, _bufferSize);
_readPos = 0;
}
if (_readPos == _readLen) return -1;
return _buffer[_readPos++];
}
[System.Security.SecuritySafeCritical] // auto-generated
public override void Write(byte[] array, int offset, int count)
{
//if (array == null)
// throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer"));
//if (offset < 0)
// throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
//if (count < 0)
// throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
//if (array.Length - offset < count)
// throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen"));
Contract.EndContractBlock();
//if (_s == null) __Error.StreamIsClosed();
if (_writePos == 0)
{
// Ensure we can write to the stream, and ready buffer for writing.
//if (!_s.CanWrite) __Error.WriteNotSupported();
if (_readPos < _readLen)
FlushRead();
else
{
_readPos = 0;
_readLen = 0;
}
}
// If our buffer has data in it, copy data from the user's array into
// the buffer, and if we can fit it all there, return. Otherwise, write
// the buffer to disk and copy any remaining data into our buffer.
// The assumption here is memcpy is cheaper than disk (or net) IO.
// (10 milliseconds to disk vs. ~20-30 microseconds for a 4K memcpy)
// So the extra copying will reduce the total number of writes, in
// non-pathological cases (ie, write 1 byte, then write for the buffer
// size repeatedly)
if (_writePos > 0)
{
int numBytes = _bufferSize - _writePos; // space left in buffer
if (numBytes > 0)
{
if (numBytes > count)
numBytes = count;
Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
//Buffer.InternalBlockCopy(array, offset, _buffer, _writePos, numBytes);
_writePos += numBytes;
if (count == numBytes) return;
offset += numBytes;
count -= numBytes;
}
// Reset our buffer. We essentially want to call FlushWrite
// without calling Flush on the underlying Stream.
_s.Write(_buffer, 0, _writePos);
_writePos = 0;
}
// If the buffer would slow writes down, avoid buffer completely.
if (count >= _bufferSize)
{
Contract.Assert(_writePos == 0, "BufferedStream cannot have buffered data to write here! Your stream will be corrupted.");
_s.Write(array, offset, count);
return;
}
else if (count == 0)
return; // Don't allocate a buffer then call memcpy for 0 bytes.
if (_buffer == null) _buffer = new byte[_bufferSize];
// Copy remaining bytes into buffer, to write at a later date.
Buffer.BlockCopy(array, offset, _buffer, 0, count);
//Buffer.InternalBlockCopy(array, offset, _buffer, 0, count);
_writePos = count;
}
public override void WriteByte(byte value)
{
//if (_s == null) __Error.StreamIsClosed();
if (_writePos == 0)
{
//if (!_s.CanWrite) __Error.WriteNotSupported();
if (_readPos < _readLen)
FlushRead();
else
{
_readPos = 0;
_readLen = 0;
}
if (_buffer == null) _buffer = new byte[_bufferSize];
}
if (_writePos == _bufferSize)
FlushWrite();
_buffer[_writePos++] = value;
}
[System.Security.SecuritySafeCritical] // auto-generated
public override long Seek(long offset, SeekOrigin origin)
{
//if (_s == null) __Error.StreamIsClosed();
//if (!_s.CanSeek) __Error.SeekNotSupported();
// If we've got bytes in our buffer to write, write them out.
// If we've read in and consumed some bytes, we'll have to adjust
// our seek positions ONLY IF we're seeking relative to the current
// position in the stream.
Contract.Assert(_readPos <= _readLen, "_readPos <= _readLen");
if (_writePos > 0)
{
FlushWrite();
}
else if (origin == SeekOrigin.Current)
{
// Don't call FlushRead here, which would have caused an infinite
// loop. Simply adjust the seek origin. This isn't necessary
// if we're seeking relative to the beginning or end of the stream.
Contract.Assert(_readLen - _readPos >= 0, "_readLen (" + _readLen + ") - _readPos (" + _readPos + ") >= 0");
offset -= (_readLen - _readPos);
}
/*
_readPos = 0;
_readLen = 0;
return _s.Seek(offset, origin);
*/
long oldPos = _s.Position + (_readPos - _readLen);
long pos = _s.Seek(offset, origin);
// We now must update the read buffer. We can in some cases simply
// update _readPos within the buffer, copy around the buffer so our
// Position property is still correct, and avoid having to do more
// reads from the disk. Otherwise, discard the buffer's contents.
if (_readLen > 0)
{
// We can optimize the following condition:
// oldPos - _readPos <= pos < oldPos + _readLen - _readPos
if (oldPos == pos)
{
if (_readPos > 0)
{
//Console.WriteLine("Seek: seeked for 0, adjusting buffer back by: "+_readPos+" _readLen: "+_readLen);
Buffer.BlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
//Buffer.InternalBlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
_readLen -= _readPos;
_readPos = 0;
}
// If we still have buffered data, we must update the stream's
// position so our Position property is correct.
if (_readLen > 0)
_s.Seek(_readLen, SeekOrigin.Current);
}
else if (oldPos - _readPos < pos && pos < oldPos + _readLen - _readPos)
{
int diff = (int)(pos - oldPos);
//Console.WriteLine("Seek: diff was "+diff+", readpos was "+_readPos+" adjusting buffer - shrinking by "+ (_readPos + diff));
Buffer.BlockCopy(_buffer, _readPos + diff, _buffer, 0, _readLen - (_readPos + diff));
//Buffer.InternalBlockCopy(_buffer, _readPos + diff, _buffer, 0, _readLen - (_readPos + diff));
_readLen -= (_readPos + diff);
_readPos = 0;
if (_readLen > 0)
_s.Seek(_readLen, SeekOrigin.Current);
}
else
{
// Lose the read buffer.
_readPos = 0;
_readLen = 0;
}
Contract.Assert(_readLen >= 0 && _readPos <= _readLen, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen");
Contract.Assert(pos == Position, "Seek optimization: pos != Position! Buffer math was mangled.");
}
return pos;
}
public override void SetLength(long value)
{
//if (value < 0) throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NegFileSize"));
Contract.EndContractBlock();
//if (_s == null) __Error.StreamIsClosed();
//if (!_s.CanSeek) __Error.SeekNotSupported();
//if (!_s.CanWrite) __Error.WriteNotSupported();
if (_writePos > 0)
{
FlushWrite();
}
else if (_readPos < _readLen)
{
FlushRead();
}
_readPos = 0;
_readLen = 0;
_s.SetLength(value);
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.