Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added exec-style SQL operation #345

Merged
merged 3 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/bins/wick/src/commands/new/component/sql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use clap::Args;
use structured_output::StructuredOutput;
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder, SqlOperationKind};
use wick_config::config::{self, ComponentConfiguration};
use wick_interface_types::{Field, Type};

Expand Down Expand Up @@ -46,13 +46,15 @@ pub(crate) async fn handle(

let component = SqlComponentConfigBuilder::default()
.resource(resource_name)
.operations([SqlOperationDefinitionBuilder::default()
.name("operation_name".to_owned())
.inputs([Field::new("id", Type::String)])
.query("SELECT * FROM users WHERE id = $1".to_owned())
.arguments(["id".to_owned()])
.build()
.unwrap()])
.operations([SqlOperationKind::Query(
SqlOperationDefinitionBuilder::default()
.name("operation_name".to_owned())
.inputs([Field::new("id", Type::String)])
.query("SELECT * FROM users WHERE id = $1".to_owned())
.arguments(["id".to_owned()])
.build()
.unwrap(),
)])
.build()
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions crates/components/wick-sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub enum Error {
#[error("Failed to fetch result of query: {0}")]
Fetch(String),

#[error("Failed to fetch result of exec: {0}")]
Exec(String),

#[error("Unknown database scheme '{0}'")]
InvalidScheme(String),

Expand Down
86 changes: 70 additions & 16 deletions crates/components/wick-sql/src/mssql_tiberius/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_json::{Map, Value};
use tiberius::{Query, Row};
use tracing::Span;
use url::Url;
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationDefinition};
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationKind};
use wick_config::config::{ErrorBehavior, Metadata};
use wick_config::{ConfigValidation, Resolver};
use wick_interface_types::{ComponentSignature, Field, OperationSignatures, Type};
Expand Down Expand Up @@ -147,14 +147,14 @@ impl Component for AzureSqlComponent {

async fn handle_call(
pool: Pool<ConnectionManager>,
opdef: SqlOperationDefinition,
opdef: SqlOperationKind,
input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: Arc<(String, String)>,
span: Span,
) -> Result<(), Error> {
let mut client = pool.get().await.map_err(|e| Error::PoolConnection(e.to_string()))?;
let error_behavior = *opdef.on_error();
let error_behavior = opdef.on_error();
match error_behavior {
ErrorBehavior::Commit | ErrorBehavior::Rollback => {
client.simple_query("BEGIN TRAN").await.map_err(|_| Error::TxStart)?;
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn handle_call(

async fn handle_stream(
client: &mut PooledConnection<'_, ConnectionManager>,
opdef: SqlOperationDefinition,
opdef: SqlOperationKind,
mut input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: Arc<(String, String)>,
Expand Down Expand Up @@ -224,23 +224,46 @@ async fn handle_stream(
type_wrappers.push((ty, packet));
}

let _ = exec(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await?;
let result = match &opdef {
SqlOperationKind::Query(_) => {
query(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await
}
SqlOperationKind::Exec(_) => {
exec(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await
}
};

if let Err(e) = result {
if opdef.on_error() == ErrorBehavior::Ignore {
let _ = tx.send(Packet::err("output", e.to_string()));
} else {
return Err(Error::OperationFailed(e.to_string()));
}
};
}
Ok(())
}

async fn exec(
async fn query(
client: &mut PooledConnection<'_, ConnectionManager>,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationDefinition,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
span: Span,
Expand Down Expand Up @@ -275,6 +298,37 @@ async fn exec(
Ok(duration)
}

async fn exec(
client: &mut PooledConnection<'_, ConnectionManager>,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
span: Span,
) -> Result<Duration, Error> {
let start = SystemTime::now();
span.in_scope(|| trace!(stmt = %stmt.0, "executing query"));

let bound_args = common::bind_args(def.arguments(), &args)?;

#[allow(trivial_casts)]
let mut query = Query::new(&stmt.1);

for param in bound_args {
query.bind(MsSqlWrapper::try_from(&param).map_err(|e| Error::SqlServerEncodingFault(param.0, e))?);
}

let packet = match query.execute(client).await.map_err(|e| Error::Failed(e.to_string())) {
Ok(result) => Packet::encode("output", result.rows_affected()),
Err(err) => Packet::err("output", err.to_string()),
};
let _ = tx.send(packet);

let duration = SystemTime::now().duration_since(start).unwrap();

Ok(duration)
}

impl ConfigValidation for AzureSqlComponent {
type Config = SqlComponentConfig;
fn validate(config: &Self::Config, resolver: &Resolver) -> Result<(), ComponentError> {
Expand Down Expand Up @@ -373,7 +427,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/mssql_tiberius/mssql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -75,7 +80,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
73 changes: 66 additions & 7 deletions crates/components/wick-sql/src/sqlx/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parking_lot::Mutex;
use serde_json::Value;
use sqlx::{MssqlPool, PgPool};
use url::Url;
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationDefinition};
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationKind};
use wick_config::config::Metadata;
use wick_config::{ConfigValidation, Resolver};
use wick_interface_types::{ComponentSignature, Field, OperationSignatures, Type};
Expand All @@ -28,7 +28,7 @@ enum CtxPool {
}

impl CtxPool {
fn fetch<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> BoxStream<'a, Result<Value, Error>>
fn query<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> BoxStream<'a, Result<Value, Error>>
where
'q: 'a,
{
Expand Down Expand Up @@ -64,6 +64,39 @@ impl CtxPool {
}
}
}

async fn exec<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> Result<u64, Error>
where
'q: 'a,
{
match self {
CtxPool::Postgres(c) => {
let mut query = sqlx::query(query);
for arg in args {
trace!(?arg, "binding arg");
query = query.bind(arg);
}

query
.execute(c)
.await
.map(|r| r.rows_affected())
.map_err(|e| Error::Exec(e.to_string()))
}
CtxPool::MsSql(c) => {
let mut query = sqlx::query(query);
for arg in args {
trace!(?arg, "binding arg");
query = query.bind(arg);
}
query
.execute(c)
.await
.map(|r| r.rows_affected())
.map_err(|e| Error::Exec(e.to_string()))
}
}
}
}

#[derive()]
Expand Down Expand Up @@ -183,7 +216,16 @@ impl Component for SqlXComponent {
type_wrappers.push((ty, packet));
}

if let Err(e) = exec(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await {
let result = match &opdef {
SqlOperationKind::Query(_) => {
query(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await
}
SqlOperationKind::Exec(_) => {
exec(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await
}
};

if let Err(e) = result {
error!(error = %e, "error executing query");
let _ = tx.send(Packet::component_error(e.to_string()));
}
Expand Down Expand Up @@ -281,18 +323,18 @@ async fn init_context(config: SqlComponentConfig, addr: Url) -> Result<Context,
})
}

async fn exec(
async fn query(
client: CtxPool,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationDefinition,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
) -> Result<(), Error> {
debug!(stmt = %stmt.0, "executing query");

let bound_args = common::bind_args(def.arguments(), &args)?;

let mut result = client.fetch(&stmt.1, bound_args);
let mut result = client.query(&stmt.1, bound_args);

while let Some(row) = result.next().await {
if let Err(e) = row {
Expand All @@ -307,6 +349,23 @@ async fn exec(
Ok(())
}

async fn exec(
client: CtxPool,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
) -> Result<(), Error> {
debug!(stmt = %stmt.0, "executing query");

let bound_args = common::bind_args(def.arguments(), &args)?;

let rows = client.exec(&stmt.1, bound_args).await?;

let _ = tx.send(Packet::encode("output", rows));
Ok(())
}

#[cfg(test)]
mod test {
use anyhow::Result;
Expand Down Expand Up @@ -338,7 +397,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/sqlx/mssql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -54,7 +59,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/sqlx/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -54,7 +59,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
Loading