Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Question about batching streams that involve asynchronous steps #795

Closed
atombender opened this issue Jun 30, 2015 · 5 comments
Closed

Question about batching streams that involve asynchronous steps #795

atombender opened this issue Jun 30, 2015 · 5 comments

Comments

@atombender
Copy link

I am processing a stream of data pulled, usually, from a RxNode.fromReadableStream() where there is an intermediate step that is asynchronous and may only run with a certain concurrency, and at the same time the output step can only run in serial, in discrete batches.

Basically a classic transformation pipeline:

cat file | parallel -P5 transform | xargs -n1000 process

With promises it would be something like (extremely simplified):

lines.on('readable', function read() {
  let line;
  while (line = lines.read()) {
    queue.push(line);
    if (queue.length >= 10) {
      return Promise.map(queue, item => someAsyncTransformations(item), {concurrency: 5})
        .then(items => someFinalAsyncProcessing(items))
        .then(read);
    }
  }
});

Here's what I have so far:

// Processing function, returns promise
let processBatch = items => { ... };

// Transform function, returns promise
let transform = item => { ... };

let lines = RxNode.fromReadableStream(lineStream);

let transforms = lines
  .map(item => Rx.Observable.defer(() => transform(item)))
  .merge(5);

let processing = transforms.controlled();
processing
  .bufferWithCount(batchSize)
  .map(items => {
    return Rx.Observable.defer(() => {
      return processBatch(items).then(() => 
        processing.request(batchSize));
    });
  })
  .concatAll()
  .subscribe(...);

processing.request(batchSize);  // Request first batch

I'm using controlled() to make sure the map step effectively pauses processing until done. It looks really ugly — isn't there a better, non-manual way to implement lossless backpressure where the next receiver won't receive data if it's full?

By inserting the controller after the transforms, I can ensure that the transforms execute independently, giving me pipelining. However, I also need to enforce queueing here, and I haven't found a solution to this. Basically, I want the transform() pipeline to work continuously at maximum concurrencyup to N items at a time, pumping these out to theprocessBatch()` as soon as they're done. With the above code, it will race through the entire file, filling up memory with all the transformed data.

@atombender
Copy link
Author

So far I've ended up with this monstrosity, which induces a wait if the pipe is full, and maintains the queue size in the next step following the step that needs to have a throttle. It feels very wrong to me.

// Processing function, returns promise
let processBatch = items => { ... };

// Transform function, returns promise
let transform = item => { ... };

const maxTransformQueueSize = 1000;

const bufferSize = 5000;
const batchSize = 100;

let transformQueueSize = 0;

let lines = RxNode.fromReadableStream(lineStream);

let transformController = lines.controlled();
let transforms = transformController
  .bufferWithCount(bufferSize)
  .concatMap(items => {
    if (items.length > 0) {
      transformQueueSize += items.length;
      return Rx.Observable.defer(() => new Promise((resolve, reject) => {
        (function wait() {
          if (transformQueueSize > maxTransformQueueSize) {
            setTimeout(wait, 300);
          } else {
            resolve();
          }
        })();
      })
        .then(() => Promise.map(items, transform))
        .then(newItems => {
          transformsControlled.request(bufferSize);
          return newItems;
        }));
    } else {
      return Rx.Observable.empty();
    }
  })
  .flatMap(items => Rx.Observable.fromArray(items));

let processingController = transforms.controlled();
let processing = processingController
  .bufferWithCount(batchSize)
  .concatMap(items => {
    transformQueueSize -= items.length;
    return Rx.Observable.defer(() =>
      processBatch(items).then(() => {
        processing.request(batchSize);
        return items;
      }));
  });
processingController.request(batchSize);

processing.subscribe(...);

@trxcllnt
Copy link
Contributor

trxcllnt commented Jul 1, 2015

@atombender here, give this a shot. Let me know if you have any questions.

const Rx = require("rx");
// const RxNode = require("rx-node");
const Observable = Rx.Observable;

const processBatch = (items, index) => {
    return Observable.timer(100)
        .map(items)
        .tap(console.log.bind(console, ">>> batch process", index + ":"));
};

const transform = (item) => {
    return Observable.timer(10).map(item)
        // .tap(console.log.bind(console, "line transform:"));
};

// Number of lines to pass to "processBatch"
const linesPerBatch = 10;
// Number of lines to pass to "transform"
const linesPerTransform = 100;
// Number of lines to read from the file system before awaiting async line operations.
const linesToReadPerRequest = 500;
// Number of batches we wait to finish until reading another huge bunch of lines.
const batchRequestThreshold = Math.floor(linesToReadPerRequest / linesPerBatch);
// Number of concurrent line buffer transformations
const maxConcurrentLineTransformations = Math.floor(linesToReadPerRequest / linesPerTransform);

// const lines = RxNode.fromReadableStream(lineStream);
// use a "stateful" range instead of a file-stream for demo
const lines = createStatefulRange(0, 1000);
const linesController = lines.controlled();

const transformedLines = linesController
    .bufferWithCount(linesPerTransform)
    .map(items => Observable.forkJoin(items.map(transform)))
    .merge(maxConcurrentLineTransformations)
    .flatMap((newItems, index) => {
        console.log("--- line transform group", index + 1, "finished");
        return Observable.from(newItems)
    });

const batchLineTransformations = Observable.using(
        () => {
            console.log("requesting first", linesToReadPerRequest, "lines");
            return linesController.request(linesToReadPerRequest)
        },
        () => transformedLines
    )
    .bufferWithCount(linesPerBatch)
    .concatMap(processBatch)
    .flatMap((processedItems, index) => {
        if((index + 1) % batchRequestThreshold === 0) {
            return Observable.using(
                () => {
                    console.log("requesting", linesToReadPerRequest, "more lines");
                    return linesController.request(linesToReadPerRequest)
                },
                () => Observable.from(processedItems)
            );
        }
        return Observable.from(processedItems);
    });

batchLineTransformations.count().subscribe(function(total) {
    console.log("read and batch-transformed", total, "lines");
});

// Creates a stateful range Observable to mimic a readable file stream.
function createStatefulRange(start, end, scheduler) {
    scheduler || (scheduler = Rx.Scheduler.currentThread);
    var index = start;
    return Observable.create(function(observer) {
        return scheduler.scheduleRecursiveWithState(index, function loop(i, recurse) {
            if(i < end) {
                observer.onNext(index = i);
                recurse(i + 1);
            } else {
                observer.onCompleted();
            }
        });
    });
}

@mattpodwysocki
Copy link
Member

@atombender since no response, will close for now. I think @trxcllnt answered this nicely

@loganvolkers
Copy link

It seems like a lot has changed since September, and this fantastic example no longer works with the latest version of RxJs v4. (http://codepen.io/anon/pen/mPQGyL)

Any chance @trxcllnt would be willing to update this example for the new API?

@trxcllnt
Copy link
Contributor

trxcllnt commented May 3, 2016

@loganvolkers from what I can tell, the only thing that's different is scheduleRecursiveWithState has now become scheduleRecursive.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants