diff --git a/Cargo.toml b/Cargo.toml index 92a471b4f7..67036a92f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,8 @@ tryhard = "0.4.0" eyre = "0.6.5" stable-eyre = "0.2.2" ipnetwork = "0.18.0" +regex = "1.5.4" +serde_regex = "1.1.0" [target.'cfg(target_os = "linux")'.dependencies] sys-info = "0.9.0" diff --git a/build.rs b/build.rs index 72a86e4cfe..4c113601e8 100644 --- a/build.rs +++ b/build.rs @@ -29,7 +29,7 @@ fn main() -> Result<(), Box> { "proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto", "proto/udpa/xds/core/v3/resource_name.proto", "proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto", - "proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto", + "proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto", "proto/quilkin/extensions/filters/compress/v1alpha1/compress.proto", "proto/quilkin/extensions/filters/concatenate_bytes/v1alpha1/concatenate_bytes.proto", "proto/quilkin/extensions/filters/load_balancer/v1alpha1/load_balancer.proto", diff --git a/docs/src/filters/capture_bytes.md b/docs/src/filters/capture_bytes.md index e36fd333bd..94b02af208 100644 --- a/docs/src/filters/capture_bytes.md +++ b/docs/src/filters/capture_bytes.md @@ -10,7 +10,7 @@ This is often used as a way of retrieving authentication tokens from a packet, a #### Filter name ```text -quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes +quilkin.extensions.filters.capture.v1alpha1.Capture ``` ### Configuration Examples @@ -19,12 +19,13 @@ quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes version: v1alpha1 static: filters: - - name: quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes + - name: quilkin.extensions.filters.capture.v1alpha1.Capture config: - strategy: PREFIX metadataKey: myapp.com/myownkey - size: 3 - remove: false + strategy: + kind: PREFIX + size: 3 + remove: false endpoints: - address: 127.0.0.1:7001 # "; @@ -33,12 +34,12 @@ static: # quilkin::Builder::from(std::sync::Arc::new(config)).validate().unwrap(); ``` -### Configuration Options ([Rust Doc](../../api/quilkin/filters/capture_bytes/struct.Config.html)) +### Configuration Options ([Rust Doc](../../api/quilkin/filters/capture/struct.Config.html)) ```yaml properties: strategy: - type: string + type: map description: | The selected strategy for capturing the series of bytes from the incoming packet. - SUFFIX: Retrieve bytes from the end of the packet. @@ -47,7 +48,7 @@ properties: enum: ['PREFIX', 'SUFFIX'] metadataKey: type: string - default: quilkin.dev/captured_bytes + default: quilkin.dev/captured description: | The key under which the captured bytes are stored in the Filter invocation values. size: @@ -65,7 +66,7 @@ properties: ### Metrics -* `quilkin_filter_CaptureBytes_packets_dropped` +* `quilkin_filter_Capture_packets_dropped` A counter of the total number of packets that have been dropped due to their length being less than the configured `size`. diff --git a/macros/src/include.rs b/macros/src/include.rs index bb608c2c95..443a9e6f98 100644 --- a/macros/src/include.rs +++ b/macros/src/include.rs @@ -42,12 +42,9 @@ impl ToTokens for IncludeProto { fn to_tokens(&self, tokens: &mut TokenStream) { let id = &self.id; - let doc_hidden: syn::Attribute = syn::parse_quote!(#![doc(hidden)]); let tonic_include_proto: syn::Stmt = syn::parse_quote!(tonic::include_proto!(#id);); - let items: Vec = vec![ - syn::Item::Verbatim(doc_hidden.to_token_stream()), - syn::Item::Verbatim(tonic_include_proto.to_token_stream()), - ]; + let items: Vec = + vec![syn::Item::Verbatim(tonic_include_proto.to_token_stream())]; let module = id.split('.').rev().fold::, _>(items, |acc, module| { let module = syn::Ident::new(module, Span::mixed_site()); diff --git a/proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto b/proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto similarity index 59% rename from proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto rename to proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto index 909c121a6d..9ea6e711d3 100644 --- a/proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto +++ b/proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto @@ -16,23 +16,30 @@ syntax = "proto3"; -package quilkin.extensions.filters.capture_bytes.v1alpha1; +package quilkin.extensions.filters.capture.v1alpha1; import "google/protobuf/wrappers.proto"; -message CaptureBytes { - enum Strategy { - Prefix = 0; - Suffix = 1; +message Capture { + message Suffix { + uint32 size = 1; + google.protobuf.BoolValue remove = 2; } - message StrategyValue { - Strategy value = 1; + message Prefix { + uint32 size = 1; + google.protobuf.BoolValue remove = 2; } - StrategyValue strategy = 1; - uint32 size = 2; - google.protobuf.StringValue metadata_key = 3; - google.protobuf.BoolValue remove = 4; + message Regex { + google.protobuf.StringValue regex = 1; + } + + google.protobuf.StringValue metadata_key = 1; + oneof strategy { + Prefix prefix = 2; + Suffix suffix = 3; + Regex regex = 4; + } } diff --git a/src/filters.rs b/src/filters.rs index 01d6f5415e..a84f38de2c 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -26,7 +26,7 @@ mod write; pub(crate) mod chain; pub(crate) mod manager; -pub mod capture_bytes; +pub mod capture; pub mod compress; pub mod concatenate_bytes; pub mod debug; diff --git a/src/filters/capture_bytes.rs b/src/filters/capture.rs similarity index 57% rename from src/filters/capture_bytes.rs rename to src/filters/capture.rs index b96ae89fc2..48b5a55c83 100644 --- a/src/filters/capture_bytes.rs +++ b/src/filters/capture.rs @@ -14,71 +14,73 @@ * limitations under the License. */ -mod capture; +mod affix; mod config; mod metrics; -mod proto; +mod regex; -use std::sync::Arc; +crate::include_proto!("quilkin.extensions.filters.capture.v1alpha1"); -use tracing::warn; +use std::sync::Arc; use crate::{filters::prelude::*, metadata::Value}; -use capture::Capture; -use metrics::Metrics; -use proto::quilkin::extensions::filters::capture_bytes::v1alpha1::CaptureBytes as ProtoConfig; +use self::{ + affix::{Prefix, Suffix}, + metrics::Metrics, + regex::Regex, +}; +use self::quilkin::extensions::filters::capture::v1alpha1 as proto; pub use config::{Config, Strategy}; -pub const NAME: &str = "quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes"; +pub const NAME: &str = "quilkin.extensions.filters.capture.v1alpha1.Capture"; /// Creates a new factory for generating capture filters. pub fn factory() -> DynFilterFactory { Box::from(CaptureBytesFactory::new()) } -struct CaptureBytes { - capture: Box, +/// Trait to implement different strategies for capturing packet data. +pub trait CaptureStrategy { + /// Capture packet data from the contents, and optionally returns a value if + /// anything was captured. + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option; +} + +struct Capture { + capture: Box, /// metrics reporter for this filter. metrics: Metrics, metadata_key: Arc, - size: usize, - remove: bool, } -impl CaptureBytes { +impl Capture { fn new(config: Config, metrics: Metrics) -> Self { - CaptureBytes { - capture: config.strategy.as_capture(), + Self { + capture: config.strategy.into_capture(), metrics, metadata_key: Arc::new(config.metadata_key), - size: config.size, - remove: config.remove, } } } -impl Filter for CaptureBytes { +impl Filter for Capture { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] fn read(&self, mut ctx: ReadContext) -> Option { - // if the capture size is bigger than the packet size, then we drop the packet, - // and occasionally warn - if ctx.contents.len() < self.size { - if self.metrics.packets_dropped_total.get() % 1000 == 0 { - warn!(count = ?self.metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", self.size); - } - self.metrics.packets_dropped_total.inc(); - return None; - } - let token = self - .capture - .capture(&mut ctx.contents, self.size, self.remove); + let capture = self.capture.capture(&mut ctx.contents, &self.metrics); + let mut capture_key = (*self.metadata_key).clone(); + capture_key += "/is_present"; ctx.metadata - .insert(self.metadata_key.clone(), Value::Bytes(token.into())); + .insert(Arc::from(capture_key), Value::Bool(capture.is_some())); - Some(ctx.into()) + if let Some(value) = capture { + ctx.metadata.insert(self.metadata_key.clone(), value); + Some(ctx.into()) + } else { + None + } } } struct CaptureBytesFactory {} @@ -97,8 +99,8 @@ impl FilterFactory for CaptureBytesFactory { fn create_filter(&self, args: CreateFilterArgs) -> Result { let (config_json, config) = self .require_config(args.config)? - .deserialize::(self.name())?; - let filter = CaptureBytes::new(config, Metrics::new(&args.metrics_registry)?); + .deserialize::(self.name())?; + let filter = Capture::new(config, Metrics::new(&args.metrics_registry)?); Ok(FilterInstance::new( config_json, Box::new(filter) as Box, @@ -111,23 +113,21 @@ mod tests { use std::sync::Arc; use prometheus::Registry; - use serde_yaml::{Mapping, Value}; + use serde_yaml::{Mapping, Value as YamlValue}; - use crate::endpoint::{Endpoint, Endpoints}; - use crate::test_utils::assert_write_no_change; - - use super::{CaptureBytes, CaptureBytesFactory, Config, Metrics, Strategy}; - - use super::capture::{Capture, Prefix, Suffix}; - - use crate::filters::{ - metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext, + use crate::{ + endpoint::{Endpoint, Endpoints}, + filters::{metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext}, + metadata::Value, + test_utils::assert_write_no_change, }; + use super::*; + const TOKEN_KEY: &str = "TOKEN"; - fn capture_bytes(config: Config) -> CaptureBytes { - CaptureBytes::new(config, Metrics::new(&Registry::default()).unwrap()) + fn capture_bytes(config: Config) -> Capture { + Capture::new(config, Metrics::new(&Registry::default()).unwrap()) } #[test] @@ -135,20 +135,26 @@ mod tests { let factory = CaptureBytesFactory::new(); let mut map = Mapping::new(); map.insert( - Value::String("strategy".into()), - Value::String("SUFFIX".into()), + YamlValue::String("metadataKey".into()), + YamlValue::String(TOKEN_KEY.into()), ); map.insert( - Value::String("metadataKey".into()), - Value::String(TOKEN_KEY.into()), + YamlValue::String("strategy".into()), + YamlValue::Mapping({ + let mut map = Mapping::new(); + + map.insert("kind".into(), YamlValue::String("SUFFIX".into())); + map.insert("size".into(), YamlValue::Number(3.into())); + map.insert("remove".into(), YamlValue::Bool(true)); + + map + }), ); - map.insert(Value::String("size".into()), Value::Number(3.into())); - map.insert(Value::String("remove".into()), Value::Bool(true)); let filter = factory .create_filter(CreateFilterArgs::fixed( Registry::default(), - Some(&Value::Mapping(map)), + Some(&YamlValue::Mapping(map)), )) .unwrap() .filter; @@ -159,11 +165,20 @@ mod tests { fn factory_valid_config_defaults() { let factory = CaptureBytesFactory::new(); let mut map = Mapping::new(); - map.insert(Value::String("size".into()), Value::Number(3.into())); + map.insert("strategy".into(), { + let mut map = Mapping::new(); + map.insert("kind".into(), "SUFFIX".into()); + map.insert( + YamlValue::String("size".into()), + YamlValue::Number(3.into()), + ); + map.into() + }); + let filter = factory .create_filter(CreateFilterArgs::fixed( Registry::default(), - Some(&Value::Mapping(map)), + Some(&YamlValue::Mapping(map)), )) .unwrap() .filter; @@ -174,11 +189,14 @@ mod tests { fn factory_invalid_config() { let factory = CaptureBytesFactory::new(); let mut map = Mapping::new(); - map.insert(Value::String("size".into()), Value::String("WRONG".into())); + map.insert( + YamlValue::String("size".into()), + YamlValue::String("WRONG".into()), + ); let result = factory.create_filter(CreateFilterArgs::fixed( Registry::default(), - Some(&Value::Mapping(map)), + Some(&YamlValue::Mapping(map)), )); assert!(result.is_err(), "Should be an error"); } @@ -186,11 +204,13 @@ mod tests { #[test] fn read() { let config = Config { - strategy: Strategy::Suffix, metadata_key: TOKEN_KEY.into(), - size: 3, - remove: true, + strategy: Strategy::Suffix(Suffix { + size: 3, + remove: true, + }), }; + let filter = capture_bytes(config); assert_end_strategy(&filter, TOKEN_KEY, true); } @@ -198,10 +218,11 @@ mod tests { #[test] fn read_overflow_capture_size() { let config = Config { - strategy: Strategy::Suffix, metadata_key: TOKEN_KEY.into(), - size: 99, - remove: true, + strategy: Strategy::Suffix(Suffix { + size: 99, + remove: true, + }), }; let filter = capture_bytes(config); let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())]; @@ -219,10 +240,11 @@ mod tests { #[test] fn write() { let config = Config { - strategy: Strategy::Suffix, + strategy: Strategy::Suffix(Suffix { + size: 0, + remove: false, + }), metadata_key: TOKEN_KEY.into(), - size: 0, - remove: false, }; let filter = capture_bytes(config); assert_write_no_change(&filter); @@ -230,28 +252,40 @@ mod tests { #[test] fn end_capture() { - let end = Suffix {}; + let metrics = Metrics::new(&Registry::default()).unwrap(); + let mut end = Suffix { + size: 3, + remove: false, + }; let mut contents = b"helloabc".to_vec(); - let result = end.capture(&mut contents, 3, false); - assert_eq!(b"abc".to_vec(), result); + let result = end.capture(&mut contents, &metrics).unwrap(); + assert_eq!(Value::Bytes(b"abc".to_vec().into()), result); assert_eq!(b"helloabc".to_vec(), contents); - let result = end.capture(&mut contents, 3, true); - assert_eq!(b"abc".to_vec(), result); + end.remove = true; + + let result = end.capture(&mut contents, &metrics).unwrap(); + assert_eq!(Value::Bytes(b"abc".to_vec().into()), result); assert_eq!(b"hello".to_vec(), contents); } #[test] fn beginning_capture() { - let beg = Prefix {}; + let metrics = Metrics::new(&Registry::default()).unwrap(); + let mut beg = Prefix { + size: 3, + remove: false, + }; let mut contents = b"abchello".to_vec(); - let result = beg.capture(&mut contents, 3, false); - assert_eq!(b"abc".to_vec(), result); + let result = beg.capture(&mut contents, &metrics); + assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result); assert_eq!(b"abchello".to_vec(), contents); - let result = beg.capture(&mut contents, 3, true); - assert_eq!(b"abc".to_vec(), result); + beg.remove = true; + + let result = beg.capture(&mut contents, &metrics); + assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result); assert_eq!(b"hello".to_vec(), contents); } diff --git a/src/filters/capture/affix.rs b/src/filters/capture/affix.rs new file mode 100644 index 0000000000..d1ae743ec0 --- /dev/null +++ b/src/filters/capture/affix.rs @@ -0,0 +1,64 @@ +use crate::metadata::Value; + +use super::Metrics; + +fn is_valid_size(contents: &[u8], size: u32, metrics: &Metrics) -> bool { + // if the capture size is bigger than the packet size, then we drop the packet, + // and occasionally warn + if contents.len() < size as usize { + if metrics.packets_dropped_total.get() % 1000 == 0 { + tracing::warn!(count = ?metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", size); + } + metrics.packets_dropped_total.inc(); + + false + } else { + true + } +} + +/// Capture from the start of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub struct Prefix { + /// Whether captured bytes are removed from the original packet. + #[serde(default)] + pub remove: bool, + /// The number of bytes to capture. + pub size: u32, +} + +impl super::CaptureStrategy for Prefix { + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option { + is_valid_size(contents, self.size, metrics).then(|| { + if self.remove { + Value::Bytes(contents.drain(..self.size as usize).collect()) + } else { + Value::Bytes(contents.iter().take(self.size as usize).copied().collect()) + } + }) + } +} + +/// Capture from the end of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub struct Suffix { + /// Whether captured bytes are removed from the original packet. + pub size: u32, + /// The number of bytes to capture. + #[serde(default)] + pub remove: bool, +} + +impl super::CaptureStrategy for Suffix { + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option { + is_valid_size(contents, self.size, metrics).then(|| { + let index = contents.len() - self.size as usize; + + if self.remove { + Value::Bytes(contents.split_off(index).into()) + } else { + Value::Bytes(contents.iter().skip(index).copied().collect()) + } + }) + } +} diff --git a/src/filters/capture/config.rs b/src/filters/capture/config.rs new file mode 100644 index 0000000000..b96e2dbb3e --- /dev/null +++ b/src/filters/capture/config.rs @@ -0,0 +1,148 @@ +/* + * Copyright 2021 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::TryFrom; + +use serde::{Deserialize, Serialize}; + +use super::{proto, Prefix, Regex, Suffix}; +use crate::filters::{metadata::CAPTURED_BYTES, ConvertProtoConfigError}; + +/// Strategy to apply for acquiring a set of bytes in the UDP packet +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "kind")] +pub enum Strategy { + /// Looks for the set of bytes at the beginning of the packet + #[serde(rename = "PREFIX")] + Prefix(Prefix), + /// Look for the set of bytes at the end of the packet + #[serde(rename = "SUFFIX")] + Suffix(Suffix), + /// Look for the set of bytes at the end of the packet + #[serde(rename = "REGEX")] + Regex(Regex), +} + +impl Strategy { + pub fn into_capture(self) -> Box { + match self { + Self::Prefix(value) => Box::from(value), + Self::Suffix(value) => Box::from(value), + Self::Regex(value) => Box::from(value), + } + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct Config { + /// the key to use when storing the captured bytes in the filter context + #[serde(rename = "metadataKey")] + #[serde(default = "default_metadata_key")] + pub metadata_key: String, + /// The capture strategy. + pub strategy: Strategy, +} + +/// default value for the context key in the Config +fn default_metadata_key() -> String { + CAPTURED_BYTES.into() +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: proto::Capture) -> Result { + let strategy = p + .strategy + .ok_or_else(|| ConvertProtoConfigError::new("Missing", Some("strategy".into())))?; + + Ok(Self { + metadata_key: p.metadata_key.ok_or_else(|| { + ConvertProtoConfigError::new("Missing", Some("metadata_key".into())) + })?, + strategy: strategy.try_into()?, + }) + } +} + +impl TryFrom for Strategy { + type Error = ConvertProtoConfigError; + + fn try_from(p: proto::capture::Strategy) -> Result { + use proto::capture; + + Ok(match p { + capture::Strategy::Prefix(prefix) => Self::Prefix(Prefix { + size: prefix.size, + remove: prefix.remove.unwrap_or_default(), + }), + capture::Strategy::Suffix(suffix) => Self::Suffix(Suffix { + size: suffix.size, + remove: suffix.remove.unwrap_or_default(), + }), + capture::Strategy::Regex(regex) => { + let regex = regex.regex.ok_or_else(|| { + ConvertProtoConfigError::new("Missing", Some("Regex.regex".into())) + })?; + Self::Regex(Regex { + regex: regex.parse().map_err(|error: regex::Error| { + ConvertProtoConfigError::new(error.to_string(), Some("Regex.regex".into())) + })?, + }) + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryFrom; + + #[test] + fn convert_proto_config() { + let test_cases = vec![( + "should succeed when all valid values are provided", + proto::Capture { + strategy: Some(proto::capture::Strategy::Suffix(proto::capture::Suffix { + size: 42, + remove: Some(true), + })), + metadata_key: Some("foobar".into()), + }, + Some(Config { + metadata_key: "foobar".into(), + strategy: Strategy::Suffix(Suffix { + size: 42, + remove: true, + }), + }), + )]; + + for (name, proto_config, expected) in test_cases { + let result = Config::try_from(proto_config); + assert_eq!( + result.is_err(), + expected.is_none(), + "{}: error expectation does not match", + name + ); + if let Some(expected) = expected { + assert_eq!(expected, result.unwrap(), "{}", name); + } + } + } +} diff --git a/src/filters/capture_bytes/metrics.rs b/src/filters/capture/metrics.rs similarity index 93% rename from src/filters/capture_bytes/metrics.rs rename to src/filters/capture/metrics.rs index 75393fe836..b400ee723f 100644 --- a/src/filters/capture_bytes/metrics.rs +++ b/src/filters/capture/metrics.rs @@ -20,8 +20,8 @@ use prometheus::{IntCounter, Registry}; use crate::metrics::{filter_opts, CollectorExt}; /// Register and manage metrics for this filter -pub(super) struct Metrics { - pub(super) packets_dropped_total: GenericCounter, +pub struct Metrics { + pub packets_dropped_total: GenericCounter, } impl Metrics { diff --git a/src/filters/capture/regex.rs b/src/filters/capture/regex.rs new file mode 100644 index 0000000000..89ec08c82d --- /dev/null +++ b/src/filters/capture/regex.rs @@ -0,0 +1,33 @@ +use crate::metadata::Value; + +use super::Metrics; + +/// Capture from the start of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Regex { + /// The regular expression to use for capture. + #[serde(with = "serde_regex")] + pub regex: regex::bytes::Regex, +} + +impl super::CaptureStrategy for Regex { + fn capture(&self, contents: &mut Vec, _metrics: &Metrics) -> Option { + let matches = self + .regex + .find_iter(contents) + .map(|mat| Value::Bytes(bytes::Bytes::copy_from_slice(mat.as_bytes()))) + .collect::>(); + + if matches.len() > 1 { + Some(Value::List(matches)) + } else { + matches.into_iter().next() + } + } +} + +impl PartialEq for Regex { + fn eq(&self, rhs: &Self) -> bool { + self.regex.as_str() == rhs.regex.as_str() + } +} diff --git a/src/filters/capture_bytes/capture.rs b/src/filters/capture_bytes/capture.rs deleted file mode 100644 index cf8c4abaad..0000000000 --- a/src/filters/capture_bytes/capture.rs +++ /dev/null @@ -1,37 +0,0 @@ -/// Trait to implement different strategies for capturing packet data -pub trait Capture { - /// Capture the packet data from the contents. If remove is true, contents will be altered to - /// not have the retrieved set of bytes. - /// Returns the captured bytes. - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec; -} - -/// Capture from the end of the packet. -pub struct Suffix; - -impl Capture for Suffix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.split_off(contents.len() - size); - } - - contents - .iter() - .skip(contents.len() - size) - .cloned() - .collect::>() - } -} - -/// Capture from the start of the packet. -pub struct Prefix; - -impl Capture for Prefix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.drain(..size).collect(); - } - - contents.iter().cloned().take(size).collect() - } -} diff --git a/src/filters/capture_bytes/config.rs b/src/filters/capture_bytes/config.rs deleted file mode 100644 index 4c904b7a57..0000000000 --- a/src/filters/capture_bytes/config.rs +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright 2021 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::convert::TryFrom; - -use serde::{Deserialize, Serialize}; - -use super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::Strategy as ProtoStrategy, CaptureBytes as ProtoConfig, -}; -use crate::filters::{metadata::CAPTURED_BYTES, ConvertProtoConfigError}; -use crate::map_proto_enum; - -use super::capture::{Capture, Prefix, Suffix}; - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -/// Strategy to apply for acquiring a set of bytes in the UDP packet -pub enum Strategy { - #[serde(rename = "PREFIX")] - /// Looks for the set of bytes at the beginning of the packet - Prefix, - #[serde(rename = "SUFFIX")] - /// Look for the set of bytes at the end of the packet - Suffix, -} - -impl Strategy { - pub(crate) fn as_capture(&self) -> Box { - match self { - Self::Prefix => Box::new(Prefix {}), - Self::Suffix => Box::new(Suffix {}), - } - } -} - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -pub struct Config { - #[serde(default)] - pub strategy: Strategy, - /// the number of bytes to capture - #[serde(rename = "size")] - pub size: usize, - /// the key to use when storing the captured bytes in the filter context - #[serde(rename = "metadataKey")] - #[serde(default = "default_metadata_key")] - pub metadata_key: String, - /// whether or not to remove the set of the bytes from the packet once captured - #[serde(default = "default_remove")] - pub remove: bool, -} - -/// default value for [`Config::remove`]. -fn default_remove() -> bool { - false -} - -/// default value for the context key in the Config -fn default_metadata_key() -> String { - CAPTURED_BYTES.into() -} - -impl Default for Strategy { - fn default() -> Self { - Strategy::Suffix - } -} - -impl TryFrom for Config { - type Error = ConvertProtoConfigError; - - fn try_from(p: ProtoConfig) -> Result { - let strategy = p - .strategy - .map(|strategy| { - map_proto_enum!( - value = strategy.value, - field = "strategy", - proto_enum_type = ProtoStrategy, - target_enum_type = Strategy, - variants = [Suffix, Prefix] - ) - }) - .transpose()? - .unwrap_or_default(); - - Ok(Self { - strategy, - size: p.size as usize, - metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), - remove: p.remove.unwrap_or_else(default_remove), - }) - } -} - -#[cfg(test)] -mod tests { - use std::convert::TryFrom; - - use super::super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::{Strategy as ProtoStrategy, StrategyValue}, - CaptureBytes as ProtoConfig, - }; - use super::*; - - #[test] - fn convert_proto_config() { - let test_cases = vec![ - ( - "should succeed when all valid values are provided", - ProtoConfig { - strategy: Some(StrategyValue { - value: ProtoStrategy::Suffix as i32, - }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - Some(Config { - strategy: Strategy::Suffix, - size: 42, - metadata_key: "foobar".into(), - remove: true, - }), - ), - ( - "should fail when invalid strategy is provided", - ProtoConfig { - strategy: Some(StrategyValue { value: 42 }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - None, - ), - ( - "should use correct default values", - ProtoConfig { - strategy: None, - size: 42, - metadata_key: None, - remove: None, - }, - Some(Config { - strategy: Strategy::default(), - size: 42, - metadata_key: default_metadata_key(), - remove: default_remove(), - }), - ), - ]; - for (name, proto_config, expected) in test_cases { - let result = Config::try_from(proto_config); - assert_eq!( - result.is_err(), - expected.is_none(), - "{}: error expectation does not match", - name - ); - if let Some(expected) = expected { - assert_eq!(expected, result.unwrap(), "{}", name); - } - } - } -} diff --git a/src/filters/capture_bytes/proto.rs b/src/filters/capture_bytes/proto.rs deleted file mode 100644 index 2c33e22d33..0000000000 --- a/src/filters/capture_bytes/proto.rs +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// Protobuf config for this filter. -pub(super) mod quilkin { - pub mod extensions { - pub mod filters { - pub mod capture_bytes { - pub mod v1alpha1 { - #![doc(hidden)] - tonic::include_proto!("quilkin.extensions.filters.capture_bytes.v1alpha1"); - } - } - } - } -} diff --git a/src/filters/metadata.rs b/src/filters/metadata.rs index f956c60af7..00a9f9de24 100644 --- a/src/filters/metadata.rs +++ b/src/filters/metadata.rs @@ -16,7 +16,7 @@ //! Well known dynamic metadata used by Quilkin. -/// The default key under which the [`super::capture_bytes`] filter puts the +/// The default key under which the [`super::capture`] filter puts the /// byte slices it extracts from each packet. /// - **Type** `Vec` -pub const CAPTURED_BYTES: &str = "quilkin.dev/captured_bytes"; +pub const CAPTURED_BYTES: &str = "quilkin.dev/captured"; diff --git a/src/filters/read.rs b/src/filters/read.rs index f84bed598d..53a5141ec5 100644 --- a/src/filters/read.rs +++ b/src/filters/read.rs @@ -76,6 +76,7 @@ impl From for ReadResponse { /// Some(ctx.into()) /// } /// ``` +#[derive(Debug)] #[non_exhaustive] pub struct ReadResponse { /// The upstream endpoints that the packet should be forwarded to. diff --git a/src/filters/set.rs b/src/filters/set.rs index 45d33699a6..dcccbc0ed1 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -52,14 +52,14 @@ impl FilterSet { pub fn default_with(filters: impl IntoIterator) -> Self { Self::with( std::array::IntoIter::new([ - filters::debug::factory(), - filters::local_rate_limit::factory(), + filters::capture::factory(), + filters::compress::factory(), filters::concatenate_bytes::factory(), + filters::debug::factory(), + filters::firewall::factory(), filters::load_balancer::factory(), - filters::capture_bytes::factory(), + filters::local_rate_limit::factory(), filters::token_router::factory(), - filters::compress::factory(), - filters::firewall::factory(), ]) .chain(filters), ) diff --git a/src/filters/token_router.rs b/src/filters/token_router.rs index f810e1305d..f3ffb5b3fd 100644 --- a/src/filters/token_router.rs +++ b/src/filters/token_router.rs @@ -100,7 +100,7 @@ impl FilterFactory for TokenRouterFactory { impl Filter for TokenRouter { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] fn read(&self, mut ctx: ReadContext) -> Option { - match ctx.metadata.get(self.metadata_key.as_ref()) { + match dbg!(ctx.metadata.get(self.metadata_key.as_ref())) { None => { if self.metrics.packets_dropped_no_token_found.get() % LOG_SAMPLING_RATE == 0 { error!( diff --git a/tests/capture.rs b/tests/capture.rs new file mode 100644 index 0000000000..8e86a84eef --- /dev/null +++ b/tests/capture.rs @@ -0,0 +1,89 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use tokio::time::{timeout, Duration}; + +use quilkin::{ + config::{Builder, Filter}, + endpoint::Endpoint, + filters::{capture, token_router}, + metadata::MetadataView, + test_utils::TestHelper, +}; + +/// This test covers both token_router and capture filters, +/// since they work in concert together. +#[tokio::test] +async fn token_router() { + let mut t = TestHelper::default(); + let echo = t.run_echo_server().await; + + let capture_yaml = r#" +strategy: + kind: REGEX + regex: .{3}$ +"#; + let endpoint_metadata = " +quilkin.dev: + tokens: + - YWJj # abc + "; + let server_port = 12348; + let server_config = Builder::empty() + .with_port(server_port) + .with_static( + vec![ + Filter { + name: capture::factory().name().into(), + config: serde_yaml::from_str(capture_yaml).unwrap(), + }, + Filter { + name: token_router::factory().name().into(), + config: None, + }, + ], + vec![Endpoint::with_metadata( + echo, + serde_yaml::from_str::>(endpoint_metadata).unwrap(), + )], + ) + .build(); + t.run_server_with_config(server_config); + + // valid packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); + let msg = b"helloabc"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "helloabc", + timeout(Duration::from_secs(5), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // send an invalid packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); + + let result = timeout(Duration::from_secs(3), recv_chan.recv()).await; + assert!(result.is_err(), "should not have received a packet"); +} diff --git a/tests/token_router.rs b/tests/token_router.rs index 08ebffee99..34b94a6e8c 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -21,12 +21,12 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::{Builder, Filter}, endpoint::Endpoint, - filters::{capture_bytes, token_router}, + filters::{capture, token_router}, metadata::MetadataView, test_utils::TestHelper, }; -/// This test covers both token_router and capture_bytes filters, +/// This test covers both token_router and capture filters, /// since they work in concert together. #[tokio::test] async fn token_router() { @@ -34,8 +34,10 @@ async fn token_router() { let echo = t.run_echo_server().await; let capture_yaml = " -size: 3 -remove: true +strategy: + kind: SUFFIX + size: 3 + remove: true "; let endpoint_metadata = " quilkin.dev: @@ -48,7 +50,7 @@ quilkin.dev: .with_static( vec![ Filter { - name: capture_bytes::factory().name().into(), + name: capture::factory().name().into(), config: serde_yaml::from_str(capture_yaml).unwrap(), }, Filter {