Skip to content
This repository has been archived by the owner on Aug 16, 2021. It is now read-only.

Commit

Permalink
feat: Sorted queue for voice chat frames (#1363)
Browse files Browse the repository at this point in the history
Added a sorted queue for voice chat frames received from the network.

This allows to potentially receive the voice packets in different order, and that should improve voice chat when there is a big P2P network.

Also fixed a problem that would make the final part of the last recorded audio to be appended to the next recording
  • Loading branch information
pablitar authored Sep 28, 2020
1 parent e4d8552 commit 78d4f8a
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 140 deletions.
207 changes: 193 additions & 14 deletions kernel/packages/atomicHelpers/RingBuffer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import future, { IFuture } from 'fp-future'

type TypedArray =
| Int8Array
| Uint8Array
Expand All @@ -22,7 +24,51 @@ export class RingBuffer<T extends TypedArray> {
return this.writePointer - this.readPointer
}

getWritePointer() {
return this.writePointer
}

getReadPointer() {
return this.readPointer
}

write(array: T, length?: number) {
this.writeAt(array, this.writePointer, length)
}

read(readCount?: number): T {
const result = this.peek(this.readPointer, readCount)

this.readPointer += result.length

return result
}

peek(startPointer?: number, readCount?: number): T {
const start = startPointer ? startPointer : this.readPointer

const maxCountToRead = this.writePointer - this.readPointer

const count = readCount ? Math.min(readCount, maxCountToRead) : maxCountToRead

const readPosition = start % this.buffer.length

const endIndex = readPosition + count

let result: T

if (endIndex > this.buffer.length) {
result = new this.ArrayTypeConstructor(count)
result.set(this.buffer.slice(readPosition, this.buffer.length))
result.set(this.buffer.slice(0, endIndex - this.buffer.length), this.buffer.length - readPosition)
} else {
result = this.buffer.slice(readPosition, endIndex) as T
}

return result
}

writeAt(array: T, startPointer: number, length?: number) {
const len = length || array.length

let toWrite = array
Expand All @@ -31,7 +77,7 @@ export class RingBuffer<T extends TypedArray> {
toWrite = array.slice(array.length - this.buffer.length, array.length) as T
}

const writePosition = this.writePointer % this.buffer.length
const writePosition = startPointer % this.buffer.length

const endIndex = writePosition + len

Expand All @@ -43,36 +89,169 @@ export class RingBuffer<T extends TypedArray> {
this.buffer.set(toWrite.slice(0, len), writePosition)
}

this.writePointer += len
const endPointer = startPointer + len

if (endPointer > this.writePointer) {
this.writePointer = endPointer
}

this.updateReadPointerToMinReadPosition()
}

isFull() {
return this.readAvailableCount() >= this.size
}

private updateReadPointerToMinReadPosition() {
const minReadPointer = this.writePointer - this.buffer.length

if (this.readPointer < minReadPointer) {
this.readPointer = minReadPointer
}
}
}

