Skip to content

Commit

Permalink
feat: added sql exec operation type
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Jun 29, 2023
1 parent bcd4169 commit 034a043
Show file tree
Hide file tree
Showing 21 changed files with 700 additions and 51 deletions.
18 changes: 10 additions & 8 deletions crates/bins/wick/src/commands/new/component/sql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use clap::Args;
use structured_output::StructuredOutput;
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder, SqlOperationKind};
use wick_config::config::{self, ComponentConfiguration};
use wick_interface_types::{Field, Type};

Expand Down Expand Up @@ -46,13 +46,15 @@ pub(crate) async fn handle(

let component = SqlComponentConfigBuilder::default()
.resource(resource_name)
.operations([SqlOperationDefinitionBuilder::default()
.name("operation_name".to_owned())
.inputs([Field::new("id", Type::String)])
.query("SELECT * FROM users WHERE id = $1".to_owned())
.arguments(["id".to_owned()])
.build()
.unwrap()])
.operations([SqlOperationKind::Query(
SqlOperationDefinitionBuilder::default()
.name("operation_name".to_owned())
.inputs([Field::new("id", Type::String)])
.query("SELECT * FROM users WHERE id = $1".to_owned())
.arguments(["id".to_owned()])
.build()
.unwrap(),
)])
.build()
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions crates/components/wick-sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub enum Error {
#[error("Failed to fetch result of query: {0}")]
Fetch(String),

#[error("Failed to fetch result of exec: {0}")]
Exec(String),

#[error("Unknown database scheme '{0}'")]
InvalidScheme(String),

Expand Down
83 changes: 65 additions & 18 deletions crates/components/wick-sql/src/mssql_tiberius/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_json::{Map, Value};
use tiberius::{Query, Row};
use tracing::Span;
use url::Url;
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationDefinition};
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationKind};
use wick_config::config::{ErrorBehavior, Metadata};
use wick_config::{ConfigValidation, Resolver};
use wick_interface_types::{ComponentSignature, Field, OperationSignatures, Type};
Expand Down Expand Up @@ -147,14 +147,14 @@ impl Component for AzureSqlComponent {

async fn handle_call(
pool: Pool<ConnectionManager>,
opdef: SqlOperationDefinition,
opdef: SqlOperationKind,
input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: Arc<(String, String)>,
span: Span,
) -> Result<(), Error> {
let mut client = pool.get().await.map_err(|e| Error::PoolConnection(e.to_string()))?;
let error_behavior = *opdef.on_error();
let error_behavior = opdef.on_error();
match error_behavior {
ErrorBehavior::Commit | ErrorBehavior::Rollback => {
client.simple_query("BEGIN TRAN").await.map_err(|_| Error::TxStart)?;
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn handle_call(

async fn handle_stream(
client: &mut PooledConnection<'_, ConnectionManager>,
opdef: SqlOperationDefinition,
opdef: SqlOperationKind,
mut input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: Arc<(String, String)>,
Expand Down Expand Up @@ -224,17 +224,33 @@ async fn handle_stream(
type_wrappers.push((ty, packet));
}

if let Err(e) = exec(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await
{
if *opdef.on_error() == ErrorBehavior::Ignore {
let result = match &opdef {
SqlOperationKind::Query(_) => {
query(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await
}
SqlOperationKind::Exec(_) => {
exec(
client,
tx.clone(),
opdef.clone(),
type_wrappers,
stmt.clone(),
span.clone(),
)
.await
}
};

if let Err(e) = result {
if opdef.on_error() == ErrorBehavior::Ignore {
let _ = tx.send(Packet::err("output", e.to_string()));
} else {
return Err(Error::OperationFailed(e.to_string()));
Expand All @@ -244,10 +260,10 @@ async fn handle_stream(
Ok(())
}

async fn exec(
async fn query(
client: &mut PooledConnection<'_, ConnectionManager>,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationDefinition,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
span: Span,
Expand Down Expand Up @@ -282,6 +298,37 @@ async fn exec(
Ok(duration)
}

async fn exec(
client: &mut PooledConnection<'_, ConnectionManager>,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
span: Span,
) -> Result<Duration, Error> {
let start = SystemTime::now();
span.in_scope(|| trace!(stmt = %stmt.0, "executing query"));

let bound_args = common::bind_args(def.arguments(), &args)?;

#[allow(trivial_casts)]
let mut query = Query::new(&stmt.1);

for param in bound_args {
query.bind(MsSqlWrapper::try_from(&param).map_err(|e| Error::SqlServerEncodingFault(param.0, e))?);
}

let packet = match query.execute(client).await.map_err(|e| Error::Failed(e.to_string())) {
Ok(result) => Packet::encode("output", result.rows_affected()),
Err(err) => Packet::err("output", err.to_string()),
};
let _ = tx.send(packet);

let duration = SystemTime::now().duration_since(start).unwrap();

Ok(duration)
}

impl ConfigValidation for AzureSqlComponent {
type Config = SqlComponentConfig;
fn validate(config: &Self::Config, resolver: &Resolver) -> Result<(), ComponentError> {
Expand Down Expand Up @@ -380,7 +427,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/mssql_tiberius/mssql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -75,7 +80,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
73 changes: 66 additions & 7 deletions crates/components/wick-sql/src/sqlx/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parking_lot::Mutex;
use serde_json::Value;
use sqlx::{MssqlPool, PgPool};
use url::Url;
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationDefinition};
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationKind};
use wick_config::config::Metadata;
use wick_config::{ConfigValidation, Resolver};
use wick_interface_types::{ComponentSignature, Field, OperationSignatures, Type};
Expand All @@ -28,7 +28,7 @@ enum CtxPool {
}

impl CtxPool {
fn fetch<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> BoxStream<'a, Result<Value, Error>>
fn query<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> BoxStream<'a, Result<Value, Error>>
where
'q: 'a,
{
Expand Down Expand Up @@ -64,6 +64,39 @@ impl CtxPool {
}
}
}

async fn exec<'a, 'q>(&'a self, query: &'q str, args: Vec<SqlWrapper>) -> Result<u64, Error>
where
'q: 'a,
{
match self {
CtxPool::Postgres(c) => {
let mut query = sqlx::query(query);
for arg in args {
trace!(?arg, "binding arg");
query = query.bind(arg);
}

query
.execute(c)
.await
.map(|r| r.rows_affected())
.map_err(|e| Error::Exec(e.to_string()))
}
CtxPool::MsSql(c) => {
let mut query = sqlx::query(query);
for arg in args {
trace!(?arg, "binding arg");
query = query.bind(arg);
}
query
.execute(c)
.await
.map(|r| r.rows_affected())
.map_err(|e| Error::Exec(e.to_string()))
}
}
}
}

#[derive()]
Expand Down Expand Up @@ -183,7 +216,16 @@ impl Component for SqlXComponent {
type_wrappers.push((ty, packet));
}

if let Err(e) = exec(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await {
let result = match &opdef {
SqlOperationKind::Query(_) => {
query(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await
}
SqlOperationKind::Exec(_) => {
exec(client.clone(), tx.clone(), opdef.clone(), type_wrappers, stmt.clone()).await
}
};

if let Err(e) = result {
error!(error = %e, "error executing query");
let _ = tx.send(Packet::component_error(e.to_string()));
}
Expand Down Expand Up @@ -281,18 +323,18 @@ async fn init_context(config: SqlComponentConfig, addr: Url) -> Result<Context,
})
}

async fn exec(
async fn query(
client: CtxPool,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationDefinition,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
) -> Result<(), Error> {
debug!(stmt = %stmt.0, "executing query");

let bound_args = common::bind_args(def.arguments(), &args)?;

let mut result = client.fetch(&stmt.1, bound_args);
let mut result = client.query(&stmt.1, bound_args);

while let Some(row) = result.next().await {
if let Err(e) = row {
Expand All @@ -307,6 +349,23 @@ async fn exec(
Ok(())
}

async fn exec(
client: CtxPool,
tx: FluxChannel<Packet, wick_packet::Error>,
def: SqlOperationKind,
args: Vec<(Type, Packet)>,
stmt: Arc<(String, String)>,
) -> Result<(), Error> {
debug!(stmt = %stmt.0, "executing query");

let bound_args = common::bind_args(def.arguments(), &args)?;

let rows = client.exec(&stmt.1, bound_args).await?;

let _ = tx.send(Packet::encode("output", rows));
Ok(())
}

#[cfg(test)]
mod test {
use anyhow::Result;
Expand Down Expand Up @@ -338,7 +397,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/sqlx/mssql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -54,7 +59,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
9 changes: 7 additions & 2 deletions crates/components/wick-sql/src/sqlx/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ mod integration_test {
use flow_component::{panic_callback, Component};
use futures::StreamExt;
use serde_json::json;
use wick_config::config::components::{ComponentConfig, SqlComponentConfigBuilder, SqlOperationDefinitionBuilder};
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
use wick_packet::{packet_stream, Invocation, Packet};
Expand Down Expand Up @@ -54,7 +59,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(op);
config.operations_mut().push(SqlOperationKind::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
Loading

0 comments on commit 034a043

Please sign in to comment.