From 3dfdccba73c6c91c72c7cbc32f5f85781b3abf22 Mon Sep 17 00:00:00 2001 From: kazk Date: Thu, 25 Feb 2021 21:17:09 -0800 Subject: [PATCH 1/2] Readd optional proxy support --- examples/Cargo.toml | 11 ++ examples/proxy.rs | 34 ++++++ kube/Cargo.toml | 4 + kube/src/config/file_config.rs | 2 + kube/src/config/mod.rs | 5 + kube/src/error.rs | 4 + kube/src/service/connector.rs | 190 +++++++++++++++++++++++++++++++++ kube/src/service/mod.rs | 49 ++++++--- kube/src/service/tls.rs | 174 ------------------------------ 9 files changed, 283 insertions(+), 190 deletions(-) create mode 100644 examples/proxy.rs create mode 100644 kube/src/service/connector.rs delete mode 100644 kube/src/service/tls.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index f8590df62..d967d425e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -15,6 +15,9 @@ kubederive = ["kube/derive"] # by default import kube-derive with its default fe schema = ["kube-derive/schema"] # crd_derive_no_schema shows how to opt out native-tls = ["kube/native-tls", "kube-runtime/native-tls"] rustls-tls = ["kube/rustls-tls", "kube-runtime/rustls-tls"] +proxy = [] # To not compile proxy example without required feature set. +proxy-native-tls = ["proxy", "kube/proxy-native-tls"] # Enable this when trying proxy example +proxy-rustls-tls = ["proxy", "kube/proxy-rustls-tls"] # Enable this when trying proxy example ws = ["kube/ws"] [dev-dependencies] @@ -130,6 +133,14 @@ path = "pod_reflector.rs" name = "pod_watcher" path = "pod_watcher.rs" +# Use one of the following when trying proxy example: +# - `--no-default-features --features=proxy-native-tls` +# - `--no-default-features --features=proxy-rustls-tls` +[[example]] +name = "proxy" +path = "proxy.rs" +required-features = ["proxy"] + [[example]] name = "node_reflector" path = "node_reflector.rs" diff --git a/examples/proxy.rs b/examples/proxy.rs new file mode 100644 index 000000000..95ebecbfb --- /dev/null +++ b/examples/proxy.rs @@ -0,0 +1,34 @@ +#[macro_use] extern crate log; +use k8s_openapi::api::core::v1::Namespace; + +use kube::{ + api::{Api, ListParams}, + config::KubeConfigOptions, + Client, Config, +}; + +use std::convert::TryFrom; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "info,kube=debug"); + env_logger::init(); + + let mut config = Config::from_kubeconfig(&KubeConfigOptions::default()).await?; + + if let Ok(proxy_url) = &std::env::var("PROXY_URL") { + info!("PROXY_URL is {}", proxy_url); + config.proxy_url = Some(proxy_url.to_owned()); + } else { + warn!("Running without PROXY_URL environment variable set"); + } + + let client = Client::try_from(config).unwrap(); + // Verify we can access kubernetes through proxy + let ns_api: Api = Api::all(client); + let namespaces = ns_api.list(&ListParams::default()).await?; + assert!(namespaces.items.len() > 0); + info!("Found {} namespaces", namespaces.items.len()); + + Ok(()) +} diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 47744ee6b..c941200fc 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -24,6 +24,8 @@ jsonpatch = ["json-patch"] ws = ["tokio-tungstenite"] oauth = ["tame-oauth"] gzip = ["async-compression"] +proxy-native-tls = ["native-tls", "hyper-proxy/tls"] +proxy-rustls-tls = ["rustls-tls", "hyper-proxy/rustls-base"] [package.metadata.docs.rs] features = ["derive", "ws", "oauth", "jsonpatch"] @@ -64,6 +66,8 @@ async-compression = { version = "0.3.7", features = ["gzip", "tokio"], optional hyper-timeout = "0.4.1" tame-oauth = { version = "0.4.7", features = ["gcp"], optional = true } pin-project = "1.0.4" +hyper-proxy = { version = "0.9.0", default-features = false, optional = true } + [dependencies.k8s-openapi] version = "0.11.0" diff --git a/kube/src/config/file_config.rs b/kube/src/config/file_config.rs index f485c94ed..3a927cf4d 100644 --- a/kube/src/config/file_config.rs +++ b/kube/src/config/file_config.rs @@ -59,6 +59,8 @@ pub struct Cluster { #[serde(rename = "certificate-authority-data")] pub certificate_authority_data: Option, pub extensions: Option>, + #[serde(rename = "proxy-url")] + pub proxy_url: Option, } /// NamedAuthInfo associates name with authentication. diff --git a/kube/src/config/mod.rs b/kube/src/config/mod.rs index 59885f745..fc8d2f823 100644 --- a/kube/src/config/mod.rs +++ b/kube/src/config/mod.rs @@ -40,6 +40,8 @@ pub struct Config { pub(crate) identity: Option<(Vec, String)>, /// Stores information to tell the cluster who you are. pub(crate) auth_info: AuthInfo, + /// Proxy URL. Not used unless `proxy-native-tls` or `proxy-rustls-tls` feature is enabled. + pub proxy_url: Option, } impl Config { @@ -58,6 +60,7 @@ impl Config { accept_invalid_certs: false, identity: None, auth_info: AuthInfo::default(), + proxy_url: None, } } @@ -119,6 +122,7 @@ impl Config { token: Some(token), ..Default::default() }, + proxy_url: None, }) } @@ -182,6 +186,7 @@ impl Config { accept_invalid_certs, identity: identity_pem.map(|i| (i, String::from(IDENTITY_PASSWORD))), auth_info: loader.user, + proxy_url: loader.cluster.proxy_url, }) } } diff --git a/kube/src/error.rs b/kube/src/error.rs index 1a4d83632..d2cfdbd5b 100644 --- a/kube/src/error.rs +++ b/kube/src/error.rs @@ -50,6 +50,10 @@ pub enum Error { #[error("InternalUrlError: {0}")] InternalUrlError(#[from] url::ParseError), + /// Invalid Url + #[error("InvalidUrlError: {0}")] + InvalidUrlError(#[from] http::uri::InvalidUri), + /// Common error case when requesting parsing into own structs #[error("Error deserializing response")] SerdeError(#[from] serde_json::Error), diff --git a/kube/src/service/connector.rs b/kube/src/service/connector.rs new file mode 100644 index 000000000..132dfee4f --- /dev/null +++ b/kube/src/service/connector.rs @@ -0,0 +1,190 @@ +pub use inner::https_connector; +#[cfg(any(feature = "proxy-native-tls", feature = "proxy-rustls-tls"))] +pub use inner::proxy_connector; + +#[cfg(feature = "rustls-tls")] pub use inner::tls_config; +#[cfg(feature = "native-tls")] pub use inner::tls_connector; + +#[cfg(feature = "native-tls")] +mod inner { + use hyper::client::HttpConnector; + use hyper_tls::HttpsConnector; + use tokio_native_tls::{ + native_tls::{self, Certificate, Identity}, + TlsConnector as AsyncTlsConnector, + }; + + #[cfg(feature = "proxy-native-tls")] + use hyper_proxy::{Intercept, Proxy, ProxyConnector}; + + use crate::{Error, Result}; + + pub fn tls_connector( + identity: Option<(Vec, String)>, + root_cert: Option>>, + accept_invalid: bool, + ) -> Result { + let mut builder = native_tls::TlsConnector::builder(); + if let Some((pem, identity_password)) = identity.as_ref() { + let identity = pkcs12_from_pem(pem, identity_password)?; + builder.identity( + Identity::from_pkcs12(&identity, identity_password) + .map_err(|e| Error::SslError(format!("{}", e)))?, + ); + } + + if let Some(ders) = root_cert { + for der in ders { + builder.add_root_certificate( + Certificate::from_der(&der).map_err(|e| Error::SslError(format!("{}", e)))?, + ); + } + } + + if accept_invalid { + builder.danger_accept_invalid_certs(accept_invalid); + } + + builder.build().map_err(|e| Error::SslError(format!("{}", e))) + } + + pub fn https_connector(connector: native_tls::TlsConnector) -> HttpsConnector { + let mut http = HttpConnector::new(); + http.enforce_http(false); + HttpsConnector::from((http, AsyncTlsConnector::from(connector))) + } + + #[cfg(feature = "proxy-native-tls")] + pub fn proxy_connector( + connector: native_tls::TlsConnector, + proxy_url: Option, + ) -> hyper_proxy::ProxyConnector> { + let mut proxy = ProxyConnector::unsecured(https_connector(connector.clone())); + if let Some(proxy_url) = proxy_url { + proxy.add_proxy(Proxy::new(Intercept::All, proxy_url)); + } + proxy.set_tls(Some(connector)); + proxy + } + + fn pkcs12_from_pem(pem: &[u8], password: &str) -> Result> { + use openssl::{pkcs12::Pkcs12, pkey::PKey, x509::X509}; + let x509 = X509::from_pem(&pem)?; + let pkey = PKey::private_key_from_pem(&pem)?; + let p12 = Pkcs12::builder().build(password, "kubeconfig", &pkey, &x509)?; + let der = p12.to_der()?; + Ok(der) + } +} + +#[cfg(feature = "rustls-tls")] +mod inner { + use std::sync::Arc; + + use hyper::client::HttpConnector; + use hyper_rustls::HttpsConnector; + use tokio_rustls::{ + rustls::{self, Certificate, ClientConfig, ServerCertVerified, ServerCertVerifier}, + webpki::DNSNameRef, + }; + + #[cfg(feature = "proxy-rustls-tls")] + use hyper_proxy::{Intercept, Proxy, ProxyConnector}; + + use crate::{Error, Result}; + + pub fn tls_config( + identity: Option<(Vec, String)>, + root_cert: Option>>, + accept_invalid: bool, + ) -> Result { + use rustls::internal::pemfile; + use std::io::Cursor; + + // Based on code from `reqwest` + let mut client_config = ClientConfig::new(); + if let Some((buf, _)) = identity.as_ref() { + let (key, certs) = { + let mut pem = Cursor::new(buf); + let certs = pemfile::certs(&mut pem) + .map_err(|_| Error::SslError("No valid certificate was found".into()))?; + pem.set_position(0); + + let mut sk = pemfile::pkcs8_private_keys(&mut pem) + .and_then(|pkcs8_keys| { + if pkcs8_keys.is_empty() { + Err(()) + } else { + Ok(pkcs8_keys) + } + }) + .or_else(|_| { + pem.set_position(0); + pemfile::rsa_private_keys(&mut pem) + }) + .map_err(|_| Error::SslError("No valid private key was found".into()))?; + + if let (Some(sk), false) = (sk.pop(), certs.is_empty()) { + (sk, certs) + } else { + return Err(Error::SslError("private key or certificate not found".into())); + } + }; + + client_config + .set_single_client_cert(certs, key) + .map_err(|e| Error::SslError(format!("{}", e)))?; + } + + if let Some(ders) = root_cert { + for der in ders { + client_config + .root_store + .add(&Certificate(der)) + .map_err(|e| Error::SslError(format!("{}", e)))?; + } + } + + if accept_invalid { + client_config + .dangerous() + .set_certificate_verifier(Arc::new(NoCertificateVerification {})); + } + + Ok(client_config) + } + + pub fn https_connector(tls_config: Arc) -> HttpsConnector { + let mut http = HttpConnector::new(); + http.enforce_http(false); + HttpsConnector::from((http, tls_config)) + } + + #[cfg(feature = "proxy-rustls-tls")] + pub fn proxy_connector( + tls_config: Arc, + proxy_url: Option, + ) -> hyper_proxy::ProxyConnector> { + let mut connector = ProxyConnector::unsecured(https_connector(tls_config.clone())); + if let Some(proxy_url) = proxy_url { + connector.add_proxy(Proxy::new(Intercept::All, proxy_url)); + } + let tls = tokio_rustls::TlsConnector::from(tls_config); + connector.set_tls(Some(tls)); + connector + } + + struct NoCertificateVerification {} + + impl ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _roots: &rustls::RootCertStore, + _presented_certs: &[rustls::Certificate], + _dns_name: DNSNameRef<'_>, + _ocsp: &[u8], + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + } +} diff --git a/kube/src/service/mod.rs b/kube/src/service/mod.rs index 28aedea90..443aa4d99 100644 --- a/kube/src/service/mod.rs +++ b/kube/src/service/mod.rs @@ -2,23 +2,22 @@ mod auth; #[cfg(feature = "gzip")] mod compression; +mod connector; mod headers; mod log; -mod tls; mod url; use self::{log::LogRequest, url::set_cluster_url}; use auth::AuthLayer; #[cfg(feature = "gzip")] use compression::{accept_compressed, maybe_decompress}; use headers::set_default_headers; -use tls::HttpsConnector; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use http::{HeaderValue, Request, Response}; use hyper::{Body, Client as HyperClient}; use hyper_timeout::TimeoutConnector; -use tower::{buffer::Buffer, util::BoxService, BoxError, ServiceBuilder}; +use tower::{buffer::Buffer, layer, util::BoxService, BoxError, ServiceBuilder}; use crate::{error::ConfigError, Config, Error, Result}; use auth::Authentication; @@ -104,22 +103,40 @@ impl TryFrom for Service { .map_response(maybe_decompress) .into_inner(); - let https: HttpsConnector<_> = config.try_into()?; - let mut connector = TimeoutConnector::new(https); - if let Some(timeout) = timeout { - // reqwest's timeout is applied from when the request stars connecting until - // the response body has finished. - // Setting both connect and read timeout should be close enough. - connector.set_connect_timeout(Some(timeout)); - // Timeout for reading the response. - connector.set_read_timeout(Some(timeout)); - } - let client: HyperClient<_, Body> = HyperClient::builder().build(connector); + let with_timeout = layer::layer_fn(|c| { + let mut connector = TimeoutConnector::new(c); + if let Some(timeout) = timeout { + // reqwest's timeout is applied from when the request stars connecting until + // the response body has finished. + // Setting both connect and read timeout should be close enough. + connector.set_connect_timeout(Some(timeout)); + // Timeout for reading the response. + connector.set_read_timeout(Some(timeout)); + } + connector + }); + + #[cfg(feature = "native-tls")] + let tls = connector::tls_connector(config.identity, config.root_cert, config.accept_invalid_certs)?; + #[cfg(feature = "rustls-tls")] + let tls = std::sync::Arc::new(connector::tls_config( + config.identity, + config.root_cert, + config.accept_invalid_certs, + )?); + + #[cfg(not(any(feature = "proxy-native-tls", feature = "proxy-rustls-tls")))] + let conn = connector::https_connector(tls); + #[cfg(any(feature = "proxy-native-tls", feature = "proxy-rustls-tls"))] + let conn = connector::proxy_connector(tls, config.proxy_url.map(|s| s.parse()).transpose()?); + + let conn = ServiceBuilder::new().layer(with_timeout).service(conn); + let client: HyperClient<_, Body> = HyperClient::builder().build(conn); let inner = ServiceBuilder::new() .layer(common) .option_layer(maybe_auth) - .layer(tower::layer::layer_fn(LogRequest::new)) + .layer(layer::layer_fn(LogRequest::new)) .service(client); Ok(Self::new(inner)) } diff --git a/kube/src/service/tls.rs b/kube/src/service/tls.rs deleted file mode 100644 index 5c26807a9..000000000 --- a/kube/src/service/tls.rs +++ /dev/null @@ -1,174 +0,0 @@ -// Create `HttpsConnector` from `Config`. -// - hyper_tls::HttpsConnector from (hyper::client::HttpConnector, tokio_native_tls::TlsConnector) -// - hyper_rustls::HttpsConnector from (hyper::client::HttpConnector, Arc) - -pub use connector::HttpsConnector; - -#[cfg(feature = "native-tls")] -mod connector { - use std::convert::{TryFrom, TryInto}; - - use hyper::client::HttpConnector; - use tokio_native_tls::native_tls::{Certificate, Identity, TlsConnector}; - - use crate::{Config, Error, Result}; - - pub use hyper_tls::HttpsConnector; - use tokio_native_tls::TlsConnector as AsyncTlsConnector; - - impl TryFrom for HttpsConnector { - type Error = Error; - - fn try_from(config: Config) -> Result { - let mut http = HttpConnector::new(); - http.enforce_http(false); - let tls: AsyncTlsConnector = config.try_into()?; - Ok(HttpsConnector::from((http, tls))) - } - } - - impl TryFrom for AsyncTlsConnector { - type Error = Error; - - fn try_from(config: Config) -> Result { - let mut builder = TlsConnector::builder(); - if let Some((pem, identity_password)) = config.identity.as_ref() { - let identity = pkcs12_from_pem(pem, identity_password)?; - builder.identity( - Identity::from_pkcs12(&identity, identity_password) - .map_err(|e| Error::SslError(format!("{}", e)))?, - ); - } - - if let Some(ders) = config.root_cert { - for der in ders { - builder.add_root_certificate( - Certificate::from_der(&der).map_err(|e| Error::SslError(format!("{}", e)))?, - ); - } - } - - if config.accept_invalid_certs { - builder.danger_accept_invalid_certs(config.accept_invalid_certs); - } - - let connector = builder.build().map_err(|e| Error::SslError(format!("{}", e)))?; - Ok(AsyncTlsConnector::from(connector)) - } - } - - fn pkcs12_from_pem(pem: &[u8], password: &str) -> Result> { - use openssl::{pkcs12::Pkcs12, pkey::PKey, x509::X509}; - let x509 = X509::from_pem(&pem)?; - let pkey = PKey::private_key_from_pem(&pem)?; - let p12 = Pkcs12::builder().build(password, "kubeconfig", &pkey, &x509)?; - let der = p12.to_der()?; - Ok(der) - } -} - -#[cfg(feature = "rustls-tls")] -mod connector { - use std::{ - convert::{TryFrom, TryInto}, - sync::Arc, - }; - - use hyper::client::HttpConnector; - use tokio_rustls::{ - rustls::{self, Certificate, ClientConfig, ServerCertVerified, ServerCertVerifier}, - webpki::DNSNameRef, - }; - - use crate::{config::Config, Error, Result}; - - pub use hyper_rustls::HttpsConnector; - - impl TryFrom for HttpsConnector { - type Error = Error; - - fn try_from(config: Config) -> Result { - let mut http = HttpConnector::new(); - http.enforce_http(false); - let client_config: ClientConfig = config.try_into()?; - let client_config = Arc::new(client_config); - - Ok(HttpsConnector::from((http, client_config))) - } - } - - impl TryFrom for ClientConfig { - type Error = Error; - - fn try_from(config: Config) -> Result { - use rustls::internal::pemfile; - use std::io::Cursor; - - // Based on code from `reqwest` - let mut client_config = ClientConfig::new(); - if let Some((buf, _)) = config.identity.as_ref() { - let (key, certs) = { - let mut pem = Cursor::new(buf); - let certs = pemfile::certs(&mut pem) - .map_err(|_| Error::SslError("No valid certificate was found".into()))?; - pem.set_position(0); - - let mut sk = pemfile::pkcs8_private_keys(&mut pem) - .and_then(|pkcs8_keys| { - if pkcs8_keys.is_empty() { - Err(()) - } else { - Ok(pkcs8_keys) - } - }) - .or_else(|_| { - pem.set_position(0); - pemfile::rsa_private_keys(&mut pem) - }) - .map_err(|_| Error::SslError("No valid private key was found".into()))?; - - if let (Some(sk), false) = (sk.pop(), certs.is_empty()) { - (sk, certs) - } else { - return Err(Error::SslError("private key or certificate not found".into())); - } - }; - - client_config - .set_single_client_cert(certs, key) - .map_err(|e| Error::SslError(format!("{}", e)))?; - } - - if let Some(ders) = config.root_cert { - for der in ders { - client_config - .root_store - .add(&Certificate(der)) - .map_err(|e| Error::SslError(format!("{}", e)))?; - } - } - - if config.accept_invalid_certs { - client_config - .dangerous() - .set_certificate_verifier(Arc::new(NoCertificateVerification {})); - } - - Ok(client_config) - } - } - - struct NoCertificateVerification {} - - impl ServerCertVerifier for NoCertificateVerification { - fn verify_server_cert( - &self, - _roots: &rustls::RootCertStore, - _presented_certs: &[rustls::Certificate], - _dns_name: DNSNameRef<'_>, - _ocsp: &[u8], - ) -> Result { - Ok(ServerCertVerified::assertion()) - } - } -} From 33dcc4a7256fddd25a14943dcfc665944314e72e Mon Sep 17 00:00:00 2001 From: kazk Date: Sun, 28 Feb 2021 00:56:44 -0800 Subject: [PATCH 2/2] Return error when connecting to proxy without the required feature --- kube/src/error.rs | 4 ++++ kube/src/service/mod.rs | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/kube/src/error.rs b/kube/src/error.rs index d2cfdbd5b..cbdad93e0 100644 --- a/kube/src/error.rs +++ b/kube/src/error.rs @@ -21,6 +21,10 @@ pub enum Error { #[error("ConnectionError: {0}")] Connection(std::io::Error), + /// Returned when trying to connect to a proxy using `kube` without proxy support. + #[error("Proxy is not supported")] + ProxyNotSupported, + /// Hyper error #[error("HyperError: {0}")] HyperError(#[from] hyper::Error), diff --git a/kube/src/service/mod.rs b/kube/src/service/mod.rs index 443aa4d99..777106915 100644 --- a/kube/src/service/mod.rs +++ b/kube/src/service/mod.rs @@ -126,7 +126,12 @@ impl TryFrom for Service { )?); #[cfg(not(any(feature = "proxy-native-tls", feature = "proxy-rustls-tls")))] - let conn = connector::https_connector(tls); + let conn = { + if config.proxy_url.is_some() { + return Err(Error::ProxyNotSupported); + } + connector::https_connector(tls) + }; #[cfg(any(feature = "proxy-native-tls", feature = "proxy-rustls-tls"))] let conn = connector::proxy_connector(tls, config.proxy_url.map(|s| s.parse()).transpose()?);