Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utility buffer class for stream transformers #1097

Open
morganbarrett opened this issue Dec 18, 2020 · 3 comments
Open

Utility buffer class for stream transformers #1097

morganbarrett opened this issue Dec 18, 2020 · 3 comments

Comments

@morganbarrett
Copy link

morganbarrett commented Dec 18, 2020

I was hoping to start a discussion about the class BufferTransformer that I've just mocked up. Is this an antipattern? Could this be a useful helper class in the spec?

BufferTransformer

class Lock {
  open : () => void;
  close : () => void;
  constructor(){
    (this.close = async () =>
      new Promise(r =>
        this.open = r
      )
    )();
  }
}

class BufferTransformer<I, O> implements Transformer<I[], O[]> {
  private buffer: I[] = [];
  private lock: Lock = new Lock();
  private terminator: I;

  constructor(terminator: I){
    this.terminator = terminator;
  }

  transform(chunk: I[]){
    this.buffer = this.buffer.concat(chunk);
    this.lock.open();
  }

  flush(){
    this.lock.open();
  }

  private async has(i: number): Promise<boolean> {
    if(this.buffer.length > i) return true;
    await this.lock.close();
    return this.buffer.length > i;
  }

  async read(): Promise<I> {
    return await this.has(0) ? this.buffer.shift() : this.terminator;
  }

  async peek(i): Promise<I> {
    return await this.has(i) ? this.buffer[i] : this.terminator;
  }

  reconsume(i: I){
    this.buffer.unshift(i);
  }
}

An example of its usage is shown below, it takes a stream of characters, reads an unknown amount of them and enqueues only 1 Token. This seems to be the opposite of how I've seen a transformer used, where a fixed buffer of data is given to the transformer and an unknown amount of things are then queued.

TokenizerTransformer

class TokenizerTransformer extends BufferTransformer<string, Token> {
  constructor(){
    super("");
  }

  async start(controller){
    let token: Token;
    while(token = await this.consumeToken()){
      controller.enqueue(token);
    }
  }
  
  async consumeToken(): Promise<Token>{
    await this.consumeComments();

    let c = await this.read();
    if(c === "") return;

    if(isWhitespace(c)){
      while(isWhitespace(await this.peek(0))){
        await this.read();
      }
      return {type: TokenType.whitespace};
    }
    
    if(isDigit(c)){
      this.reconsume(c);
      return this.consumeNumeric();
    }

    ...
  }

  ...
}

And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?

const stringToStream = str => new ReadableStream<string>({
  start(controller){
    str.split("").forEach(c => 
      controller.enqueue(c)
    );
    controller.close();
  }
});

stringToStream("foo").pipeThrough(new TransformStream(new TokenizerTransformer()))
@morganbarrett
Copy link
Author

P.S. sorry, this is all untested code, treat it as pseudocode.

@MattiasBuelens
Copy link
Collaborator

It's an interesting use case.

It looks like there's a lot of plumbing going on to turn the push-style transform() method into a pull-based read() method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.

Could this be a useful helper class in the spec?

I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂

And another thing I was curious about is if I wanted to run this process on a string, is there no utility function or easier way than below to start the process?

We're working on making it easier to turn an array or any (async) iterable into a ReadableStream, see #1018. You'll then be able to do:

ReadableStream.from(str.split("")).pipeThrough(new TransformStream(new TokenizerTransformer()))

@morganbarrett
Copy link
Author

It looks like there's a lot of plumbing going on to turn the push-style transform() method into a pull-based read() method though (using an intermediate buffer). This is tricky to get right, especially if you need to consider backpressure. I'll have a think about it, see if there's a better way.

I think this might be a slightly better solution, still no idea how to consider backpressure though.

PullTransformStream

export class PullTransformStream {
  constructor(transform) {
    let buffer = new Buffer();

    this.readable = new ReadableStream({
      async pull(controller){
        let arr = await transform(buffer);
        if(arr === undefined) controller.close();
        else controller.enqueue(arr);
      },
      cancel(){
        this.writable.close();
      }
    });

    this.writable = new WritableStream({
      write(chunk){
        chunk.forEach(c => buffer.push(c));
        buffer.resolve();
      },
      close(){
        buffer.resolve();
      },
      abort(){
        buffer.reject();
      }
    });
  }
}
export class Buffer extends Array {
  constructor(){
    this.resolve = () => {};
    this.reject = () => {};
  }

  async has(length){
    if(this.length > length) return true;

    await new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });

    return this.length > length;
  }
}

Example

export default class PreprocessTransformStream extends PullTransformStream {
  constructor(){
    super(async buffer => {
      if(!await buffer.has(1)) return;

      let c = buffer.shift();

      //replace form feed code point with a line feed
      if(c === "\u000C") return ["\u000A"];

      //replace any carriage return code points...
      if(c === "\u000D"){
        //...or pairs of carriage returns followed by a line feed...
        await buffer.has(2);
        if(buffer[0] === "\u000D" && buffer[1] === "\u000A"){
          buffer.splice(0, 2);
        }
        //...with a single line feed
        return ["\u000A"];
      }
      
      //replace any null or surrogate code points with a replacement character
      if(c === "\u0000" || (c >= "\uD800" && c <= "\uDFFF")){
        return ["\uFFFD"];
      }
      
      return [c];
    });
  }
}

I think it's a bit too early to tell whether this will be useful enough to make it part of the platform. I suggest you start off by first writing it as a userland JavaScript library. 🙂

and sorry, I meant more of an example. I see many people doing my original trail of thought which is something along the line of

class Transformation {
  readable: ReadableStream;

  constructor(readable: ReadableStream, writable: WritableStream){
    this.readable = readable;
    let writer = writable.getWriter();
    let token: Token;
    while(token = await this.consumeToken(readable)){
      writer.write([token]);
    }
  }

  async consumeToken(){
    this.consumeWhitespace();

    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();

    if(done) return;

    switch(value){
      ...
    }
  }

  async consumeWhitespace(){
    let [s1, s2] = this.readable.tee();
    let reader = s1.getReader();
    let {done, value} = await reader.read();
    if(done || !isWhitespace(value)){
      this.readable = s2;
    } else {
      this.readable = s1;
      this.consumeWhitespace();
    }
  }

  ...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants