Skip to content

Commit

Permalink
Attempt to work around Deno zlib regression
Browse files Browse the repository at this point in the history
  • Loading branch information
danopia committed Aug 4, 2023
1 parent 5b7fa53 commit e1885e0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
13 changes: 8 additions & 5 deletions lib/spdy-transport/protocol/spdy/zlib-pool.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { Buffer } from 'node:buffer';
import { createDeflate,constants,createInflate, Deflate, Inflate } from 'node:zlib';
import { createDeflate,constants,createInflate, Deflate } from 'node:zlib';
import { Inflate } from "https://deno.land/x/[email protected]/zlib/mod.ts";
import { dictionary } from "./dictionary.ts";

export { type Deflate, Inflate, constants };

// TODO(indutny): think about it, why has it always been Z_SYNC_FLUSH here.
// It should be possible to manually flush stuff after the write instead
function _createDeflate (version: 2|3|3.1, compression: boolean) {
const deflate = createDeflate({
dictionary: Buffer.from(dictionary[version]),
// dictionary: Buffer.from(dictionary[version]),
flush: constants.Z_SYNC_FLUSH,
windowBits: 11,
level: compression ? constants.Z_DEFAULT_COMPRESSION : constants.Z_NO_COMPRESSION
Expand All @@ -16,9 +19,9 @@ function _createDeflate (version: 2|3|3.1, compression: boolean) {
}

function _createInflate (version: 2|3|3.1) {
const inflate = createInflate({
dictionary: Buffer.from(dictionary[version]),
flush: constants.Z_SYNC_FLUSH
const inflate = new Inflate({
dictionary: dictionary[version],
// flush: constants.Z_SYNC_FLUSH
})

return inflate
Expand Down
37 changes: 33 additions & 4 deletions lib/spdy-transport/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import EventEmitter from "node:events";
import type { Buffer } from "node:buffer";
import type { Deflate, Inflate } from "node:zlib";
import { constants, type Deflate, Inflate } from "./protocol/spdy/zlib-pool.ts";
import type { ClassicCallback } from "./protocol/types.ts";

export class QueueItem {
Expand Down Expand Up @@ -75,8 +75,23 @@ export class LockStream {

this.locked = true

if (this.stream instanceof Inflate) {
// TODO: error handling
const output: Uint8Array[] = []
for (const chunk of chunks) {
output.push(this.stream.push(chunk, constants.Z_SYNC_FLUSH));
}

this.locked = false
if (this.queue.length > 0) { this.queue.shift()!() }
callback(null, output);
return;
}

const stream = this.stream;

const done = (err?: Error | null, chunks?: Uint8Array[]) => {
this.stream.removeListener('error', done)
stream.removeListener('error', done)

this.locked = false
if (this.queue.length > 0) { this.queue.shift()!() }
Expand All @@ -93,7 +108,7 @@ export class LockStream {
this.stream.on('data', onData)

const next = (err?: Error | null) => {
this.stream.removeListener('data', onData)
stream.removeListener('data', onData)
if (err) {
return done(err)
}
Expand Down Expand Up @@ -160,6 +175,20 @@ export class InflateDeflateQueue extends QueuingMutex<Uint8Array[],Uint8Array[]>

async transformOne(chunks: Uint8Array[]): Promise<Uint8Array[]> {

if (this.stream instanceof Inflate) {
// TODO: error handling
const output: Uint8Array[] = []
for (const chunk of chunks) {
output.push(this.stream.push(chunk, constants.Z_SYNC_FLUSH));
}

this.locked = false
if (this.queue.length > 0) { this.queue.shift()!() }
return output;
}

const stream = this.stream;

// Accumulate all output data
const output: Uint8Array[] = []
function onData (chunk: Buffer) {
Expand All @@ -170,7 +199,7 @@ export class InflateDeflateQueue extends QueuingMutex<Uint8Array[],Uint8Array[]>
try {
for (const chunk of chunks) {
await new Promise<void>((ok, fail) => {
this.stream.write(chunk, err => err ? fail(err) : ok());
stream.write(chunk, err => err ? fail(err) : ok());
});
}

Expand Down

0 comments on commit e1885e0

Please sign in to comment.