Skip to content
This repository has been archived by the owner on Oct 7, 2021. It is now read-only.

RawPCMStreamNode (StreamNode, or InputStreamNode) #97

Closed
guest271314 opened this issue Aug 30, 2020 · 8 comments
Closed

RawPCMStreamNode (StreamNode, or InputStreamNode) #97

guest271314 opened this issue Aug 30, 2020 · 8 comments
Labels
feature Feature request

Comments

@guest271314
Copy link

Describe the feature

const ac = new AudioContext();
const aw = new AudioWorkletNode(ac, 'processor');
const {body: readable} = await fetch('/path/to/raw_pcm_output');
const rawPCMStreamNode = new RawPCMStreamNode(ac, readable);
aw.connect(rawPCMStreamNode);

Is there a prototype?

Yes. Rather involved, illustrating some of the deficiencies of SharedArrayBuffer and even dynamic SharedArrayBuffer memory growth using WebAssembly.Memory.prototype.grow(), thus this feature request to simplify the process.

From wasmerio/wasmer-php#121 (comment)

Put together a minimal, verifiable, complete working example of streaming from fetch() to AudioWorkletProcessor via Native Messaging, including the observable restriction which lead to filing this issue, and those issues referenced above, specifically, RangeError being thrown, and limitations on the amount of memory that can be pre-allocated at an ArrayBuffer or SharedArrayBuffer.

Posting the code here as not infrequently crash the browser and, or the OS in the midst of ongoing experiments.

Flow chart

  • Create Native Messaging Chromium app (app)
  • At an arbitrary web page ynamically set permissions for the app using Native File System to re-write manifest.json in app/ folder by setting the property externally_connectable value
  • Connect to the backgound script, background.js, that passes messages between the Native Messaging host (in this case we use bash and execute PHP within the shell script) and the arbitrary web page
  • We do not use the Native Messaging protocol to pass STDOUT to background.js then to the web page, instead we begin PHP built-in server process, localhost, to handle the subsequent call to fetch() with AbortController set within object passed as send parameter where we read STDOUT as a ReadableStream, converting to and from JSON and Uint8Array
  • Create a MediaStreamAudioDestinationNode for the ability to record, perform other tasks with MediaStream, and MediaStreamTrack of live audio stream of system audio capture
  • Use Transferable Streams to post response.body (ReadableStream) to AudioWorkletProcessor, where we pipe the stream to WritableStream and write Uint8Arrays to a single Uint8Array pre-allocated to a set amount (344 * 512 * 60 * 60 throws cannot allocate memory, here)
  • Store minimal data before process() is started by resuming AudioContext
  • Pass message to background.js to kill PHP built-in server process
  • Close ReadableStreamDefaultController

Chromium Native Messaging extenstion.

app/

manifest.json

{
  // Extension ID: <id>
  "key": "<key>",
  "name": "native_messaging_stream",
  "version": "1.0",
  "manifest_version": 2,
  "description": "Capture system audio",
  "icons": {},
  "permissions": [
    "nativeMessaging", "*://localhost/"
  ],
  "app": {
    "background": {
      "scripts": [
        "background.js"
      ],
      "persistent": false
    }
  },

  "externally_connectable": {
    "matches": [
      "https://developer.mozilla.org/*",
      "http://localhost/*"
    ],
    "ids": [
      "lmkllhfhgnmbcmhdagfhejkpicehnida"
    ]
  },
  "author": "guest271314"
}

background.js

const id = 'native_messaging_stream';
let externalPort, controller, signal;

chrome.runtime.onConnectExternal.addListener(port => {
  console.log(port);
  externalPort = port;
  externalPort.onMessage.addListener(message => {
    if (message === 'start') {
      chrome.runtime.sendNativeMessage(id, {}, async _ => {
        console.log(_);
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        controller = new AbortController();
        signal = controller.signal;
        // wait until bash script completes, server starts
        for await (const _ of (async function* stream() {
          while (true) {
            try {
              if ((await fetch('http://localhost:8000', { method: 'HEAD' })).ok)
                break;
            } catch (e) {
              console.warn(e.message);
              yield;
            }
          }
        })());
        try {
          const response = await fetch('http://localhost:8000?start=true', {
            cache: 'no-store',
            mode: 'cors',
            method: 'get',
            signal
          });
          console.log(...response.headers);
          const readable = response.body;
          readable
            .pipeTo(
              new WritableStream({
                write: async value => {
                  // value is a Uint8Array, postMessage() here only supports cloning, not transfer
                  externalPort.postMessage(JSON.stringify(value));
                },
              })
            )
            .catch(err => {
              console.warn(err);
              externalPort.postMessage('done');
            });
        } catch (err) {
          console.error(err);
        }
      });
    }
    if (message === 'stop') {
      controller.abort();
      chrome.runtime.sendNativeMessage(id, {}, _ => {
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        console.log('everything should be done');
      });
    }
  });
});

