Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node): child_process IPC on Windows #21597

Merged
merged 13 commits into from
Dec 19, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cli/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,12 +917,12 @@ impl CliOptions {
.map(Some)
}

pub fn node_ipc_fd(&self) -> Option<i32> {
pub fn node_ipc_fd(&self) -> Option<i64> {
littledivy marked this conversation as resolved.
Show resolved Hide resolved
let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable.
std::env::remove_var("DENO_CHANNEL_FD");
node_channel_fd.parse::<i32>().ok()
node_channel_fd.parse::<i64>().ok()
} else {
None
}
Expand Down
4 changes: 2 additions & 2 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct SharedWorkerState {
maybe_inspector_server: Option<Arc<InspectorServer>>,
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
node_ipc: Option<i32>,
node_ipc: Option<i64>,
}

impl SharedWorkerState {
Expand Down Expand Up @@ -404,7 +404,7 @@ impl CliMainWorkerFactory {
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
options: CliMainWorkerOptions,
node_ipc: Option<i32>,
node_ipc: Option<i64>,
) -> Self {
Self {
shared: Arc::new(SharedWorkerState {
Expand Down
1 change: 1 addition & 0 deletions ext/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ url.workspace = true
winapi.workspace = true
x25519-dalek = "2.0.0"
x509-parser = "0.15.0"
windows-sys.workspace = true
2 changes: 1 addition & 1 deletion ext/node/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod polyfill;
mod resolution;

pub use ops::ipc::ChildPipeFd;
pub use ops::ipc::IpcJsonStreamResource;
pub use ops::v8::VM_CONTEXT_INDEX;
pub use package_json::PackageJson;
pub use path::PathClean;
Expand Down Expand Up @@ -313,7 +314,6 @@ deno_core::extension!(deno_node,
ops::require::op_require_break_on_next_statement,
ops::util::op_node_guess_handle_type,
ops::crypto::op_node_create_private_key,
ops::ipc::op_node_ipc_pipe,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read,
Expand Down
103 changes: 53 additions & 50 deletions ext/node/ops/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

#[cfg(unix)]
pub use unix::*;

#[cfg(windows)]
pub use windows::*;
pub struct ChildPipeFd(pub i64);

pub struct ChildPipeFd(pub i32);

#[cfg(unix)]
mod unix {
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::fd::FromRawFd;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
Expand All @@ -35,18 +32,16 @@ mod unix {
use tokio::io::AsyncBufRead;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;

#[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
#[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
#[cfg(unix)]
use tokio::net::UnixStream;

#[op2(fast)]
#[smi]
pub fn op_node_ipc_pipe(
state: &mut OpState,
#[smi] fd: i32,
) -> Result<ResourceId, AnyError> {
Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?))
}
#[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;

// Open IPC pipe from bootstrap options.
#[op2]
Expand Down Expand Up @@ -97,9 +92,12 @@ mod unix {
Ok(msgs)
}

struct IpcJsonStreamResource {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
#[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
#[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
}

Expand All @@ -109,21 +107,42 @@ mod unix {
}
}

#[cfg(unix)]
fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
Ok(unix_stream.into_split())
}

#[cfg(windows)]
fn pipe(
handle: i64,
) -> Result<
(
tokio::io::ReadHalf<NamedPipeClient>,
tokio::io::WriteHalf<NamedPipeClient>,
),
io::Error,
> {
use std::os::windows::io::FromRawHandle;
let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
Ok(tokio::io::split(pipe))
}

impl IpcJsonStreamResource {
fn new(stream: RawFd) -> Result<Self, std::io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
let (read_half, write_half) = unix_stream.into_split();
pub fn new(stream: i64) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
})
}

#[cfg(unix)]
#[cfg(test)]
fn from_unix_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
Expand Down Expand Up @@ -172,18 +191,30 @@ mod unix {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
#[cfg(unix)]
pipe: BufReader<OwnedReadHalf>,
#[cfg(windows)]
pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
buffer: Vec<u8>,
}

impl IpcJsonStream {
#[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
buffer: Vec::with_capacity(INITIAL_CAPACITY),
}
}

#[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
buffer: Vec::with_capacity(INITIAL_CAPACITY),
}
}

async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
let mut json = None;
let nread =
Expand Down Expand Up @@ -252,7 +283,6 @@ mod unix {
std::task::Poll::Ready(t) => t?,
std::task::Poll::Pending => return std::task::Poll::Pending,
};

if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
Expand Down Expand Up @@ -499,30 +529,3 @@ mod unix {
}
}
}

#[cfg(windows)]
mod windows {
use deno_core::error::AnyError;
use deno_core::op2;

#[op2(fast)]
pub fn op_node_ipc_pipe() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}

#[op2(fast)]
#[smi]
pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> {
Ok(-1)
}

#[op2(async)]
pub async fn op_node_ipc_write() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}

#[op2(async)]
pub async fn op_node_ipc_read() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}
}
4 changes: 1 addition & 3 deletions ext/node/polyfills/internal/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1069,9 +1069,7 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}

export function setupChannel(target, channel) {
const ipc = ops.op_node_ipc_pipe(channel);

export function setupChannel(target, ipc) {
async function readLoop() {
try {
while (true) {
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ which = "4.2.5"
fwdansi.workspace = true
winapi = { workspace = true, features = ["commapi", "knownfolders", "mswsock", "objbase", "psapi", "shlobj", "tlhelp32", "winbase", "winerror", "winuser", "winsock2"] }
ntapi = "0.4.0"
windows-sys.workspace = true

[target.'cfg(unix)'.dependencies]
nix.workspace = true
Expand Down
Loading