Skip to content

Commit

Permalink
refactor: private ipc (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
privatenumber authored Nov 27, 2023
1 parent 3b17ef6 commit 83dea6e
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 92 deletions.
13 changes: 11 additions & 2 deletions src/cjs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { transformSync } from '../utils/transform/index.js';
import { transformDynamicImport } from '../utils/transform/transform-dynamic-import.js';
import { resolveTsPath } from '../utils/resolve-ts-path.js';
import { isESM } from '../utils/esm-pattern.js';
import { connectingToServer, type SendToParent } from '../utils/ipc/client.js';

const isRelativePathPattern = /^\.{1,2}\//;
const isTsFilePatten = /\.[cm]?tsx?$/;
Expand Down Expand Up @@ -49,13 +50,21 @@ const transformExtensions = [
'.mjs',
];

let sendToParent: SendToParent | void;
connectingToServer.then(
(_sendToParent) => {
sendToParent = _sendToParent;
},
() => {},
);

const transformer = (
module: Module,
filePath: string,
) => {
// For tracking dependencies in watch mode
if (process.send) {
process.send({
if (sendToParent) {
sendToParent({
type: 'dependency',
path: filePath,
});
Expand Down
28 changes: 16 additions & 12 deletions src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { constants as osConstants } from 'os';
import type { ChildProcess } from 'child_process';
import type { Server } from 'net';
import { cli } from 'cleye';
import {
transformSync as esbuildTransformSync,
Expand All @@ -12,22 +13,23 @@ import {
ignoreAfterArgument,
} from './remove-argv-flags.js';
import { testRunnerGlob } from './utils/node-features.js';
import { createIpcServer } from './utils/ipc/server.js';

const relaySignals = (
childProcess: ChildProcess,
ipcSocket: Server,
) => {
let waitForSignal: undefined | ((signal: NodeJS.Signals) => void);

childProcess.on(
'message',
(
data: { type: string; signal: NodeJS.Signals },
) => {
if (data && data.type === 'kill' && waitForSignal) {
waitForSignal(data.signal);
}
},
);
ipcSocket.on('data', (data: { type: string; signal: NodeJS.Signals }) => {
if (
data
&& data.type === 'signal'
&& waitForSignal
) {
waitForSignal(data.signal);
}
});

const waitForSignalFromChild = () => {
const p = new Promise<NodeJS.Signals | undefined>((resolve) => {
Expand Down Expand Up @@ -133,7 +135,7 @@ cli({
},
help: false,
ignoreArgv: ignoreAfterArgument(),
}, (argv) => {
}, async (argv) => {
if (argv.flags.version) {
process.stdout.write(`tsx v${version}\nnode `);
} else if (argv.flags.help) {
Expand Down Expand Up @@ -198,6 +200,8 @@ cli({
argvsToRun.push('**/{test,test/**/*,test-*,*[.-_]test}.?(c|m)@(t|j)s');
}

const ipc = await createIpcServer();

const childProcess = run(
argvsToRun,
{
Expand All @@ -206,7 +210,7 @@ cli({
},
);

relaySignals(childProcess);
relaySignals(childProcess, ipc);

childProcess.on(
'close',
Expand Down
39 changes: 13 additions & 26 deletions src/esm/loaders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { transformDynamicImport } from '../utils/transform/transform-dynamic-imp
import { resolveTsPath } from '../utils/resolve-ts-path.js';
import { installSourceMapSupport } from '../source-map.js';
import { importAttributes } from '../utils/node-features.js';
import { connectingToServer, type SendToParent } from '../utils/ipc/client.js';
import {
tsconfigPathsMatcher,
fileMatcher,
Expand Down Expand Up @@ -36,42 +37,20 @@ type resolve = (
recursiveCall?: boolean,
) => MaybePromise<ResolveFnOutput>;

type SendToParent = (data: {
type: 'dependency';
path: string;
}) => void;

let sendToParent: SendToParent | undefined = process.send ? process.send.bind(process) : undefined;

export const initialize: InitializeHook = async (data) => {
if (!data) {
throw new Error('tsx must be loaded with --import instead of --loader\nThe --loader flag was deprecated in Node v20.6.0');
}

const { port } = data;
sendToParent = port.postMessage.bind(port);
};

/**
* Technically globalPreload is deprecated so it should be in loaders-deprecated
* but it shares a closure with the new load hook
*/
export const globalPreload: GlobalPreloadHook = ({ port }) => {
sendToParent = port.postMessage.bind(port);

return `
const require = getBuiltin('module').createRequire("${import.meta.url}");
require('tsx/source-map').installSourceMapSupport();
if (process.send) {
port.addListener('message', (message) => {
if (message.type === 'dependency') {
process.send(message);
}
});
}
port.unref(); // Allows process to exit without waiting for port to close
`;
};
export const globalPreload: GlobalPreloadHook = () => `
const require = getBuiltin('module').createRequire("${import.meta.url}");
require('../source-map.cjs').installSourceMapSupport();
`;

const resolveExplicitPath = async (
defaultResolve: NextResolve,
Expand Down Expand Up @@ -242,6 +221,14 @@ export const resolve: resolve = async function (
}
};

let sendToParent: SendToParent | void;
connectingToServer.then(
(_sendToParent) => {
sendToParent = _sendToParent;
},
() => {},
);

const contextAttributesProperty = importAttributes ? 'importAttributes' : 'importAssertions';

export const load: LoadHook = async function (

Check warning on line 234 in src/esm/loaders.ts

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

Unexpected unnamed async function
Expand Down
18 changes: 1 addition & 17 deletions src/esm/register.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
import module from 'node:module';
import { MessageChannel } from 'node:worker_threads';
import { installSourceMapSupport } from '../source-map.js';

export const registerLoader = () => {
const { port1, port2 } = new MessageChannel();

installSourceMapSupport();
if (process.send) {
port1.addListener('message', (message) => {
if (message.type === 'dependency') {
process.send!(message);
}
});
}

// Allows process to exit without waiting for port to close
port1.unref();

module.register(
'./index.mjs',
{
parentURL: import.meta.url,
data: {
port: port2,
},
transferList: [port2],
data: true,
},
);
};
22 changes: 13 additions & 9 deletions src/preflight.cts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isMainThread } from 'node:worker_threads';
import { constants as osConstants } from 'os';
import { isMainThread } from 'node:worker_threads';
import { connectingToServer } from './utils/ipc/client.js';
import './suppress-warnings.cjs';

type BaseEventListener = () => void;
Expand Down Expand Up @@ -63,13 +64,16 @@ if (isMainThread) {
// eslint-disable-next-line import/no-unresolved
require('./cjs/index.cjs');

// If a parent process is detected
if (process.send) {
bindHiddenSignalsHandler(['SIGINT', 'SIGTERM'], (signal: NodeJS.Signals) => {
process.send!({
type: 'kill',
signal,
(async () => {
const sendToParent = await connectingToServer;

if (sendToParent) {
bindHiddenSignalsHandler(['SIGINT', 'SIGTERM'], (signal: NodeJS.Signals) => {
sendToParent({
type: 'signal',
signal,
});
});
});
}
}
})();
}
33 changes: 33 additions & 0 deletions src/utils/ipc/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import net from 'net';
import { getPipePath } from './get-pipe-path.js';

export type SendToParent = (data: Record<string, unknown>) => void;

const connectToServer = () => new Promise<SendToParent | void>((resolve) => {
const pipePath = getPipePath(process.ppid);
const socket: net.Socket = net.createConnection(
pipePath,
() => {
resolve((data) => {
const messageBuffer = Buffer.from(JSON.stringify(data));
const lengthBuffer = Buffer.alloc(4);
lengthBuffer.writeInt32BE(messageBuffer.length, 0);
socket.write(Buffer.concat([lengthBuffer, messageBuffer]));
});
},
);

/**
* Ignore error when:
* - Called as a loader and there is no server
* - Nested process when using --test and the ppid is incorrect
*/
socket.on('error', () => {
resolve();
});

// Prevent Node from waiting for this socket to close before exiting
socket.unref();
});

export const connectingToServer = connectToServer();
11 changes: 11 additions & 0 deletions src/utils/ipc/get-pipe-path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import path from 'path';
import { tmpdir } from '../temporary-directory.js';

export const getPipePath = (processId: number) => {
const pipePath = path.join(tmpdir, `${processId}.pipe`);
return (
process.platform === 'win32'
? `\\\\?\\pipe\\${pipePath}`
: pipePath
);
};
70 changes: 70 additions & 0 deletions src/utils/ipc/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import net from 'net';
import fs from 'fs';
import { tmpdir } from '../temporary-directory.js';
import { getPipePath } from './get-pipe-path.js';

type OnMessage = (message: Buffer) => void;

const bufferData = (
onMessage: OnMessage,
) => {
let buffer = Buffer.alloc(0);
return (data: Buffer) => {
buffer = Buffer.concat([buffer, data]);

while (buffer.length > 4) {
const messageLength = buffer.readInt32BE(0);
if (buffer.length >= 4 + messageLength) {
const message = buffer.slice(4, 4 + messageLength);
onMessage(message);
buffer = buffer.slice(4 + messageLength);
} else {
break;
}
}
};
};

export const createIpcServer = async () => {
const server = net.createServer((socket) => {
socket.on('data', bufferData((message: Buffer) => {
const data = JSON.parse(message.toString());
server.emit('data', data);
}));
});

const pipePath = getPipePath(process.pid);
await fs.promises.mkdir(tmpdir, { recursive: true });

await new Promise<void>((resolve, reject) => {
server.listen(pipePath, resolve);
server.on('error', reject);
});

// Prevent Node from waiting for this socket to close before exiting
server.unref();

process.on('exit', () => {
server.close();

/**
* Only clean on Unix
*
* https://nodejs.org/api/net.html#ipc-support:
* On Windows, the local domain is implemented using a named pipe.
* The path must refer to an entry in \\?\pipe\ or \\.\pipe\.
* Any characters are permitted, but the latter may do some processing
* of pipe names, such as resolving .. sequences. Despite how it might
* look, the pipe namespace is flat. Pipes will not persist. They are
* removed when the last reference to them is closed. Unlike Unix domain
* sockets, Windows will close and remove the pipe when the owning process exits.
*/
if (process.platform !== 'win32') {
try {
fs.rmSync(pipePath);
} catch {}
}
});

return server;
};
Loading

0 comments on commit 83dea6e

Please sign in to comment.