diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 02ecabc9cc4b13..6ca2a132cc92c9 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -165,7 +165,7 @@ async fn op_read( buf: ZeroCopyBuf, ) -> Result { let resource = state.borrow().resource_table.get_any(rid)?; - resource.read(buf).await.map(|n| n as u32) + resource.read_return(buf).await.map(|(n, _)| n as u32) } #[op] diff --git a/core/resources.rs b/core/resources.rs index eaa1fb3cf49972..56c9298af381bb 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -35,14 +35,7 @@ pub trait Resource: Any + 'static { type_name::().into() } - /// Resources may implement `read()` to be a readable stream - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(async move { - let (nread, _) = self.read_return(buf).await?; - Ok(nread) - }) - } - + /// Resources may implement `read_return()` to be a readable stream fn read_return( self: Rc, _buf: ZeroCopyBuf, diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index fcea23a959ac56..789979ff95be8b 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -10,7 +10,7 @@ const { ReadableStream, ReadableStreamPrototype, - getReadableStreamRid, + getReadableStreamResourceBacking, readableStreamClose, _state, } = window.__bootstrap.streams; @@ -333,8 +333,8 @@ } if (isStreamingResponseBody === true) { - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { + const resourceBacking = getReadableStreamResourceBacking(respBody); + if (resourceBacking) { if (respBody.locked) { throw new TypeError("ReadableStream is locked."); } @@ -352,7 +352,8 @@ ), serverId, i, - resourceRid, + resourceBacking.rid, + resourceBacking.autoClose, ).then(() => { // Release JS lock. readableStreamClose(respBody); diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index 0714e379d770f8..183ebde2610256 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -205,8 +205,13 @@ async fn op_flash_write_resource( server_id: u32, token: u32, resource_id: deno_core::ResourceId, + auto_close: bool, ) -> Result<(), AnyError> { - let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?; + let resource = if auto_close { + op_state.borrow_mut().resource_table.take_any(resource_id)? + } else { + op_state.borrow_mut().resource_table.get_any(resource_id)? + }; let sock = { let op_state = &mut op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 63023a29649e72..588a7da5746a46 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -17,8 +17,7 @@ } = window.__bootstrap.fetch; const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { ReadableStream, ReadableStreamPrototype } = - window.__bootstrap.streams; + const { ReadableStreamPrototype } = window.__bootstrap.streams; const abortSignal = window.__bootstrap.abortSignal; const { WebSocket, @@ -33,8 +32,12 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred, getReadableStreamRid, readableStreamClose } = - window.__bootstrap.streams; + const { + Deferred, + getReadableStreamResourceBacking, + readableStreamForRid, + readableStreamClose, + } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -50,7 +53,6 @@ StringPrototypeSplit, Symbol, SymbolAsyncIterator, - TypedArrayPrototypeSubarray, TypeError, Uint8Array, Uint8ArrayPrototype, @@ -121,7 +123,7 @@ // It will be closed automatically once the request has been handled and // the response has been sent. if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(streamRid); + body = readableStreamForRid(streamRid, false); } const innerRequest = newInnerRequest( @@ -170,10 +172,6 @@ } } - function readRequest(streamRid, buf) { - return core.opAsync("op_http_read", streamRid, buf); - } - function createRespondWith( httpConn, streamRid, @@ -270,9 +268,9 @@ ) { throw new TypeError("Unreachable"); } - const resourceRid = getReadableStreamRid(respBody); + const resourceBacking = getReadableStreamResourceBacking(respBody); let reader; - if (resourceRid) { + if (resourceBacking) { if (respBody.locked) { throw new TypeError("ReadableStream is locked."); } @@ -281,9 +279,9 @@ await core.opAsync( "op_http_write_resource", streamRid, - resourceRid, + resourceBacking.rid, ); - core.tryClose(resourceRid); + if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. } catch (error) { const connError = httpConn[connErrorSymbol]; @@ -379,32 +377,6 @@ }; } - function createRequestBodyStream(streamRid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest(streamRid, chunk); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - } - }, - }); - } - const _ws = Symbol("[[associated_ws]]"); function upgradeWebSocket(request, options = {}) { diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d1b38fb42dbb71..bffe3c3d5d159b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -78,7 +78,6 @@ pub fn init() -> Extension { )) .ops(vec![ op_http_accept::decl(), - op_http_read::decl(), op_http_write_headers::decl(), op_http_headers::decl(), op_http_write::decl(), @@ -329,11 +328,63 @@ impl HttpStreamResource { } } +impl HttpStreamResource { + async fn read( + self: Rc, + mut buf: ZeroCopyBuf, + ) -> Result<(usize, ZeroCopyBuf), AnyError> { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok((0, buf)), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; + }; + + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok((len, buf)); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok((0, buf)), + } + } + }; + + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + } +} + impl Resource for HttpStreamResource { fn name(&self) -> Cow { "httpStream".into() } + fn read_return( + self: Rc, + _buf: ZeroCopyBuf, + ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { + Box::pin(self.read(_buf)) + } + fn close(self: Rc) { self.cancel_handle.cancel(); } @@ -816,55 +867,6 @@ async fn op_http_shutdown( Ok(()) } -#[op] -async fn op_http_read( - state: Rc>, - rid: ResourceId, - mut buf: ZeroCopyBuf, -) -> Result { - let stream = state - .borrow_mut() - .resource_table - .get::(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok(0), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); - } - _ => unreachable!(), - }; - }; - - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok(len); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok(0), - } - } - }; - - let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await -} - #[op] fn op_http_websocket_accept_header(key: String) -> Result { let digest = ring::digest::digest( diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index cbf781b53fd1d9..412c58c3c16447 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -654,11 +654,12 @@ * read directly from the underlying resource if they so choose (FastStream). * * @param {number} rid The resource ID to read from. + * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. * @returns {ReadableStream} */ - function readableStreamForRid(rid) { + function readableStreamForRid(rid, autoClose = true) { const stream = webidl.createBranded(ReadableStream); - stream[_maybeRid] = rid; + stream[_resourceBacking] = { rid, autoClose }; const underlyingSource = { type: "bytes", async pull(controller) { @@ -666,7 +667,7 @@ try { const bytesRead = await core.read(rid, v); if (bytesRead === 0) { - core.tryClose(rid); + if (autoClose) core.tryClose(rid); controller.close(); controller.byobRequest.respond(0); } else { @@ -674,11 +675,11 @@ } } catch (e) { controller.error(e); - core.tryClose(rid); + if (autoClose) core.tryClose(rid); } }, cancel() { - core.tryClose(rid); + if (autoClose) core.tryClose(rid); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }; @@ -761,8 +762,8 @@ } } - function getReadableStreamRid(stream) { - return stream[_maybeRid]; + function getReadableStreamResourceBacking(stream) { + return stream[_resourceBacking]; } /** @@ -4424,7 +4425,7 @@ WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); } - const _maybeRid = Symbol("[[maybeRid]]"); + const _resourceBacking = Symbol("[[resourceBacking]]"); /** @template R */ class ReadableStream { /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */ @@ -4439,8 +4440,8 @@ [_state]; /** @type {any} */ [_storedError]; - /** @type {number | null} */ - [_maybeRid] = null; + /** @type {{ rid: number, autoClose: boolean } | null} */ + [_resourceBacking] = null; /** * @param {UnderlyingSource=} underlyingSource @@ -5986,7 +5987,7 @@ readableStreamForRidUnrefable, readableStreamForRidUnrefableRef, readableStreamForRidUnrefableUnref, - getReadableStreamRid, + getReadableStreamResourceBacking, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, diff --git a/runtime/js/40_testing.js b/runtime/js/40_testing.js index c1ce3e726c6a5b..069b27957af4af 100644 --- a/runtime/js/40_testing.js +++ b/runtime/js/40_testing.js @@ -90,7 +90,6 @@ "op_funlock_async": ["unlock a file", "awaiting the result of a `Deno.funlock` call"], "op_futime_async": ["change file timestamps", "awaiting the result of a `Deno.futime` call"], "op_http_accept": ["accept a HTTP request", "closing a `Deno.HttpConn`"], - "op_http_read": ["read the body of a HTTP request", "consuming the entire request body"], "op_http_shutdown": ["shutdown a HTTP connection", "awaiting `Deno.HttpEvent#respondWith`"], "op_http_upgrade_websocket": ["upgrade a HTTP connection to a WebSocket", "awaiting `Deno.HttpEvent#respondWith`"], "op_http_write_headers": ["write HTTP response headers", "awaiting `Deno.HttpEvent#respondWith`"],