Skip to content

Commit

Permalink
Streamed chunks.
Browse files Browse the repository at this point in the history
  • Loading branch information
JonHanna committed Jul 12, 2015
1 parent ac99ae9 commit fed4174
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,8 @@ public static void MatchSequencePattern()
"AsEnumerable",
"ToList",
"Fold",
"LeftJoin"
"LeftJoin",
"ToChunkedStream"
}
).ToList();

Expand Down
77 changes: 77 additions & 0 deletions src/System.Linq/src/System/Linq/Enumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,83 @@ public static IEnumerable<TSource> ExceptByIterator<TSource, TKey>(IEnumerable<T
if (set.Add(keySelector(item))) yield return item;
}
}

public static IEnumerable<IEnumerable<TSource>> ToChunkedStream<TSource>(this IEnumerable<TSource> source, int size)
{
if (source == null) throw Error.ArgumentNull("source");
if (size < 1) throw Error.ArgumentOutOfRange("size");
return ChunkStreamIterator(source, size);
}

private static IEnumerable<IEnumerable<TSource>> ChunkStreamIterator<TSource>(IEnumerable<TSource> source, int size)
{
using (IEnumerator<TSource> e = source.GetEnumerator())
{
if (e.MoveNext())
{
StreamedChunk<TSource> chunk;
do
{
chunk = new StreamedChunk<TSource>(e, size);
yield return chunk;
chunk.Dispose();
} while (!chunk.FinishedEnumerator);
}
}
}

private sealed class StreamedChunk<TSource> : IEnumerable<TSource>, IDisposable
{
private IEnumerator<TSource> _sourceEnumerator;
private readonly int _size;
private bool _enumerated;
private int _sent = 0;
public bool FinishedEnumerator;

public StreamedChunk(IEnumerator<TSource> sourceEnumerator, int size)
{
_sourceEnumerator = sourceEnumerator;
_size = size;
}

public IEnumerator<TSource> GetEnumerator()
{
// FIXME: Make localisable
if (_enumerated) throw new InvalidOperationException("Streamed chunks can only be enumerated once and only before the next is received.");
_enumerated = true;
while (_sent < _size)
{
yield return _sourceEnumerator.Current;
if (!_sourceEnumerator.MoveNext())
{
FinishedEnumerator = true;
_sent = _size;
yield break;
}
++_sent;
}
}

public void Dispose()
{
_enumerated = true;
while (_sent < _size)
{
if (_sourceEnumerator.MoveNext()) ++_sent;
else
{
FinishedEnumerator = true;
break;
}
}
_sent = _size;
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}

internal class IdentityFunction<TElement>
Expand Down
18 changes: 18 additions & 0 deletions src/System.Linq/tests/EnumerableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,24 @@ public void ExceptBy()
Assert.Throws<ArgumentNullException>(() => Enumerable.Range(0, 1).ExceptBy(null, i => i));
Assert.Throws<ArgumentNullException>(() => Enumerable.Range(0, 1).ExceptBy(Enumerable.Range(0, 1), default(Func<int, int>)));
}
[Fact]
public void ToChunkedStream()
{
Assert.Equal(3, Enumerable.Range(0, 12).ToChunkedStream(5).Count());
Assert.Equal(3, Enumerable.Range(0, 15).ToChunkedStream(5).Count());
Assert.Equal(4, Enumerable.Range(0, 16).ToChunkedStream(5).Count());
Assert.Equal(new[] { 5, 5, 2 }, Enumerable.Range(0, 12).ToChunkedStream(5).Select(c => c.Count()));
Assert.Equal(Enumerable.Range(0, 12), Enumerable.Range(0, 12).ToChunkedStream(7).SelectMany(g => g));
Assert.Throws<ArgumentOutOfRangeException>(() => Enumerable.Range(0, 3).ToChunkedStream(0));
Assert.Throws<ArgumentNullException>(() => default(IEnumerable<int>).ToChunkedStream(10));
Assert.Equal(3, Enumerable.Range(0, 12).ToList().ToChunkedStream(5).Count());
Assert.Equal(3, Enumerable.Range(0, 15).ToList().ToChunkedStream(5).Count());
Assert.Equal(4, Enumerable.Range(0, 16).ToList().ToChunkedStream(5).Count());
Assert.Equal(new[] { 5, 5, 2 }, Enumerable.Range(0, 12).ToList().ToChunkedStream(5).Select(c => c.Count()));
Assert.Equal(Enumerable.Range(0, 12), Enumerable.Range(0, 12).ToList().ToChunkedStream(7).SelectMany(g => g));
Assert.Throws<ArgumentOutOfRangeException>(() => Enumerable.Range(0, 3).ToList().ToChunkedStream(0));
Assert.Throws<InvalidOperationException>(() => Enumerable.Range(0, 10).ToChunkedStream(3).ToList()[0].First());
}
#pragma warning restore 1720 // Triggered on purpose to test exception.
}
}
Expand Down

0 comments on commit fed4174

Please sign in to comment.