Skip to content

Commit

Permalink
feat: improved type coercion, added mssql tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Jul 28, 2023
1 parent 71ba023 commit 6d1949b
Show file tree
Hide file tree
Showing 17 changed files with 390 additions and 91 deletions.
16 changes: 10 additions & 6 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ python := if os() == 'windows' { 'python' } else { '/usr/bin/env python3' }
# The wick repository
repository := "https://github.com/candlecorp/wick"

# The `wick` command to ensure that the build from source is used.
wick := "cargo run -p wick-cli --"

# The root directory of this project
wick_root := justfile_directory()

Expand Down Expand Up @@ -92,7 +95,7 @@ integration: integration-setup && integration-teardown
integration-setup:
rm -rf ~/.cache/wick
just _run-integration-task up init
cargo run -p wick-cli -- reg push --debug ./crates/integration/test-baseline-component/component.yaml --insecure-oci=${DOCKER_REGISTRY}
{{wick}} reg push --debug ./crates/integration/test-baseline-component/component.yaml --insecure-oci=${DOCKER_REGISTRY}

# Tear down the environment for integration tests
integration-teardown:
Expand Down Expand Up @@ -218,17 +221,18 @@ _run-wasm-task task:

# Run `wick` tests for db components
_wick-db-tests:
cargo run -p wick-cli -- test ./examples/db/postgres-numeric-tests.wick
{{wick}} test ./examples/db/postgres-numeric-tests.wick
{{wick}} test ./tests/cli-tests/tests/cmd/db/azuresql-tx-test.wick

# Run `wick` tests for http components
_wick-http-tests:
cargo run -p wick-cli -- test ./examples/http/wasm-http-call/harness.wick
{{wick}} test ./examples/http/wasm-http-call/harness.wick

# Run `wick` tests for generic components
_wick-component-tests:
cargo run -p wick-cli -- test ./examples/components/hello-world.wick
cargo run -p wick-cli -- test ./examples/components/composite-db-import.wick
DIR=./examples/components/wasi-fs/ cargo run -p wick-cli -- test ./examples/components/wasi-fs/component.wick
{{wick}} test ./examples/components/hello-world.wick
{{wick}} test ./examples/components/composite-db-import.wick
DIR=./examples/components/wasi-fs/ {{wick}} test ./examples/components/wasi-fs/component.wick

# Run component-codegen unit tests
_codegen-tests:
Expand Down
7 changes: 6 additions & 1 deletion crates/components/wick-sql/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ pub(crate) trait ClientConnection: Send + Sync {
'a: 'b;

async fn exec(&mut self, stmt: String, bound_args: Vec<SqlWrapper>) -> Result<u64>;
async fn finish(&mut self) -> Result<()>;
async fn finish(&mut self, behavior: ErrorBehavior) -> Result<()>;
async fn handle_error(&mut self, e: Error, behavior: ErrorBehavior) -> Result<()>;
async fn start(&mut self, behavior: ErrorBehavior) -> Result<()>;
}

#[derive()]
Expand Down Expand Up @@ -184,6 +185,10 @@ impl<'conn> Connection<'conn> {
self.0.handle_error(e, behavior).await
}

pub(crate) async fn start(&mut self, behavior: ErrorBehavior) -> Result<()> {
self.0.start(behavior).await
}

