Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
frankpepermans committed Jul 23, 2019
2 parents d8643f6 + 85f2e62 commit 27f1109
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
2 changes: 1 addition & 1 deletion example/example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void main(List<String> arguments) {

// seed value: this value will be used as the
// starting value for the [scan] method
const seed = const IndexedPair(1, 1, 0);
const seed = IndexedPair(1, 1, 0);

Observable
// amount of numbers to compute
Expand Down
2 changes: 1 addition & 1 deletion example/web/konami/konamicode.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import 'package:rxdart/rxdart.dart';
// Side note: To maintain readability, this example was not formatted using dart_fmt.

void main() {
const konamiKeyCodes = const <int>[
const konamiKeyCodes = <int>[
KeyCode.UP,
KeyCode.UP,
KeyCode.DOWN,
Expand Down
36 changes: 27 additions & 9 deletions lib/src/transformers/scan.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,35 @@ class ScanStreamTransformer<T, S> extends StreamTransformerBase<T, S> {
static StreamTransformer<T, S> _buildTransformer<T, S>(
S accumulator(S accumulated, T value, int index),
[S seed]) {
var index = 0;
var acc = seed;
return StreamTransformer<T, S>((input, bool cancelOnError) {
var index = 0;
var acc = seed;
StreamController<S> controller;
StreamSubscription<T> subscription;

return StreamTransformer<T, S>.fromHandlers(
handleData: (T data, EventSink<S> sink) {
acc = accumulator(acc, data, index++);
controller = StreamController<S>(
sync: true,
onListen: () {
subscription = input.listen((value) {
try {
acc = accumulator(acc, value, index++);

sink.add(acc);
},
handleError: (Object error, StackTrace s, EventSink<S> sink) =>
sink.addError(error));
controller.add(acc);
} catch (e, s) {
controller.addError(e, s);
}
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: ([Future<dynamic> resumeSignal]) =>
subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());

return controller.stream.listen(null);
});
}
}

Expand Down

0 comments on commit 27f1109

Please sign in to comment.