read(readCount?: number): T {
const maxCountToRead = this.writePointer - this.readPointer
type Chunk = {
order: number
startPointer: number
length: number
}

const count = readCount ? Math.min(readCount, maxCountToRead) : maxCountToRead
export class OrderedRingBuffer<T extends TypedArray> {
private internalRingBuffer: RingBuffer<T>

const readPosition = this.readPointer % this.buffer.length
private chunks: Chunk[] = []

const endIndex = readPosition + count
private blockedReadChunksFuture?: { chunksToRead: number; future: IFuture<T[]> }

let result: T
constructor(public readonly size: number, ArrayTypeConstructor: { new (size: number): T }) {
this.internalRingBuffer = new RingBuffer(size, ArrayTypeConstructor)
}

if (endIndex > this.buffer.length) {
result = new this.ArrayTypeConstructor(count)
result.set(this.buffer.slice(readPosition, this.buffer.length))
result.set(this.buffer.slice(0, endIndex - this.buffer.length), this.buffer.length - readPosition)
readAvailableCount() {
return this.internalRingBuffer.readAvailableCount()
}

write(array: T, order: number, length?: number) {
// We find those chunks that should be after this chunk
const nextChunks = this.chunks.filter((it) => it.order > order)

if (nextChunks.length === 0) {
// If there are no chunks that should be after this chunk, then we just need to write the chunk at the end.
this.chunks.push({
order,
startPointer: this.internalRingBuffer.getWritePointer(),
length: length || array.length
})

this.internalRingBuffer.write(array, length)
} else {
result = this.buffer.slice(readPosition, endIndex) as T
// Otherwise, we need to get those chunks that should be after this one, and write them one after the other

let writePointer = nextChunks[0].startPointer

const newChunk = {
order,
startPointer: writePointer,
length: length || array.length
}

// Chunks are ordered by "order", so we need to ensure that we place this new chunk in the corresponding index.
this.chunks.splice(this.chunks.length - nextChunks.length, 0, newChunk)

const arraysToWrite = [length ? array.slice(0, length) : array] as T[]

// We get the arrays for each chunk, and we update their pointers while we are at it
nextChunks.forEach((chunk) => {
arraysToWrite.push(this.arrayForChunk(chunk))

chunk.startPointer += newChunk.length
})

// We write starting from the position of the first chunk that will be rewritten
arraysToWrite.forEach((toWrite) => {
this.internalRingBuffer.writeAt(toWrite, writePointer)
writePointer += toWrite.length
})
}

this.readPointer += count
this.discardUnreadableChunks()
this.resolveBlockedRead()
}

arrayForChunk(chunk: Chunk): T {
return this.peek(chunk.startPointer, chunk.length)
}

peek(startPointer?: number, readCount?: number): T {
return this.internalRingBuffer.peek(startPointer, readCount)
}

read(readCount?: number): T {
const result = this.internalRingBuffer.read(readCount)

this.discardUnreadableChunks()

return result
}

/**
* The promise will block until there is chunksCount chunks to read,
* or until timeToWait has passed.
*
* Once timeToWait has passed, if there is nothing to read, an empty array is returned.
*/
async blockAndReadChunks(chunksCount: number, timeToWait: number): Promise<T[]> {
if (this.chunks.length >= chunksCount) {
const chunks = this.readChunks(chunksCount)

return Promise.resolve(chunks)
} else {
if (this.blockedReadChunksFuture) {
this.blockedReadChunksFuture.future.reject(new Error('Only one blocking call is possible at the same time'))
}

const thisFuture = { chunksToRead: chunksCount, future: future() }

this.blockedReadChunksFuture = thisFuture

setTimeout(() => {
if (this.blockedReadChunksFuture === thisFuture) {
if (this.chunks.length > 0) {
thisFuture.future.resolve(this.readChunks(this.chunks.length))
} else {
thisFuture.future.resolve([])
}
}
}, timeToWait)

return this.blockedReadChunksFuture.future
}
}

private readChunks(chunksCount: number) {
return this.chunks.slice(0, chunksCount).map((it) => this.read(it.length))
}

private discardUnreadableChunks() {
const isReadable = (chunk: Chunk) => {
// A chunk is readable if its end pointer is ahead of the read pointer
const endPointer = chunk.startPointer + chunk.length
return endPointer > this.internalRingBuffer.getReadPointer()
}

this.chunks = this.chunks.filter(isReadable)

if (this.chunks.length > 0 && this.chunks[0].startPointer < this.internalRingBuffer.getReadPointer()) {
this.chunks[0].startPointer = this.internalRingBuffer.getReadPointer()
}
}

private resolveBlockedRead() {
if (this.blockedReadChunksFuture && this.chunks.length >= this.blockedReadChunksFuture.chunksToRead) {
const read = this.readChunks(this.blockedReadChunksFuture.chunksToRead)
this.blockedReadChunksFuture.future.resolve(read)
delete this.blockedReadChunksFuture
}
}
}
97 changes: 97 additions & 0 deletions kernel/packages/atomicHelpers/SortedLimitedQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import future, { IFuture } from 'fp-future'

export class SortedLimitedQueue<T> {
private internalArray: T[]

private pendingDequeue?: { amount: number; futures: IFuture<T[]>[]; timedoutValues?: T[] }

constructor(private readonly maxLength: number, private readonly sortCriteria: (a: T, b: T) => number) {
this.internalArray = []
}

queue(item: T) {
let insertIndex = 0

// Since the most likely scenario for our use case is that we insert the item at the end,
// we start by the end. This may be parameterized in the future
for (let i = this.internalArray.length - 1; i >= 0; i--) {
if (this.sortCriteria(item, this.internalArray[i]) >= 0) {
insertIndex = i + 1
break
}
}

if (insertIndex === 0) {
this.internalArray.unshift(item)
} else if (insertIndex === this.internalArray.length) {
this.internalArray.push(item)
} else {
this.internalArray.splice(insertIndex, 0, item)
}

if (this.internalArray.length > this.maxLength) {
this.internalArray.shift()
}

this.resolveBlockedDequeues()
}

queuedCount() {
return this.internalArray.length
}

dequeue(): T | undefined {
return this.internalArray.shift()
}

dequeueItems(count?: number): T[] {
return this.internalArray.splice(0, count ?? this.internalArray.length)
}

async dequeueItemsWhenAvailable(count: number, timeout: number): Promise<T[]> {
if (this.pendingDequeue && this.pendingDequeue.amount !== count) {
// To have multiple dequeue requests they all have to have the same amount. We prioritize the new request, and resolve the other with empty arrays.
this.pendingDequeue.futures.forEach((it) => it.resolve([]))
}

if (this.queuedCount() >= count) {
const items = this.dequeueItems(count)
this.pendingDequeue?.futures.forEach((it) => it.resolve(items))
this.pendingDequeue = undefined
return Promise.resolve(items)
} else {
if (!this.pendingDequeue) {
this.pendingDequeue = {
amount: count,
futures: []
}
}
const newFuture = future<T[]>()
this.pendingDequeue.futures.push(newFuture)

setTimeout(() => {
if (this.pendingDequeue && this.pendingDequeue.futures.indexOf(newFuture) >= 0) {
this.resolveBlockedDequeueWith(this.queuedCount())
}
}, timeout)

return newFuture
}
}

isFull() {
return this.queuedCount() >= this.maxLength
}

private resolveBlockedDequeues() {
if (this.pendingDequeue && this.queuedCount() >= this.pendingDequeue.amount) {
this.resolveBlockedDequeueWith(this.pendingDequeue.amount)
}
}

private resolveBlockedDequeueWith(amount: number) {
const items = amount === 0 ? [] : this.dequeueItems(amount)
this.pendingDequeue?.futures.forEach((it) => it.resolve(items))
this.pendingDequeue = undefined
}
}
3 changes: 2 additions & 1 deletion kernel/packages/config/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export const contracts = {
'AdapterFeeCollector': '0x5DC888024cB599CfDdb9E6483ED6bAe1fA9e9D18',
'AdapterConverter': '0x2782eb28Dcb1eF4E7632273cd4e347e130Ce4646',
'POIAllowlist': '0x5DC4a5C214f2161F0D5595a6dDd9352409aE3Ab4',
'NAMEDenylist': '0x20c6f1e86eba703a14414a0cbc1b55c89dba7a0f'
'NAMEDenylist': '0x20c6f1e86eba703a14414a0cbc1b55c89dba7a0f',
'CollectionsV2Factory': '0x16d8bac5b67a6b782a9081377bec413bc5bb56a6'
},
'mainnet': {
'MANAToken': '0x0f5d2fb29fb7d3cfee444a200298f468908cc942',
Expand Down
1 change: 0 additions & 1 deletion kernel/packages/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ export namespace commConfigurations {
}
]

export const voiceChatSampleRate = 24000
export const voiceChatUseHRTF = location.search.indexOf('VOICE_CHAT_USE_HRTF') !== -1
}
export const loginConfig = {
Expand Down
Loading

0 comments on commit 78d4f8a

Please sign in to comment.