Skip to content

Commit

Permalink
refactor: removed spawns in favor of stream chains
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Oct 13, 2023
1 parent b80b206 commit fd8cd7b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/interfaces/wick-interface-http/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
name: http
kind: wick/types@v1
metadata:
version: 0.4.0
version: 0.5.0
package:
registry:
host: registry.candle.dev
Expand Down
55 changes: 25 additions & 30 deletions crates/wick/wick-trigger-http/src/http/component_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,39 +180,28 @@ pub(super) async fn respond(
.map_or(false, |v| v == "text/event-stream");

let res = if event_stream {
let (tx, rx) = unbounded_channel();
let _output_handle = tokio::spawn(async move {
while let Some(p) = body_stream.recv().await {
let body_stream =
tokio_stream::wrappers::UnboundedReceiverStream::new(body_stream).filter_map(move |p| async move {
if !p.has_data() {
continue;
return None;
}
match codec {
Codec::Json => {
let chunk = p
.decode::<wick_http::HttpEvent>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(|v| to_sse_string_bytes(&v));
let _ = tx.send(chunk);
}
Codec::Raw => {
let chunk = p
.decode::<Base64Bytes>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(Into::into);
let _ = tx.send(chunk);
}
Codec::Text => {
let chunk = p
.decode::<String>()
.map_err(|e| HttpError::Utf8Text(e.to_string()))
.map(Into::into);
let _ = tx.send(chunk);
}
Some(match codec {
Codec::Json => p
.decode::<wick_http::HttpEvent>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(|v| to_sse_string_bytes(&v)),
Codec::Raw => p
.decode::<Base64Bytes>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(Into::into),
Codec::Text => p
.decode::<String>()
.map_err(|e| HttpError::Utf8Text(e.to_string()))
.map(Into::into),
Codec::FormData => unreachable!("FormData is not supported as a decoder for HTTP responses"),
}
}
});
let body = Body::wrap_stream(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
})
});
let body = Body::wrap_stream(body_stream);
builder.body(body).unwrap()
} else {
let mut body = bytes::BytesMut::new();
Expand Down Expand Up @@ -273,6 +262,12 @@ fn split_stream(
let _ = sender.send(response);
} else if p.port() == "body" {
let _ = body_tx.send(p);
} else if let PacketPayload::Err(e) = p.payload {
if let Some(sender) = res_tx.take() {
let _ = sender.send(Err(HttpError::OperationError(e.to_string())));
}
warn!(?e, "http:stream:error");
break;
}
}
Err(e) => {
Expand Down

0 comments on commit fd8cd7b

Please sign in to comment.