set_externally_connectable.js

(async(set_externally_connectable = ["https://example.com/*"], unset_externally_connectable = true) => {
  const dir = await self.showDirectoryPicker();
  const status = await dir.requestPermission({writable: true});
  const fileHandle = await dir.getFileHandle("manifest.json", {create: false});
  const file = await fileHandle.getFile();
  const manifest_text = await file.text();
  const match_extension_id = /\/\/ Extension ID: \w{32}/;
  const [extension_id] = manifest_text.match(match_extension_id);
  let text = manifest_text.replace(match_extension_id, `"_": 0,`);
  const manifest_json = JSON.parse(text);
  manifest_json.externally_connectable.matches = unset_externally_connectable ? set_externally_connectable :
    [...manifest_json.externally_connectable.matches, ...set_externally_connectable];
  const writer = await fileHandle.createWritable({keepExistingData:false});
  await writer.write(JSON.stringify(manifest_json, null, 2).replace(/"_": 0,/, extension_id)); 
  return await writer.close();
})([`${location.origin}/*`]);

host/

native_messaging_stream.json ($ cp native_messaging_stream.json ~/.config/chromium/NativeMessagingHosts)

{
  "name": "native_messaging_file_stream",
  "description": "Capture system audio",
  "path": "/path/to/host/captureSystemAudio.sh",
  "type": "stdio",
  "allowed_origins": [
    "chrome-extension://<id>/"
  ],
  "author": "guest271314"
}

index.php

<?php 
  if($_SERVER['REQUEST_METHOD'] == 'HEAD') {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEADERS");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("X-Powered-By:");
    header("HTTP/1.1 200 OK");
    die();
  }
  if (isset($_GET["start"])) {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEAD");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("Content-Type: text/plain");
    header("X-Powered-By:");
    echo passthru("parec -v --raw -d alsa_output.pci-0000_00_1b.0.analog-stereo.monitor");
    exit();
  }

captureSystemAudio.sh

#!/bin/bash
sendMessage() {
    # https://stackoverflow.com/a/24777120
    message='{"message": "ok"}'
    # Calculate the byte size of the string.
    # NOTE: This assumes that byte length is identical to the string length!
    # Do not use multibyte (unicode) characters, escape them instead, e.g.
    # message='"Some unicode character:\u1234"'
    messagelen=${#message}
    # Convert to an integer in native byte order.
    # If you see an error message in Chrome's stdout with
    # "Native Messaging host tried sending a message that is ... bytes long.",
    # then just swap the order, i.e. messagelen1 <-> messagelen4 and
    # messagelen2 <-> messagelen3
    messagelen1=$(( ($messagelen      ) & 0xFF ))               
    messagelen2=$(( ($messagelen >>  8) & 0xFF ))               
    messagelen3=$(( ($messagelen >> 16) & 0xFF ))               
    messagelen4=$(( ($messagelen >> 24) & 0xFF ))               
    # Print the message byte length followed by the actual message.
    printf "$(printf '\\x%x\\x%x\\x%x\\x%x' \
        $messagelen1 $messagelpen2 $messagelen3 $messagelen4)%s" "$message"
}

captureSystemAudio() {
  if pgrep -f php > /dev/null; then
    killall -9 php & sendMessage  
  else
    php -S localhost:8000 -t /path/to/host/ & sendMessage
  fi
}
captureSystemAudio

at the arbitrary web page, in this case, MDN, where we previously set the matching URL pattern at externally_connectable for permission to communicate with the Chromium Native Messaging application

At web page

var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.byteSize = 512 * 384 * 60 * 1; // 5 minutes of data
          this.memory = new Uint8Array(this.byteSize); // TODO: grow memory, dynamically
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          const source = {
            write: (value, controller) => {
              console.log(globalThis.currentTime % 3);
              if (this.totalBytes + value.byteLength < this.byteSize) {
                this.memory.set(value, this.readOffset);
                this.readOffset = this.readOffset + value.buffer.byteLength;
                this.totalBytes = this.readOffset;
              } else {
                const last = value.subarray(0, this.byteSize - this.totalBytes);
                this.memory.set(last, this.readOffset);
                this.readOffset = this.readOffset + last.length;
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                controller.close();

              }
              // await 250 milliseconds of audio data
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stopped');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffet,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = this.memory[this.writeOffset];
            }
            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(ac, 'audio-worklet-native-message-stream', {
        numberOfInputs: 1,
        numberOfOutputs: 2,
        channelCount: 2,
        processorOptions: {
          totalBytes: 0,
          readOffset: 0,
          writeOffset: 0,
          done: false,
          ended: false,
          started: false,
        },
      });

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 1);

        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);

    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

