Skip to content

Commit

Permalink
fix: "回退到旧版本的数据包解析逻辑", close #9
Browse files Browse the repository at this point in the history
  • Loading branch information
starknt committed Nov 6, 2024
1 parent 18baf5d commit a77bad8
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 123 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"test:memory:leak": "node --inspect ./scripts/m-test.mjs"
},
"dependencies": {
"eventemitter3": "^5.0.1",
"pako": "^2.1.0",
"ws": "^8.18.0"
},
Expand All @@ -68,7 +69,6 @@
"bumpp": "^9.5.1",
"dotenv": "^16.4.5",
"eslint": "^9.9.0",
"eventemitter3": "5.0.1",
"pnpm": "^9.7.0",
"terser": "^5.31.6",
"tsx": "^4.17.0",
Expand Down
19 changes: 7 additions & 12 deletions playground/node/test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
import path from 'node:path'
import process from 'node:process'
import * as dotenv from 'dotenv'
import { KeepLiveWS } from 'tiny-bilibili-ws'
import { KeepLiveWS, toMessageData } from 'tiny-bilibili-ws'

dotenv.config({ path: path.resolve(process.cwd(), '.env.local') })

const live = new KeepLiveWS(process.env.VITE_ROOM as any, {
// uid: 32140469,
// buvid: '1517DD0A-0A72-7039-2EE2-DC76CFC2408709872infoc',
// key: 'zfYifs1uI1RrPpyGY07Zdlbcd2Y_riAWqn-ISq4oI9phE1Vg8ZqmqBQQyGBpcER6DU5vfQqgqKISu0SP_P3zS0gmrqDM7EDMfGR44PlnaSlJoIB3-hu0kx_P_XTaJLLDM3YQ91XthTADaY7GnQLgpfvN5tFJWATFCLZB_YjKQX_O8TrGHlyUDYg=',
uid: Number(process.env.VITE_UID!),
headers: {
Cookie: process.env.VITE_COOKIE!,
},
stub: true,
})

live.on('open', () => console.log('连接成功'))
live.on('close', () => console.log('断开连接'))

live.on('SEND_GIFT', (message) => { // 有礼物, 但不会被触发
console.log(message)
})

live.on('DANMU_MSG', console.log)
live.on('DANMU_MSG', m => console.log(toMessageData(m).info[1]))

live.getOnline()
.then(console.log)
Expand Down
6 changes: 3 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 38 additions & 34 deletions src/base/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ export interface ProtocolHeader {
sequence: number
}

// eslint-disable-next-line ts/no-unsafe-declaration-merging
export interface Protocol {
header: ProtocolHeader
body: Uint8Array
}

///

/// utils
Expand Down Expand Up @@ -135,6 +135,18 @@ function concat(list: Uint8Array[]) {
return buffer
}

function cutBuffer(buffer: Uint8Array) {
const bufferPacks: Uint8Array[] = []
let size: number

for (let i = 0; i < buffer.length; i += size) {
size = readInt(buffer, i, INT_32)
bufferPacks.push(buffer.slice(i, i + size))
}

return bufferPacks
}

