Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign create pipeline UX and pipeline outputs for greater density and performance #663

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use crate::jobs::{
use crate::metrics::__path_get_operator_metric_groups;
use crate::pipelines::__path_get_pipelines;
use crate::pipelines::{
__path_create_pipeline, __path_delete_pipeline, __path_get_pipeline, __path_get_pipeline_jobs,
__path_patch_pipeline, __path_restart_pipeline, __path_validate_query,
__path_create_pipeline, __path_create_preview_pipeline, __path_delete_pipeline,
__path_get_pipeline, __path_get_pipeline_jobs, __path_patch_pipeline, __path_restart_pipeline,
__path_validate_query,
};
use crate::rest::__path_ping;
use crate::rest_utils::{service_unavailable, ErrorResp};
Expand Down Expand Up @@ -207,6 +208,7 @@ impl IntoResponse for HttpError {
validate_query,
validate_udf,
create_pipeline,
create_preview_pipeline,
patch_pipeline,
restart_pipeline,
get_pipeline,
Expand Down Expand Up @@ -237,6 +239,7 @@ impl IntoResponse for HttpError {
components(schemas(
ErrorResp,
PipelinePost,
PreviewPost,
PipelinePatch,
PipelineRestart,
Pipeline,
Expand Down
191 changes: 123 additions & 68 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ use petgraph::{Direction, EdgeDirection};
use std::collections::HashMap;

use petgraph::visit::NodeRef;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use crate::{compiler_service, connection_profiles, jobs, types};
use arroyo_datastream::default_sink;
use arroyo_rpc::api_types::pipelines::{
Job, Pipeline, PipelinePatch, PipelinePost, PipelineRestart, QueryValidationResult, StopType,
ValidateQueryPost,
Job, Pipeline, PipelinePatch, PipelinePost, PipelineRestart, PreviewPost,
QueryValidationResult, StopType, ValidateQueryPost,
};
use arroyo_rpc::api_types::udfs::{GlobalUdf, Udf};
use arroyo_rpc::api_types::{JobCollection, PaginationQueryParams, PipelineCollection};
use arroyo_rpc::grpc::api::{ArrowProgram, ConnectorOp};

use arroyo_connectors::kafka::{KafkaConfig, KafkaTable, SchemaRegistry};
use arroyo_datastream::logical::{LogicalProgram, OperatorName};
use arroyo_datastream::logical::{LogicalNode, LogicalProgram, OperatorName};
use arroyo_df::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, SqlConfig};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_rpc::formats::Format;
Expand Down Expand Up @@ -51,7 +51,9 @@ use crate::udfs::build_udf;
use crate::AuthData;
use crate::{connection_tables, to_micros};
use arroyo_rpc::config::config;
use arroyo_types::to_millis;
use cornucopia_async::{Database, DatabaseSource};
use petgraph::prelude::EdgeRef;

async fn compile_sql<'a>(
query: String,
Expand Down Expand Up @@ -266,44 +268,67 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
}

pub(crate) async fn create_pipeline_int<'a>(
req: &PipelinePost,
pub_id: &str,
name: String,
query: String,
udfs: Vec<Udf>,
parallelism: u64,
checkpoint_interval: Duration,
is_preview: bool,
enable_sinks: bool,
auth: AuthData,
db: &DatabaseSource,
) -> Result<(i64, LogicalProgram), ErrorResp> {
let is_preview = req.preview.unwrap_or(false);

if req.parallelism > auth.org_metadata.max_parallelism as u64 {
) -> Result<String, ErrorResp> {
if parallelism > auth.org_metadata.max_parallelism as u64 {
return Err(bad_request(format!(
"Your plan allows you to run pipelines up to parallelism {};
contact [email protected] for an increase",
auth.org_metadata.max_parallelism
)));
}

let mut compiled = compile_sql(
req.query.clone(),
req.udfs.as_ref().unwrap_or(&vec![]),
req.parallelism as usize,
&auth,
false,
db,
)
.await?;
let pub_id = generate_id(IdTypes::Pipeline);

let mut compiled =
compile_sql(query.clone(), &udfs, parallelism as usize, &auth, false, db).await?;

if compiled.program.graph.node_count() > auth.org_metadata.max_operators as usize {
return Err(bad_request(
format!("This pipeline is too large to create under your plan, which only allows pipelines up to {} nodes;
contact [email protected] for an increase", auth.org_metadata.max_operators)));
}

set_parallelism(&mut compiled.program, req.parallelism as usize);

if is_preview && !config().sinks_in_preview {
for node in compiled.program.graph.node_weights_mut() {
// replace all sink connectors with websink for preview
if node.operator_name == OperatorName::ConnectorSink {
node.operator_config = default_sink().encode_to_vec();
set_parallelism(&mut compiled.program, parallelism as usize);

if is_preview {
// in Preview, we either replace sinks with a preview sink, or add a preview sink
// next to them depending on the `enable_sinks` option
let g = &mut compiled.program.graph;
for idx in g.node_indices() {
let should_replace = {
let node = g.node_weight(idx).unwrap();
node.operator_name == OperatorName::ConnectorSink
&& node.operator_config != default_sink().encode_to_vec()
};
if should_replace {
if enable_sinks {
let new_idx = g.add_node(LogicalNode {
operator_id: format!("{}_1", g.node_weight(idx).unwrap().operator_id),
description: "Preview sink".to_string(),
operator_name: OperatorName::ConnectorSink,
operator_config: default_sink().encode_to_vec(),
parallelism: 1,
});
let edges: Vec<_> = g
.edges_directed(idx, Direction::Incoming)
.map(|e| (e.source(), e.weight().clone()))
.collect();
for (source, weight) in edges {
g.add_edge(source, new_idx, weight);
}
} else {
g.node_weight_mut(idx).unwrap().operator_config =
default_sink().encode_to_vec();
}
}
}
}
Expand All @@ -323,21 +348,19 @@ pub(crate) async fn create_pipeline_int<'a>(

let program_bytes = proto_program.encode_to_vec();

if req.name.is_empty() {
if name.is_empty() {
return Err(required_field("name"));
}

let udfs = serde_json::to_value(req.udfs.as_ref().unwrap_or(&vec![])).unwrap();

api_queries::execute_create_pipeline(
&db.client().await?,
&pub_id,
&auth.organization_id,
&auth.user_id,
&req.name,
&name,
&PipelineType::sql,
&Some(req.query.clone()),
&udfs,
&Some(query.clone()),
&serde_json::to_value(&udfs).unwrap(),
&program_bytes,
&2,
)
Expand All @@ -363,7 +386,30 @@ pub(crate) async fn create_pipeline_int<'a>(
}
}

Ok((pipeline_id, compiled.program))
let job_id = jobs::create_job(
&name,
pipeline_id,
checkpoint_interval,
is_preview,
&auth,
&db,
)
.await?;

log_event(
"job_created",
json!({
"service": "api",
"is_preview": is_preview,
"job_id": job_id,
"parallelism": parallelism,
"has_udfs": udfs.first().map(|e| !e.definition.trim().is_empty()).unwrap_or(false),
// TODO: program features
"features": compiled.program.features(),
}),
);

Ok(pub_id)
}

impl TryInto<Pipeline> for DbPipeline {
Expand Down Expand Up @@ -492,58 +538,67 @@ pub async fn create_pipeline(
) -> Result<Json<Pipeline>, ErrorResp> {
let auth_data = authenticate(&state.database, bearer_auth).await?;

let pipeline_pub_id = generate_id(IdTypes::Pipeline);

//let transaction = db.transaction().await?;

let (pipeline_id, program) = create_pipeline_int(
&pipeline_post,
&pipeline_pub_id,
auth_data.clone(),
&state.database,
)
.await?;

let preview = pipeline_post.preview.unwrap_or(false);

let checkpoint_interval = pipeline_post
.checkpoint_interval_micros
.map(Duration::from_micros)
.unwrap_or(*config().default_checkpoint_interval);

let job_id = jobs::create_job(
&pipeline_post.name,
pipeline_id,
let pipeline_id = create_pipeline_int(
pipeline_post.name,
pipeline_post.query,
pipeline_post.udfs.unwrap_or_default(),
pipeline_post.parallelism,
checkpoint_interval,
preview,
&auth_data,
false,
true,
auth_data.clone(),
&state.database,
)
.await?;

// transaction.commit().await?;

log_event(
"job_created",
json!({
"service": "api",
"is_preview": preview,
"job_id": job_id,
"parallelism": pipeline_post.parallelism,
"has_udfs": pipeline_post.udfs.map(|e| !e.is_empty() && !e[0].definition.trim().is_empty())
.unwrap_or(false),
// TODO: program features
"features": program.features(),
}),
);
let pipeline =
query_pipeline_by_pub_id(&pipeline_id, &state.database.client().await?, &auth_data).await?;

let pipeline = query_pipeline_by_pub_id(
&pipeline_pub_id,
&state.database.client().await?,
&auth_data,
Ok(Json(pipeline))
}

/// Create a new preview pipeline
#[utoipa::path(
post,
path = "/v1/pipelines/preview",
tag = "pipelines",
request_body = PreviewPost,
responses(
(status = 200, description = "Created pipeline and job", body = Pipeline),
(status = 400, description = "Bad request", body = ErrorResp),
),
)]
pub async fn create_preview_pipeline(
State(state): State<AppState>,
bearer_auth: BearerAuth,
WithRejection(Json(req), _): WithRejection<Json<PreviewPost>, ApiError>,
) -> Result<Json<Pipeline>, ErrorResp> {
let auth_data = authenticate(&state.database, bearer_auth).await?;

let pipeline_id = create_pipeline_int(
format!("preview_{}", to_millis(SystemTime::now())),
req.query,
req.udfs.unwrap_or_default(),
1,
Duration::MAX,
true,
req.enable_sinks,
auth_data.clone(),
&state.database,
)
.await?;

let pipeline =
query_pipeline_by_pub_id(&pipeline_id, &state.database.client().await?, &auth_data).await?;

Ok(Json(pipeline))
}

Expand Down
5 changes: 3 additions & 2 deletions crates/arroyo-api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::jobs::{
};
use crate::metrics::get_operator_metric_groups;
use crate::pipelines::{
create_pipeline, delete_pipeline, get_pipeline, get_pipeline_jobs, get_pipelines,
patch_pipeline, restart_pipeline, validate_query,
create_pipeline, create_preview_pipeline, delete_pipeline, get_pipeline, get_pipeline_jobs,
get_pipelines, patch_pipeline, restart_pipeline, validate_query,
};
use crate::rest_utils::not_found;
use crate::udfs::{create_udf, delete_udf, get_udfs, validate_udf};
Expand Down Expand Up @@ -162,6 +162,7 @@ pub fn create_rest_app(database: DatabaseSource, controller_addr: &str) -> Route
.route("/udfs/validate", post(validate_udf))
.route("/udfs/:id", delete(delete_udf))
.route("/pipelines", post(create_pipeline))
.route("/pipelines/preview", post(create_preview_pipeline))
.route("/pipelines", get(get_pipelines))
.route("/jobs", get(get_jobs))
.route("/pipelines/validate_query", post(validate_query))
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-api/src/udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ pub async fn create_udf(
api_queries::execute_create_udf(
&client,
&pub_id,
&auth_data.user_id,
&auth_data.organization_id,
&auth_data.user_id,
&req.prefix,
&udf_name,
&req.definition,
Expand Down
Loading
Loading