Skip to content

Commit

Permalink
Add support for draft-03 (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Mar 8, 2024
1 parent 7e145e8 commit 831f191
Show file tree
Hide file tree
Showing 23 changed files with 688 additions and 526 deletions.
67 changes: 23 additions & 44 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,9 @@ export class Broadcast {
// Send a SUBSCRIBE_OK
await subscriber.ack()

const stream = await subscriber.data({
group: 0,
object: 0,
priority: 0,
})

const writer = stream.getWriter()

try {
await writer.write(bytes)
await writer.close()
} catch (e) {
const err = asError(e)
await writer.abort(err.message)
throw err
} finally {
writer.releaseLock()
}
const stream = await subscriber.group({ group: 0 })
await stream.write({ object: 0, payload: bytes })
await stream.close()
}

async #serveInit(subscriber: SubscribeRecv, name: string) {
Expand All @@ -162,28 +147,9 @@ export class Broadcast {

const init = await track.init()

// Create a new stream for each segment.
const stream = await subscriber.data({
group: 0,
object: 0,
priority: 0, // TODO
expires: 0, // Never expires
})

const writer = stream.getWriter()

// TODO make a helper to pipe a Uint8Array to a stream
try {
// Write the init segment to the stream.
await writer.write(init)
await writer.close()
} catch (e) {
const err = asError(e)
await writer.abort(err.message)
throw err
} finally {
writer.releaseLock()
}
const stream = await subscriber.group({ group: 0 })
await stream.write({ object: 0, payload: init })
await stream.close()
}

async #serveTrack(subscriber: SubscribeRecv, name: string) {
Expand All @@ -209,15 +175,28 @@ export class Broadcast {

async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
// Create a new stream for each segment.
const stream = await subscriber.data({
const stream = await subscriber.group({
group: segment.id,
object: 0,
priority: 0, // TODO
expires: 30, // TODO configurable
})

let object = 0

// Pipe the segment to the stream.
await segment.chunks().pipeTo(stream)
const chunks = segment.chunks().getReader()
for (;;) {
const { value, done } = await chunks.read()
if (done) break

await stream.write({
object,
payload: value,
})

object += 1
}

await stream.close()
}

// Attach the captured video stream to the given video element.
Expand Down
22 changes: 5 additions & 17 deletions lib/media/catalog/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Connection } from "../../transport"
import { Reader } from "../../transport/stream"
import { asError } from "../../common/error"

// JSON encoded catalog
Expand Down Expand Up @@ -31,27 +30,18 @@ export class Catalog {
}

static async fetch(connection: Connection): Promise<Catalog> {
let raw: Uint8Array

const subscribe = await connection.subscribe("", ".catalog")
try {
const segment = await subscribe.data()
if (!segment) throw new Error("no catalog data")

const { header, stream } = segment

if (header.group !== 0) {
throw new Error("TODO updates not supported")
}

if (header.object !== 0) {
throw new Error("TODO delta updates not supported")
}

const reader = new Reader(stream)
raw = await reader.readAll()
const chunk = await segment.read()
if (!chunk) throw new Error("no catalog chunk")

await segment.close()
await subscribe.close() // we done

return Catalog.decode(chunk.payload)
} catch (e) {
const err = asError(e)

Expand All @@ -60,8 +50,6 @@ export class Catalog {

throw err
}

return Catalog.decode(raw)
}
}

Expand Down
48 changes: 26 additions & 22 deletions lib/media/mp4/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,21 @@ export interface Frame {

// Decode a MP4 container into individual samples.
export class Parser {
info!: MP4.Info

#mp4 = MP4.New()
#offset = 0

// TODO Parser should extend TransformStream
decode: TransformStream<Uint8Array, Frame>

constructor() {
this.decode = new TransformStream(
{
start: this.#start.bind(this),
transform: this.#transform.bind(this),
flush: this.#flush.bind(this),
},
// Buffer a single sample on either end
{ highWaterMark: 1 },
{ highWaterMark: 1 },
)
}
#samples: Array<Frame> = []

#start(controller: TransformStreamDefaultController<Frame>) {
constructor(init: Uint8Array) {
this.#mp4.onError = (err) => {
controller.error(err)
console.error("MP4 error", err)
}

this.#mp4.onReady = (info: MP4.Info) => {
this.info = info

// Extract all of the tracks, because we don't know if it's audio or video.
for (const track of info.tracks) {
this.#mp4.setExtractionOptions(track.id, track, { nbSamples: 1 })
Expand All @@ -40,14 +30,27 @@ export class Parser {

this.#mp4.onSamples = (_track_id: number, track: MP4.Track, samples: MP4.Sample[]) => {
for (const sample of samples) {
controller.enqueue({ track, sample })
this.#samples.push({ track, sample })
}
}

this.#mp4.start()

// For some reason we need to modify the underlying ArrayBuffer with offset
const copy = new Uint8Array(init)
const buffer = copy.buffer as MP4.ArrayBuffer
buffer.fileStart = this.#offset

this.#mp4.appendBuffer(buffer)
this.#offset += buffer.byteLength
this.#mp4.flush()

if (!this.info) {
throw new Error("could not parse MP4 info")
}
}

#transform(chunk: Uint8Array) {
decode(chunk: Uint8Array): Array<Frame> {
const copy = new Uint8Array(chunk)

// For some reason we need to modify the underlying ArrayBuffer with offset
Expand All @@ -59,9 +62,10 @@ export class Parser {
this.#mp4.flush()

this.#offset += buffer.byteLength
}

#flush() {
this.#mp4.flush()
const samples = [...this.#samples]
this.#samples.length = 0

return samples
}
}
7 changes: 4 additions & 3 deletions lib/playback/backend.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Catalog } from "../media/catalog"
import { Header } from "../transport/object"
import { GroupHeader } from "../transport/objects"

// TODO make an interface for backends

Expand All @@ -9,12 +9,13 @@ export interface Config {

export interface Init {
name: string // name of the init track
stream: ReadableStream<Uint8Array>
data: Uint8Array
}

export interface Segment {
init: string // name of the init track
kind: "audio" | "video"
header: Header
header: GroupHeader
buffer: Uint8Array
stream: ReadableStream<Uint8Array>
}
16 changes: 14 additions & 2 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { asError } from "../common/error"
import Webcodecs from "./webcodecs"
import MSE from "./mse"
import { Client } from "../transport/client"
import { GroupReader } from "../transport/objects"

export type Range = Message.Range
export type Timeline = Message.Timeline
Expand Down Expand Up @@ -98,7 +99,11 @@ export class Player {
const init = await Promise.race([sub.data(), this.#running])
if (!init) throw new Error("no init data")

this.#backend.init({ stream: init.stream, name })
// We don't care what type of reader we get, we just want the payload.
const chunk = await init.read()
if (!chunk) throw new Error("no init chunk")

this.#backend.init({ data: chunk.payload, name })
} finally {
await sub.close()
}
Expand All @@ -115,11 +120,18 @@ export class Player {
const segment = await Promise.race([sub.data(), this.#running])
if (!segment) break

if (!(segment instanceof GroupReader)) {
throw new Error(`expected group reader for segment: ${track.data_track}`)
}

const [buffer, stream] = segment.stream.release()

this.#backend.segment({
init: track.init_track,
kind: track.kind,
header: segment.header,
stream: segment.stream,
buffer,
stream,
})
}
} finally {
Expand Down
Loading

0 comments on commit 831f191

Please sign in to comment.