class HeaderReader extends BufferReader {
readOperation(): WS_OP {
return this.read(WS_OPERATION_OFFSET, INT_32)
Expand Down Expand Up @@ -167,35 +179,30 @@ class HeaderReader extends BufferReader {
}
}

// eslint-disable-next-line ts/no-unsafe-declaration-merging
export class Protocol implements Protocol {
header: ProtocolHeader
body: Uint8Array
export async function parser<T extends Uint8Array = Uint8Array>(buffer: T, zlib: IZlib<T>): Promise<Message<any>[]> {
const groupPacks = cutBuffer(buffer)

constructor(header: ProtocolHeader, body: Uint8Array) {
this.header = header
this.body = body
}
const protocols = groupPacks.map<Protocol>((buffer) => {
const header = new HeaderReader(buffer.slice(0, 16)).toJSON()
const body = buffer.slice(16)

static parse(buffer: Uint8Array): Protocol {
const headerLength = readInt(buffer, 4, 2)
const header = new HeaderReader(buffer.slice(0, headerLength)).toJSON()
const body = buffer.slice(headerLength, header.packetLength)

return new Protocol(header, body)
}
return {
header,
body,
}
})

async toMessagePacket(zlib: IZlib): Promise<Message<any> | Message<any>[]> {
const { header, body } = this
const standardProtocolPacket = protocols.map(async (protocol) => {
const { header, body } = protocol

let data: any
if (header.ver === WS_BODY_PROTOCOL_VERSION.NORMAL)
data = JSON.parse(textDecoder.decode(body))

if (header.ver === WS_BODY_PROTOCOL_VERSION.NUMBER) {
if (header.op === WS_OP.HEARTBEAT_REPLY) {
if (header.op === WS_OP.HEARTBEAT_REPLY)
data = readInt(body, 0, 4)
}

if (header.op === WS_OP.CONNECT_SUCCESS) {
try {
data = JSON.parse(textDecoder.decode(body))
Expand All @@ -207,31 +214,28 @@ export class Protocol implements Protocol {
}

if (header.ver === WS_BODY_PROTOCOL_VERSION.ZLIB)
// @ts-expect-error allow uint8array and buffer
data = await parser(await zlib.inflateAsync(body), zlib)

if (header.ver === WS_BODY_PROTOCOL_VERSION.BROTLI)
// @ts-expect-error allow uint8array and buffer
data = await parser(await zlib.brotliDecompressAsync(body), zlib)

if (header.ver === WS_BODY_PROTOCOL_VERSION.ZLIB || header.ver === WS_BODY_PROTOCOL_VERSION.BROTLI) {
return data
}

return {
meta: header,
data,
}
}
}
})

export async function parser(buffer: Uint8Array, zlib: IZlib<any>): Promise<Message<any>[]> {
const protocol: Protocol = Protocol.parse(buffer)
const standardProtocolPacket = await protocol.toMessagePacket(zlib)

if (Array.isArray(standardProtocolPacket)) {
return standardProtocolPacket
}
const packs = await Promise.all(
standardProtocolPacket,
)

return [standardProtocolPacket]
return packs.flatMap<Message<any>>((v) => {
if (v.meta.ver === WS_BODY_PROTOCOL_VERSION.ZLIB || v.meta.ver === WS_BODY_PROTOCOL_VERSION.BROTLI)
return v.data
return v
})
}

///
Expand Down
39 changes: 23 additions & 16 deletions src/base/eventemitter.bench.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
/* eslint-disable unused-imports/no-unused-vars */
import NodeEventEmitter from 'node:events'
import EventEmitter3 from 'eventemitter3'
import { describe, it } from 'vitest'
import { bench, describe } from 'vitest'
import { EventEmitter } from './eventemitter'

describe('emit', () => {
const times = 100
const times = 1000000
const emitter = new NodeEventEmitter()
const emitter3 = new EventEmitter3()
const typeemitter = new EventEmitter<{ t1: void }>()

it('nodeJs', () => {
const emitter = new NodeEventEmitter()

emitter.on('t1', () => {})
bench('nodeJs', () => {
let i = 0
emitter.on('t1', () => {
i++
})

for (let i = 0; i < times; i++)
emitter.emit('t1')
})

it('eventemitter3', () => {
const emitter = new EventEmitter3()

emitter.on('t1', () => {})
bench('eventemitter3', () => {
let i = 0
emitter3.on('t1', () => {
i++
})

for (let i = 0; i < times; i++)
emitter.emit('t1')
emitter3.emit('t1')
})

it('eventemitter', () => {
const emitter = new EventEmitter<{ t1: void }>()

emitter.on('t1', () => {})
bench('eventemitter', () => {
let i = 0
typeemitter.on('t1', () => {
i++
})

for (let i = 0; i < times; i++)
emitter.emit('t1')
typeemitter.emit('t1')
})
})
16 changes: 16 additions & 0 deletions src/base/eventemitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,20 @@ describe('eventEmitter', () => {
expect(emitter.listenerCount('t2')).eq(0)
expect(emitter.listenerCount()).eq(1)
})

it('advance test case(2)', () => {
const emitter = new EventEmitter()
let emitHit = 0
const f = () => {
emitHit += 1
}

emitter.addListener('hit', f)

for (let i = 0; i < 1000; i++) {
emitter.emit('hit')
}

expect(emitHit).eq(1000)
})
})
69 changes: 12 additions & 57 deletions src/base/eventemitter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import E3 from 'eventemitter3'

export type Nil = undefined | void | null
export type EventKey = string | symbol | number
export type EventKey = string | symbol
export type ListenerResult = void | Promise<void>
export type Listener<O extends Record<EventKey, any>, K extends keyof O, V = O[K]> =
V extends Array<any>
Expand All @@ -19,85 +21,38 @@ export type Listener<O extends Record<EventKey, any>, K extends keyof O, V = O[K
: V extends boolean ? (arg: boolean) => ListenerResult
: (arg: V) => ListenerResult

export class EventEmitter<E extends Record<EventKey, any>> {
private readonly eventListeners: Map<keyof E, Listener<E, any>[]> = new Map()

get eventNames() {
return this.eventListeners.keys()
}

export class EventEmitter<E extends Record<EventKey, any>> extends E3 {
on<N extends keyof E>(eventName: N, listener: Listener<E, N>) {
if (!this.eventListeners.has(eventName))
this.eventListeners.set(eventName, [listener])
else
this.eventListeners.get(eventName)!.push(listener)
return this
return super.on(eventName as EventKey, listener)
}

once<N extends keyof E>(eventName: N, listener: Listener<E, N>) {
const _listener = async (...args: any[]) => {
// @ts-expect-error rest parameter allow
await listener(...args)
// @ts-expect-error rest parameter allow
this.off(eventName, _listener)
}
// @ts-expect-error rest parameter allow
this.on(eventName, _listener)
return this
return super.once(eventName as EventKey, listener)
}

addListener<N extends keyof E>(eventName: N, listener: Listener<E, N>) {
this.on(eventName, listener)
return this
return this.on(eventName, listener)
}

off<N extends keyof E>(eventName: N, listener: Listener<E, N>): this {
if (this.eventListeners.has(eventName)) {
const listeners = this.eventListeners.get(eventName)!.filter(
l => l !== listener,
)
this.eventListeners.set(eventName, listeners)
}
return this
return super.off(eventName as EventKey, listener)
}

emit<N extends keyof E>(eventName: N, ...args: Parameters<Listener<E, N>>) {
if (!this.eventListeners.has(eventName))
return this

for (const callback of this.eventListeners.get(eventName)!) {
// @ts-expect-error rest parameter allow
callback(...args)
?.then(() => { /* ignore void */ })
?.catch(() => { /* ignore error */ })
}
return this
return super.emit(eventName as EventKey, ...args)
}

removeListener<N extends keyof E>(eventName: N, listener: Listener<E, N>): this {
return this.off(eventName, listener)
}

prependListener<N extends keyof E>(eventName: N, listener: Listener<E, N>): this {
if (this.eventListeners.has(eventName))
this.eventListeners.get(eventName)?.unshift(listener)
else
this.eventListeners.set(eventName, [listener])

return this
}

removeAllListeners<N extends keyof E>(eventName?: N) {
if (eventName)
this.eventListeners.delete(eventName)
else
this.eventListeners.clear()
return super.removeAllListeners(eventName as EventKey)
}

listenerCount<N extends keyof E>(eventName?: N): number {
if (!eventName)
return Array.from(this.eventListeners.values()).flatMap(v => v).length

return this.eventListeners.get(eventName)?.length ?? 0
return super.eventNames().length
return super.listenerCount(eventName as EventKey)
}
}

0 comments on commit a77bad8

Please sign in to comment.