Skip to content

Commit

Permalink
Use Promise.withResolvers() internally and remove equivalent `Defer…
Browse files Browse the repository at this point in the history
…red` class
  • Loading branch information
TooTallNate committed Oct 26, 2024
1 parent 78a3fac commit bb94e10
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 57 deletions.
6 changes: 6 additions & 0 deletions .changeset/kind-bananas-brush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@nx.js/runtime": patch
"@nx.js/http": patch
---

Use `Promise.withResolvers()` internally and remove equivalent `Deferred` class
12 changes: 0 additions & 12 deletions packages/http/src/deferred.ts

This file was deleted.

13 changes: 2 additions & 11 deletions packages/http/src/unshiftable-readable-stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Deferred } from './deferred';

export class UnshiftableStream {
buffer: Uint8Array;
reader: ReadableStreamDefaultReader<Uint8Array>;
readable: ReadableStream<Uint8Array>;
paused?: Deferred<void>;
paused?: PromiseWithResolvers<void>;

constructor(sourceStream: ReadableStream<Uint8Array>) {
this.buffer = new Uint8Array();
Expand Down Expand Up @@ -37,34 +35,27 @@ export class UnshiftableStream {

pause() {
if (this.paused) return;
//console.log('pause');
this.paused = new Deferred();
this.paused = Promise.withResolvers();
}

resume() {
const p = this.paused;
if (!p) return;
//console.log('resume');
this.paused = undefined;
p.resolve();
}

// Read method that checks the buffer first
async read() {
//console.log('read')
if (this.paused) {
//console.log('read paused')
await this.paused.promise;
}
//console.log('read resume')
if (this.buffer.length > 0) {
const value = this.buffer;
//console.log('buffer', { value })
this.buffer = new Uint8Array(); // Clear the buffer after reading
return { done: false, value };
} else {
const result = await this.reader.read();
//console.log({ result })
return result; // Return data from the source stream
}
}
Expand Down
21 changes: 7 additions & 14 deletions packages/runtime/src/tcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { resolveDns } from './switch/dns';
import { EventTarget } from './polyfills/event-target';
import { SocketEvent, type SocketAddress, type SocketInfo } from './switch';
import {
Deferred,
assertInternalConstructor,
bufferSourceToArrayBuffer,
createInternal,
Expand Down Expand Up @@ -63,8 +62,8 @@ function tlsWrite(ctx: TlsContextOpaque, data: BufferSource) {
interface SocketInternal {
fd: number;
tls?: TlsContextOpaque;
opened: Deferred<SocketInfo>;
closed: Deferred<void>;
opened: PromiseWithResolvers<SocketInfo>;
closed: PromiseWithResolvers<void>;
}

const _ = createInternal<Socket, SocketInternal>();
Expand Down Expand Up @@ -96,8 +95,8 @@ export class Socket {
const socket = this;
const i: SocketInternal = {
fd: -1,
opened: new Deferred(),
closed: new Deferred(),
opened: Promise.withResolvers(),
closed: Promise.withResolvers(),
};
_.set(this, i);
this.opened = i.opened.promise;
Expand All @@ -106,9 +105,7 @@ export class Socket {
let readBuffer: ArrayBuffer | undefined;
this.readable = new ReadableStream({
async pull(controller) {
if (i.opened.pending) {
await socket.opened;
}
await socket.opened;
if (!readBuffer) {
// Matches the configured `tcp_rx_buf_size` in `main.c`
readBuffer = new ArrayBuffer(1024 * 1024);
Expand All @@ -129,9 +126,7 @@ export class Socket {

this.writable = new WritableStream({
async write(chunk) {
if (i.opened.pending) {
await socket.opened;
}
await socket.opened;
await (i.tls ? tlsWrite(i.tls, chunk) : write(i.fd, chunk));
},
close() {
Expand Down Expand Up @@ -169,9 +164,7 @@ export class Socket {
this.writable.abort(reason);
}
const i = _(this);
if (i.opened.pending) {
i.opened.reject(reason);
}
i.opened.reject(reason);
if (i.fd !== -1) {
$.close(i.fd);
i.fd = -1;
Expand Down
19 changes: 0 additions & 19 deletions packages/runtime/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,6 @@ export function pathToString(p: PathLike): string {
return typeof p === 'string' ? p : decodeURI(p.href);
}

export class Deferred<T> {
pending = true;
promise: Promise<T>;
resolve!: (value: T | PromiseLike<T>) => void;
reject!: (v: any) => void;
constructor() {
this.promise = new Promise<T>((res, rej) => {
this.resolve = (v) => {
this.pending = false;
res(v);
};
this.reject = (v) => {
this.pending = false;
rej(v);
};
});
}
}

export const createInternal = <K extends object, V>() => {
const wm = new WeakMap<K, V>();
const _ = (k: K): V => {
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2022",
"target": "ESNext",
"module": "commonjs",
"noEmit": true,
"outDir": "./dist",
Expand Down

0 comments on commit bb94e10

Please sign in to comment.