#[allow(clippy::unused_async)]
pub(crate) async fn finish(&mut self) -> Result<()> {
// todo
Expand Down
25 changes: 15 additions & 10 deletions crates/components/wick-sql/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,15 @@ impl Component for SqlComponent {
tokio::spawn(async move {
let start = SystemTime::now();
let span = invocation.span.clone();
if let Err(e) = handle_call(&client, opdef, input_streams, tx.clone(), &stmt, span).await {
let Ok(mut connection) = client.get_connection().await else {
invocation.trace(|| {
error!(error = %e, "error in sql operation");
error!("could not get connection to database");
});
return;
};
if let Err(e) = handle_call(&mut connection, opdef, input_streams, tx.clone(), &stmt, span).await {
invocation.trace(|| {
error!(error = %e, "error handling sql operation");
});
let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
}
Expand Down Expand Up @@ -224,8 +230,8 @@ fn validate(config: &SqlComponentConfig, _resolver: &Resolver) -> Result<(), Err
Ok(())
}

async fn handle_call<'a, 'b>(
client: &'a Client,
async fn handle_call<'a, 'b, 'c>(
connection: &'a mut Connection<'c>,
opdef: SqlOperationKind,
input_streams: Vec<PacketStream>,
tx: PacketSender,
Expand All @@ -236,13 +242,12 @@ where
'b: 'a,
{
let error_behavior = opdef.on_error();
let mut connection = match error_behavior {
ErrorBehavior::Commit | ErrorBehavior::Rollback => client.get_connection().await?, // TODO make transaction
_ => client.get_connection().await?,
};

let result = handle_stream(&mut connection, opdef, input_streams, tx, stmt, span).await;
connection.start(error_behavior).await?;

let result = handle_stream(connection, opdef, input_streams, tx, stmt, span.clone()).await;
if let Err(e) = result {
span.in_scope(|| error!(error = %e, "error in sql operation"));
let err = Error::OperationFailed(e.to_string());
connection.handle_error(e, error_behavior).await?;
return Err(err);
Expand Down Expand Up @@ -316,7 +321,7 @@ where
if opdef.on_error() == ErrorBehavior::Ignore {
let _ = tx.send(Packet::err("output", e.to_string()));
} else {
return Err(Error::OperationFailed(e.to_string()));
return Err(Error::ErrorInStream(e.to_string()));
}
};

Expand Down
6 changes: 6 additions & 0 deletions crates/components/wick-sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub enum Error {
#[error("Operation failed: {0}")]
OperationFailed(String),

#[error("SQL Query failed, check log for details")]
QueryFailed,

#[error("SQL error reported within stream: {0}")]
ErrorInStream(String),

#[error("Query failed: {0}")]
Failed(String),

Expand Down
41 changes: 33 additions & 8 deletions crates/components/wick-sql/src/mssql_tiberius/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,33 @@ impl Context {

#[async_trait::async_trait]
impl<'a> ClientConnection for PooledConnection<'a, ConnectionManager> {
async fn finish(&mut self) -> Result<(), Error> {
async fn finish(&mut self, _behavior: ErrorBehavior) -> Result<(), Error> {
// todo
Ok(())
}

async fn handle_error(&mut self, _e: Error, _behavior: ErrorBehavior) -> Result<(), Error> {
// todo
async fn start(&mut self, behavior: ErrorBehavior) -> Result<(), Error> {
match behavior {
ErrorBehavior::Commit | ErrorBehavior::Rollback => {
self.simple_query("BEGIN TRAN").await.map_err(|_| Error::TxStart)?;
}
_ => {}
}
Ok(())
}

async fn handle_error(&mut self, e: Error, behavior: ErrorBehavior) -> Result<(), Error> {
match behavior {
ErrorBehavior::Commit => {
error!(error=%e, on_error=?behavior, "error in sql operation, committing transaction");
self.simple_query("COMMIT").await.map_err(|_| Error::TxCommit)?;
}
ErrorBehavior::Rollback => {
error!(error=%e, on_error=?behavior, "error in sql operation, rolling back transaction");
self.simple_query("ROLLBACK").await.map_err(|_| Error::TxCommit)?;
}
_ => {}
}
Ok(())
}

Expand Down Expand Up @@ -85,11 +105,16 @@ impl<'a> ClientConnection for PooledConnection<'a, ConnectionManager> {
result
.filter(|row| futures::future::ready(!matches!(row, Ok(tiberius::QueryItem::Metadata(_)))))
.map(|row| {
row.map_err(|e| Error::OperationFailed(e.to_string())).and_then(|row| {
row
.into_row()
.map_or_else(|| Err(Error::NoRow), |row| Ok(row_to_json(&row)))
})
row
.map_err(|e| {
tracing::Span::current().in_scope(|| tracing::error!(error=%e,"sql error in stream"));
Error::QueryFailed
})
.and_then(|row| {
row
.into_row()
.map_or_else(|| Err(Error::NoRow), |row| Ok(row_to_json(&row)))
})
})
.boxed(),
)
Expand Down
7 changes: 6 additions & 1 deletion crates/components/wick-sql/src/sqlx/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ impl DatabaseProvider for SqlXComponent {

#[async_trait::async_trait]
impl ClientConnection for CtxPool {
async fn finish(&mut self) -> Result<(), Error> {
async fn finish(&mut self, _behavior: ErrorBehavior) -> Result<(), Error> {
// todo
Ok(())
}

async fn start(&mut self, _behavior: ErrorBehavior) -> Result<(), Error> {
// todo
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions crates/components/wick-sql/src/sqlx/sqlite/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ mod integration_test {
#[test_logger::test(tokio::test)]
async fn test_int() -> Result<()> {
let mut conn = connect().await;
let row = conn.fetch_one("select cast(3 as integer);").await.unwrap();
let row = conn.fetch_one("select cast(3 as integer);").await?;
let row = read_row(&row);
assert_eq!(row[0].as_i64().unwrap(), 3);
Ok(())
Expand All @@ -155,8 +155,7 @@ mod integration_test {

let row = conn
.fetch_one("select cast(1 as tinyint) as foo, cast('hello' as nvarchar(50)) as bar")
.await
.unwrap();
.await?;
let row = SerMapRow::from(row);
let row = serde_json::to_string(&row).unwrap();
assert_eq!(row, r#"{"foo":1,"bar":"hello"}"#);
Expand Down
8 changes: 7 additions & 1 deletion crates/wick/wick-packet/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use serde_json::Value;
use wick_interface_types::Type;

use crate::PacketError;

/// Errors originating from WASM components.
Expand Down Expand Up @@ -45,7 +48,7 @@ pub enum Error {

/// Returned when trying to decode a non-JSON object into [crate::RuntimeConfig].
#[error("Can only convert JSON Objects to a operation and component configuration, got '{0}'")]
BadJson(serde_json::Value),
BadJson(Value),

/// Couldn't retrieve a complete set of packets from a [crate::StreamMap]
#[error("Could not retrieve a complete set of packets. Stream '{0}' failed to provide a packet: '{1}'")]
Expand All @@ -65,6 +68,9 @@ pub enum Error {
#[cfg(feature = "datetime")]
#[error("Error parsing date '{0}', date must be milliseconds from the UNIX epoch")]
ParseDateMillis(u64),

#[error("Could not coerce value {value} to a {desired}")]
Coersion { value: Value, desired: Type },
}

impl Error {
Expand Down
61 changes: 20 additions & 41 deletions crates/wick/wick-packet/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,8 @@ use wasmrs::{BoxFlux, Metadata, Payload, PayloadError, RawPayload};
use wick_interface_types::Type;

use crate::metadata::DONE_FLAG;
use crate::{
Base64Bytes,
ComponentReference,
Error,
PacketStream,
TypeWrapper,
WickMetadata,
CLOSE_BRACKET,
OPEN_BRACKET,
};
use crate::wrapped_type::coerce;
use crate::{Base64Bytes, Error, PacketStream, TypeWrapper, WickMetadata, CLOSE_BRACKET, OPEN_BRACKET};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[must_use]
Expand Down Expand Up @@ -295,37 +287,8 @@ impl PacketPayload {

/// Partially process a [Packet] as [Type].
pub fn type_wrapper(self, sig: Type) -> Result<TypeWrapper, Error> {
let val = match sig {
Type::I8 => TypeWrapper::new(sig, self.decode::<i8>()?.into()),
Type::I16 => TypeWrapper::new(sig, self.decode::<i16>()?.into()),
Type::I32 => TypeWrapper::new(sig, self.decode::<i32>()?.into()),
Type::I64 => TypeWrapper::new(sig, self.decode::<i64>()?.into()),
Type::U8 => TypeWrapper::new(sig, self.decode::<u8>()?.into()),
Type::U16 => TypeWrapper::new(sig, self.decode::<u16>()?.into()),
Type::U32 => TypeWrapper::new(sig, self.decode::<u32>()?.into()),
Type::U64 => TypeWrapper::new(sig, self.decode::<u64>()?.into()),
Type::F32 => TypeWrapper::new(sig, self.decode::<f32>()?.into()),
Type::F64 => TypeWrapper::new(sig, self.decode::<f64>()?.into()),
Type::Bool => TypeWrapper::new(sig, self.decode::<bool>()?.into()),
Type::String => TypeWrapper::new(sig, self.decode::<String>()?.into()),
Type::Datetime => TypeWrapper::new(sig, self.decode::<String>()?.into()),
Type::Bytes => TypeWrapper::new(sig, self.decode::<Vec<u8>>()?.into()),
Type::Named(_) => TypeWrapper::new(sig, self.decode::<serde_json::Value>()?),
Type::List { .. } => TypeWrapper::new(sig, self.decode::<Vec<serde_json::Value>>()?.into()),
Type::Optional { .. } => TypeWrapper::new(sig, self.decode::<Option<serde_json::Value>>()?.into()),
Type::Map { .. } => TypeWrapper::new(
sig,
serde_json::Value::Object(self.decode::<serde_json::Map<String, serde_json::Value>>()?),
),
#[allow(deprecated)]
Type::Link { .. } => TypeWrapper::new(
sig,
serde_json::Value::String(self.decode::<ComponentReference>()?.to_string()),
),
Type::Object => TypeWrapper::new(sig, self.decode::<serde_json::Value>()?),
Type::AnonymousStruct(_) => unimplemented!(),
};
Ok(val)
let val = coerce(self.decode::<serde_json::Value>()?, &sig)?;
Ok(TypeWrapper::new(sig, val))
}

pub fn bytes(&self) -> Option<&Base64Bytes> {
Expand Down Expand Up @@ -495,7 +458,9 @@ impl From<Packet> for Result<RawPayload, PayloadError> {
mod test {
use anyhow::Result;
use serde_json::Value;
use wick_interface_types::Type;

use super::PacketPayload;
use crate::{Base64Bytes, Packet};

#[test]
Expand Down Expand Up @@ -523,6 +488,20 @@ mod test {
Ok(())
}

#[rstest::rstest]
#[case("2", Type::String, Value::String("2".into()))]
#[case(2, Type::String, Value::String("2".into()))]
fn test_type_wrapper<T>(#[case] value: T, #[case] ty: Type, #[case] expected: Value) -> Result<()>
where
T: serde::Serialize + std::fmt::Debug,
{
let packet = PacketPayload::encode(value);
println!("{:?}", packet);
let wrapper = packet.type_wrapper(ty)?;
assert_eq!(wrapper.into_inner(), expected);
Ok(())
}

#[rstest::rstest]
#[case("dGVzdA==", b"test")]
fn test_from_b64(#[case] value: &str, #[case] expected: &[u8]) -> Result<()> {
Expand Down
Loading

0 comments on commit 6d1949b

Please sign in to comment.