diff --git a/kernel/packages/atomicHelpers/RingBuffer.ts b/kernel/packages/atomicHelpers/RingBuffer.ts index 7b7eddc124..1822d29ca0 100644 --- a/kernel/packages/atomicHelpers/RingBuffer.ts +++ b/kernel/packages/atomicHelpers/RingBuffer.ts @@ -1,3 +1,5 @@ +import future, { IFuture } from 'fp-future' + type TypedArray = | Int8Array | Uint8Array @@ -22,7 +24,51 @@ export class RingBuffer { return this.writePointer - this.readPointer } + getWritePointer() { + return this.writePointer + } + + getReadPointer() { + return this.readPointer + } + write(array: T, length?: number) { + this.writeAt(array, this.writePointer, length) + } + + read(readCount?: number): T { + const result = this.peek(this.readPointer, readCount) + + this.readPointer += result.length + + return result + } + + peek(startPointer?: number, readCount?: number): T { + const start = startPointer ? startPointer : this.readPointer + + const maxCountToRead = this.writePointer - this.readPointer + + const count = readCount ? Math.min(readCount, maxCountToRead) : maxCountToRead + + const readPosition = start % this.buffer.length + + const endIndex = readPosition + count + + let result: T + + if (endIndex > this.buffer.length) { + result = new this.ArrayTypeConstructor(count) + result.set(this.buffer.slice(readPosition, this.buffer.length)) + result.set(this.buffer.slice(0, endIndex - this.buffer.length), this.buffer.length - readPosition) + } else { + result = this.buffer.slice(readPosition, endIndex) as T + } + + return result + } + + writeAt(array: T, startPointer: number, length?: number) { const len = length || array.length let toWrite = array @@ -31,7 +77,7 @@ export class RingBuffer { toWrite = array.slice(array.length - this.buffer.length, array.length) as T } - const writePosition = this.writePointer % this.buffer.length + const writePosition = startPointer % this.buffer.length const endIndex = writePosition + len @@ -43,36 +89,169 @@ export class RingBuffer { this.buffer.set(toWrite.slice(0, len), writePosition) } - this.writePointer += len + const endPointer = startPointer + len + + if (endPointer > this.writePointer) { + this.writePointer = endPointer + } + this.updateReadPointerToMinReadPosition() + } + + isFull() { + return this.readAvailableCount() >= this.size + } + + private updateReadPointerToMinReadPosition() { const minReadPointer = this.writePointer - this.buffer.length if (this.readPointer < minReadPointer) { this.readPointer = minReadPointer } } +} - read(readCount?: number): T { - const maxCountToRead = this.writePointer - this.readPointer +type Chunk = { + order: number + startPointer: number + length: number +} - const count = readCount ? Math.min(readCount, maxCountToRead) : maxCountToRead +export class OrderedRingBuffer { + private internalRingBuffer: RingBuffer - const readPosition = this.readPointer % this.buffer.length + private chunks: Chunk[] = [] - const endIndex = readPosition + count + private blockedReadChunksFuture?: { chunksToRead: number; future: IFuture } - let result: T + constructor(public readonly size: number, ArrayTypeConstructor: { new (size: number): T }) { + this.internalRingBuffer = new RingBuffer(size, ArrayTypeConstructor) + } - if (endIndex > this.buffer.length) { - result = new this.ArrayTypeConstructor(count) - result.set(this.buffer.slice(readPosition, this.buffer.length)) - result.set(this.buffer.slice(0, endIndex - this.buffer.length), this.buffer.length - readPosition) + readAvailableCount() { + return this.internalRingBuffer.readAvailableCount() + } + + write(array: T, order: number, length?: number) { + // We find those chunks that should be after this chunk + const nextChunks = this.chunks.filter((it) => it.order > order) + + if (nextChunks.length === 0) { + // If there are no chunks that should be after this chunk, then we just need to write the chunk at the end. + this.chunks.push({ + order, + startPointer: this.internalRingBuffer.getWritePointer(), + length: length || array.length + }) + + this.internalRingBuffer.write(array, length) } else { - result = this.buffer.slice(readPosition, endIndex) as T + // Otherwise, we need to get those chunks that should be after this one, and write them one after the other + + let writePointer = nextChunks[0].startPointer + + const newChunk = { + order, + startPointer: writePointer, + length: length || array.length + } + + // Chunks are ordered by "order", so we need to ensure that we place this new chunk in the corresponding index. + this.chunks.splice(this.chunks.length - nextChunks.length, 0, newChunk) + + const arraysToWrite = [length ? array.slice(0, length) : array] as T[] + + // We get the arrays for each chunk, and we update their pointers while we are at it + nextChunks.forEach((chunk) => { + arraysToWrite.push(this.arrayForChunk(chunk)) + + chunk.startPointer += newChunk.length + }) + + // We write starting from the position of the first chunk that will be rewritten + arraysToWrite.forEach((toWrite) => { + this.internalRingBuffer.writeAt(toWrite, writePointer) + writePointer += toWrite.length + }) } - this.readPointer += count + this.discardUnreadableChunks() + this.resolveBlockedRead() + } + + arrayForChunk(chunk: Chunk): T { + return this.peek(chunk.startPointer, chunk.length) + } + + peek(startPointer?: number, readCount?: number): T { + return this.internalRingBuffer.peek(startPointer, readCount) + } + + read(readCount?: number): T { + const result = this.internalRingBuffer.read(readCount) + + this.discardUnreadableChunks() return result } + + /** + * The promise will block until there is chunksCount chunks to read, + * or until timeToWait has passed. + * + * Once timeToWait has passed, if there is nothing to read, an empty array is returned. + */ + async blockAndReadChunks(chunksCount: number, timeToWait: number): Promise { + if (this.chunks.length >= chunksCount) { + const chunks = this.readChunks(chunksCount) + + return Promise.resolve(chunks) + } else { + if (this.blockedReadChunksFuture) { + this.blockedReadChunksFuture.future.reject(new Error('Only one blocking call is possible at the same time')) + } + + const thisFuture = { chunksToRead: chunksCount, future: future() } + + this.blockedReadChunksFuture = thisFuture + + setTimeout(() => { + if (this.blockedReadChunksFuture === thisFuture) { + if (this.chunks.length > 0) { + thisFuture.future.resolve(this.readChunks(this.chunks.length)) + } else { + thisFuture.future.resolve([]) + } + } + }, timeToWait) + + return this.blockedReadChunksFuture.future + } + } + + private readChunks(chunksCount: number) { + return this.chunks.slice(0, chunksCount).map((it) => this.read(it.length)) + } + + private discardUnreadableChunks() { + const isReadable = (chunk: Chunk) => { + // A chunk is readable if its end pointer is ahead of the read pointer + const endPointer = chunk.startPointer + chunk.length + return endPointer > this.internalRingBuffer.getReadPointer() + } + + this.chunks = this.chunks.filter(isReadable) + + if (this.chunks.length > 0 && this.chunks[0].startPointer < this.internalRingBuffer.getReadPointer()) { + this.chunks[0].startPointer = this.internalRingBuffer.getReadPointer() + } + } + + private resolveBlockedRead() { + if (this.blockedReadChunksFuture && this.chunks.length >= this.blockedReadChunksFuture.chunksToRead) { + const read = this.readChunks(this.blockedReadChunksFuture.chunksToRead) + this.blockedReadChunksFuture.future.resolve(read) + delete this.blockedReadChunksFuture + } + } } diff --git a/kernel/packages/atomicHelpers/SortedLimitedQueue.ts b/kernel/packages/atomicHelpers/SortedLimitedQueue.ts new file mode 100644 index 0000000000..c972275076 --- /dev/null +++ b/kernel/packages/atomicHelpers/SortedLimitedQueue.ts @@ -0,0 +1,97 @@ +import future, { IFuture } from 'fp-future' + +export class SortedLimitedQueue { + private internalArray: T[] + + private pendingDequeue?: { amount: number; futures: IFuture[]; timedoutValues?: T[] } + + constructor(private readonly maxLength: number, private readonly sortCriteria: (a: T, b: T) => number) { + this.internalArray = [] + } + + queue(item: T) { + let insertIndex = 0 + + // Since the most likely scenario for our use case is that we insert the item at the end, + // we start by the end. This may be parameterized in the future + for (let i = this.internalArray.length - 1; i >= 0; i--) { + if (this.sortCriteria(item, this.internalArray[i]) >= 0) { + insertIndex = i + 1 + break + } + } + + if (insertIndex === 0) { + this.internalArray.unshift(item) + } else if (insertIndex === this.internalArray.length) { + this.internalArray.push(item) + } else { + this.internalArray.splice(insertIndex, 0, item) + } + + if (this.internalArray.length > this.maxLength) { + this.internalArray.shift() + } + + this.resolveBlockedDequeues() + } + + queuedCount() { + return this.internalArray.length + } + + dequeue(): T | undefined { + return this.internalArray.shift() + } + + dequeueItems(count?: number): T[] { + return this.internalArray.splice(0, count ?? this.internalArray.length) + } + + async dequeueItemsWhenAvailable(count: number, timeout: number): Promise { + if (this.pendingDequeue && this.pendingDequeue.amount !== count) { + // To have multiple dequeue requests they all have to have the same amount. We prioritize the new request, and resolve the other with empty arrays. + this.pendingDequeue.futures.forEach((it) => it.resolve([])) + } + + if (this.queuedCount() >= count) { + const items = this.dequeueItems(count) + this.pendingDequeue?.futures.forEach((it) => it.resolve(items)) + this.pendingDequeue = undefined + return Promise.resolve(items) + } else { + if (!this.pendingDequeue) { + this.pendingDequeue = { + amount: count, + futures: [] + } + } + const newFuture = future() + this.pendingDequeue.futures.push(newFuture) + + setTimeout(() => { + if (this.pendingDequeue && this.pendingDequeue.futures.indexOf(newFuture) >= 0) { + this.resolveBlockedDequeueWith(this.queuedCount()) + } + }, timeout) + + return newFuture + } + } + + isFull() { + return this.queuedCount() >= this.maxLength + } + + private resolveBlockedDequeues() { + if (this.pendingDequeue && this.queuedCount() >= this.pendingDequeue.amount) { + this.resolveBlockedDequeueWith(this.pendingDequeue.amount) + } + } + + private resolveBlockedDequeueWith(amount: number) { + const items = amount === 0 ? [] : this.dequeueItems(amount) + this.pendingDequeue?.futures.forEach((it) => it.resolve(items)) + this.pendingDequeue = undefined + } +} diff --git a/kernel/packages/config/contracts.ts b/kernel/packages/config/contracts.ts index 16b1286caf..ae70c96eb8 100644 --- a/kernel/packages/config/contracts.ts +++ b/kernel/packages/config/contracts.ts @@ -30,7 +30,8 @@ export const contracts = { 'AdapterFeeCollector': '0x5DC888024cB599CfDdb9E6483ED6bAe1fA9e9D18', 'AdapterConverter': '0x2782eb28Dcb1eF4E7632273cd4e347e130Ce4646', 'POIAllowlist': '0x5DC4a5C214f2161F0D5595a6dDd9352409aE3Ab4', - 'NAMEDenylist': '0x20c6f1e86eba703a14414a0cbc1b55c89dba7a0f' + 'NAMEDenylist': '0x20c6f1e86eba703a14414a0cbc1b55c89dba7a0f', + 'CollectionsV2Factory': '0x16d8bac5b67a6b782a9081377bec413bc5bb56a6' }, 'mainnet': { 'MANAToken': '0x0f5d2fb29fb7d3cfee444a200298f468908cc942', diff --git a/kernel/packages/config/index.ts b/kernel/packages/config/index.ts index f9aeb7bf6b..33537baf02 100644 --- a/kernel/packages/config/index.ts +++ b/kernel/packages/config/index.ts @@ -180,7 +180,6 @@ export namespace commConfigurations { } ] - export const voiceChatSampleRate = 24000 export const voiceChatUseHRTF = location.search.indexOf('VOICE_CHAT_USE_HRTF') !== -1 } export const loginConfig = { diff --git a/kernel/packages/shared/comms/index.ts b/kernel/packages/shared/comms/index.ts index dc7046424d..e48ea9f339 100644 --- a/kernel/packages/shared/comms/index.ts +++ b/kernel/packages/shared/comms/index.ts @@ -86,6 +86,7 @@ import { createLogger } from '../logger' import { VoiceCommunicator, VoiceSpatialParams } from 'voice-chat-codec/VoiceCommunicator' import { voicePlayingUpdate, voiceRecordingUpdate } from './actions' import { isVoiceChatRecording } from './selectors' +import { VOICE_CHAT_SAMPLE_RATE } from 'voice-chat-codec/constants' export type CommsVersion = 'v1' | 'v2' export type CommsMode = CommsV1Mode | CommsV2Mode @@ -216,7 +217,7 @@ function requestMediaDevice() { .getUserMedia({ audio: { channelCount: 1, - sampleRate: commConfigurations.voiceChatSampleRate, + sampleRate: VOICE_CHAT_SAMPLE_RATE, echoCancellation: true, noiseSuppression: true, autoGainControl: true, @@ -412,7 +413,8 @@ function processVoiceFragment(context: Context, fromAlias: string, message: Pack voiceCommunicator?.playEncodedAudio( peerTrackingInfo.identity, getSpatialParamsFor(peerTrackingInfo.position), - message.data.encoded + message.data.encoded, + message.time ) } } @@ -994,7 +996,6 @@ async function doStartCommunications(context: Context) { }, { - sampleRate: commConfigurations.voiceChatSampleRate, initialListenerParams: context.currentPosition ? getSpatialParamsFor(context.currentPosition) : undefined, panningModel: commConfigurations.voiceChatUseHRTF ? 'HRTF' : 'equalpower' } diff --git a/kernel/packages/shared/comms/v2/LighthouseWorldInstanceConnection.ts b/kernel/packages/shared/comms/v2/LighthouseWorldInstanceConnection.ts index e884eb4980..fa9ca1bfa4 100644 --- a/kernel/packages/shared/comms/v2/LighthouseWorldInstanceConnection.ts +++ b/kernel/packages/shared/comms/v2/LighthouseWorldInstanceConnection.ts @@ -43,7 +43,7 @@ const VoiceType: PeerMessageType = { name: 'voice', ttl: 5, optimistic: true, - discardOlderThan: 0, + discardOlderThan: 2000, expirationTime: 10000 } diff --git a/kernel/packages/voice-chat-codec/VoiceCommunicator.ts b/kernel/packages/voice-chat-codec/VoiceCommunicator.ts index ed02e947a2..0eece86ec7 100644 --- a/kernel/packages/voice-chat-codec/VoiceCommunicator.ts +++ b/kernel/packages/voice-chat-codec/VoiceCommunicator.ts @@ -1,7 +1,16 @@ import { VoiceChatCodecWorkerMain, EncodeStream } from './VoiceChatCodecWorkerMain' import { RingBuffer } from 'atomicHelpers/RingBuffer' +import { SortedLimitedQueue } from 'atomicHelpers/SortedLimitedQueue' import { defer } from 'atomicHelpers/defer' import defaultLogger from 'shared/logger' +import { + VOICE_CHAT_SAMPLE_RATE, + OPUS_FRAME_SIZE_MS, + OUTPUT_NODE_BUFFER_SIZE, + OUTPUT_NODE_BUFFER_DURATION, + OPUS_SAMPLES_PER_FRAME, + INPUT_NODE_BUFFER_SIZE +} from './constants' export type AudioCommunicatorChannel = { send(data: Uint8Array): any @@ -10,8 +19,14 @@ export type AudioCommunicatorChannel = { export type StreamPlayingListener = (streamId: string, playing: boolean) => any export type StreamRecordingListener = (recording: boolean) => any +type EncodedFrame = { + order: number + frame: Uint8Array +} + type VoiceOutput = { - buffer: RingBuffer + encodedFramesQueue: SortedLimitedQueue + decodedBuffer: RingBuffer scriptProcessor: ScriptProcessorNode panNode: PannerNode spatialParams: VoiceSpatialParams @@ -55,12 +70,15 @@ export class VoiceCommunicator { private readonly channelBufferSize: number private readonly outputExpireTime = 60 * 1000 + private pauseRequested: boolean = false + private inputSamplesCount: number = 0 + constructor( private selfId: string, private channel: AudioCommunicatorChannel, private options: VoiceCommunicatorOptions ) { - this.sampleRate = this.options.sampleRate ?? 24000 + this.sampleRate = this.options.sampleRate ?? VOICE_CHAT_SAMPLE_RATE this.channelBufferSize = this.options.channelBufferSize ?? 2.0 this.context = new AudioContext({ sampleRate: this.sampleRate }) @@ -99,33 +117,15 @@ export class VoiceCommunicator { return !!this.input } - async playEncodedAudio(src: string, relativePosition: VoiceSpatialParams, encoded: Uint8Array) { + async playEncodedAudio(src: string, relativePosition: VoiceSpatialParams, encoded: Uint8Array, time: number) { if (!this.outputs[src]) { - const nodes = this.createOutputNodes(src) - this.outputs[src] = { - buffer: new RingBuffer(Math.floor(this.channelBufferSize * this.sampleRate), Float32Array), - playing: false, - spatialParams: relativePosition, - lastUpdateTime: Date.now(), - ...nodes - } + this.createOutput(src, relativePosition) } else { this.outputs[src].lastUpdateTime = Date.now() this.setVoiceRelativePosition(src, relativePosition) } - let stream = this.voiceChatWorkerMain.decodeStreams[src] - - if (!stream) { - stream = this.voiceChatWorkerMain.getOrCreateDecodeStream(src, this.sampleRate) - - stream.addAudioDecodedListener((samples) => { - this.outputs[src].lastUpdateTime = Date.now() - this.outputs[src].buffer.write(samples) - }) - } - - stream.decode(encoded) + this.outputs[src].encodedFramesQueue.queue({ frame: encoded, order: time }) } setListenerSpatialParams(spatialParams: VoiceSpatialParams) { @@ -175,7 +175,7 @@ export class VoiceCommunicator { } createScriptOutputFor(src: string) { - const bufferSize = 8192 + const bufferSize = OUTPUT_NODE_BUFFER_SIZE const processor = this.context.createScriptProcessor(bufferSize, 0, 1) processor.onaudioprocess = (ev) => { const data = ev.outputBuffer.getChannelData(0) @@ -183,14 +183,16 @@ export class VoiceCommunicator { data.fill(0) if (this.outputs[src]) { const wasPlaying = this.outputs[src].playing - if (this.outputs[src].buffer.readAvailableCount() > 0) { - data.set(this.outputs[src].buffer.read(data.length)) + const minReadCount = wasPlaying ? 0 : OUTPUT_NODE_BUFFER_SIZE - 1 + if (this.outputs[src].decodedBuffer.readAvailableCount() > minReadCount) { + data.set(this.outputs[src].decodedBuffer.read(data.length)) if (!wasPlaying) { this.changePlayingStatus(src, true) } } else { if (wasPlaying) { this.changePlayingStatus(src, false) + this.outputs[src].decodedBuffer.read() // Emptying buffer } } } @@ -230,6 +232,7 @@ export class VoiceCommunicator { } start() { + this.pauseRequested = false if (this.input) { this.input.encodeInputProcessor.connect(this.input.recordingContext.destination) this.input.inputStream.connect(this.input.encodeInputProcessor) @@ -240,24 +243,65 @@ export class VoiceCommunicator { } pause() { - try { - this.input?.inputStream.disconnect(this.input.encodeInputProcessor) - } catch (e) { - // Ignored. This will fail if it was already disconnected + this.pauseRequested = true + } + + private disconnectInput() { + this.input?.inputStream.disconnect() + + this.input?.encodeInputProcessor.disconnect() + + this.notifyRecording(false) + } + + private createOutput(src: string, relativePosition: VoiceSpatialParams) { + const nodes = this.createOutputNodes(src) + this.outputs[src] = { + encodedFramesQueue: new SortedLimitedQueue( + Math.ceil((this.channelBufferSize * 1000) / OPUS_FRAME_SIZE_MS), + (frameA, frameB) => frameA.order - frameB.order + ), + decodedBuffer: new RingBuffer(Math.floor(this.channelBufferSize * this.sampleRate), Float32Array), + playing: false, + spatialParams: relativePosition, + lastUpdateTime: Date.now(), + ...nodes } - try { - this.input?.encodeInputProcessor.disconnect(this.input.recordingContext.destination) - } catch (e) { - // Ignored. This will fail if it was already disconnected + const readEncodedBufferLoop = async () => { + if (this.outputs[src]) { + const framesToRead = Math.ceil((OUTPUT_NODE_BUFFER_DURATION * 1.5) / OPUS_FRAME_SIZE_MS) + + const frames = await this.outputs[src].encodedFramesQueue.dequeueItemsWhenAvailable( + framesToRead, + OUTPUT_NODE_BUFFER_DURATION * 3 + ) + + if (frames.length > 0) { + let stream = this.voiceChatWorkerMain.decodeStreams[src] + + if (!stream) { + stream = this.voiceChatWorkerMain.getOrCreateDecodeStream(src, this.sampleRate) + + stream.addAudioDecodedListener((samples) => { + this.outputs[src].lastUpdateTime = Date.now() + this.outputs[src].decodedBuffer.write(samples) + }) + } + + frames.forEach((it) => stream.decode(it.frame)) + } + + await readEncodedBufferLoop() + } } - this.notifyRecording(false) + readEncodedBufferLoop().catch((e) => defaultLogger.log('Error while reading encoded buffer of ' + src, e)) } private createInputFor(stream: MediaStream, context: AudioContext) { const streamSource = context.createMediaStreamSource(stream) - const inputProcessor = context.createScriptProcessor(4096, 1, 1) + const inputProcessor = context.createScriptProcessor(INPUT_NODE_BUFFER_SIZE, 1, 1) return { recordingContext: context, encodeStream: this.createInputEncodeStream(context, inputProcessor), @@ -275,9 +319,23 @@ export class VoiceCommunicator { encodeStream.addAudioEncodedListener((data) => this.channel.send(data)) - encodeInputProcessor.onaudioprocess = async (e) => { + encodeInputProcessor.onaudioprocess = (e) => { const buffer = e.inputBuffer - encodeStream.encode(buffer.getChannelData(0)) + let data = buffer.getChannelData(0) + + if (this.pauseRequested) { + // We try to use as many samples as we can that would complete some frames + const samplesToUse = + Math.floor(data.length / OPUS_SAMPLES_PER_FRAME) * OPUS_SAMPLES_PER_FRAME + + OPUS_SAMPLES_PER_FRAME - + (this.inputSamplesCount % OPUS_SAMPLES_PER_FRAME) + data = data.slice(0, samplesToUse) + this.disconnectInput() + this.pauseRequested = false + } + + encodeStream.encode(data) + this.inputSamplesCount += data.length } return encodeStream @@ -308,14 +366,16 @@ export class VoiceCommunicator { } private destroyOutput(outputId: string) { - try { - this.outputs[outputId].panNode.disconnect(this.context.destination) - } catch (e) { - // Ignored. This may fail if the node wasn't connected yet - } + this.disconnectOutputNodes(outputId) this.voiceChatWorkerMain.destroyDecodeStream(outputId) delete this.outputs[outputId] } + + private disconnectOutputNodes(outputId: string) { + const output = this.outputs[outputId] + output.panNode.disconnect() + output.scriptProcessor.disconnect() + } } diff --git a/kernel/packages/voice-chat-codec/constants.ts b/kernel/packages/voice-chat-codec/constants.ts new file mode 100644 index 0000000000..810e8da493 --- /dev/null +++ b/kernel/packages/voice-chat-codec/constants.ts @@ -0,0 +1,11 @@ +export const OPUS_BITS_PER_SECOND = 24000 +export const OPUS_FRAME_SIZE_MS = 60 + +export const OPUS_SAMPLES_PER_FRAME = (OPUS_BITS_PER_SECOND * OPUS_FRAME_SIZE_MS) / 1000 + +export const VOICE_CHAT_SAMPLE_RATE = 24000 + +export const OUTPUT_NODE_BUFFER_SIZE = 4096 +export const OUTPUT_NODE_BUFFER_DURATION = (OUTPUT_NODE_BUFFER_SIZE * 1000) / VOICE_CHAT_SAMPLE_RATE + +export const INPUT_NODE_BUFFER_SIZE = 4096 diff --git a/kernel/packages/voice-chat-codec/worker.ts b/kernel/packages/voice-chat-codec/worker.ts index 03b41b8dc5..0d6dd1ecde 100644 --- a/kernel/packages/voice-chat-codec/worker.ts +++ b/kernel/packages/voice-chat-codec/worker.ts @@ -1,5 +1,6 @@ import { VoiceChatWorkerResponse, RequestTopic, ResponseTopic } from './types' import { Resampler } from './resampler' +import { OPUS_BITS_PER_SECOND, OPUS_FRAME_SIZE_MS } from './constants' declare var self: WorkerGlobalScope & any declare function postMessage(message: any, transferables: any[]): void @@ -112,7 +113,7 @@ function processEncodeMessage(e: MessageEvent) { const sampleRate = getSampleRate(e) const encoderWorklet = (encoderWorklets[e.data.streamId] = encoderWorklets[e.data.streamId] || { working: false, - encoder: new libopus.Encoder(1, sampleRate, 24000, 20, true), + encoder: new libopus.Encoder(1, sampleRate, OPUS_BITS_PER_SECOND, OPUS_FRAME_SIZE_MS, true), lastWorkTime: Date.now(), destroy: function () { this.encoder.destroy() diff --git a/kernel/static/voice-chat-codec/worker.js b/kernel/static/voice-chat-codec/worker.js index 59d0b27d28..e6c62d67ac 100644 --- a/kernel/static/voice-chat-codec/worker.js +++ b/kernel/static/voice-chat-codec/worker.js @@ -1 +1 @@ -!function(t,e){for(var r in e)t[r]=e[r]}(this,function(t){var e={};function r(i){if(e[i])return e[i].exports;var a=e[i]={i:i,l:!1,exports:{}};return t[i].call(a.exports,a,a.exports,r),a.l=!0,a.exports}return r.m=t,r.c=e,r.d=function(t,e,i){r.o(t,e)||Object.defineProperty(t,e,{enumerable:!0,get:i})},r.r=function(t){"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(t,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(t,"__esModule",{value:!0})},r.t=function(t,e){if(1&e&&(t=r(t)),8&e)return t;if(4&e&&"object"==typeof t&&t&&t.__esModule)return t;var i=Object.create(null);if(r.r(i),Object.defineProperty(i,"default",{enumerable:!0,value:t}),2&e&&"string"!=typeof t)for(var a in t)r.d(i,a,function(e){return t[e]}.bind(null,a));return i},r.n=function(t){var e=t&&t.__esModule?function(){return t.default}:function(){return t};return r.d(e,"a",e),e},r.o=function(t,e){return Object.prototype.hasOwnProperty.call(t,e)},r.p="",r(r.s=1)}([function(t,e,r){"use strict";var i,a;r.r(e),function(t){t.ENCODE="ENCODE",t.DECODE="DECODE",t.DESTROY_ENCODER="DESTROY_ENCODER",t.DESTROY_DECODER="DESTROY_ENCODER"}(i||(i={})),function(t){t.ENCODE="ENCODE_OUTPUT",t.DECODE="DECODE_OUTPUT"}(a||(a={}));class s{constructor(t,e,r){if(!t||!e||!r)throw new Error("Invalid settings specified for the resampler.");this.resampler=null,this.fromSampleRate=t,this.toSampleRate=e,this.channels=r||0,this.initialize()}initialize(){this.fromSampleRate==this.toSampleRate?(this.resampler=t=>t,this.ratioWeight=1):(this.fromSampleRate{let e,r,i,a,s,o,n,l,u,f=t.length,h=this.channels;if(f%h!=0)throw new Error("Buffer was of incorrect sample length.");if(f<=0)return Float32Array.of();for(e=this.outputBufferSize,r=this.ratioWeight,i=this.lastWeight,a=0,s=0,o=0,n=0,l=this.outputBuffer;i<1;i+=r)for(s=i%1,a=1-s,this.lastWeight=i%1,u=0;u0?u:0)]*a+t[o+(h+u)]*s;i+=r,o=Math.floor(i)*h}for(u=0;u{let e,r,i,a,s,o,n,l,u,f,h,c=t.length,p=this.channels;if(c%p!=0)throw new Error("Buffer was of incorrect sample length.");if(c<=0)return Float32Array.of();for(e=this.outputBufferSize,r=[],i=this.ratioWeight,a=0,o=0,n=0,l=!this.tailExists,this.tailExists=!1,u=this.outputBuffer,f=0,h=0,s=0;s0&&o=n)){for(s=0;s0?s:0)]*a;h+=a,a=0;break}for(s=0;s{let e=Math.floor(32767*t);return e=Math.min(32767,e),e=Math.max(-32768,e),e})}(function(t,e,r){if(r&&r!==e){return new s(r,e,1).resample(t)}return t}(t.data.samples,t.data.sampleRate,t.data.inputSampleRate));r.encoder.input(i),r.working||u(t.data.streamId,r,t=>t.encoder.output(),(t,e)=>({topic:a.ENCODE,streamId:e,encoded:t}))}(t),t.data.topic===i.DECODE&&function(t){const e=o(t),r=l[t.data.streamId]=l[t.data.streamId]||{working:!1,decoder:new libopus.Decoder(1,e),lastWorkTime:Date.now(),destroy:function(){this.decoder.destroy()}};r.decoder.input(t.data.encoded),r.working||u(t.data.streamId,r,t=>t.decoder.output(),(t,e)=>{return{topic:a.DECODE,streamId:e,samples:(r=t,Float32Array.from(r,t=>{let e=t>=0?t/32767:t/32768;return Math.fround(e)}))};var r})}(t),t.data.topic===i.DESTROY_DECODER){const{streamId:e}=t.data;f(l,e)}if(t.data.topic===i.DESTROY_ENCODER){const{streamId:e}=t.data;f(n,e)}}},function(t,e,r){"use strict";r.r(e);const i=r(0);var a;i&&i.__esModule&&i.default&&new i.default((a=self,{onConnect(t){a.addEventListener("message",()=>t(),{once:!0})},onError(t){a.addEventListener("error",e=>{e.error?t(e.error):e.message&&t(Object.assign(new Error(e.message),{colno:e.colno,error:e.error,filename:e.filename,lineno:e.lineno,message:e.message}))})},onMessage(t){a.addEventListener("message",e=>{t(e.data)})},sendMessage(t){a.postMessage(t)},close(){"terminate"in a?a.terminate():"close"in a&&a.close()}}))}])); \ No newline at end of file +!function(t,e){for(var r in e)t[r]=e[r]}(this,function(t){var e={};function r(i){if(e[i])return e[i].exports;var a=e[i]={i:i,l:!1,exports:{}};return t[i].call(a.exports,a,a.exports,r),a.l=!0,a.exports}return r.m=t,r.c=e,r.d=function(t,e,i){r.o(t,e)||Object.defineProperty(t,e,{enumerable:!0,get:i})},r.r=function(t){"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(t,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(t,"__esModule",{value:!0})},r.t=function(t,e){if(1&e&&(t=r(t)),8&e)return t;if(4&e&&"object"==typeof t&&t&&t.__esModule)return t;var i=Object.create(null);if(r.r(i),Object.defineProperty(i,"default",{enumerable:!0,value:t}),2&e&&"string"!=typeof t)for(var a in t)r.d(i,a,function(e){return t[e]}.bind(null,a));return i},r.n=function(t){var e=t&&t.__esModule?function(){return t.default}:function(){return t};return r.d(e,"a",e),e},r.o=function(t,e){return Object.prototype.hasOwnProperty.call(t,e)},r.p="",r(r.s=1)}([function(t,e,r){"use strict";var i,a;r.r(e),function(t){t.ENCODE="ENCODE",t.DECODE="DECODE",t.DESTROY_ENCODER="DESTROY_ENCODER",t.DESTROY_DECODER="DESTROY_ENCODER"}(i||(i={})),function(t){t.ENCODE="ENCODE_OUTPUT",t.DECODE="DECODE_OUTPUT"}(a||(a={}));class s{constructor(t,e,r){if(!t||!e||!r)throw new Error("Invalid settings specified for the resampler.");this.resampler=null,this.fromSampleRate=t,this.toSampleRate=e,this.channels=r||0,this.initialize()}initialize(){this.fromSampleRate==this.toSampleRate?(this.resampler=t=>t,this.ratioWeight=1):(this.fromSampleRate{let e,r,i,a,s,o,n,l,u,f=t.length,h=this.channels;if(f%h!=0)throw new Error("Buffer was of incorrect sample length.");if(f<=0)return Float32Array.of();for(e=this.outputBufferSize,r=this.ratioWeight,i=this.lastWeight,a=0,s=0,o=0,n=0,l=this.outputBuffer;i<1;i+=r)for(s=i%1,a=1-s,this.lastWeight=i%1,u=0;u0?u:0)]*a+t[o+(h+u)]*s;i+=r,o=Math.floor(i)*h}for(u=0;u{let e,r,i,a,s,o,n,l,u,f,h,c=t.length,p=this.channels;if(c%p!=0)throw new Error("Buffer was of incorrect sample length.");if(c<=0)return Float32Array.of();for(e=this.outputBufferSize,r=[],i=this.ratioWeight,a=0,o=0,n=0,l=!this.tailExists,this.tailExists=!1,u=this.outputBuffer,f=0,h=0,s=0;s0&&o=n)){for(s=0;s0?s:0)]*a;h+=a,a=0;break}for(s=0;s{let e=Math.floor(32767*t);return e=Math.min(32767,e),e=Math.max(-32768,e),e})}(function(t,e,r){if(r&&r!==e){return new s(r,e,1).resample(t)}return t}(t.data.samples,t.data.sampleRate,t.data.inputSampleRate));r.encoder.input(i),r.working||u(t.data.streamId,r,t=>t.encoder.output(),(t,e)=>({topic:a.ENCODE,streamId:e,encoded:t}))}(t),t.data.topic===i.DECODE&&function(t){const e=o(t),r=l[t.data.streamId]=l[t.data.streamId]||{working:!1,decoder:new libopus.Decoder(1,e),lastWorkTime:Date.now(),destroy:function(){this.decoder.destroy()}};r.decoder.input(t.data.encoded),r.working||u(t.data.streamId,r,t=>t.decoder.output(),(t,e)=>{return{topic:a.DECODE,streamId:e,samples:(r=t,Float32Array.from(r,t=>{let e=t>=0?t/32767:t/32768;return Math.fround(e)}))};var r})}(t),t.data.topic===i.DESTROY_DECODER){const{streamId:e}=t.data;f(l,e)}if(t.data.topic===i.DESTROY_ENCODER){const{streamId:e}=t.data;f(n,e)}}},function(t,e,r){"use strict";r.r(e);const i=r(0);var a;i&&i.__esModule&&i.default&&new i.default((a=self,{onConnect(t){a.addEventListener("message",()=>t(),{once:!0})},onError(t){a.addEventListener("error",e=>{e.error?t(e.error):e.message&&t(Object.assign(new Error(e.message),{colno:e.colno,error:e.error,filename:e.filename,lineno:e.lineno,message:e.message}))})},onMessage(t){a.addEventListener("message",e=>{t(e.data)})},sendMessage(t){a.postMessage(t)},close(){"terminate"in a?a.terminate():"close"in a&&a.close()}}))}])); \ No newline at end of file diff --git a/kernel/test/atomicHelpers/OrderedRingBuffer.test.ts b/kernel/test/atomicHelpers/OrderedRingBuffer.test.ts new file mode 100644 index 0000000000..049922ee88 --- /dev/null +++ b/kernel/test/atomicHelpers/OrderedRingBuffer.test.ts @@ -0,0 +1,146 @@ +import { OrderedRingBuffer } from 'atomicHelpers/RingBuffer' +import { expect } from 'chai' + +describe('OrderedRingBuffer', () => { + let buffer: OrderedRingBuffer + beforeEach(() => { + buffer = new OrderedRingBuffer(20, Float32Array) + }) + + it('can write and read simple operations', () => { + buffer.write(Float32Array.of(10, 20, 30), 0) + + expect(buffer.read()).to.eql(Float32Array.of(10, 20, 30)) + + buffer.write(Float32Array.of(40, 50, 60), 1) + expect(buffer.read()).to.eql(Float32Array.of(40, 50, 60)) + }) + + it('can write multiple and read once', () => { + buffer.write(Float32Array.of(10, 20, 30), 0) + buffer.write(Float32Array.of(40, 50, 60), 1) + + expect(buffer.read()).to.eql(Float32Array.of(10, 20, 30, 40, 50, 60)) + }) + + it('can write when the buffer is full, overwriting the first values', () => { + const toWrite = new Float32Array(buffer.size) + + toWrite.fill(1) + + buffer.write(toWrite, 0) + buffer.write(Float32Array.of(10, 20, 30), 1) + + const expected = new Float32Array(toWrite) + expected.set(Float32Array.of(10, 20, 30), toWrite.length - 3) + + expect(buffer.read()).to.eql(expected) + }) + + it('can write values bytes and still work as expected', () => { + for (let i = 0; i < 10; i++) { + const toWrite = new Float32Array(buffer.size) + + toWrite.fill(i) + + buffer.write(toWrite, i) + } + + buffer.write(Float32Array.of(10, 20, 30), 11) + + const expected = new Float32Array(buffer.size) + expected.fill(9) + expected.set(Float32Array.of(10, 20, 30), buffer.size - 3) + + expect(buffer.read()).to.eql(expected) + }) + + it('can write a large array and it keeps the last values', () => { + for (let i = 0; i < 10; i++) { + const toWrite = new Float32Array(buffer.size) + + toWrite.fill(i) + + buffer.write(toWrite, i) + } + + buffer.write(Float32Array.of(10, 20, 30), 11) + + const expected = new Float32Array(buffer.size) + expected.fill(9) + expected.set(Float32Array.of(10, 20, 30), buffer.size - 3) + + expect(buffer.read()).to.eql(expected) + }) + + it('can write out of order and it gets ordered on read', () => { + buffer.write(Float32Array.of(40, 50, 60), 1) + buffer.write(Float32Array.of(10, 20, 30), 0) + + expect(buffer.read()).to.eql(Float32Array.of(10, 20, 30, 40, 50, 60)) + }) + + it('can write at random order and it gets ordered on read', () => { + const chunksToWrite = buffer.size / 2 + const toWrite: Record = {} + const expected = new Float32Array(buffer.size) + + for (let i = 0; i < chunksToWrite; i++) { + const chunk = Float32Array.of(i * 100 + 1, i * 100 + 2) + toWrite[i] = chunk + expected.set(chunk, i * 2) + } + + const randomized = Object.keys(toWrite).sort(() => Math.random() - 0.5) + + randomized.forEach((i) => buffer.write(toWrite[i], parseInt(i))) + + expect(buffer.read()).to.eql(expected) + }) + + it('when writing discards part of buffer, old chunks get resized but not discarded', () => { + const toWrite = new Float32Array(buffer.size) + + toWrite.fill(1) + + buffer.write(toWrite, 0) + + const second = Float32Array.of(40, 50, 60) + const first = Float32Array.of(10, 20, 30) + + buffer.write(second, 2) + buffer.write(first, 1) + + const expected = new Float32Array(buffer.size) + expected.fill(1) + + expected.set(first, expected.length - first.length - second.length) + expected.set(second, expected.length - second.length) + + expect(buffer.read()).to.eql(expected) + }) + + it('when writing discards whole old chunks, they get cleaned up', () => { + for (let i = 0; i < 10; i++) { + const toWrite = new Float32Array(buffer.size / 5) + + toWrite.fill(i) + + buffer.write(toWrite, i * 10) + } + + //@ts-ignore + expect(buffer.chunks.length).to.eql(5) + + expect(buffer.peek()).to.eql(Float32Array.of(5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 7, 8, 8, 8, 8, 9, 9, 9, 9)) + + // We write something in the middle of 5 and 6, and something in the middle of 7 and 8 and something at the end + buffer.write(Float32Array.of(11, 11, 11, 11), 55) + + buffer.write(Float32Array.of(22, 22, 22, 22), 75) + + buffer.write(Float32Array.of(33, 33), 95) + + expect(buffer.read()).to.eql(Float32Array.of(6, 6, 7, 7, 7, 7, 22, 22, 22, 22, 8, 8, 8, 8, 9, 9, 9, 9, 33, 33)) + }) +}) diff --git a/kernel/test/atomicHelpers/RingBuffer.test.ts b/kernel/test/atomicHelpers/RingBuffer.test.ts deleted file mode 100644 index 36604a6d8b..0000000000 --- a/kernel/test/atomicHelpers/RingBuffer.test.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { RingBuffer } from 'atomicHelpers/RingBuffer' -import { expect } from 'chai' - -describe('RingBuffer', () => { - let buffer: RingBuffer - beforeEach(() => { - buffer = new RingBuffer(20, Float32Array) - }) - - it('can write and read simple operations', () => { - buffer.write(Float32Array.of(10, 20, 30)) - - expect(buffer.read()).to.equal(Float32Array.of(10, 20, 30)) - - buffer.write(Float32Array.of(40, 50, 60)) - expect(buffer.read()).to.equal(Float32Array.of(40, 50, 60)) - }) - - it('can write multiple and read once', () => { - buffer.write(Float32Array.of(10, 20, 30)) - buffer.write(Float32Array.of(40, 50, 60)) - - expect(buffer.read()).to.equal(Float32Array.of(10, 20, 30, 40, 50, 60)) - }) - - it('can write when the buffer is full, overwriting the first values', () => { - const toWrite = new Float32Array(buffer.size) - - toWrite.fill(1) - - buffer.write(toWrite) - buffer.write(Float32Array.of(10, 20, 30)) - - const expected = new Float32Array(toWrite) - expected.set(Float32Array.of(10, 20, 30), toWrite.length - 3) - - expect(buffer.read()).to.equal(expected) - }) - - it('can write values bytes and still work as expected', () => { - for (let i = 0; i < 10; i++) { - const toWrite = new Float32Array(buffer.size) - - toWrite.fill(i) - - buffer.write(toWrite) - } - - buffer.write(Float32Array.of(10, 20, 30)) - - const expected = new Float32Array(buffer.size) - expected.fill(9) - expected.set(Float32Array.of(10, 20, 30), buffer.size - 3) - - expect(buffer.read()).to.equal(expected) - }) - - it('can write a large array and it keeps the last values', () => { - for (let i = 0; i < 10; i++) { - const toWrite = new Float32Array(buffer.size) - - toWrite.fill(i) - - buffer.write(toWrite) - } - - buffer.write(Float32Array.of(10, 20, 30)) - - const expected = new Float32Array(buffer.size) - expected.fill(9) - expected.set(Float32Array.of(10, 20, 30), buffer.size - 3) - - expect(buffer.read()).to.equal(expected) - }) -}) diff --git a/kernel/test/atomicHelpers/SortedLimitedQueue.test.ts b/kernel/test/atomicHelpers/SortedLimitedQueue.test.ts new file mode 100644 index 0000000000..0d8bca91de --- /dev/null +++ b/kernel/test/atomicHelpers/SortedLimitedQueue.test.ts @@ -0,0 +1,127 @@ +import { SortedLimitedQueue } from 'atomicHelpers/SortedLimitedQueue' +import { expect } from 'chai' + +describe('SortedLimitedQueue', () => { + let queue: SortedLimitedQueue + const criteria = (a: number, b: number) => a - b + + const queueLength = 10 + + beforeEach(() => { + queue = new SortedLimitedQueue(queueLength, criteria) + }) + + it('can queue and dequeue elements', () => { + queue.queue(1) + queue.queue(2) + + expect(queue.dequeue()).to.eql(1) + expect(queue.dequeue()).to.eql(2) + expect(queue.dequeue()).to.be.undefined + }) + + it('can queue out of order and it gets ordered', () => { + queue.queue(4) + queue.queue(2) + queue.queue(7) + queue.queue(1) + + expect(queue.dequeueItems(4)).to.eql([1, 2, 4, 7]) + }) + + it('can queue in random order and it gets ordered', () => { + const expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + expected + .slice() + .sort(() => Math.random() - 0.5) + .forEach((item) => queue.queue(item)) + + expect(queue.dequeueItems(10)).to.eql(expected) + }) + + it('can queue more than the length limit and the first elements get removed', () => { + const queued = [] + const length = 20 + + for (let i = 0; i < length; i++) { + const item = Math.floor(Math.random() * 1000) + queued.push(item) + queue.queue(item) + } + + const expected = queued.sort(criteria).slice(length - queueLength, length) + + expect(queue.dequeueItems(10)).to.eql(expected) + }) + + it('can queue any element with custom sorting', () => { + const queue = new SortedLimitedQueue<{ name: string; order: number }>(10, (a, b) => a.order - b.order) + + queue.queue({ name: 'second', order: 2 }) + queue.queue({ name: 'fourth', order: 4 }) + queue.queue({ name: 'third', order: 3 }) + queue.queue({ name: 'first', order: 1 }) + + expect(queue.dequeueItems(4).map((it) => it.name)).to.eql(['first', 'second', 'third', 'fourth']) + }) + + it('can await elements to be dequeued', async () => { + queue.queue(1) + queue.queue(2) + + const result = await queue.dequeueItemsWhenAvailable(2, 100) + + expect(result).to.eql([1, 2]) + }) + + it('can await elements with a timeout', async () => { + let timedout = false + + queue.queue(1) + + setTimeout(() => (timedout = true), 5) + + const result = await queue.dequeueItemsWhenAvailable(2, 10) + + expect(result).to.eql([1]) + expect(timedout).to.be.true + }) + + it('can await elements with a timeout and it returns empty array', async () => { + let timedout = false + + setTimeout(() => (timedout = true), 5) + + const result = await queue.dequeueItemsWhenAvailable(2, 10) + + expect(result).to.eql([]) + expect(timedout).to.be.true + }) + + it('can await elements and it blocks until elements are available', async () => { + let blocked = false + let timedout = false + + queue.queue(1) + + setTimeout(() => (blocked = true), 5) + setTimeout(() => queue.queue(2), 10) + setTimeout(() => (timedout = true), 50) + + const result = await queue.dequeueItemsWhenAvailable(2, 100) + + expect(result).to.eql([1, 2]) + expect(timedout).to.be.false + expect(blocked).to.be.true + }) + + it('puts the elements in the order they came if they have the same order value', () => { + const queue = new SortedLimitedQueue<{ name: string; order: number }>(10, (a, b) => a.order - b.order) + + queue.queue({ name: 'first', order: 1 }) + queue.queue({ name: 'second', order: 1 }) + + expect(queue.dequeueItems(2).map((it) => it.name)).to.eql(['first', 'second']) + }) +}) diff --git a/kernel/test/index.ts b/kernel/test/index.ts index a435bc9e27..e491ddbf03 100644 --- a/kernel/test/index.ts +++ b/kernel/test/index.ts @@ -9,6 +9,8 @@ global['isRunningTests'] = true import './atomicHelpers/parcelScenePositions.test' import './atomicHelpers/landHelpers.test' import './atomicHelpers/vectorHelpers.test' +import './atomicHelpers/OrderedRingBuffer.test' +import './atomicHelpers/SortedLimitedQueue.test' /* UNIT */ import './unit/ethereum.test'