A version using WebAssembly.Memory.grow() to dynamically grow memory
during the stream. The expected result is 30 minutes of audio being written to
SharedArrayBuffer. An error is thrown at

Testing at a legacy 4.15.0-20-lowlatency 32-bit kernel attempted to grow memory to 30 minutes of data, with maximum set to 1 hour of data
An error was thrown when calling grow(1) when current offset is 177143808

before grow 177012736
after grow 177078272
before grow 177078272
after grow 177143808
before grow 177143808
e4056867-5209-4bea-8f76-a86106ab83af:75 RangeError: WebAssembly.Memory.grow(): Unable to grow instance memory.
    at Memory.grow (<anonymous>)
    at Object.write (e4056867-5209-4bea-8f76-a86106ab83af:26)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:75
async function (async)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:73
177111040 177139656
{ended: true, currentTime: 1004.0308390022676, currentFrame: 44277760, readOffset: 177139656, writeOffset: 177111040, …}
MediaStreamTrack {kind: "audio", id: "e0958602-f0ea-4565-9e34-f624afce7c12", label: "MediaStreamAudioDestinationNode", enabled: true, muted: false, …}
blob:https://developer.mozilla.org/f9d716ff-c65e-4ddf-b7c7-5e385d0602ec
closed

resulting in 16 minutes and 43 seconds of data being written to memory, observable at recorded audio

Screenshot_2020-08-23_21-16-22

Asked the question if 1GB is the maximum the underlying memory can grow at 32-bit architectures at https://bugs.chromium.org/p/v8/issues/detail?id=7881.

if (globalThis.gc) gc();
var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.initial = (384 * 512) / 65536; // 1 minute
          this.maximum = (384 * 512 * 60 * 60) / 65536; // 1 hour
          this.byteSize = this.maximum * 65536;
          this.memory = new WebAssembly.Memory({
            initial: this.initial,
            maximum: this.maximum,
            shared: true,
          });
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          
          const source = {
            write: (value, controller) => {
              if (
                this.totalBytes + value.byteLength >
                  this.memory.buffer.byteLength &&
                this.totalBytes + value.buffer.byteLength < this.byteSize
              ) {
                console.log('before grow', this.memory.buffer.byteLength);
                this.memory.grow(1);
                console.log('after grow', this.memory.buffer.byteLength);
              }

              const uint8_sab = new Uint8Array(this.memory.buffer, this.readOffset);
              // console.log(this.totalBytes, this.totalBytes % 65536, this.totalBytes / 65536, this.memory.buffer.byteLength);
              if (this.totalBytes + value.buffer.byteLength < this.byteSize) {
                

                for (
                  let i = 0;
                  i < value.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }

                this.totalBytes = this.readOffset;
              } else {
                const lastBytes = value.subarray(0, this.byteSize - this.totalBytes);
                // const uint8 = new Uint8Array(this.memory.buffer, this.readOffset);
                for (
                  let i = 0;
                  i < lastBytes.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                this.readable.cancel();
              }
              // accumulate 250 milliseconds of data before resuming AudioContext
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stream closed');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffset,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          const uint8_sab = new Uint8Array(this.memory.buffer, this.writeOffset); // .slice(this.writeOffset, this.writeOffset + 512)
          
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = uint8_sab[this.writeOffset];
            }

            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(
        ac,
        'audio-worklet-native-message-stream',
        {
          numberOfInputs: 1,
          numberOfOutputs: 2,
          channelCount: 2,
          processorOptions: {
            totalBytes: 0,
            readOffset: 0,
            writeOffset: 0,
            done: false,
            ended: false,
            started: false,
          },
        }
      );

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 30);
        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);
    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

