Skip to content

Commit

Permalink
Fix tags
Browse files Browse the repository at this point in the history
- Move tags from L7 listeners to L7 frontends
- Keep tags on L4 listeners assuming a single frontend

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Feb 11, 2025
1 parent 8eee269 commit c4d45e7
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 325 deletions.
77 changes: 25 additions & 52 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cell::RefCell,
collections::{hash_map::Entry, BTreeMap, HashMap},
collections::{hash_map::Entry, HashMap},
io::ErrorKind,
net::{Shutdown, SocketAddr},
os::unix::io::AsRawFd,
Expand All @@ -16,7 +16,6 @@ use mio::{
use rusty_ulid::Ulid;

use sozu_command::{
logging::CachedTags,
proto::command::{
request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener,
RequestHttpFrontend, WorkerRequest, WorkerResponse,
Expand All @@ -38,9 +37,9 @@ use crate::{
server::{ListenToken, SessionManager},
socket::server_bind,
timer::TimeoutContainer,
AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, Protocol,
ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics,
SessionResult, StateMachineBuilder, StateResult,
};

#[derive(PartialEq, Eq)]
Expand All @@ -58,7 +57,7 @@ StateMachineBuilder! {
enum HttpStateMachine impl SessionState {
Expect(ExpectProxyProtocol<TcpStream>),
Http(Http<TcpStream, HttpListener>),
WebSocket(Pipe<TcpStream, HttpListener>),
WebSocket(Pipe<TcpStream>),
}
}

Expand Down Expand Up @@ -249,11 +248,11 @@ impl HttpSession {
http.request_stream.storage.buffer,
frontend_token,
http.frontend_socket,
self.listener.clone(),
Protocol::HTTP,
http.context.id,
http.context.session_address,
websocket_context,
http.context.tags,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
Expand All @@ -267,7 +266,7 @@ impl HttpSession {
Some(HttpStateMachine::WebSocket(pipe))
}

fn upgrade_websocket(&self, ws: Pipe<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
fn upgrade_websocket(&self, ws: Pipe<TcpStream>) -> Option<HttpStateMachine> {
// what do we do here?
error!("Upgrade called on WS, this should not happen");
Some(HttpStateMachine::WebSocket(ws))
Expand Down Expand Up @@ -398,27 +397,9 @@ pub struct HttpListener {
config: HttpListenerConfig,
fronts: Router,
listener: Option<MioTcpListener>,
tags: BTreeMap<String, CachedTags>,
token: Token,
}

impl ListenerHandler for HttpListener {
fn get_addr(&self) -> &SocketAddr {
&self.address
}

fn get_tags(&self, key: &str) -> Option<&CachedTags> {
self.tags.get(key)
}

fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
match tags {
Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
None => self.tags.remove(&key),
};
}
}

impl L7ListenerHandler for HttpListener {
fn get_sticky_name(&self) -> &str {
&self.config.sticky_name
Expand Down Expand Up @@ -599,13 +580,9 @@ impl HttpProxy {
.ok_or(ProxyError::NoListenerFound(front.address))?
.borrow_mut();

let hostname = front.hostname.to_owned();
let tags = front.tags.to_owned();

listener
.add_http_front(front)
.map_err(ProxyError::AddFrontend)?;
listener.set_tags(hostname, tags);
Ok(())
}

Expand All @@ -624,13 +601,10 @@ impl HttpProxy {
.ok_or(ProxyError::NoListenerFound(front.address))?
.borrow_mut();

let hostname = front.hostname.to_owned();

listener
.remove_http_front(front)
.map_err(ProxyError::RemoveFrontend)?;

listener.set_tags(hostname, None);
Ok(())
}

Expand Down Expand Up @@ -693,7 +667,6 @@ impl HttpListener {
config,
fronts: Router::new(),
listener: None,
tags: BTreeMap::new(),
token,
})
}
Expand Down Expand Up @@ -1012,6 +985,16 @@ pub mod testing {
mod tests {
extern crate tiny_http;

use std::{
collections::BTreeMap,
io::{Read, Write},
net::TcpStream,
str,
sync::{Arc, Barrier},
thread,
time::Duration,
};

use super::testing::start_http_worker;
use super::*;
use sozu_command::proto::command::{RedirectPolicy, RedirectScheme, SocketAddress};
Expand All @@ -1023,15 +1006,6 @@ mod tests {
response::{Backend, HttpFrontend},
};

use std::{
io::{Read, Write},
net::TcpStream,
str,
sync::{Arc, Barrier},
thread,
time::Duration,
};

/*
#[test]
#[cfg(target_pointer_width = "64")]
Expand Down Expand Up @@ -1370,7 +1344,6 @@ mod tests {
config: default_config,
token: Token(0),
active: true,
tags: BTreeMap::new(),
};

let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
Expand All @@ -1379,20 +1352,20 @@ mod tests {
let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
assert_eq!(
frontend1.expect("should find frontend"),
RouteResult::forward("cluster_1".to_string())
frontend1.expect("should find frontend").cluster_id,
Some("cluster_1".to_string())
);
assert_eq!(
frontend2.expect("should find frontend"),
RouteResult::forward("cluster_1".to_string())
frontend2.expect("should find frontend").cluster_id,
Some("cluster_1".to_string())
);
assert_eq!(
frontend3.expect("should find frontend"),
RouteResult::forward("cluster_2".to_string())
frontend3.expect("should find frontend").cluster_id,
Some("cluster_2".to_string())
);
assert_eq!(
frontend4.expect("should find frontend"),
RouteResult::forward("cluster_3".to_string())
frontend4.expect("should find frontend").cluster_id,
Some("cluster_3".to_string())
);
assert!(frontend5.is_err());
}
Expand Down
67 changes: 21 additions & 46 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cell::RefCell,
collections::{hash_map::Entry, BTreeMap, HashMap},
collections::{hash_map::Entry, HashMap},
io::ErrorKind,
net::{Shutdown, SocketAddr as StdSocketAddr},
os::unix::io::AsRawFd,
Expand Down Expand Up @@ -64,9 +64,9 @@ use crate::{
timer::TimeoutContainer,
tls::MutexCertificateResolver,
util::UnwrapLog,
AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, Protocol,
ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics,
SessionResult, StateMachineBuilder, StateResult,
};

// const SERVER_PROTOS: &[&str] = &["http/1.1", "h2"];
Expand All @@ -83,7 +83,7 @@ StateMachineBuilder! {
Expect(ExpectProxyProtocol<MioTcpStream>, ServerConnection),
Handshake(TlsHandshake),
Http(Http<FrontRustls, HttpsListener>),
WebSocket(Pipe<FrontRustls, HttpsListener>),
WebSocket(Pipe<FrontRustls>),
Http2(Http2<FrontRustls>) -> todo!("H2"),
}
}
Expand Down Expand Up @@ -369,11 +369,11 @@ impl HttpsSession {
http.request_stream.storage.buffer,
front_token,
http.frontend_socket,
self.listener.clone(),
Protocol::HTTPS,
http.context.id,
http.context.session_address,
websocket_context,
http.context.tags,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
Expand All @@ -391,10 +391,7 @@ impl HttpsSession {
todo!()
}

fn upgrade_websocket(
&self,
wss: Pipe<FrontRustls, HttpsListener>,
) -> Option<HttpsStateMachine> {
fn upgrade_websocket(&self, wss: Pipe<FrontRustls>) -> Option<HttpsStateMachine> {
// what do we do here?
error!("Upgrade called on WSS, this should not happen");
Some(HttpsStateMachine::WebSocket(wss))
Expand Down Expand Up @@ -533,27 +530,9 @@ pub struct HttpsListener {
listener: Option<MioTcpListener>,
resolver: Arc<MutexCertificateResolver>,
rustls_details: Arc<RustlsServerConfig>,
tags: BTreeMap<String, CachedTags>,
token: Token,
}

impl ListenerHandler for HttpsListener {
fn get_addr(&self) -> &StdSocketAddr {
&self.address
}

fn get_tags(&self, key: &str) -> Option<&CachedTags> {
self.tags.get(key)
}

fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
match tags {
Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
None => self.tags.remove(&key),
};
}
}

impl L7ListenerHandler for HttpsListener {
fn get_sticky_name(&self) -> &str {
&self.config.sticky_name
Expand Down Expand Up @@ -597,7 +576,6 @@ impl HttpsListener {
)?)),
config,
token,
tags: BTreeMap::new(),
})
}

Expand Down Expand Up @@ -1015,7 +993,6 @@ impl HttpsProxy {
.ok_or(ProxyError::NoListenerFound(front.address))?
.borrow_mut();

listener.set_tags(front.hostname.to_owned(), front.tags.to_owned());
listener
.add_https_front(front)
.map_err(ProxyError::AddFrontend)?;
Expand All @@ -1040,7 +1017,6 @@ impl HttpsProxy {
.ok_or(ProxyError::NoListenerFound(front.address))?
.borrow_mut();

listener.set_tags(front.hostname.to_owned(), None);
listener
.remove_https_front(front)
.map_err(ProxyError::RemoveFrontend)?;
Expand Down Expand Up @@ -1470,11 +1446,11 @@ pub mod testing {
mod tests {
use super::*;

use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use sozu_command::{config::ListenerBuilder, proto::command::SocketAddress};

use crate::router::{pattern_trie::TrieNode, MethodRule, PathRule, Route, Router};
use crate::router::{pattern_trie::TrieNode, Frontend, MethodRule, PathRule, Router};

/*
#[test]
Expand Down Expand Up @@ -1507,25 +1483,25 @@ mod tests {
"lolcatho.st".as_bytes(),
&PathRule::Prefix(uri1),
&MethodRule::new(None),
&Route::forward(cluster_id1.clone())
&Frontend::forward(cluster_id1.clone())
));
assert!(fronts.add_tree_rule(
"lolcatho.st".as_bytes(),
&PathRule::Prefix(uri2),
&MethodRule::new(None),
&Route::forward(cluster_id2)
&Frontend::forward(cluster_id2)
));
assert!(fronts.add_tree_rule(
"lolcatho.st".as_bytes(),
&PathRule::Prefix(uri3),
&MethodRule::new(None),
&Route::forward(cluster_id3)
&Frontend::forward(cluster_id3)
));
assert!(fronts.add_tree_rule(
"other.domain".as_bytes(),
&PathRule::Prefix("test".to_string()),
&MethodRule::new(None),
&Route::forward(cluster_id1)
&Frontend::forward(cluster_id1)
));

let address = SocketAddress::new_v4(127, 0, 0, 1, 1032);
Expand Down Expand Up @@ -1557,32 +1533,31 @@ mod tests {
config: default_config,
token: Token(0),
active: true,
tags: BTreeMap::new(),
};

println!("TEST {}", line!());
let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
assert_eq!(
frontend1.expect("should find a frontend"),
RouteResult::forward("cluster_1".to_string())
frontend1.expect("should find a frontend").cluster_id,
Some("cluster_1".to_string())
);
println!("TEST {}", line!());
let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
assert_eq!(
frontend2.expect("should find a frontend"),
RouteResult::forward("cluster_1".to_string())
frontend2.expect("should find a frontend").cluster_id,
Some("cluster_1".to_string())
);
println!("TEST {}", line!());
let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
assert_eq!(
frontend3.expect("should find a frontend"),
RouteResult::forward("cluster_2".to_string())
frontend3.expect("should find a frontend").cluster_id,
Some("cluster_2".to_string())
);
println!("TEST {}", line!());
let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
assert_eq!(
frontend4.expect("should find a frontend"),
RouteResult::forward("cluster_3".to_string())
frontend4.expect("should find a frontend").cluster_id,
Some("cluster_3".to_string())
);
println!("TEST {}", line!());
let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
Expand Down
Loading

0 comments on commit c4d45e7

Please sign in to comment.