diff --git a/lib/spdy-transport/protocol/spdy/zlib-pool.ts b/lib/spdy-transport/protocol/spdy/zlib-pool.ts index 19c83da..f5e4598 100644 --- a/lib/spdy-transport/protocol/spdy/zlib-pool.ts +++ b/lib/spdy-transport/protocol/spdy/zlib-pool.ts @@ -1,12 +1,15 @@ import { Buffer } from 'node:buffer'; -import { createDeflate,constants,createInflate, Deflate, Inflate } from 'node:zlib'; +import { createDeflate,constants,createInflate, Deflate } from 'node:zlib'; +import { Inflate } from "https://deno.land/x/compress@v0.4.5/zlib/mod.ts"; import { dictionary } from "./dictionary.ts"; +export { type Deflate, Inflate, constants }; + // TODO(indutny): think about it, why has it always been Z_SYNC_FLUSH here. // It should be possible to manually flush stuff after the write instead function _createDeflate (version: 2|3|3.1, compression: boolean) { const deflate = createDeflate({ - dictionary: Buffer.from(dictionary[version]), + // dictionary: Buffer.from(dictionary[version]), flush: constants.Z_SYNC_FLUSH, windowBits: 11, level: compression ? constants.Z_DEFAULT_COMPRESSION : constants.Z_NO_COMPRESSION @@ -16,9 +19,9 @@ function _createDeflate (version: 2|3|3.1, compression: boolean) { } function _createInflate (version: 2|3|3.1) { - const inflate = createInflate({ - dictionary: Buffer.from(dictionary[version]), - flush: constants.Z_SYNC_FLUSH + const inflate = new Inflate({ + dictionary: dictionary[version], + // flush: constants.Z_SYNC_FLUSH }) return inflate diff --git a/lib/spdy-transport/utils.ts b/lib/spdy-transport/utils.ts index de63918..d324be2 100644 --- a/lib/spdy-transport/utils.ts +++ b/lib/spdy-transport/utils.ts @@ -1,6 +1,6 @@ import EventEmitter from "node:events"; import type { Buffer } from "node:buffer"; -import type { Deflate, Inflate } from "node:zlib"; +import { constants, type Deflate, Inflate } from "./protocol/spdy/zlib-pool.ts"; import type { ClassicCallback } from "./protocol/types.ts"; export class QueueItem { @@ -75,8 +75,23 @@ export class LockStream { this.locked = true + if (this.stream instanceof Inflate) { + // TODO: error handling + const output: Uint8Array[] = [] + for (const chunk of chunks) { + output.push(this.stream.push(chunk, constants.Z_SYNC_FLUSH)); + } + + this.locked = false + if (this.queue.length > 0) { this.queue.shift()!() } + callback(null, output); + return; + } + + const stream = this.stream; + const done = (err?: Error | null, chunks?: Uint8Array[]) => { - this.stream.removeListener('error', done) + stream.removeListener('error', done) this.locked = false if (this.queue.length > 0) { this.queue.shift()!() } @@ -93,7 +108,7 @@ export class LockStream { this.stream.on('data', onData) const next = (err?: Error | null) => { - this.stream.removeListener('data', onData) + stream.removeListener('data', onData) if (err) { return done(err) } @@ -160,6 +175,20 @@ export class InflateDeflateQueue extends QueuingMutex async transformOne(chunks: Uint8Array[]): Promise { + if (this.stream instanceof Inflate) { + // TODO: error handling + const output: Uint8Array[] = [] + for (const chunk of chunks) { + output.push(this.stream.push(chunk, constants.Z_SYNC_FLUSH)); + } + + this.locked = false + if (this.queue.length > 0) { this.queue.shift()!() } + return output; + } + + const stream = this.stream; + // Accumulate all output data const output: Uint8Array[] = [] function onData (chunk: Buffer) { @@ -170,7 +199,7 @@ export class InflateDeflateQueue extends QueuingMutex try { for (const chunk of chunks) { await new Promise((ok, fail) => { - this.stream.write(chunk, err => err ? fail(err) : ok()); + stream.write(chunk, err => err ? fail(err) : ok()); }); }