Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Nov 28, 2024
1 parent abdb1ef commit 219a843
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 49 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ quilkin-proto.workspace = true

# Crates.io
arc-swap.workspace = true
async-channel.workspace = true
async-stream.workspace = true
base64.workspace = true
base64-serde = "0.8.0"
Expand Down Expand Up @@ -194,7 +193,6 @@ edition = "2021"

[workspace.dependencies]
arc-swap = { version = "1.7.1", features = ["serde"] }
async-channel = "2.3.1"
async-stream = "0.3.6"
base64 = "0.22.1"
cached = { version = "0.54", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ publish = false
workspace = true

[dependencies]
async-channel.workspace = true
once_cell.workspace = true
quilkin.workspace = true
rand.workspace = true
Expand Down
47 changes: 19 additions & 28 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use qt::*;
use quilkin::test::TestConfig;
use quilkin::{components::proxy, test::TestConfig};
use tracing::Instrument as _;

trace_test!(server, {
Expand Down Expand Up @@ -87,8 +87,7 @@ trace_test!(uring_receiver, {

let (mut packet_rx, endpoint) = sb.server("server");

let (error_sender, mut error_receiver) =
tokio::sync::mpsc::channel::<quilkin::components::proxy::ErrorMap>(20);
let (error_sender, mut error_receiver) = tokio::sync::mpsc::channel::<proxy::ErrorMap>(20);

tokio::task::spawn(
async move {
Expand All @@ -105,37 +104,32 @@ trace_test!(uring_receiver, {
config
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));
let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let socket = sb.client();
let (ws, addr) = sb.socket();

let pending_sends = proxy::PendingSends::new(1).unwrap();

// we'll test a single DownstreamReceiveWorkerConfig
let ready = quilkin::components::proxy::packet_router::DownstreamReceiveWorkerConfig {
proxy::packet_router::DownstreamReceiveWorkerConfig {
worker_id: 1,
port: addr.port(),
upstream_receiver: rx.clone(),
config: config.clone(),
error_sender,
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
sessions: quilkin::components::proxy::SessionPool::new(
sessions: proxy::SessionPool::new(
config,
tx,
vec![pending_sends.0.clone()],
BUFFER_POOL.clone(),
shutdown_rx.clone(),
),
}
.spawn(shutdown_rx)
.spawn(pending_sends)
.await
.expect("failed to spawn task");

// Drop the socket, otherwise it can
drop(ws);

ready.recv().unwrap();

let msg = "hello-downstream";
tracing::debug!("sending packet");
socket.send_to(msg.as_bytes(), addr).await.unwrap();
Expand All @@ -158,36 +152,33 @@ trace_test!(
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));

let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);
let pending_sends: Vec<_> = [
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
]
.into_iter()
.collect();

let sessions = quilkin::components::proxy::SessionPool::new(
let sessions = proxy::SessionPool::new(
config.clone(),
tx,
pending_sends.iter().map(|ps| ps.0.clone()).collect(),
BUFFER_POOL.clone(),
shutdown_rx.clone(),
);

const WORKER_COUNT: usize = 3;

let (socket, addr) = sb.socket();
let workers = quilkin::components::proxy::packet_router::spawn_receivers(
proxy::packet_router::spawn_receivers(
config,
socket,
WORKER_COUNT,
pending_sends,
&sessions,
rx,
BUFFER_POOL.clone(),
shutdown_rx,
)
.await
.unwrap();

for wn in workers {
wn.recv().unwrap();
}

let socket = std::sync::Arc::new(sb.client());
let msg = "recv-from";

Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct PendingSends {
}

impl PendingSends {
pub(crate) fn new(capacity: usize) -> std::io::Result<(Self, PacketSendReceiver)> {
pub fn new(capacity: usize) -> std::io::Result<(Self, PacketSendReceiver)> {
#[cfg(target_os = "linux")]
let (notify, rx) = {
let rx = io_uring_shared::EventFd::new()?;
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
///
/// We use eventfd to signal to io uring loops from async tasks, it is essentially
/// the equivalent of a signalling 64 bit cross-process atomic
pub(crate) struct EventFd {
pub struct EventFd {
fd: std::os::fd::OwnedFd,
val: u64,
}
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl DownstreamReceiveWorkerConfig {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
pub(crate) async fn spawn_receivers(
pub async fn spawn_receivers(
config: Arc<Config>,
socket: socket2::Socket,
worker_sends: Vec<(super::PendingSends, super::PacketSendReceiver)>,
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::components::proxy;
use eyre::Context as _;

impl super::DownstreamReceiveWorkerConfig {
pub(crate) async fn spawn(
pub async fn spawn(
self,
pending_sends: (proxy::PendingSends, proxy::PacketSendReceiver),
) -> eyre::Result<()> {
Expand Down

0 comments on commit 219a843

Please sign in to comment.