Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fetch() faster at uploading files over http:// #16303

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 132 additions & 23 deletions src/http.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,69 @@ pub const HTTPThread = struct {

has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
timer: std.time.Timer,

lazy_libdeflater: ?*LibdeflateState = null,
lazy_request_body_buffer: ?*HeapRequestBodyBuffer = null,

pub const HeapRequestBodyBuffer = struct {
buffer: [512 * 1024]u8 = undefined,
fixed_buffer_allocator: std.heap.FixedBufferAllocator,

pub usingnamespace bun.New(@This());

pub fn init() *@This() {
var this = HeapRequestBodyBuffer.new(.{
.fixed_buffer_allocator = undefined,
});
this.fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(&this.buffer);
return this;
}

pub fn put(this: *@This()) void {
if (http_thread.lazy_request_body_buffer == null) {
// This case hypothetically should never happen
this.fixed_buffer_allocator.reset();
http_thread.lazy_request_body_buffer = this;
} else {
this.deinit();
}
}

pub fn deinit(this: *@This()) void {
this.destroy();
}
};

pub const RequestBodyBuffer = union(enum) {
heap: *HeapRequestBodyBuffer,
stack: std.heap.StackFallbackAllocator(request_body_send_stack_buffer_size),

pub fn deinit(this: *@This()) void {
switch (this.*) {
.heap => |heap| heap.put(),
.stack => {},
}
}

pub fn allocatedSlice(this: *@This()) []u8 {
return switch (this.*) {
.heap => |heap| &heap.buffer,
.stack => |*stack| &stack.buffer,
};
}

pub fn allocator(this: *@This()) std.mem.Allocator {
return switch (this.*) {
.heap => |heap| heap.fixed_buffer_allocator.allocator(),
.stack => |*stack| stack.get(),
};
}

pub fn toArrayList(this: *@This()) std.ArrayList(u8) {
var arraylist = std.ArrayList(u8).fromOwnedSlice(this.allocator(), this.allocatedSlice());
arraylist.items.len = 0;
return arraylist;
}
};

const threadlog = Output.scoped(.HTTPThread, true);
const WriteMessage = struct {
Expand All @@ -1072,6 +1133,24 @@ pub const HTTPThread = struct {
pub usingnamespace bun.New(@This());
};

const request_body_send_stack_buffer_size = 32 * 1024;

pub inline fn getRequestBodySendBuffer(this: *@This(), estimated_size: usize) RequestBodyBuffer {
if (estimated_size >= request_body_send_stack_buffer_size) {
if (this.lazy_request_body_buffer == null) {
log("Allocating HeapRequestBodyBuffer due to {d} bytes request body", .{estimated_size});
return .{
.heap = HeapRequestBodyBuffer.init(),
};
}

return .{ .heap = bun.take(&this.lazy_request_body_buffer).? };
}
return .{
.stack = std.heap.stackFallback(request_body_send_stack_buffer_size, bun.default_allocator),
};
}

pub fn deflater(this: *@This()) *LibdeflateState {
if (this.lazy_libdeflater == null) {
this.lazy_libdeflater = LibdeflateState.new(.{
Expand Down Expand Up @@ -1667,6 +1746,23 @@ pub inline fn getAllocator() std.mem.Allocator {
return default_allocator;
}

const max_tls_record_size = 16 * 1024;

/// Get the buffer we use to write data to the network.
///
/// For large files, we want to avoid extra network send overhead
/// So we do two things:
/// 1. Use a 32 KB stack buffer for small files
/// 2. Use a 512 KB heap buffer for large files
/// This only has an impact on http://
///
/// On https://, we are limited to a 16 KB TLS record size.
inline fn getRequestBodySendBuffer(this: *@This()) HTTPThread.RequestBodyBuffer {
const actual_estimated_size = this.state.request_body.len + this.estimatedRequestHeaderByteLength();
const estimated_size = if (this.isHTTPS()) @min(actual_estimated_size, max_tls_record_size) else actual_estimated_size * 2;
return http_thread.getRequestBodySendBuffer(estimated_size);
}

pub inline fn cleanup(force: bool) void {
default_arena.gc(force);
}
Expand Down Expand Up @@ -3058,6 +3154,18 @@ pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCon
this.result_callback.run(@fieldParentPtr("client", this), HTTPClientResult{ .fail = null, .metadata = null, .has_more = false });
}

fn estimatedRequestHeaderByteLength(this: *const HTTPClient) usize {
const sliced = this.header_entries.slice();
var count: usize = 0;
for (sliced.items(.name)) |head| {
count += @as(usize, head.length);
}
for (sliced.items(.value)) |value| {
count += @as(usize, value.length);
}
return count;
}

pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
Expand All @@ -3077,11 +3185,12 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s

switch (this.state.request_stage) {
.pending, .headers => {
var stack_fallback = std.heap.stackFallback(16384, default_allocator);
const allocator = stack_fallback.get();
var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable;
defer if (list.capacity > stack_fallback.buffer.len) list.deinit();
const writer = &list.writer();
var request_body_buffer = this.getRequestBodySendBuffer();
defer request_body_buffer.deinit();
var temporary_send_buffer = request_body_buffer.toArrayList();
defer temporary_send_buffer.deinit();

const writer = &temporary_send_buffer.writer();

this.setTimeout(socket, 5);

Expand Down Expand Up @@ -3120,17 +3229,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
};
}

const headers_len = list.items.len;
assert(list.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0 and !this.flags.proxy_tunneling) {
var remain = list.items.ptr[list.items.len..list.capacity];
const headers_len = temporary_send_buffer.items.len;
assert(temporary_send_buffer.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and temporary_send_buffer.capacity - temporary_send_buffer.items.len > 0 and !this.flags.proxy_tunneling) {
var remain = temporary_send_buffer.items.ptr[temporary_send_buffer.items.len..temporary_send_buffer.capacity];
const wrote = @min(remain.len, this.state.request_body.len);
assert(wrote > 0);
@memcpy(remain[0..wrote], this.state.request_body[0..wrote]);
list.items.len += wrote;
temporary_send_buffer.items.len += wrote;
}

const to_send = list.items[this.state.request_sent_len..];
const to_send = temporary_send_buffer.items[this.state.request_sent_len..];
if (comptime Environment.allow_assert) {
assert(!socket.isShutdown());
assert(!socket.isClosed());
Expand Down Expand Up @@ -3320,11 +3429,11 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.proxy_headers => {
if (this.proxy_tunnel) |proxy| {
this.setTimeout(socket, 5);
var stack_fallback = std.heap.stackFallback(16384, default_allocator);
const allocator = stack_fallback.get();
var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable;
defer if (list.capacity > stack_fallback.buffer.len) list.deinit();
const writer = &list.writer();
var request_body_buffer = this.getRequestBodySendBuffer();
defer request_body_buffer.deinit();
var temporary_send_buffer = request_body_buffer.toArrayList();
defer temporary_send_buffer.deinit();
const writer = &temporary_send_buffer.writer();

const request = this.buildRequest(this.state.request_body.len);
writeRequest(
Expand All @@ -3336,17 +3445,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
return;
};

const headers_len = list.items.len;
assert(list.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0) {
var remain = list.items.ptr[list.items.len..list.capacity];
const headers_len = temporary_send_buffer.items.len;
assert(temporary_send_buffer.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and temporary_send_buffer.capacity - temporary_send_buffer.items.len > 0) {
var remain = temporary_send_buffer.items.ptr[temporary_send_buffer.items.len..temporary_send_buffer.capacity];
const wrote = @min(remain.len, this.state.request_body.len);
assert(wrote > 0);
@memcpy(remain[0..wrote], this.state.request_body[0..wrote]);
list.items.len += wrote;
temporary_send_buffer.items.len += wrote;
}

const to_send = list.items[this.state.request_sent_len..];
const to_send = temporary_send_buffer.items[this.state.request_sent_len..];
if (comptime Environment.allow_assert) {
assert(!socket.isShutdown());
assert(!socket.isClosed());
Expand Down
Loading