diff --git a/crates/wick/flow-graph-interpreter/src/graph.rs b/crates/wick/flow-graph-interpreter/src/graph.rs index f0fe7880..5c9cc4b1 100644 --- a/crates/wick/flow-graph-interpreter/src/graph.rs +++ b/crates/wick/flow-graph-interpreter/src/graph.rs @@ -233,13 +233,13 @@ fn expand_port_paths( schematic: &mut Schematic, expressions: &mut [FlowExpression], ) -> Result<(), flow_graph::error::Error> { - for expression in expressions.iter_mut() { + for (i, expression) in expressions.iter_mut().enumerate() { if let FlowExpression::ConnectionExpression(expr) = expression { let (from, to) = expr.clone().into_parts(); let (from_inst, from_port, _) = from.into_parts(); let (to_inst, to_port, _) = to.into_parts(); if let InstancePort::Path(name, parts) = from_port { - let id = format!("{}_pluck_{}_[{}]", schematic.name(), name, parts.join(",")); + let id = format!("{}_pluck_{}_{}_[{}]", schematic.name(), i, name, parts.join(",")); let config = HashMap::from([( "path".to_owned(), Value::Array(parts.into_iter().map(Value::String).collect()), diff --git a/crates/wick/flow-graph-interpreter/tests/core.rs b/crates/wick/flow-graph-interpreter/tests/core.rs index d881945a..dc198a0d 100644 --- a/crates/wick/flow-graph-interpreter/tests/core.rs +++ b/crates/wick/flow-graph-interpreter/tests/core.rs @@ -24,6 +24,35 @@ async fn test_pluck() -> Result<()> { .await } +#[test_logger::test(tokio::test)] +async fn test_pluck_substreams() -> Result<()> { + test_config( + "./tests/manifests/v1/core-pluck-streams.yaml", + None, + None, + vec![ + Packet::open_bracket("input"), + Packet::encode("input", json!({"value": "value1!"})), + Packet::encode("input", json!({"value": "value2!"})), + Packet::close_bracket("input"), + Packet::done("input"), + ], + vec![ + Packet::open_bracket("pluck1"), + Packet::encode("pluck1", "value1!"), + Packet::encode("pluck1", "value2!"), + Packet::close_bracket("pluck1"), + Packet::done("pluck1"), + Packet::open_bracket("pluck2"), + Packet::encode("pluck2", "value1!"), + Packet::encode("pluck2", "value2!"), + Packet::close_bracket("pluck2"), + Packet::done("pluck2"), + ], + ) + .await +} + #[test_logger::test(tokio::test)] async fn test_pluck_shorthand() -> Result<()> { first_packet_test( diff --git a/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-shorthand.yaml b/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-shorthand.yaml index fb54d9e9..d9097fa9 100644 --- a/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-shorthand.yaml +++ b/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-shorthand.yaml @@ -9,6 +9,3 @@ component: - name: test flow: - <>.request.headers.cookie.0 -> <>.output - - name: test2 - flow: - - '<>.input."Raw String Field #" -> <>.output' diff --git a/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-streams.yaml b/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-streams.yaml new file mode 100644 index 00000000..f88f2681 --- /dev/null +++ b/crates/wick/flow-graph-interpreter/tests/manifests/v1/core-pluck-streams.yaml @@ -0,0 +1,12 @@ +--- +name: 'test' +kind: wick/component@v1 +metadata: + version: '0.0.2' +component: + kind: wick/component/composite@v1 + operations: + - name: test + flow: + - <>.input.value -> <>.pluck1 + - <>.input.value -> <>.pluck2