Skip to content

Commit

Permalink
Reworking integration tests, vol.2
Browse files Browse the repository at this point in the history
  • Loading branch information
tyranron committed Nov 8, 2023
1 parent e2561c7 commit 8ba8de5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 68 deletions.
2 changes: 1 addition & 1 deletion juniper/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ pub mod tests {

#[allow(missing_docs)]
pub async fn run_test_suite<T: WsIntegration>(integration: &T) {
println!("Running `graphql-ws` test suite for integration");
println!("Running `graphql-transport-ws` test suite for integration");

println!(" - graphql_ws::test_simple_subscription");
test_simple_subscription(integration).await;
Expand Down
2 changes: 1 addition & 1 deletion juniper_axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ anyhow = "1.0"
axum = { version = "0.6", features = ["macros"] }
hyper = "0.14"
juniper = { version = "0.16.0-dev", path = "../juniper", features = ["expose-test-schema"] }
tokio = { version = "1.20", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.20", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
tokio-tungstenite = "0.20"
tracing = "0.1"
Expand Down
116 changes: 50 additions & 66 deletions juniper_axum/tests/ws_test_suite.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
time::Duration,
};

use anyhow::anyhow;
Expand All @@ -14,8 +13,7 @@ use juniper::{
};
use juniper_axum::subscriptions;
use juniper_graphql_ws::ConnectionConfig;
use serde_json::Value;
use tokio::net::TcpStream;
use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};

type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
Expand Down Expand Up @@ -65,11 +63,59 @@ impl TestApp {
.unwrap();

for msg in messages {
process_message(&mut websocket, msg).await?;
Self::process_message(&mut websocket, msg).await?;
}

Ok(())
}

async fn process_message(
websocket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
message: WsIntegrationMessage,
) -> Result<(), anyhow::Error> {
match message {
WsIntegrationMessage::Send(msg) => websocket
.send(Message::Text(msg.to_string()))
.await
.map_err(|e| anyhow!("Could not send message: {e}"))
.map(drop),

WsIntegrationMessage::Expect(expected, duration) => {
let message = timeout(duration, websocket.next())
.await
.map_err(|e| anyhow!("Timed out receiving message. Elapsed: {e}"))?;
match message {
None => Err(anyhow!("No message received")),
Some(Err(e)) => Err(anyhow!("WebSocket error: {e}")),
Some(Ok(Message::Text(json))) => {
let actual: serde_json::Value = serde_json::from_str(&json)
.map_err(|e| anyhow!("Cannot deserialize received message: {e}"))?;
if actual != expected {
return Err(anyhow!(
"Expected message: {expected}. \
Received message: {actual}",
));
}
Ok(())
}
Some(Ok(Message::Close(Some(frame)))) => {
let actual = serde_json::json!({
"code": u16::from(frame.code),
"description": frame.reason,
});
if actual != expected {
return Err(anyhow!(
"Expected message: {expected}. \
Received message: {actual}",
));
}
Ok(())
}
Some(Ok(msg)) => Err(anyhow!("Received non-text message: {msg:?}")),
}
}
}
}
}

impl WsIntegration for TestApp {
Expand All @@ -81,68 +127,6 @@ impl WsIntegration for TestApp {
}
}

async fn process_message(
mut websocket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
message: WsIntegrationMessage,
) -> Result<(), anyhow::Error> {
match message {
WsIntegrationMessage::Send(msg) => websocket.send(Message::Text(msg.to_string())).await
.map_err(|e| anyhow!("Could not send message: {e}"))
.map(drop),
WsIntegrationMessage::Expect(expected, timeout) => {
receive_message_from_socket_and_test(&mut websocket, &expected, timeout).await
}
}
}



async fn receive_message_from_socket_and_test(
websocket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
expected: &String,
timeout: u64,
) -> Result<(), anyhow::Error> {
let message = tokio::time::timeout(Duration::from_millis(timeout), websocket.next())
.await
.map_err(|e| anyhow!("Timed out receiving message. Elapsed: {e}"))?;

match message {
None => Err(anyhow!("No Message received")),
Some(Err(e)) => Err(anyhow!("Websocket error: {:?}", e)),
Some(Ok(message)) => equals_received_text_message(&expected, message),
}
}

fn equals_received_text_message(expected: &String, message: Message) -> Result<(), anyhow::Error> {
match message {
Message::Text(received) => is_the_same(&expected, &received),
Message::Binary(_) => Err(anyhow!("Received binary message, but expected text")),
Message::Ping(_) => Err(anyhow!("Received ping message, but expected text")),
Message::Pong(_) => Err(anyhow!("Received pong message, but expected text")),
Message::Close(_) => Err(anyhow!("Received close message, but expected text")),
Message::Frame(_) => Err(anyhow!("Received frame message, but expected text")),
}
}

/// Check if expected == received by transforming both to a JSON value
fn is_the_same(expected: &String, received: &String) -> Result<(), anyhow::Error> {
let expected: Value =
serde_json::from_str(&expected).map_err(|e| anyhow::anyhow!("Serde error: {e:?}"))?;

let received: Value =
serde_json::from_str(&received).map_err(|e| anyhow::anyhow!("Serde error: {e:?}"))?;

if received != expected {
return Err(anyhow!(
"Expected: {:?}\nReceived: {:?}",
expected,
received
));
}

Ok(())
}

#[tokio::test]
async fn test_graphql_ws_integration() {
let app = TestApp::new("graphql-ws");
Expand Down

0 comments on commit 8ba8de5

Please sign in to comment.