diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index 8a8ec3ffffc530..3d26bbbfbf291b 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -30,13 +30,13 @@ const errno = std.os.errno; const mode_t = std.os.mode_t; const unexpectedErrno = std.os.unexpectedErrno; -pub const WaitPidResult = struct { - pid: pid_t, - status: u32, -}; - // mostly taken from zig's posix_spawn.zig pub const PosixSpawn = struct { + pub const WaitPidResult = struct { + pid: pid_t, + status: u32, + }; + pub const Attr = struct { attr: system.posix_spawnattr_t, diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index f6d86d91a4d72a..c62570e9267fd6 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -16,6 +16,8 @@ const Which = @import("../../../which.zig"); const uws = @import("../../../deps/uws.zig"); const IPC = @import("../../ipc.zig"); +const PosixSpawn = @import("./spawn.zig").PosixSpawn; + pub const Subprocess = struct { const log = Output.scoped(.Subprocess, false); pub usingnamespace JSC.Codegen.JSSubprocess; @@ -29,8 +31,7 @@ pub const Subprocess = struct { stdin: Writable, stdout: Readable, stderr: Readable, - killed: bool = false, - poll_ref: ?*JSC.FilePoll = null, + poll: Poll = Poll{ .poll_ref = null }, exit_promise: JSC.Strong = .{}, on_exit_callback: JSC.Strong = .{}, @@ -39,14 +40,6 @@ pub const Subprocess = struct { signal_code: ?SignalCode = null, waitpid_err: ?bun.sys.Error = null, - has_waitpid_task: bool = false, - notification_task: JSC.AnyTask = undefined, - waitpid_task: JSC.AnyTask = undefined, - - wait_task: JSC.ConcurrentTask = .{}, - - finalized: bool = false, - globalThis: *JSC.JSGlobalObject, observable_getters: std.enums.EnumSet(enum { stdin, @@ -59,16 +52,31 @@ pub const Subprocess = struct { stderr, }) = .{}, has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), - is_sync: bool = false, this_jsvalue: JSC.JSValue = .zero, ipc_mode: IPCMode, ipc_callback: JSC.Strong = .{}, ipc: IPC.IPCData, + flags: Flags = .{}, + + pub const Flags = packed struct(u32) { + is_sync: bool = false, + killed: bool = false, + reference_count: u30 = 0, + }; - has_pending_unref: bool = false, pub const SignalCode = bun.SignalCode; + pub const Poll = union(enum) { + poll_ref: ?*JSC.FilePoll, + wait_thread: WaitThreadPoll, + }; + + pub const WaitThreadPoll = struct { + ref_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + poll_ref: JSC.PollRef = .{}, + }; + pub const IPCMode = enum { none, bun, @@ -79,9 +87,37 @@ pub const Subprocess = struct { return this.exit_code != null or this.waitpid_err != null or this.signal_code != null; } - pub fn updateHasPendingActivityFlag(this: *Subprocess) void { + pub fn hasPendingActivityNonThreadsafe(this: *const Subprocess) bool { + if (this.poll == .wait_thread and this.poll.wait_thread.ref_count.load(.Monotonic) > 0) { + return true; + } + + if (this.flags.reference_count > 0) { + return true; + } + + if (this.ipc_mode != .none) { + return true; + } + + if (this.poll == .poll_ref) { + if (this.poll.poll_ref) |poll| { + if (poll.isRegistered()) { + return true; + } + } + } + + return false; + } + + pub fn updateHasPendingActivity(this: *Subprocess) void { @fence(.SeqCst); - this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc_mode == .none and this.has_pending_unref, .SeqCst); + + this.has_pending_activity.store( + this.hasPendingActivityNonThreadsafe(), + .Monotonic, + ); } pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @@ -89,14 +125,19 @@ pub const Subprocess = struct { return this.has_pending_activity.load(.Acquire); } - pub fn updateHasPendingActivity(this: *Subprocess) void { - @fence(.Release); - this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc_mode == .none and this.has_pending_unref, .Release); - } - pub fn ref(this: *Subprocess) void { var vm = this.globalThis.bunVM(); - if (this.poll_ref) |poll| poll.enableKeepingProcessAlive(vm); + switch (this.poll) { + .poll_ref => if (this.poll.poll_ref) |poll| { + this.flags.reference_count += @as(u30, @intFromBool(!poll.isRegistered())); + poll.enableKeepingProcessAlive(vm); + }, + + .wait_thread => |*wait_thread| { + wait_thread.poll_ref.ref(vm); + }, + } + if (!this.hasCalledGetter(.stdin)) { this.stdin.ref(); } @@ -114,7 +155,15 @@ pub const Subprocess = struct { pub fn unref(this: *Subprocess) void { var vm = this.globalThis.bunVM(); - if (this.poll_ref) |poll| poll.disableKeepingProcessAlive(vm); + switch (this.poll) { + .poll_ref => if (this.poll.poll_ref) |poll| { + this.flags.reference_count -= @as(u30, @intFromBool(poll.isRegistered())); + poll.disableKeepingProcessAlive(vm); + }, + .wait_thread => |*wait_thread| { + wait_thread.poll_ref.unref(vm); + }, + } if (!this.hasCalledGetter(.stdin)) { this.stdin.unref(); } @@ -358,7 +407,7 @@ pub const Subprocess = struct { } pub fn hasKilled(this: *const Subprocess) bool { - return this.killed or this.hasExited(); + return this.flags.killed or this.exit_code != null; } pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) { @@ -366,23 +415,31 @@ pub const Subprocess = struct { return .{ .result = {} }; } - if (comptime Environment.isLinux) { - // should this be handled differently? - // this effectively shouldn't happen - if (this.pidfd == bun.invalid_fd) { - return .{ .result = {} }; - } + send_signal: { + if (comptime Environment.isLinux) { + // if these are the same, it means the pidfd is invalid. + if (!WaiterThread.shouldUseWaiterThread()) { + // should this be handled differently? + // this effectively shouldn't happen + if (this.pidfd == bun.invalid_fd) { + return .{ .result = {} }; + } - // first appeared in Linux 5.1 - const rc = std.os.linux.pidfd_send_signal(this.pidfd, @as(u8, @intCast(sig)), null, 0); + // first appeared in Linux 5.1 + const rc = std.os.linux.pidfd_send_signal(this.pidfd, @as(u8, @intCast(sig)), null, 0); - if (rc != 0) { - const errno = std.os.linux.getErrno(rc); - // if the process was already killed don't throw - if (errno != .SRCH) - return .{ .err = bun.sys.Error.fromCode(errno, .kill) }; + if (rc != 0) { + const errno = std.os.linux.getErrno(rc); + + // if the process was already killed don't throw + if (errno != .SRCH and errno != .NOSYS) + return .{ .err = bun.sys.Error.fromCode(errno, .kill) }; + } else { + break :send_signal; + } + } } - } else { + const err = std.c.kill(this.pid, sig); if (err != 0) { const errno = bun.C.getErrno(err); @@ -393,7 +450,7 @@ pub const Subprocess = struct { } } - this.killed = true; + this.flags.killed = true; return .{ .result = {} }; } @@ -493,10 +550,10 @@ pub const Subprocess = struct { // because we don't want to block the thread waiting for the write switch (bun.isWritable(this.fd)) { .ready => { - if (this.poll_ref) |poll_ref| { - poll_ref.flags.insert(.writable); - poll_ref.flags.insert(.fifo); - std.debug.assert(poll_ref.flags.contains(.poll_writable)); + if (this.poll_ref) |poll| { + poll.flags.insert(.writable); + poll.flags.insert(.fifo); + std.debug.assert(poll.flags.contains(.poll_writable)); } }, .hup => { @@ -1452,7 +1509,7 @@ pub const Subprocess = struct { }; const pidfd: std.os.fd_t = brk: { - if (!Environment.isLinux) { + if (!Environment.isLinux or WaiterThread.shouldUseWaiterThread()) { break :brk pid; } @@ -1493,16 +1550,8 @@ pub const Subprocess = struct { const error_instance = brk2: { if (err == .NOSYS) { - break :brk2 globalThis.createErrorInstance( - \\"pidfd_open(2)" system call is not supported by your Linux kernel - \\To fix this error, either: - \\- Upgrade your Linux kernel to a newer version (current: {}) - \\- Ensure the seccomp filter allows "pidfd_open" - , - .{ - kernel.fmt(""), - }, - ); + WaiterThread.setShouldUseWaiterThread(); + break :brk pid; } break :brk2 bun.sys.Error.fromCode(err, .open).toJSC(globalThis); @@ -1534,11 +1583,13 @@ pub const Subprocess = struct { .stdout = Readable.init(stdio[bun.STDOUT_FD], stdout_pipe[0], jsc_vm.allocator, default_max_buffer_size), .stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, - .is_sync = is_sync, .ipc_mode = ipc_mode, // will be assigned in the block below .ipc = .{ .socket = socket }, .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined, + .flags = .{ + .is_sync = is_sync, + }, }; if (ipc_mode != .none) { var ptr = socket.ext(*Subprocess); @@ -1560,22 +1611,27 @@ pub const Subprocess = struct { const watchfd = if (comptime Environment.isLinux) pidfd else pid; if (comptime !is_sync) { - var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess); - subprocess.poll_ref = poll; - switch (subprocess.poll_ref.?.register( - jsc_vm.event_loop_handle.?, - .process, - true, - )) { - .result => {}, - .err => |err| { - if (err.getErrno() != .SRCH) { - @panic("This shouldn't happen"); - } + if (!WaiterThread.shouldUseWaiterThread()) { + var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess); + subprocess.poll = .{ .poll_ref = poll }; + subprocess.flags.reference_count += 1; + switch (subprocess.poll.poll_ref.?.register( + jsc_vm.event_loop_handle.?, + .process, + true, + )) { + .result => {}, + .err => |err| { + if (err.getErrno() != .SRCH) { + @panic("This shouldn't happen"); + } - send_exit_notification = true; - lazy = false; - }, + send_exit_notification = true; + lazy = false; + }, + } + } else { + WaiterThread.append(subprocess); } } @@ -1583,7 +1639,7 @@ pub const Subprocess = struct { if (send_exit_notification) { // process has already exited // https://cs.github.com/libuv/libuv/blob/b00d1bd225b602570baee82a6152eaa823a84fa6/src/unix/process.c#L1007 - subprocess.onExitNotification(); + subprocess.wait(subprocess.flags.is_sync); } } @@ -1622,10 +1678,11 @@ pub const Subprocess = struct { } subprocess.closeIO(.stdin); - { + if (!WaiterThread.shouldUseWaiterThread()) { var poll = JSC.FilePoll.init(jsc_vm, watchfd, .{}, Subprocess, subprocess); - subprocess.poll_ref = poll; - switch (subprocess.poll_ref.?.register( + subprocess.poll = .{ .poll_ref = poll }; + subprocess.flags.reference_count += 1; + switch (subprocess.poll.poll_ref.?.register( jsc_vm.event_loop_handle.?, .process, true, @@ -1641,6 +1698,8 @@ pub const Subprocess = struct { subprocess.onExitNotification(); }, } + } else { + WaiterThread.append(subprocess); } while (!subprocess.hasExited()) { @@ -1671,28 +1730,43 @@ pub const Subprocess = struct { pub fn onExitNotificationTask(this: *Subprocess) void { var vm = this.globalThis.bunVM(); - defer vm.drainMicrotasks(); - std.debug.assert(!this.is_sync); + const is_sync = this.flags.is_sync; + + defer { + if (!is_sync) + vm.drainMicrotasks(); + } this.wait(false); } pub fn onExitNotification( this: *Subprocess, ) void { - this.wait(this.is_sync); + std.debug.assert(this.flags.is_sync); + + defer this.flags.reference_count -= 1; + this.wait(this.flags.is_sync); } pub fn wait(this: *Subprocess, sync: bool) void { return this.waitWithJSValue(sync, this.this_jsvalue); } - pub fn watch(this: *Subprocess) void { - if (this.poll_ref) |poll| { - _ = poll.register( + pub fn watch(this: *Subprocess) JSC.Maybe(void) { + if (WaiterThread.shouldUseWaiterThread()) { + WaiterThread.append(this); + return JSC.Maybe(void){ .result = {} }; + } + + if (this.poll.poll_ref) |poll| { + this.flags.reference_count += @as(u30, @intFromBool(!poll.isRegistered())); + const registration = poll.register( this.globalThis.bunVM().event_loop_handle.?, .process, true, ); + + return registration; } else { @panic("Internal Bun error: poll_ref in Subprocess is null unexpectedly. Please file a bug report."); } @@ -1703,44 +1777,66 @@ pub const Subprocess = struct { sync: bool, this_jsvalue: JSC.JSValue, ) void { - if (this.has_waitpid_task) { - return; - } - defer if (sync) this.updateHasPendingActivityFlag(); - this.has_waitpid_task = true; + this.onWaitPid(sync, this_jsvalue, PosixSpawn.waitpid(this.pid, if (sync) 0 else std.os.W.NOHANG)); + } + + pub fn onWaitPid(this: *Subprocess, sync: bool, this_jsvalue: JSC.JSValue, waitpid_result_: JSC.Maybe(PosixSpawn.WaitPidResult)) void { + defer if (sync) this.updateHasPendingActivity(); + const pid = this.pid; - switch (PosixSpawn.waitpid(pid, if (sync) 0 else std.os.W.NOHANG)) { - .err => |err| { - this.waitpid_err = err; - }, - .result => |result| { - if (result.pid != 0) { - if (std.os.W.IFEXITED(result.status)) { - this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status))); - } + var waitpid_result = waitpid_result_; + while (true) { + switch (waitpid_result) { + .err => |err| { + this.waitpid_err = err; + }, + .result => |result| { + if (result.pid == pid) { + if (std.os.W.IFEXITED(result.status)) { + this.exit_code = @as(u8, @truncate(std.os.W.EXITSTATUS(result.status))); + } - if (std.os.W.IFSIGNALED(result.status)) { - this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status))))); - } else if (std.os.W.IFSTOPPED(result.status)) { - this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status))))); + if (std.os.W.IFSIGNALED(result.status)) { + this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.TERMSIG(result.status))))); + } else if (std.os.W.IFSTOPPED(result.status)) { + this.signal_code = @as(SignalCode, @enumFromInt(@as(u8, @truncate(std.os.W.STOPSIG(result.status))))); + } } - } - if (!this.hasExited()) { - this.watch(); - } - }, + if (!this.hasExited()) { + switch (this.watch()) { + .result => {}, + .err => |err| { + if (comptime Environment.isMac) { + if (err.getErrno() == .SRCH) { + waitpid_result = PosixSpawn.waitpid(pid, if (sync) 0 else std.os.W.NOHANG); + continue; + } + } + }, + } + } + }, + } + break; } - this.has_waitpid_task = false; if (!sync and this.hasExited()) { var vm = this.globalThis.bunVM(); // prevent duplicate notifications - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinitWithVM(vm); + switch (this.poll) { + .poll_ref => |poll_| { + if (poll_) |poll| { + this.poll.poll_ref = null; + this.flags.reference_count -= @as(u30, @intFromBool(poll.isRegistered())); + poll.deinitWithVM(vm); + } + }, + .wait_thread => { + this.poll.wait_thread.poll_ref.deactivate(vm.event_loop_handle.?); + }, } this.onExit(this.globalThis, this_jsvalue); @@ -1755,16 +1851,43 @@ pub const Subprocess = struct { log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 }); defer this.updateHasPendingActivity(); this_jsvalue.ensureStillAlive(); - this.has_waitpid_task = false; if (this.hasExited()) { + { + this.flags.reference_count += 1; + + const Holder = struct { + process: *Subprocess, + task: JSC.AnyTask, + + pub fn unref(self: *@This()) void { + // this calls disableKeepingProcessAlive on pool_ref and stdin, stdout, stderr + self.process.unref(); + self.process.flags.reference_count -= 1; + self.process.updateHasPendingActivity(); + bun.default_allocator.destroy(self); + } + }; + + var holder = bun.default_allocator.create(Holder) catch @panic("OOM"); + + holder.* = .{ + .process = this, + .task = JSC.AnyTask.New(Holder, Holder.unref).init(holder), + }; + + this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task)); + } + if (this.exit_promise.trySwap()) |promise| { + const waitpid_error = this.waitpid_err; + this.waitpid_err = null; + if (this.exit_code) |code| { promise.asAnyPromise().?.resolve(globalThis, JSValue.jsNumber(code)); } else if (this.signal_code != null) { promise.asAnyPromise().?.resolve(globalThis, this.getSignalCode(globalThis)); - } else if (this.waitpid_err) |err| { - this.waitpid_err = null; + } else if (waitpid_error) |err| { promise.asAnyPromise().?.reject(globalThis, err.toJSC(globalThis)); } else { // crash in debug mode @@ -1775,8 +1898,11 @@ pub const Subprocess = struct { } if (this.on_exit_callback.trySwap()) |callback| { + const waitpid_error = this.waitpid_err; + this.waitpid_err = null; + const waitpid_value: JSValue = - if (this.waitpid_err) |err| + if (waitpid_error) |err| err.toJSC(globalThis) else JSC.JSValue.jsUndefined(); @@ -1801,30 +1927,6 @@ pub const Subprocess = struct { globalThis.bunVM().onUnhandledError(globalThis, result); } } - - if (this.hasExited()) { - const Holder = struct { - process: *Subprocess, - task: JSC.AnyTask, - - pub fn unref(self: *@This()) void { - // this calls disableKeepingProcessAlive on pool_ref and stdin, stdout, stderr - self.process.unref(); - self.process.has_pending_unref = false; - self.process.updateHasPendingActivity(); - bun.default_allocator.destroy(self); - } - }; - - var holder = bun.default_allocator.create(Holder) catch @panic("OOM"); - this.has_pending_unref = true; - holder.* = .{ - .process = this, - .task = JSC.AnyTask.New(Holder, Holder.unref).init(holder), - }; - - this.globalThis.bunVM().enqueueTask(JSC.Task.init(&holder.task)); - } } const os = std.os; @@ -1833,8 +1935,6 @@ pub const Subprocess = struct { if (pipe[0] != pipe[1]) os.close(pipe[1]); } - const PosixSpawn = @import("./spawn.zig").PosixSpawn; - const Stdio = union(enum) { inherit: void, ignore: void, @@ -2072,4 +2172,135 @@ pub const Subprocess = struct { } pub const IPCHandler = IPC.NewIPCHandler(Subprocess); + + // Machines which do not support pidfd_open (GVisor, Linux Kernel < 5.6) + // use a thread to wait for the child process to exit. + // We use a single thread to call waitpid() in a loop. + pub const WaiterThread = struct { + concurrent_queue: Queue = .{}, + queue: std.ArrayList(*Subprocess) = std.ArrayList(*Subprocess).init(bun.default_allocator), + started: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + + pub fn setShouldUseWaiterThread() void { + @atomicStore(bool, &should_use_waiter_thread, true, .Monotonic); + } + + pub fn shouldUseWaiterThread() bool { + return @atomicLoad(bool, &should_use_waiter_thread, .Monotonic); + } + + pub const WaitTask = struct { + subprocess: *Subprocess, + next: ?*WaitTask = null, + }; + + var should_use_waiter_thread = false; + + pub const Queue = bun.UnboundedQueue(WaitTask, .next); + pub var instance: WaiterThread = .{}; + pub fn init() !void { + std.debug.assert(should_use_waiter_thread); + + if (instance.started.fetchMax(1, .Monotonic) > 0) { + return; + } + + var thread = try std.Thread.spawn(.{ .stack_size = 512 * 1024 }, loop, .{}); + thread.detach(); + } + + pub const WaitPidResultTask = struct { + result: JSC.Maybe(PosixSpawn.WaitPidResult), + subprocess: *Subprocess, + + pub fn runFromJSThread(self: *@This()) void { + var result = self.result; + var subprocess = self.subprocess; + _ = subprocess.poll.wait_thread.ref_count.fetchSub(1, .Monotonic); + bun.default_allocator.destroy(self); + subprocess.onWaitPid(false, subprocess.this_jsvalue, result); + } + }; + + pub fn append(process: *Subprocess) void { + if (process.poll == .wait_thread) { + process.poll.wait_thread.poll_ref.activate(process.globalThis.bunVM().event_loop_handle.?); + _ = process.poll.wait_thread.ref_count.fetchAdd(1, .Monotonic); + } else { + process.poll = .{ + .wait_thread = .{ + .poll_ref = .{}, + .ref_count = std.atomic.Atomic(u32).init(1), + }, + }; + process.poll.wait_thread.poll_ref.activate(process.globalThis.bunVM().event_loop_handle.?); + } + + var task = bun.default_allocator.create(WaitTask) catch unreachable; + task.* = WaitTask{ + .subprocess = process, + }; + instance.concurrent_queue.push(task); + process.updateHasPendingActivity(); + + init() catch @panic("Failed to start WaiterThread"); + } + + pub fn loop() void { + Output.Source.configureNamedThread("Waitpid"); + + var this = &instance; + + while (true) { + { + var batch = this.concurrent_queue.popBatch(); + var iter = batch.iterator(); + this.queue.ensureUnusedCapacity(batch.count) catch unreachable; + while (iter.next()) |task| { + this.queue.appendAssumeCapacity(task.subprocess); + bun.default_allocator.destroy(task); + } + } + + var queue: []*Subprocess = this.queue.items; + var i: usize = 0; + while (queue.len > 0 and i < queue.len) { + var process = queue[i]; + + // this case shouldn't really happen + if (process.pid == bun.invalid_fd) { + _ = this.queue.orderedRemove(i); + _ = process.poll.wait_thread.ref_count.fetchSub(1, .Monotonic); + queue = this.queue.items; + continue; + } + + const result = PosixSpawn.waitpid(process.pid, std.os.W.NOHANG); + if (result == .err or (result == .result and result.result.pid == process.pid)) { + _ = this.queue.orderedRemove(i); + queue = this.queue.items; + + var task = bun.default_allocator.create(WaitPidResultTask) catch unreachable; + task.* = WaitPidResultTask{ + .result = result, + .subprocess = process, + }; + + process.globalThis.bunVMConcurrently().enqueueTaskConcurrent( + JSC.ConcurrentTask.create( + JSC.Task.init(task), + ), + ); + } + + i += 1; + } + + var mask = std.os.empty_sigset; + var signal: c_int = std.os.SIG.CHLD; + var rc = std.c.sigwait(&mask, &signal); + _ = rc; + } + } + }; }; diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 6fcfb8a36fe5da..241b13108f2d9f 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -1834,7 +1834,7 @@ pub const FilePoll = struct { log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) Subprocess", .{poll.fd}); var loader = ptr.as(JSC.Subprocess); - loader.onExitNotification(); + loader.onExitNotificationTask(); }, @field(Owner.Tag, "FileSink") => { log("onUpdate " ++ kqueue_or_epoll ++ " (fd: {d}) FileSink", .{poll.fd}); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 9120e3de638694..f73447fe58721a 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -344,7 +344,7 @@ const Futimes = JSC.Node.Async.futimes; const Lchmod = JSC.Node.Async.lchmod; const Lchown = JSC.Node.Async.lchown; const Unlink = JSC.Node.Async.unlink; - +const WaitPidResultTask = JSC.Subprocess.WaiterThread.WaitPidResultTask; // Task.get(ReadFileTask) -> ?ReadFileTask pub const Task = TaggedPointerUnion(.{ FetchTasklet, @@ -403,6 +403,7 @@ pub const Task = TaggedPointerUnion(.{ Lchmod, Lchown, Unlink, + WaitPidResultTask, }); const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue; pub const ConcurrentTask = struct { @@ -923,6 +924,10 @@ pub const EventLoop = struct { var any: *Unlink = task.get(Unlink).?; any.runFromJSThread(); }, + @field(Task.Tag, typeBaseName(@typeName(WaitPidResultTask))) => { + var any: *WaitPidResultTask = task.get(WaitPidResultTask).?; + any.runFromJSThread(); + }, else => if (Environment.allow_assert) { bun.Output.prettyln("\nUnexpected tag: {s}\n", .{@tagName(task.tag())}); } else { @@ -1131,6 +1136,7 @@ pub const EventLoop = struct { var global = ctx.global; var global_vm = ctx.jsc; + while (true) { while (this.tickWithCount() > 0) : (this.global.handleRejectedPromises()) { this.tickConcurrent(); diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index cc35bcf17da68d..4ee87b21bb9f21 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -739,6 +739,13 @@ pub const VirtualMachine = struct { } if (map.get("BUN_GARBAGE_COLLECTOR_LEVEL")) |gc_level| { + // Reuse this flag for other things to avoid unnecessary hashtable + // lookups on start for obscure flags which we do not want others to + // depend on. + if (map.get("BUN_FEATURE_FLAG_FORCE_WAITER_THREAD") != null) { + JSC.Subprocess.WaiterThread.setShouldUseWaiterThread(); + } + if (strings.eqlComptime(gc_level, "1")) { this.aggressive_garbage_collection = .mild; } else if (strings.eqlComptime(gc_level, "2")) { diff --git a/src/output.zig b/src/output.zig index 3814fa0fabb825..94f0bffa0505ae 100644 --- a/src/output.zig +++ b/src/output.zig @@ -476,11 +476,23 @@ pub fn scoped(comptime tag: @Type(.EnumLiteral), comptime disabled: bool) _log_f defer lock.unlock(); if (Output.enable_ansi_colors_stderr) { - out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, true), args) catch unreachable; - buffered_writer.flush() catch unreachable; + out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, true), args) catch { + really_disable = true; + return; + }; + buffered_writer.flush() catch { + really_disable = true; + return; + }; } else { - out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, false), args) catch unreachable; - buffered_writer.flush() catch unreachable; + out.print(comptime prettyFmt("[" ++ @tagName(tag) ++ "] " ++ fmt, false), args) catch { + really_disable = true; + return; + }; + buffered_writer.flush() catch { + really_disable = true; + return; + }; } } }.log; diff --git a/src/standalone_bun.zig b/src/standalone_bun.zig index ba5ec8f6646605..21514f72f23765 100644 --- a/src/standalone_bun.zig +++ b/src/standalone_bun.zig @@ -568,12 +568,24 @@ pub const StandaloneModuleGraph = struct { if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun")) { return null; } + + if (comptime Environment.isDebug) { + if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun-debug")) { + return null; + } + } } if (argv0_len == 4) { if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bunx")) { return null; } + + if (comptime Environment.isDebug) { + if (bun.strings.eqlComptimeIgnoreLen(bun.argv()[0][0..argv0_len], "bun-debugx")) { + return null; + } + } } } diff --git a/test/js/bun/spawn/spawn-streaming-stdout.test.ts b/test/js/bun/spawn/spawn-streaming-stdout.test.ts index 558a703712ee75..bea2b3b516888f 100644 --- a/test/js/bun/spawn/spawn-streaming-stdout.test.ts +++ b/test/js/bun/spawn/spawn-streaming-stdout.test.ts @@ -5,12 +5,9 @@ import { closeSync, openSync } from "fs"; test("spawn can read from stdout multiple chunks", async () => { gcTick(true); - const maxFD = openSync("/dev/null", "w"); - closeSync(maxFD); - - for (let i = 0; i < 10; i++) + var maxFD: number = -1; + for (let i = 0; i < 100; i++) { await (async function () { - var exited; const proc = spawn({ cmd: [bunExe(), import.meta.dir + "/spawn-streaming-stdout-repro.js"], stdin: "ignore", @@ -34,9 +31,14 @@ test("spawn can read from stdout multiple chunks", async () => { // TODO: fix bug with returning SIGHUP instead of exit code 1 proc.kill(); expect(Buffer.concat(chunks).toString()).toBe("Wrote to stdout\n".repeat(4)); + await proc.exited; })(); - + if (maxFD === -1) { + maxFD = openSync("/dev/null", "w"); + closeSync(maxFD); + } + } const newMaxFD = openSync("/dev/null", "w"); closeSync(newMaxFD); expect(newMaxFD).toBe(maxFD); -}); +}, 60_000); diff --git a/test/js/bun/spawn/spawn.test.ts b/test/js/bun/spawn/spawn.test.ts index 139eddb335b899..727d357dea0733 100644 --- a/test/js/bun/spawn/spawn.test.ts +++ b/test/js/bun/spawn/spawn.test.ts @@ -1,6 +1,6 @@ import { ArrayBufferSink, readableStreamToText, spawn, spawnSync, write } from "bun"; import { describe, expect, it } from "bun:test"; -import { gcTick as _gcTick, bunExe } from "harness"; +import { gcTick as _gcTick, bunExe, bunEnv } from "harness"; import { rmSync, writeFileSync } from "node:fs"; import path from "path"; @@ -161,7 +161,7 @@ for (let [gcTick, label] of [ expect(exitCode1).toBe(0); expect(exitCode2).toBe(1); } - }, 20_000); + }, 60_000_0); // FIXME: fix the assertion failure it.skip("Uint8Array works as stdout", () => { @@ -476,3 +476,22 @@ for (let [gcTick, label] of [ }); }); } + +if (!process.env.BUN_FEATURE_FLAG_FORCE_WAITER_THREAD) { + it("with BUN_FEATURE_FLAG_FORCE_WAITER_THREAD", async () => { + const result = spawnSync({ + cmd: [bunExe(), "test", import.meta.path], + env: { + ...bunEnv, + // Both flags are necessary to force this condition + "BUN_FEATURE_FLAG_FORCE_WAITER_THREAD": "1", + "BUN_GARBAGE_COLLECTOR_LEVEL": "1", + }, + }); + if (result.exitCode !== 0) { + console.error(result.stderr.toString()); + console.log(result.stdout.toString()); + } + expect(result.exitCode).toBe(0); + }, 60_000); +}