Ideally, in this case, we should be able to stream directly to AudioWorkletProcessor.process() instead of writing to memory, i.e., the gist of the proposal at Chromium: Use Streams API for STDIN and STDOUT, flushing memory as we proceed.

One approach to reduce memory usage would be to write Opus audio to memory and decode to raw PCM as Float32Array values at outputs channels, which have not yet tried.

Describe the feature in more detail

Internally RawPCMStreamNode (or simply StreamNode or InputStreamNode) is a TransformStream that accepts a ReadableStream (readable) of raw PCM, transforms the raw data (the terms "interleaved" and "planar" are no longer included in Web Audio API v1) into Float32Array(s) corresponding to the number of channels contained in the input stream and sets the Float32Array(s) as outputs in process().

Externally his will work in the same way as MediaElementSourceNode

<body>
  <audio controls autoplay crossOrigin="anonymous"></audio>
  <script>
    (async() => {
      const context = new AudioContext({latencyHint:1.0});
      await context.suspend();
      const mediaElement = document.querySelector("audio");
      mediaElement.onloadedmetadata = async e => await context.resume();
      const source = new MediaElementAudioSourceNode(context, {
        mediaElement
      });
      await context.audioWorklet.addModule("bypass-processor.js");
      const bypasser = new AudioWorkletNode(context, "bypass-processor");
      source.connect(bypasser);
      bypasser.connect(context.destination);
      mediaElement.src = "https://ia800301.us.archive.org/10/items/DELTAnine2013-12-11.WAV/Deltanine121113Pt3Wav.wav";
    })();
  </script>
</body>

where when connected to an AudioWorkletNode AudioWorkletProcessor.process() effectively takes input from src of HTMLMediaElement and resulting audio pipeline and outputs Float32Arrays at outputs.

This will provide an entry point for users to utilize AudioContext for non-standard or non-implemented device capture and processing with Web Audio API.

WebCodecs does not solve this - unless that solution can be unequivocally proven here by evidence, not conjecture.

Related art:

Related issue:

@guest271314 guest271314 added the feature Feature request label Aug 30, 2020
@guest271314
Copy link
Author

This will also provide a simpler solution for https://github.com/WebAudio/web-audio-api-v2/issues/90

const ac = new AudioContext();
const aw = new AudioWorkletNode(ac, 'processor');
const {writable, readable} = new TransformStream();
const writer = writable.getWriter();
const input = new Float32Array(128);
input.fill(.5);
const done = new Float32Array(128);
done.fill(.25);
writer.write(input);
// writer.write(..);
// writer.write(..);
writer.close();
const rawPCMStreamNode = new RawPCMStreamNode(ac, readable);
aw.connect(rawPCMStreamNode);


@guest271314
Copy link
Author

Related https://github.com/w3c/mediacapture-insertable-streams. Not sure if that proposal can solve this.

@guest271314
Copy link
Author

This is largely solved by https://github.com/w3c/mediacapture-insertable-streams and https://github.com/w3c/webrtc-insertable-streams, and to the extent applicable, without making the tab unresponsive, WebCodecs AudioEncoder and AudioDecoder methods.

@guest271314
Copy link
Author

Largely though not completely solved.

The AudioFrame described by WebCodecs has some omissions, for example, precisely what timestamp "Presentation timestamp" is not defined in spec #107 is and how to generate the same.

@guest271314 guest271314 reopened this Mar 7, 2021
@guest271314
Copy link
Author

A tentative workaround in lieu of timestamp not being defined in WebCodecs specification is to create a MediaStreamAudioDestinationNode, connect an OscillatorNode with frequency set to 0, pass the MediaStreamTrack to MediaStreamTrackProcessor, read the stream, get the generated timestamp and pass that timestamp with user-defined AudioBuffer to AudioFrame.

