Skip to content

Commit

Permalink
Write RTMPChunkBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
shogo4405 committed Oct 19, 2024
1 parent 9e6c626 commit 697c793
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 56 deletions.
80 changes: 49 additions & 31 deletions Sources/RTMP/RTMPChunk.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,21 @@ final class RTMPChunkMessageHeader {
}

final class RTMPChunkBuffer {
static let headerSize = 3 + 11 + 4

var payload: Data {
return data[position..<length]
data[position..<length]
}

var chunkSize = RTMPChunkMessageHeader.chunkSize
var chunkSize = RTMPChunkMessageHeader.chunkSize {
didSet {
guard oldValue < chunkSize else {
return
}
let length = chunkSize - data.count
data += Data(count: length + Self.headerSize)
}
}

var remaining: Int {
return length - position
Expand All @@ -139,8 +149,9 @@ final class RTMPChunkBuffer {
private var data: Data
private var length = 0

init(_ data: Data) {
self.data = data
init(chunkSize: Int = RTMPChunkMessageHeader.chunkSize) {
self.data = Data(count: chunkSize + Self.headerSize)
self.chunkSize = chunkSize
}

func flip() -> Self {
Expand Down Expand Up @@ -226,43 +237,65 @@ final class RTMPChunkBuffer {
self.length = length + data.count
}

func putBasicHeader(_ chunkType: RTMPChunkType, chunkStreamId: UInt16) -> Self {
func putMessage(_ chunkType: RTMPChunkType, chunkStreamId: UInt16, message: some RTMPMessage) -> AnyIterator<Data> {
let payload = message.payload
let length = payload.count
var offset = 0
var remaining = min(chunkSize, length)
return AnyIterator { () -> Data? in
guard 0 < remaining else {
return nil
}
defer {
self.position = 0
offset += remaining
remaining = min(self.chunkSize, length - offset)
}
if offset == 0 {
self.putBasicHeader(chunkType, chunkStreamId: chunkStreamId)
self.putMessageHeader(chunkType, length: length, message: message)
} else {
self.putBasicHeader(.three, chunkStreamId: chunkStreamId)
}
self.data.replaceSubrange(self.position..<self.position + remaining, with: payload[offset..<offset + remaining])
return self.data.subdata(in: 0..<self.position + remaining)
}
}

private func putBasicHeader(_ chunkType: RTMPChunkType, chunkStreamId: UInt16) {
if chunkStreamId <= 63 {
data[position] = chunkType.rawValue << 6 | UInt8(chunkStreamId)
position += 1
return self
return
}
if chunkStreamId <= 319 {
data[position + 0] = chunkType.rawValue << 6 | 0b0000000
data[position + 1] = UInt8(chunkStreamId - 64)
position += 2
return self
return
}
data[position + 0] = chunkType.rawValue << 6 | 0b00000001
let streamId = (chunkStreamId - 64).bigEndian.data
data[position + 1] = streamId[0]
data[position + 2] = streamId[1]
position += 3
return self
}

func putMessage(_ chunkType: RTMPChunkType, chunkStreamId: UInt16, message: some RTMPMessage) -> Self {
let length = message.payload.count

private func putMessageHeader(_ chunkType: RTMPChunkType, length: Int, message: some RTMPMessage) {
switch chunkType {
case .zero:
data.replaceSubrange(position...position + 3, with: message.timestamp.bigEndian.data[1...3])
data.replaceSubrange(position..<position + 3, with: message.timestamp.bigEndian.data[1...3])
position += 3
data.replaceSubrange(position...position + 3, with: UInt32(length).bigEndian.data[1...3])
data.replaceSubrange(position..<position + 3, with: UInt32(length).bigEndian.data[1...3])
position += 3
data[position] = message.type.rawValue
position += 1
data.replaceSubrange(position...position + 4, with: message.streamId.littleEndian.data)
data.replaceSubrange(position..<position + 4, with: message.streamId.littleEndian.data)
position += 4
case .one:
data.replaceSubrange(position...position + 3, with: message.timestamp.bigEndian.data[1...3])
data.replaceSubrange(position..<position + 3, with: message.timestamp.bigEndian.data[1...3])
position += 3
data.replaceSubrange(position...position + 3, with: UInt32(length).bigEndian.data[1...3])
data.replaceSubrange(position..<position + 3, with: UInt32(length).bigEndian.data[1...3])
position += 3
data[position] = message.type.rawValue
position += 1
Expand All @@ -272,21 +305,6 @@ final class RTMPChunkBuffer {
case .three:
break
}

var offset = 0
var remaining = min(chunkSize, length)
let payload = message.payload
repeat {
if 0 < offset {
_ = putBasicHeader(.three, chunkStreamId: chunkStreamId)
}
data.replaceSubrange(position..<position + remaining, with: payload[offset..<offset + remaining])
position += remaining
offset += remaining
remaining = min(chunkSize, length - offset)
} while (0 < remaining)

return self
}
}

Expand Down
26 changes: 16 additions & 10 deletions Sources/RTMP/RTMPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,22 @@ public actor RTMPConnection: NetworkConnection {
}
private var chunkSizeC = RTMPChunkMessageHeader.chunkSize {
didSet {
guard chunkSizeC != oldValue else {
return
}
inputBuffer.chunkSize = chunkSizeC
}
}
private var chunkSizeS = RTMPChunkMessageHeader.chunkSize
private var chunkSizeS = RTMPChunkMessageHeader.chunkSize {
didSet {
guard chunkSizeS != oldValue else {
return
}
outputBuffer.chunkSize = chunkSizeS
}
}
private var operations: [Int: CheckedContinuation<RTMPResponse, any Swift.Error>] = [:]
private var inputBuffer = RTMPChunkBuffer(.init())
private var inputBuffer = RTMPChunkBuffer()
private var windowSizeC = RTMPConnection.defaultWindowSizeS {
didSet {
guard connected else {
Expand All @@ -195,6 +205,7 @@ public actor RTMPConnection: NetworkConnection {
}
}
private var windowSizeS = RTMPConnection.defaultWindowSizeS
private var outputBuffer = RTMPChunkBuffer()
private let authenticator = RTMPAuthenticator()
private var networkMonitor: NetworkMonitor?
private var statusContinuation: AsyncStream<RTMPStatus>.Continuation?
Expand Down Expand Up @@ -372,16 +383,11 @@ public actor RTMPConnection: NetworkConnection {
if logger.isEnabledFor(level: .trace) {
logger.trace("<<", message)
}
let buffer = RTMPChunkBuffer(.init(count: 1024 * 100))
buffer.chunkSize = chunkSizeS
_ = buffer
.putBasicHeader(type, chunkStreamId: chunkStreamId.rawValue)
.putMessage(type, chunkStreamId: chunkStreamId.rawValue, message: message)
let data = buffer.flip().payload
let iterator = outputBuffer.putMessage(type, chunkStreamId: chunkStreamId.rawValue, message: message)
Task {
await socket?.send(data)
await socket?.send(iterator)
}
return data.count
return message.payload.count
}

func addStream(_ stream: RTMPStream) {
Expand Down
10 changes: 10 additions & 0 deletions Sources/RTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ final actor RTMPSocket {
outputs?.yield(data)
}

func send(_ iterator: AnyIterator<Data>) {
guard connected else {
return
}
for data in iterator {
queueBytesOut += data.count
outputs?.yield(data)
}
}

func recv() -> AsyncStream<Data> {
AsyncStream<Data> { continuation in
Task {
Expand Down
32 changes: 23 additions & 9 deletions Tests/RTMP/RTMPChunkBufferTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Testing
static let readDataBufferUnderflow = Data([2, 0, 0, 0, 0, 0, 4, 5, 0, 0, 0, 0, 0, 76, 75, 64, 2, 0, 0, 0, 0, 0, 5, 6, 0, 0, 0, 0, 0, 76, 75, 64, 2, 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 32, 0, 3, 0, 0, 0, 0, 0, 190, 20, 0, 0, 0, 0, 2, 0, 7, 95, 114, 101, 115, 117, 108, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, 3, 0, 6, 102, 109, 115, 86, 101, 114, 2, 0, 13, 70, 77, 83, 47, 51, 44, 48, 44, 49, 44, 49, 50, 51, 0, 12, 99, 97, 112, 97, 98, 105, 108, 105, 116, 105, 101, 115, 0, 64, 63, 0, 0, 0, 0, 0, 0, 0, 0, 9, 3, 0, 5, 108, 101, 118, 101, 108, 2, 0, 6, 115, 116, 97, 116, 117, 115, 0, 4, 99, 111, 100, 101, 2, 0, 29, 78, 101, 116, 67, 111, 110, 110, 101, 99, 116, 105, 111, 110, 46, 67, 111, 110, 110, 101, 99, 116, 46, 83, 117, 99, 99, 101, 115, 115, 0, 11, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 2, 0, 21, 67, 111, 110, 110, 101, 99, 116, 105, 111, 110, 32, 115, 117, 99, 99, 101, 101, 100, 101, 100, 46, 0, 14, 111, 98, 106, 101, 99, 116, 69, 110, 99, 111, 100, 105, 110, 103, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

@Test func read() {
let buffer = RTMPChunkBuffer(.init(count: 1024))
let buffer = RTMPChunkBuffer()
buffer.put(Self.readData)

do {
Expand Down Expand Up @@ -57,8 +57,8 @@ import Testing
}
}

@Test func read_BufferUnderflow() {
let buffer = RTMPChunkBuffer(.init(count: 1024))
@Test func readBufferUnderflow() {
let buffer = RTMPChunkBuffer()
buffer.chunkSize = 8192
buffer.put(Self.readDataBufferUnderflow)

Expand Down Expand Up @@ -89,10 +89,8 @@ import Testing
}
}

/*
func testWrite() {
let buffer = RTMPChunkBuffer(.init(count: 1024))
_ = buffer.putBasicHeader(.zero, chunkStreamId: RTMPChunk.StreamID.command.rawValue)
@Test func write() {
let buffer = RTMPChunkBuffer()
let connection = RTMPCommandMessage(
streamId: 0,
transactionId: 0,
Expand All @@ -101,7 +99,23 @@ import Testing
commandObject: nil,
arguments: []
)
_ = buffer.putMessage(.zero, chunkStreamId: RTMPChunk.StreamID.command.rawValue, message: connection)
let iterator = buffer.putMessage(.zero, chunkStreamId: 1, message: connection)
#expect(iterator.next() == Data([1, 0, 0, 0, 0, 0, 18, 20, 0, 0, 0, 0, 2, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5]))
}

@Test func writeChunkSize() {
let buffer = RTMPChunkBuffer()
let connection = RTMPCommandMessage(
streamId: 0,
transactionId: 0,
objectEncoding: .amf0,
commandName: [String](repeating: "a", count: 128 + 56).joined(),
commandObject: nil,
arguments: []
)
let iterator = buffer.putMessage(.zero, chunkStreamId: 1, message: connection)
#expect(iterator.next() == Data([1, 0, 0, 0, 0, 0, 197, 20, 0, 0, 0, 0, 2, 0, 184, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97]))
#expect(iterator.next() == Data([193, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5]))
#expect(iterator.next() == nil)
}
*/
}
8 changes: 2 additions & 6 deletions Tests/Screen/ScreenObjectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import AVFoundation

@testable import HaishinKit

/*
@Suite struct ScreenObjectTests {
@Test func screenHorizontalAlignmentRect() {
Task { @ScreenActor in
Expand All @@ -28,11 +29,9 @@ import AVFoundation
if let sampleBuffer = CMVideoSampleBufferFactory.makeSampleBuffer(width: 1600, height: 900) {
// _ = screen.render(sampleBuffer)
}
DispatchQueue.main.sync {
#expect(object1.bounds == .init(origin: .zero, size: object1.size))
#expect(object2.bounds == .init(x: 750, y: 0, width: 100, height: 100))
#expect(object3.bounds == .init(x: 1500, y: 0, width: 100, height: 100))
}
}
}

Expand Down Expand Up @@ -64,12 +63,10 @@ import AVFoundation
if let sampleBuffer = CMVideoSampleBufferFactory.makeSampleBuffer(width: 1600, height: 900) {
// _ = screen.render(sampleBuffer)
}
DispatchQueue.main.sync {
#expect(object0.bounds == .init(x: 0, y: 0, width: 1600, height: 900))
#expect(object1.bounds == .init(x: 0, y: 0, width: object1.size.width, height: object1.size.height))
#expect(object2.bounds == .init(x: 0, y: 400, width: 100, height: 100))
#expect(object3.bounds == .init(x: 0, y: 800, width: 100, height: 100))
}
}
}

Expand Down Expand Up @@ -98,10 +95,9 @@ import AVFoundation
// _ = screen.render(sampleBuffer)
}

DispatchQueue.main.sync {
#expect(object0.bounds == .init(x: 16, y: 16, width: 200, height: 100))
#expect(object1.bounds == .init(x: 32, y: 32, width: 100, height: 100))
}
}
}
}
*/

0 comments on commit 697c793

Please sign in to comment.