Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement operator chaining #804

Merged
merged 14 commits into from
Dec 10, 2024
Prev Previous commit
Next Next commit
compiling
mwylde committed Dec 10, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 34fa6008d01df886559559263c1ed17b600ccd53
63 changes: 34 additions & 29 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,9 @@ use http::StatusCode;

use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;

use std::num::ParseIntError;
use std::str::FromStr;
use std::string::ParseError;
use petgraph::visit::NodeRef;
use std::time::{Duration, SystemTime};

@@ -268,17 +270,19 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
let schema = edge.weight().schema.schema.clone();

let node = compiled_sql.program.graph.node_weight_mut(idx).unwrap();
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
for (node, _) in node.operator_chain.iter_mut() {
if node.operator_name == OperatorName::ConnectorSink {
let mut op = ConnectorOp::decode(&node.operator_config[..]).map_err(|_| {
anyhow!(
"failed to decode configuration for connector node {:?}",
node
)
})?;
})?;

try_register_confluent_schema(&mut op, &schema).await?;
try_register_confluent_schema(&mut op, &schema).await?;

node.operator_config = op.encode_to_vec();
node.operator_config = op.encode_to_vec();
}
}
}

@@ -324,28 +328,28 @@ pub(crate) async fn create_pipeline_int<'a>(
let g = &mut compiled.program.graph;
for idx in g.node_indices() {
let should_replace = {
let node = g.node_weight(idx).unwrap();
node.operator_name == OperatorName::ConnectorSink
&& node.operator_config != default_sink().encode_to_vec()
let node = &g.node_weight(idx).unwrap().operator_chain;
node.is_sink() && node.iter().next().unwrap().0.operator_config != default_sink().encode_to_vec()
};
if should_replace {
if enable_sinks {
let new_idx = g.add_node(LogicalNode {
operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
description: "Preview sink".to_string(),
operator_name: OperatorName::ConnectorSink,
operator_config: default_sink().encode_to_vec(),
parallelism: 1,
});
let edges: Vec<_> = g
.edges_directed(idx, Direction::Incoming)
.map(|e| (e.source(), e.weight().clone()))
.collect();
for (source, weight) in edges {
g.add_edge(source, new_idx, weight);
}
todo!("enable sinks")
// let new_idx = g.add_node(LogicalNode {
// operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
// description: "Preview sink".to_string(),
// operator_name: OperatorName::ConnectorSink,
// operator_config: default_sink().encode_to_vec(),
// parallelism: 1,
// });
// let edges: Vec<_> = g
// .edges_directed(idx, Direction::Incoming)
// .map(|e| (e.source(), e.weight().clone()))
// .collect();
// for (source, weight) in edges {
// g.add_edge(source, new_idx, weight);
// }
} else {
g.node_weight_mut(idx).unwrap().operator_config =
g.node_weight_mut(idx).unwrap().operator_chain.iter_mut().next().unwrap().0.operator_config =
default_sink().encode_to_vec();
}
}
@@ -452,8 +456,9 @@ impl TryInto<Pipeline> for DbPipeline {
.as_object()
.unwrap()
.into_iter()
.map(|(k, v)| (k.clone(), v.as_u64().unwrap() as usize))
.collect(),
.map(|(k, v)| Ok((u32::from_str(k)?, v.as_u64().unwrap() as usize)))
.collect::<Result<HashMap<_, _>, ParseIntError>>()
.map_err(|e| bad_request(format!("invalid node_id: {}", e)))?,
);

let stop = match self.stop {
@@ -682,10 +687,10 @@ pub async fn patch_pipeline(
.ok_or_else(|| not_found("Job"))?;

let program = ArrowProgram::decode(&res.program[..]).map_err(log_and_map)?;
let map: HashMap<String, u32> = program
let map: HashMap<_, _> = program
.nodes
.into_iter()
.map(|node| (node.node_id, parallelism as u32))
.map(|node| (node.node_id.to_string(), parallelism as u32))
.collect();

Some(serde_json::to_value(map).map_err(log_and_map)?)
16 changes: 8 additions & 8 deletions crates/arroyo-connectors/src/impulse/operator.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use bincode::{Decode, Encode};
use datafusion::common::ScalarValue;
use std::time::{Duration, SystemTime};

use arroyo_operator::context::{OperatorContext, SourceContext};
use arroyo_operator::context::{OperatorContext, SourceCollector, SourceContext};
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_types::{from_millis, print_time, to_millis, to_nanos};
@@ -92,7 +92,7 @@ impl ImpulseSourceFunc {
}
}

async fn run(&mut self, ctx: &mut SourceContext) -> SourceFinishType {
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
let delay = self.delay(ctx);
info!(
"Starting impulse source with start {} delay {:?} and limit {}",
@@ -138,7 +138,7 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
ctx.collect(
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
@@ -163,7 +163,7 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
ctx.collect(
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
@@ -183,7 +183,7 @@ impl ImpulseSourceFunc {
.unwrap()
.insert(ctx.task_info.task_index, self.state)
.await;
if self.start_checkpoint(c, ctx).await {
if self.start_checkpoint(c, ctx, collector).await {
return SourceFinishType::Immediate;
}
}
@@ -222,7 +222,7 @@ impl ImpulseSourceFunc {
let counter_column = counter_builder.finish();
let task_index_column = task_index_scalar.to_array_of_size(items).unwrap();
let timestamp_column = timestamp_builder.finish();
ctx.collect(
collector.collect(
RecordBatch::try_new(
schema.clone(),
vec![
@@ -262,7 +262,7 @@ impl SourceOperator for ImpulseSourceFunc {
}
}

async fn run(&mut self, ctx: &mut SourceContext) -> SourceFinishType {
self.run(ctx).await
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
self.run(ctx, collector).await
}
}
18 changes: 10 additions & 8 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -279,7 +279,7 @@ impl ArrowOperator for KafkaSinkFunc {
.expect("Producer creation failed");
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext) {
async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext, _: &mut dyn Collector) {
let values = self.serializer.serialize(&batch);
let timestamps = batch
.column(
@@ -306,7 +306,7 @@ impl ArrowOperator for KafkaSinkFunc {
}
}

async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut OperatorContext) {
async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut OperatorContext, _: &mut dyn Collector) {
self.flush(ctx).await;
if let ConsistencyMode::ExactlyOnce {
next_transaction_index,
@@ -360,6 +360,7 @@ impl ArrowOperator for KafkaSinkFunc {
}
let checkpoint_event = ControlResp::CheckpointEvent(CheckpointEvent {
checkpoint_epoch: epoch,
node_id: ctx.task_info.node_id,
operator_id: ctx.task_info.operator_id.clone(),
subtask_index: ctx.task_info.task_index as u32,
time: SystemTime::now(),
@@ -371,15 +372,16 @@ impl ArrowOperator for KafkaSinkFunc {
.expect("sent commit event");
}

async fn on_close(&mut self, final_message: &Option<SignalMessage>, ctx: &mut OperatorContext, collector: &mut dyn Collector) {
async fn on_close(&mut self, final_message: &Option<SignalMessage>, ctx: &mut OperatorContext, _: &mut dyn Collector) {
self.flush(ctx).await;
if !self.is_committing() {
return;
}
if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
self.handle_commit(epoch, &commit_data, ctx).await;
} else {
warn!("no commit message received, not committing")
}
// if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
// self.handle_commit(epoch, &commit_data, ctx).await;
// } else {
// warn!("no commit message received, not committing")
// }
todo!("committing")
}
}
43 changes: 17 additions & 26 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
@@ -14,13 +14,13 @@ use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, warn};

use arroyo_formats::de::FieldValueType;
use arroyo_operator::context::OperatorContext;
use arroyo_operator::context::{SourceCollector, SourceContext};
use arroyo_operator::operator::SourceOperator;
use arroyo_operator::SourceFinishType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, MetadataField};
use arroyo_types::*;

use super::{Context, SourceOffset, StreamConsumer};
@@ -51,7 +51,7 @@ pub struct KafkaState {
}

impl KafkaSourceFunc {
async fn get_consumer(&mut self, ctx: &mut OperatorContext) -> anyhow::Result<StreamConsumer> {
async fn get_consumer(&mut self, ctx: &mut SourceContext) -> anyhow::Result<StreamConsumer> {
info!("Creating kafka consumer for {}", self.bootstrap_servers);
let mut client_config = ClientConfig::new();

@@ -113,7 +113,7 @@ impl KafkaSourceFunc {
partitions
.iter()
.enumerate()
.filter(|(i, _)| i % ctx.task_info.parallelism == ctx.task_info.task_index)
.filter(|(i, _)| i % ctx.task_info.parallelism as usize == ctx.task_info.task_index as usize)
.map(|(_, p)| {
let offset = state
.get(&p.id())
@@ -145,7 +145,7 @@ impl KafkaSourceFunc {
Ok(consumer)
}

async fn run_int(&mut self, ctx: &mut OperatorContext) -> Result<SourceFinishType, UserError> {
async fn run_int(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> Result<SourceFinishType, UserError> {
let consumer = self
.get_consumer(ctx)
.await
@@ -157,21 +157,21 @@ impl KafkaSourceFunc {
if consumer.assignment().unwrap().count() == 0 {
warn!("Kafka Consumer {}-{} is subscribed to no partitions, as there are more subtasks than partitions... setting idle",
ctx.task_info.operator_id, ctx.task_info.task_index);
ctx.broadcast(ArrowMessage::Signal(SignalMessage::Watermark(
collector.broadcast(ArrowMessage::Signal(SignalMessage::Watermark(
Watermark::Idle,
)))
.await;
}

if let Some(schema_resolver) = &self.schema_resolver {
ctx.initialize_deserializer_with_resolver(
collector.initialize_deserializer_with_resolver(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
schema_resolver.clone(),
);
} else {
ctx.initialize_deserializer(
collector.initialize_deserializer(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
@@ -209,11 +209,10 @@ impl KafkaSourceFunc {
None
};

ctx.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?;
collector.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?;


if ctx.should_flush() {
ctx.flush_buffer().await?;
if collector.should_flush() {
collector.flush_buffer().await?;
}

offsets.insert(msg.partition(), msg.offset());
@@ -226,8 +225,8 @@ impl KafkaSourceFunc {
}
}
_ = flush_ticker.tick() => {
if ctx.should_flush() {
ctx.flush_buffer().await?;
if collector.should_flush() {
collector.flush_buffer().await?;
}
}
control_message = ctx.control_rx.recv() => {
@@ -251,7 +250,7 @@ impl KafkaSourceFunc {
// fails. The actual offset is stored in state.
warn!("Failed to commit offset to Kafka {:?}", e);
}
if self.start_checkpoint(c, ctx).await {
if self.start_checkpoint(c, ctx, collector).await {
return Ok(SourceFinishType::Immediate);
}
},
@@ -286,19 +285,11 @@ impl KafkaSourceFunc {

#[async_trait]
impl SourceOperator for KafkaSourceFunc {
async fn run(&mut self, ctx: &mut OperatorContext) -> SourceFinishType {
match self.run_int(ctx).await {
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector) -> SourceFinishType {
match self.run_int(ctx, collector).await {
Ok(r) => r,
Err(e) => {
ctx.control_tx
.send(ControlResp::Error {
operator_id: ctx.task_info.operator_id.clone(),
task_index: ctx.task_info.task_index,
message: e.name.clone(),
details: e.details.clone(),
})
.await
.unwrap();
ctx.report_user_error(e.clone()).await;

panic!("{}: {}", e.name, e.details);
}
8 changes: 4 additions & 4 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -15,11 +15,11 @@ use tokio::sync::mpsc::Sender;
use tracing::warn;

pub mod blackhole;
// pub mod confluent;
pub mod confluent;
// pub mod filesystem;
// pub mod fluvio;
pub mod impulse;
// pub mod kafka;
pub mod kafka;
// pub mod kinesis;
// pub mod mqtt;
// pub mod nats;
@@ -36,12 +36,12 @@ pub mod stdout;
pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
let connectors: Vec<Box<dyn ErasedConnector>> = vec![
Box::new(blackhole::BlackholeConnector {}),
// Box::new(confluent::ConfluentConnector {}),
Box::new(confluent::ConfluentConnector {}),
// Box::new(filesystem::delta::DeltaLakeConnector {}),
// Box::new(filesystem::FileSystemConnector {}),
// Box::new(fluvio::FluvioConnector {}),
Box::new(impulse::ImpulseConnector {}),
// Box::new(kafka::KafkaConnector {}),
Box::new(kafka::KafkaConnector {}),
// Box::new(kinesis::KinesisConnector {}),
// Box::new(mqtt::MqttConnector {}),
// Box::new(nats::NatsConnector {}),
Loading