<!DOCTYPE html>

<html>
  <head> </head>

  <body>
    <h1>click</h1>
    <audio controls autoplay></audio>

    <script>
      async function webTransportBreakoutBox(text) {
        const url = 'quic-transport://localhost:4433/tts';
        try {
          const transport = new WebTransport(url);
          await transport.ready;

          const sender = await transport.createUnidirectionalStream();
          const writer = sender.writable.getWriter();
          const encoder = new TextEncoder('utf-8');
          let data = encoder.encode(text);
          await writer.write(data);
          console.log('writer close', await writer.close());
          const reader = transport.incomingUnidirectionalStreams.getReader();
          const result = await reader.read();
          console.log({
            result,
          });
          if (result.done) {
            console.log(result);
          }
          let transportStream = result.value;
          console.log({
            transportStream,
          });
          const { readable } = transportStream;
          const ac = new AudioContext({
            latencyHint: 0,
            sampleRate: 22050,
          });
          const msd = new MediaStreamAudioDestinationNode(ac, {
            channelCount: 1,
          });
          const { stream } = msd;
          const [track] = stream.getAudioTracks();
          const osc = new OscillatorNode(ac, { frequency: 0 });
          osc.connect(msd);
          osc.start();
          track.onmute = track.onunmute = track.onended = (e) => console.log(e);
          stream.oninactive = (e) => console.log(e);
          ac.onstatechange = (e) => {
            console.log(ac.state);
          };
          const processor = new MediaStreamTrackProcessor(track);
          const generator = new MediaStreamTrackGenerator('audio');
          const { writable } = generator;
          const { readable: audioReadable } = processor;
          const audioWriter = writable.getWriter();
          const ms = new MediaStream([generator]);

          document.querySelector('audio').srcObject = ms;
          const recorder = new MediaRecorder(ms);
          recorder.ondataavailable = ({data}) => console.log(URL.createObjectURL(data));
          recorder.start();
          let audioController;
          const rs = new ReadableStream({
            start(c) {
              return (audioController = c);
            },
          });
          const audioControllerReader = rs.getReader();
          const audioReader = audioReadable.getReader();
          // https://bugs.chromium.org/p/chromium/issues/detail?id=1174836
          // https://d27xp8zu78jmsf.cloudfront.net/demo/pure-audio-video6/audioworklet.js
          function Uint8ToFloat32(uint8Array_) {
            var int16Array_ = new Int16Array(uint8Array_.length / 2);
            for (var i = 0; i < int16Array_.length; i++) {
              int16Array_[i] =
                (uint8Array_[i * 2] & 0xff) +
                ((uint8Array_[i * 2 + 1] & 0xff) << 8);
            }
            return Int16ToFloat32(int16Array_, 0, int16Array_.length);
          }

          function Int16ToFloat32(inputArray, startIndex, length) {
            var output = new Float32Array(inputArray.length - startIndex);
            for (var i = startIndex; i < length; i++) {
              var int_ = inputArray[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              // output[i] = ((int_ + 32768) % 65536 - 32768) / 32768.0
              var float_ = int_ / 32768.0;
              if (float_ > 1) float_ = 1;
              if (float_ < -1) float_ = -1;
              output[i] = float_;
            }
            return output;
          }

          let index = 0;
          let arr = [];
          let headers = true;

          await Promise.all([
            audioReader.read().then(async function process({ value, done }) {
              if (document.querySelector('audio').currentTime === 0) {
                // avoid clipping first milliseconds of MediaStreamTrack audio
                return audioWriter
                  .write(value)
                  .then(() => audioReader.read().then(process));
              }
              if (done) {
                console.log({ done });
              }
              const {
                value: floats,
                done: audioControllerDone,
              } = await audioControllerReader.read();
              const { timestamp } = value;
              if (audioControllerDone) {
                console.log({ audioControllerDone });
              }
              if (audioControllerDone === false) {
                const buffer = new AudioBuffer({
                  numberOfChannels: 1,
                  length: floats.length,
                  sampleRate: 22050,
                });
                buffer.getChannelData(0).set(floats);
                const frame = new AudioFrame({ timestamp, buffer });
                return audioWriter.write(frame).then(() => {
                  frame.close();
                  return audioReader.read().then(process);
                });
              } else {
                console.log(
                  audioControllerDone,
                  audioReadable,
                  audioReader,
                  writable,
                  audioWriter
                );
                audioReader.releaseLock();
                audioReadable.cancel();
                audioWriter.releaseLock();
                writable.abort();
                return writable.close;
              }
            }),
            readable.pipeTo(
              new WritableStream({
                async start() {
                  console.log('writable start');
                },
                async write(value, c) {
                  let data;
                  if (headers === false) {
                    // #!/bin/bash
                    // espeak-ng --stdout "$1"
                    // 1 channel, 22050 sample rate
                    // omit WAV headers
                    data = Uint8ToFloat32(value.subarray(44));
                    headers = true;
                  } else {
                    data = Uint8ToFloat32(value);
                  }
                  for (let i = 0; i < data.length; i++) {
                    arr.push(data[i]);
                    if (arr.length === 220) {
                      audioController.enqueue(new Float32Array(arr.slice(0)));
                      arr = [];
                    }
                  }
                },
                close() {
                  console.log('done');
                  audioController.close();
                },
              })
            ),
          ]);

          await transport.close();
          track.stop();
          await ac.close();
          recorder.stop();
          return transport.closed
            .then((_) => {
              console.log('Connection closed normally.');
              return 'done';
            })
            .catch((e) => {
              console.error(e.message);
              console.trace();
            });
        } catch (e) {
          console.error(e);
          console.trace();
        }
      }

      document.querySelector('h1').onclick = async () => {
        webTransportBreakoutBox('a hunting we will go');
      };
    </script>
  </body>
</html>

Interestingly, the ReadableStream from a File (WAV) on disk behaves differently from a ReadableStream from WebTransport. I suspect the issue is with using only N indexes from TypedArrays from read() across different Arrays in sequential function calls, so that the memory is not necessarily contiguous when read again internally to produce output, which results in static periodically within the output. The TypedArray conversion function might also have an issue, however, when substituting File.stream() for WebTransport stream the static does not emit.

stream_pcm.zip

I am not certain why timestamp is necessary, given we can stream with AudioWorklet without the need to set timestamps for each output frame.

@guest271314
Copy link
Author

guest271314 commented Mar 14, 2021

One issue is that with live input streams the Uint8Array from ReadableStreamDefaultReader.read() is not guranteed to have an length where that Uint8Array or Int8Array or the buffer is then passed to Uint16Array or Int16Array to convert to float values.

Consider the function Uint8ToFloat32 at https://d27xp8zu78jmsf.cloudfront.net/demo/pure-audio-video6/audioworklet.js

      function Uint8ToFloat32(uint8Array_) {
        var int16Array_ = new Int16Array(uint8Array_.length / 2);
        for (let i = 0; i < int16Array_.length; i++) {
          int16Array_[i] =
            (uint8Array_[i * 2] & 0xff) +
            ((uint8Array_[i * 2 + 1] & 0xff) << 8);
        }
        return Int16ToFloat32(int16Array_, 0, int16Array_.length);
      }

when the input Uint8Array from a ReadableStreamDefaultReader.read() call is a single TypedArray, or multiple TypedArray's where the length is an even number, e.g.,

61848

or

441180

the audio is output effectively as expected, with minimal artifacts.

However, when the input is a live stream where the Uint8Array length is arbitrary, and potentially an odd length,

3761
8777
5020
16289
17765
10236

the audio output can include static interference, evidently at the locations in the timeline when theUint8Array having odd length are converted to Int16Array and then Float32Array, consider

var int16Array_ = new Int16Array(uint8Array_.length / 2);

where construction of an Int16Array from a Uint8Array with odd length will not throw

var len = 3761;
var uint8 = new Uint8Array(len);
var int16 = new Int16Array(uint8); // will not throw

passing the buffer will throw

var len = 3761;
var uint8 = new Uint8Array(len);
var int16 = new Int16Array(uint8.buffer); // will throw

VM578:3 Uncaught RangeError: byte length of Int16Array should be a multiple of 2
at new Int16Array (<anonymous>)
at <anonymous>:3:13

we lose data

var int16Array_ = new Int16Array(uint8.length / 2);
console.assert(int16Array_.length * 2 === uint8.length, [int16Array_.length * 2 < uint8.length, int16Array_.length]) // VM1132:1 Assertion failed: (2) [true, 1880]

resulting in static inference in audio output.

A tentative solution that I have previously attempted is to "carry over" the odd value, however, artifacts are still evident in playback at the locations in the timeline where we need to create and slice multiple TypedArrays to carry over a single float.

        readable.pipeTo(
          new WritableStream({
            async start() {
              console.log('writable start');
            },
            async write(value, c) {
              console.log(value.length);
              let data;
              if (float && float instanceof Float32Array) {
                data = new Uint8Array([float, ...value]);
                float = void 0;
              } else {
                data = value;
              }
              if (data.length % 2 === 1) {
                float = data.slice(-1);
                data = data.slice(0, data.length - 1);
              }
              if (headers === false) {
                data = Uint8ToFloat32(data.slice(44));
                headers = true;
              } else {
                data = Uint8ToFloat32(data);
              }
              for (let i = 0; i < data.length; i+=220) {
                 audioController.enqueue(data.slice(i, i + 220));
              }
              // ...

A solution that does output expected result with AudioWorklet is using a single WebAssembly.Memory instance to write data to a contigious memory allocation, that can grow()
when necessary https://github.com/guest271314/webtransport/blob/main/webTransportAudioWorkletWebAssemblyMemoryGrow.js#L190

     async write(value, controller) {
        console.log(value, value.byteLength, memory.buffer.byteLength);
        if (readOffset + value.byteLength > memory.buffer.byteLength) {
          console.log('before grow', memory.buffer.byteLength);
          memory.grow(3);
          console.log('after grow', memory.buffer.byteLength);
        }
        const uint8_sab = new Uint8Array(memory.buffer);
        let i = 0;
        if (!init) {
          init = true;
          i = 44;
        }
        for (; i < value.buffer.byteLength; i++, readOffset++) {
          uint8_sab[readOffset] = value[i];
        }
        // ...

during the live stream.

For this feature request, a means to stream raw PCM which in general will be accessed in the form of a Uint8Array initially is the goal, to avoid creating multiple TypedArrays to carry over the odd float value, potentially allocating additional memory for the same value(s).

@guest271314
Copy link
Author

Based on testing there does not appear to be any way to avoid glitches, gaps, static interferences ("fragmentation") when streaming audio using multiple TypedArrays (potentially having an odd number length) as input without writing the data to a contiguous ArrayBuffer instance (writing to a Blob or File takes substantially more time in practice).

Memory limits in webassembly https://stackoverflow.com/a/40425252

For asm.js it was difficult to know how the ArrayBuffer was going to be used, and on some 32-bit platforms you often ran into process fragmentation which made it hard to allocate enough contiguous space in the process' virtual memory (the ArrayBuffer must be contiguous in the browser process' virtual address space, otherwise you'd have a huge perf hit).

...

WebAssembly is backed by WebAssembly.Memory which is a special type of ArrayBuffer. This means that a WebAssembly implementation can be clever about that ArrayBuffer. On 32-bit there's not much to do: if you run out of contiguous address space then the VM can't do much. But on 64-bit platforms there's plenty of address space. The browser implementation can choose to prevent you from creating too many WebAssembly.Memory instances (allocating virtual memory is almost free, but not quite), but you should be able to get a few 4GiB allocations. Note that the browser will only allocate that space virtually, and commit physical addresses for the minimum number of pages you said you need. Afterwards it'll only allocate physically when you use grow_memory. That could fail (physical memory is about as abundant as the amount of RAM, give or take swap space), but it's much more predictable.

For StreamNode to work properly an internal form of WebAssembly.Memory can be used to write input streams, and clear the memory used immediately following each frame read.

@guest271314
Copy link
Author

This is largely solved by https://github.com/w3c/mediacapture-insertable-streams and https://github.com/w3c/webrtc-insertable-streams, and to the extent applicable, without making the tab unresponsive, WebCodecs AudioEncoder and AudioDecoder methods.

WebCodecs does not help with this.

The Inserrtable Streams (Breakout Box; Media Transform API) can be utilized to achieve the expected result.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature Feature request
Projects
None yet
Development

No branches or pull requests

1 participant