From 65a945a190bc8264c008c1b9e2db74a212c7867e Mon Sep 17 00:00:00 2001 From: Erin Power Date: Fri, 24 Dec 2021 18:57:47 +0100 Subject: [PATCH] Add regex capture and rename capture_bytes to capture --- Cargo.toml | 2 + build.rs | 3 +- docs/src/SUMMARY.md | 2 +- docs/src/filters.md | 4 +- .../filters/{capture_bytes.md => capture.md} | 34 ++- docs/src/filters/token_router.md | 13 +- macros/src/include.rs | 7 +- .../v1alpha1/capture.proto} | 29 +- src/filters.rs | 2 +- src/filters/{capture_bytes.rs => capture.rs} | 218 +++++++++------ src/filters/capture/affix.rs | 64 +++++ src/filters/capture/config.rs | 252 ++++++++++++++++++ .../{capture_bytes => capture}/metrics.rs | 4 +- src/filters/capture/regex.rs | 33 +++ src/filters/capture_bytes/capture.rs | 37 --- src/filters/capture_bytes/config.rs | 177 ------------ src/filters/capture_bytes/proto.rs | 29 -- src/filters/metadata.rs | 4 +- src/filters/read.rs | 1 + src/filters/set.rs | 4 +- src/lib.rs | 2 +- tests/capture.rs | 88 ++++++ tests/matches.rs | 11 +- tests/token_router.rs | 11 +- 24 files changed, 649 insertions(+), 382 deletions(-) rename docs/src/filters/{capture_bytes.md => capture.md} (74%) rename proto/quilkin/extensions/filters/{capture_bytes/v1alpha1/capture_bytes.proto => capture/v1alpha1/capture.proto} (59%) rename src/filters/{capture_bytes.rs => capture.rs} (52%) create mode 100644 src/filters/capture/affix.rs create mode 100644 src/filters/capture/config.rs rename src/filters/{capture_bytes => capture}/metrics.rs (93%) create mode 100644 src/filters/capture/regex.rs delete mode 100644 src/filters/capture_bytes/capture.rs delete mode 100644 src/filters/capture_bytes/config.rs delete mode 100644 src/filters/capture_bytes/proto.rs create mode 100644 tests/capture.rs diff --git a/Cargo.toml b/Cargo.toml index 6bbbbd8c92..0de4677733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ prost-types = "=0.9.0" rand = "0.8.4" serde = { version = "1.0.130", features = ["derive", "rc"] } serde_json = "1.0.68" +serde_regex = "1.1.0" serde_yaml = "0.8.21" snap = "1.0.5" tokio = { version = "1.16.1", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] } @@ -70,6 +71,7 @@ eyre = "0.6.5" stable-eyre = "0.2.2" ipnetwork = "0.18.0" futures = "0.3.17" +regex = "1.5.4" [target.'cfg(target_os = "linux")'.dependencies] sys-info = "0.9.0" diff --git a/build.rs b/build.rs index 63b01e49b7..f4f6807e6f 100644 --- a/build.rs +++ b/build.rs @@ -28,10 +28,11 @@ fn main() -> Result<(), Box> { "proto/data-plane-api/envoy/type/metadata/v3/metadata.proto", "proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto", "proto/udpa/xds/core/v3/resource_name.proto", - "proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto", + "proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto", "proto/quilkin/extensions/filters/compress/v1alpha1/compress.proto", "proto/quilkin/extensions/filters/concatenate_bytes/v1alpha1/concatenate_bytes.proto", "proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto", + "proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto", "proto/quilkin/extensions/filters/firewall/v1alpha1/firewall.proto", "proto/quilkin/extensions/filters/load_balancer/v1alpha1/load_balancer.proto", "proto/quilkin/extensions/filters/local_rate_limit/v1alpha1/local_rate_limit.proto", diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index e8b919d62f..8b8b02743a 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -10,7 +10,7 @@ - [Proxy](./proxy.md) - [Proxy Configuration](./proxy-configuration.md) - [Filters](./filters.md) - - [Capture Bytes](./filters/capture_bytes.md) + - [Capture](./filters/capture.md) - [Concatenate Bytes](./filters/concatenate_bytes.md) - [Compress](./filters/compress.md) - [Debug](./filters/debug.md) diff --git a/docs/src/filters.md b/docs/src/filters.md index 6070315b04..5d109b9537 100644 --- a/docs/src/filters.md +++ b/docs/src/filters.md @@ -96,7 +96,7 @@ The following metadata are currently used by Quilkin core and built-in filters. | Name | Type | Description | |------|------|-------------| -| `quilkin.dev/captured_bytes` | `Bytes` | The default key under which the [CaptureBytes] filter puts the byte slices it extracts from each packet. | +| `quilkin.dev/captured` | `Bytes` | The default key under which the [Capture] filter puts the byte slices it extracts from each packet. | ### Built-in filters Quilkin includes several filters out of the box. @@ -132,7 +132,7 @@ properties: required: [ 'name', 'config' ] ``` -[CaptureBytes]: ./filters/capture_bytes.md +[Capture]: ./filters/capture.md [TokenRouter]: ./filters/token_router.md [Debug]: ./filters/debug.md [LocalRateLimit]: ./filters/local_rate_limit.md diff --git a/docs/src/filters/capture_bytes.md b/docs/src/filters/capture.md similarity index 74% rename from docs/src/filters/capture_bytes.md rename to docs/src/filters/capture.md index e36fd333bd..c5fd428606 100644 --- a/docs/src/filters/capture_bytes.md +++ b/docs/src/filters/capture.md @@ -8,9 +8,25 @@ This is often used as a way of retrieving authentication tokens from a packet, a [ConcatenateBytes](./concatenate_bytes.md) and [TokenRouter](token_router.md) filter to provide common packet routing utilities. +### Capture strategies + +There are multiple strategies for capturing bytes from the packet. + +#### Suffix +Captures bytes from the end of the packet. + +#### Prefix +Captures bytes from the start of the packet. + +#### Regex +Captures bytes using a regular expression. Unlike other capture strategies, +the regular expression can return one or many values if there are +multiple matches. + + #### Filter name ```text -quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes +quilkin.extensions.filters.capture.v1alpha1.Capture ``` ### Configuration Examples @@ -19,12 +35,12 @@ quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes version: v1alpha1 static: filters: - - name: quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes + - name: quilkin.extensions.filters.capture.v1alpha1.Capture config: - strategy: PREFIX metadataKey: myapp.com/myownkey - size: 3 - remove: false + prefix: + size: 3 + remove: false endpoints: - address: 127.0.0.1:7001 # "; @@ -33,12 +49,12 @@ static: # quilkin::Builder::from(std::sync::Arc::new(config)).validate().unwrap(); ``` -### Configuration Options ([Rust Doc](../../api/quilkin/filters/capture_bytes/struct.Config.html)) +### Configuration Options ([Rust Doc](../../api/quilkin/filters/capture/struct.Config.html)) ```yaml properties: strategy: - type: string + type: object description: | The selected strategy for capturing the series of bytes from the incoming packet. - SUFFIX: Retrieve bytes from the end of the packet. @@ -47,7 +63,7 @@ properties: enum: ['PREFIX', 'SUFFIX'] metadataKey: type: string - default: quilkin.dev/captured_bytes + default: quilkin.dev/captured description: | The key under which the captured bytes are stored in the Filter invocation values. size: @@ -65,7 +81,7 @@ properties: ### Metrics -* `quilkin_filter_CaptureBytes_packets_dropped` +* `quilkin_filter_Capture_packets_dropped` A counter of the total number of packets that have been dropped due to their length being less than the configured `size`. diff --git a/docs/src/filters/token_router.md b/docs/src/filters/token_router.md index 6ab77151d6..814da5cbf1 100644 --- a/docs/src/filters/token_router.md +++ b/docs/src/filters/token_router.md @@ -38,7 +38,7 @@ static: # quilkin::Builder::from(std::sync::Arc::new(config)).validate().unwrap(); ``` -View the [CaptureBytes](./capture_bytes.md) filter documentation for more details. +View the [CaptureBytes](./capture.md) filter documentation for more details. ### Configuration Options ([Rust Doc](../../api/quilkin/filters/token_router/struct.Config.html)) @@ -46,7 +46,7 @@ View the [CaptureBytes](./capture_bytes.md) filter documentation for more detail properties: metadataKey: type: string - default: quilkin.dev/captured_bytes + default: quilkin.dev/captured description: | The key under which the token is stored in the Filter dynamic metadata. ``` @@ -68,7 +68,7 @@ properties: In combination with several other filters, the `TokenRouter` can be utilised as an authentication and access control mechanism for all incoming packets. -Capturing the authentication token from an incoming packet can be implemented via the [CaptureByte](./capture_bytes.md) +Capturing the authentication token from an incoming packet can be implemented via the [CaptureByte](./capture.md) filter, with an example outlined below, or any other filter that populates the configured dynamic metadata key for the authentication token to reside. @@ -82,10 +82,11 @@ For example, a configuration would look like: version: v1alpha1 static: filters: - - name: quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes # Capture and remove the authentication token + - name: quilkin.extensions.filters.capture.v1alpha1.Capture # Capture and remove the authentication token config: - size: 3 - remove: true + suffix: + size: 3 + remove: true - name: quilkin.extensions.filters.token_router.v1alpha1.TokenRouter endpoints: - address: 127.0.0.1:26000 diff --git a/macros/src/include.rs b/macros/src/include.rs index bb608c2c95..443a9e6f98 100644 --- a/macros/src/include.rs +++ b/macros/src/include.rs @@ -42,12 +42,9 @@ impl ToTokens for IncludeProto { fn to_tokens(&self, tokens: &mut TokenStream) { let id = &self.id; - let doc_hidden: syn::Attribute = syn::parse_quote!(#![doc(hidden)]); let tonic_include_proto: syn::Stmt = syn::parse_quote!(tonic::include_proto!(#id);); - let items: Vec = vec![ - syn::Item::Verbatim(doc_hidden.to_token_stream()), - syn::Item::Verbatim(tonic_include_proto.to_token_stream()), - ]; + let items: Vec = + vec![syn::Item::Verbatim(tonic_include_proto.to_token_stream())]; let module = id.split('.').rev().fold::, _>(items, |acc, module| { let module = syn::Ident::new(module, Span::mixed_site()); diff --git a/proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto b/proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto similarity index 59% rename from proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto rename to proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto index 909c121a6d..9ea6e711d3 100644 --- a/proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto +++ b/proto/quilkin/extensions/filters/capture/v1alpha1/capture.proto @@ -16,23 +16,30 @@ syntax = "proto3"; -package quilkin.extensions.filters.capture_bytes.v1alpha1; +package quilkin.extensions.filters.capture.v1alpha1; import "google/protobuf/wrappers.proto"; -message CaptureBytes { - enum Strategy { - Prefix = 0; - Suffix = 1; +message Capture { + message Suffix { + uint32 size = 1; + google.protobuf.BoolValue remove = 2; } - message StrategyValue { - Strategy value = 1; + message Prefix { + uint32 size = 1; + google.protobuf.BoolValue remove = 2; } - StrategyValue strategy = 1; - uint32 size = 2; - google.protobuf.StringValue metadata_key = 3; - google.protobuf.BoolValue remove = 4; + message Regex { + google.protobuf.StringValue regex = 1; + } + + google.protobuf.StringValue metadata_key = 1; + oneof strategy { + Prefix prefix = 2; + Suffix suffix = 3; + Regex regex = 4; + } } diff --git a/src/filters.rs b/src/filters.rs index 74f2aa7ca6..2fb809dddb 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -26,7 +26,7 @@ mod write; pub(crate) mod chain; pub(crate) mod manager; -pub mod capture_bytes; +pub mod capture; pub mod compress; pub mod concatenate_bytes; pub mod debug; diff --git a/src/filters/capture_bytes.rs b/src/filters/capture.rs similarity index 52% rename from src/filters/capture_bytes.rs rename to src/filters/capture.rs index 60c5be458d..00998575cb 100644 --- a/src/filters/capture_bytes.rs +++ b/src/filters/capture.rs @@ -14,82 +14,84 @@ * limitations under the License. */ -mod capture; +mod affix; mod config; mod metrics; -mod proto; +mod regex; -use std::sync::Arc; +crate::include_proto!("quilkin.extensions.filters.capture.v1alpha1"); -use tracing::warn; +use std::sync::Arc; use crate::{filters::prelude::*, metadata::Value}; -use capture::Capture; -use metrics::Metrics; -use proto::quilkin::extensions::filters::capture_bytes::v1alpha1::CaptureBytes as ProtoConfig; +use self::{ + affix::{Prefix, Suffix}, + metrics::Metrics, + regex::Regex, +}; +use self::quilkin::extensions::filters::capture::v1alpha1 as proto; pub use config::{Config, Strategy}; -pub const NAME: &str = "quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes"; +pub const NAME: &str = "quilkin.extensions.filters.capture.v1alpha1.Capture"; /// Creates a new factory for generating capture filters. pub fn factory() -> DynFilterFactory { - Box::from(CaptureBytesFactory::new()) + Box::from(CaptureFactory::new()) } -struct CaptureBytes { - capture: Box, +/// Trait to implement different strategies for capturing packet data. +pub trait CaptureStrategy { + /// Capture packet data from the contents, and optionally returns a value if + /// anything was captured. + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option; +} + +struct Capture { + capture: Box, /// metrics reporter for this filter. metrics: Metrics, metadata_key: Arc, - size: usize, - remove: bool, + is_present_key: Arc, } -impl CaptureBytes { +impl Capture { fn new(config: Config, metrics: Metrics) -> Self { - CaptureBytes { - capture: config.strategy.as_capture(), + Self { + capture: config.strategy.into_capture(), metrics, + is_present_key: Arc::new(config.metadata_key.clone() + "/is_present"), metadata_key: Arc::new(config.metadata_key), - size: config.size, - remove: config.remove, } } } -impl Filter for CaptureBytes { +impl Filter for Capture { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] fn read(&self, mut ctx: ReadContext) -> Option { - // if the capture size is bigger than the packet size, then we drop the packet, - // and occasionally warn - if ctx.contents.len() < self.size { - if self.metrics.packets_dropped_total.get() % 1000 == 0 { - warn!(count = ?self.metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", self.size); - } - self.metrics.packets_dropped_total.inc(); - return None; - } - let token = self - .capture - .capture(&mut ctx.contents, self.size, self.remove); - + let capture = self.capture.capture(&mut ctx.contents, &self.metrics); ctx.metadata - .insert(self.metadata_key.clone(), Value::Bytes(token.into())); + .insert(self.is_present_key.clone(), Value::Bool(capture.is_some())); - Some(ctx.into()) + if let Some(value) = capture { + ctx.metadata.insert(self.metadata_key.clone(), value); + Some(ctx.into()) + } else { + None + } } } -struct CaptureBytesFactory {} -impl CaptureBytesFactory { +struct CaptureFactory; + +impl CaptureFactory { pub fn new() -> Self { - CaptureBytesFactory {} + CaptureFactory } } -impl FilterFactory for CaptureBytesFactory { +impl FilterFactory for CaptureFactory { fn name(&self) -> &'static str { NAME } @@ -97,8 +99,8 @@ impl FilterFactory for CaptureBytesFactory { fn create_filter(&self, args: CreateFilterArgs) -> Result { let (config_json, config) = self .require_config(args.config)? - .deserialize::(self.name())?; - let filter = CaptureBytes::new(config, Metrics::new(&args.metrics_registry)?); + .deserialize::(self.name())?; + let filter = Capture::new(config, Metrics::new(&args.metrics_registry)?); Ok(FilterInstance::new( config_json, Box::new(filter) as Box, @@ -111,46 +113,51 @@ mod tests { use std::sync::Arc; use prometheus::Registry; - use serde_yaml::{Mapping, Value}; - - use crate::endpoint::{Endpoint, Endpoints}; - use crate::test_utils::assert_write_no_change; - - use super::{CaptureBytes, CaptureBytesFactory, Config, Metrics, Strategy}; - - use super::capture::{Capture, Prefix, Suffix}; + use serde_yaml::{Mapping, Value as YamlValue}; + + use crate::{ + endpoint::{Endpoint, Endpoints}, + filters::metadata::CAPTURED_BYTES, + filters::{prelude::*, FilterRegistry}, + metadata::Value, + test_utils::assert_write_no_change, + }; - use crate::filters::{ - metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, FilterRegistry, - ReadContext, + use super::{ + Capture, CaptureFactory, CaptureStrategy, Config, Metrics, Prefix, Regex, Strategy, Suffix, }; const TOKEN_KEY: &str = "TOKEN"; - fn capture_bytes(config: Config) -> CaptureBytes { - CaptureBytes::new(config, Metrics::new(&Registry::default()).unwrap()) + fn capture_bytes(config: Config) -> Capture { + Capture::new(config, Metrics::new(&Registry::default()).unwrap()) } #[test] fn factory_valid_config_all() { - let factory = CaptureBytesFactory::new(); + let factory = CaptureFactory::new(); let mut map = Mapping::new(); map.insert( - Value::String("strategy".into()), - Value::String("SUFFIX".into()), + YamlValue::String("metadataKey".into()), + YamlValue::String(TOKEN_KEY.into()), ); map.insert( - Value::String("metadataKey".into()), - Value::String(TOKEN_KEY.into()), + YamlValue::String("suffix".into()), + YamlValue::Mapping({ + let mut map = Mapping::new(); + + map.insert("size".into(), YamlValue::Number(3.into())); + map.insert("remove".into(), YamlValue::Bool(true)); + + map + }), ); - map.insert(Value::String("size".into()), Value::Number(3.into())); - map.insert(Value::String("remove".into()), Value::Bool(true)); let filter = factory .create_filter(CreateFilterArgs::fixed( FilterRegistry::default(), Registry::default(), - Some(Value::Mapping(map)), + Some(YamlValue::Mapping(map)), )) .unwrap() .filter; @@ -159,14 +166,22 @@ mod tests { #[test] fn factory_valid_config_defaults() { - let factory = CaptureBytesFactory::new(); + let factory = CaptureFactory::new(); let mut map = Mapping::new(); - map.insert(Value::String("size".into()), Value::Number(3.into())); + map.insert("suffix".into(), { + let mut map = Mapping::new(); + map.insert( + YamlValue::String("size".into()), + YamlValue::Number(3.into()), + ); + map.into() + }); + let filter = factory .create_filter(CreateFilterArgs::fixed( FilterRegistry::default(), Registry::default(), - Some(Value::Mapping(map)), + Some(YamlValue::Mapping(map)), )) .unwrap() .filter; @@ -175,14 +190,17 @@ mod tests { #[test] fn factory_invalid_config() { - let factory = CaptureBytesFactory::new(); + let factory = CaptureFactory::new(); let mut map = Mapping::new(); - map.insert(Value::String("size".into()), Value::String("WRONG".into())); + map.insert( + YamlValue::String("size".into()), + YamlValue::String("WRONG".into()), + ); let result = factory.create_filter(CreateFilterArgs::fixed( FilterRegistry::default(), Registry::default(), - Some(Value::Mapping(map)), + Some(YamlValue::Mapping(map)), )); assert!(result.is_err(), "Should be an error"); } @@ -190,11 +208,13 @@ mod tests { #[test] fn read() { let config = Config { - strategy: Strategy::Suffix, metadata_key: TOKEN_KEY.into(), - size: 3, - remove: true, + strategy: Strategy::Suffix(Suffix { + size: 3, + remove: true, + }), }; + let filter = capture_bytes(config); assert_end_strategy(&filter, TOKEN_KEY, true); } @@ -202,10 +222,11 @@ mod tests { #[test] fn read_overflow_capture_size() { let config = Config { - strategy: Strategy::Suffix, metadata_key: TOKEN_KEY.into(), - size: 99, - remove: true, + strategy: Strategy::Suffix(Suffix { + size: 99, + remove: true, + }), }; let filter = capture_bytes(config); let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())]; @@ -223,39 +244,64 @@ mod tests { #[test] fn write() { let config = Config { - strategy: Strategy::Suffix, + strategy: Strategy::Suffix(Suffix { + size: 0, + remove: false, + }), metadata_key: TOKEN_KEY.into(), - size: 0, - remove: false, }; let filter = capture_bytes(config); assert_write_no_change(&filter); } + #[test] + fn regex_capture() { + let metrics = Metrics::new(&Registry::default()).unwrap(); + let end = Regex { + pattern: regex::bytes::Regex::new(".{3}$").unwrap(), + }; + let mut contents = b"helloabc".to_vec(); + let result = end.capture(&mut contents, &metrics).unwrap(); + assert_eq!(Value::Bytes(b"abc".to_vec().into()), result); + assert_eq!(b"helloabc".to_vec(), contents); + } + #[test] fn end_capture() { - let end = Suffix {}; + let metrics = Metrics::new(&Registry::default()).unwrap(); + let mut end = Suffix { + size: 3, + remove: false, + }; let mut contents = b"helloabc".to_vec(); - let result = end.capture(&mut contents, 3, false); - assert_eq!(b"abc".to_vec(), result); + let result = end.capture(&mut contents, &metrics).unwrap(); + assert_eq!(Value::Bytes(b"abc".to_vec().into()), result); assert_eq!(b"helloabc".to_vec(), contents); - let result = end.capture(&mut contents, 3, true); - assert_eq!(b"abc".to_vec(), result); + end.remove = true; + + let result = end.capture(&mut contents, &metrics).unwrap(); + assert_eq!(Value::Bytes(b"abc".to_vec().into()), result); assert_eq!(b"hello".to_vec(), contents); } #[test] fn beginning_capture() { - let beg = Prefix {}; + let metrics = Metrics::new(&Registry::default()).unwrap(); + let mut beg = Prefix { + size: 3, + remove: false, + }; let mut contents = b"abchello".to_vec(); - let result = beg.capture(&mut contents, 3, false); - assert_eq!(b"abc".to_vec(), result); + let result = beg.capture(&mut contents, &metrics); + assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result); assert_eq!(b"abchello".to_vec(), contents); - let result = beg.capture(&mut contents, 3, true); - assert_eq!(b"abc".to_vec(), result); + beg.remove = true; + + let result = beg.capture(&mut contents, &metrics); + assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result); assert_eq!(b"hello".to_vec(), contents); } diff --git a/src/filters/capture/affix.rs b/src/filters/capture/affix.rs new file mode 100644 index 0000000000..d1ae743ec0 --- /dev/null +++ b/src/filters/capture/affix.rs @@ -0,0 +1,64 @@ +use crate::metadata::Value; + +use super::Metrics; + +fn is_valid_size(contents: &[u8], size: u32, metrics: &Metrics) -> bool { + // if the capture size is bigger than the packet size, then we drop the packet, + // and occasionally warn + if contents.len() < size as usize { + if metrics.packets_dropped_total.get() % 1000 == 0 { + tracing::warn!(count = ?metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", size); + } + metrics.packets_dropped_total.inc(); + + false + } else { + true + } +} + +/// Capture from the start of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub struct Prefix { + /// Whether captured bytes are removed from the original packet. + #[serde(default)] + pub remove: bool, + /// The number of bytes to capture. + pub size: u32, +} + +impl super::CaptureStrategy for Prefix { + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option { + is_valid_size(contents, self.size, metrics).then(|| { + if self.remove { + Value::Bytes(contents.drain(..self.size as usize).collect()) + } else { + Value::Bytes(contents.iter().take(self.size as usize).copied().collect()) + } + }) + } +} + +/// Capture from the end of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub struct Suffix { + /// Whether captured bytes are removed from the original packet. + pub size: u32, + /// The number of bytes to capture. + #[serde(default)] + pub remove: bool, +} + +impl super::CaptureStrategy for Suffix { + fn capture(&self, contents: &mut Vec, metrics: &Metrics) -> Option { + is_valid_size(contents, self.size, metrics).then(|| { + let index = contents.len() - self.size as usize; + + if self.remove { + Value::Bytes(contents.split_off(index).into()) + } else { + Value::Bytes(contents.iter().skip(index).copied().collect()) + } + }) + } +} diff --git a/src/filters/capture/config.rs b/src/filters/capture/config.rs new file mode 100644 index 0000000000..139b826f65 --- /dev/null +++ b/src/filters/capture/config.rs @@ -0,0 +1,252 @@ +/* + * Copyright 2021 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::TryFrom; + +use serde::{Deserialize, Serialize}; + +use super::{proto, Prefix, Regex, Suffix}; +use crate::filters::{metadata::CAPTURED_BYTES, ConvertProtoConfigError}; + +/// Strategy to apply for acquiring a set of bytes in the UDP packet +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "kind")] +pub enum Strategy { + /// Looks for the set of bytes at the beginning of the packet + #[serde(rename = "PREFIX")] + Prefix(Prefix), + /// Look for the set of bytes at the end of the packet + #[serde(rename = "SUFFIX")] + Suffix(Suffix), + /// Look for the set of bytes at the end of the packet + #[serde(rename = "REGEX")] + Regex(Regex), +} + +impl Strategy { + pub fn into_capture(self) -> Box { + match self { + Self::Prefix(value) => Box::from(value), + Self::Suffix(value) => Box::from(value), + Self::Regex(value) => Box::from(value), + } + } +} + +#[derive(Debug, PartialEq)] +pub struct Config { + /// The key to use when storing the captured value in the filter context. + /// If a match was found it is available + /// under `{{metadata_key}}/is_present`. + pub metadata_key: String, + /// The capture strategy. + pub strategy: Strategy, +} + +impl Serialize for Config { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + + let mut s = serializer.serialize_struct("Config", 2)?; + s.serialize_field("metadataKey", &self.metadata_key)?; + match &self.strategy { + Strategy::Prefix(value) => s.serialize_field("prefix", value)?, + Strategy::Suffix(value) => s.serialize_field("suffix", value)?, + Strategy::Regex(value) => s.serialize_field("regex", value)?, + } + + s.end() + } +} + +impl<'de> serde::Deserialize<'de> for Config { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Field { + #[serde(rename = "metadataKey")] + MetadataKey, + Prefix, + Suffix, + Regex, + } + + struct ConfigVisitor; + + impl<'de> serde::de::Visitor<'de> for ConfigVisitor { + type Value = Config; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("Capture config") + } + + fn visit_map(self, mut map: V) -> Result + where + V: serde::de::MapAccess<'de>, + { + let mut metadata_key = None; + let mut strategy = None; + let strategy_exists_err = || { + Err(serde::de::Error::custom( + "Multiple strategies found, only one capture strategy is permitted", + )) + }; + + while let Some(key) = map.next_key()? { + match key { + Field::MetadataKey => { + if metadata_key.is_some() { + return Err(serde::de::Error::duplicate_field("metadataKey")); + } + + metadata_key = Some(map.next_value()?); + } + + Field::Prefix => { + if strategy.is_some() { + return (strategy_exists_err)(); + } + + strategy = Some(Strategy::Prefix(map.next_value()?)); + } + + Field::Suffix => { + if strategy.is_some() { + return (strategy_exists_err)(); + } + + strategy = Some(Strategy::Suffix(map.next_value()?)); + } + + Field::Regex => { + if strategy.is_some() { + return (strategy_exists_err)(); + } + + strategy = Some(Strategy::Regex(map.next_value()?)); + } + } + } + + let metadata_key = metadata_key.unwrap_or_else(|| CAPTURED_BYTES.into()); + let strategy = strategy.ok_or_else(|| { + serde::de::Error::custom( + "Capture strategy of `regex`, `suffix`, or `prefix` is required", + ) + })?; + + Ok(Config { + metadata_key, + strategy, + }) + } + } + + deserializer.deserialize_map(ConfigVisitor) + } +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: proto::Capture) -> Result { + let strategy = p + .strategy + .ok_or_else(|| ConvertProtoConfigError::new("Missing", Some("strategy".into())))?; + + Ok(Self { + metadata_key: p.metadata_key.ok_or_else(|| { + ConvertProtoConfigError::new("Missing", Some("metadata_key".into())) + })?, + strategy: strategy.try_into()?, + }) + } +} + +impl TryFrom for Strategy { + type Error = ConvertProtoConfigError; + + fn try_from(p: proto::capture::Strategy) -> Result { + use proto::capture; + + Ok(match p { + capture::Strategy::Prefix(prefix) => Self::Prefix(Prefix { + size: prefix.size, + remove: prefix.remove.unwrap_or_default(), + }), + capture::Strategy::Suffix(suffix) => Self::Suffix(Suffix { + size: suffix.size, + remove: suffix.remove.unwrap_or_default(), + }), + capture::Strategy::Regex(regex) => { + let regex = regex.regex.ok_or_else(|| { + ConvertProtoConfigError::new("Missing", Some("Regex.regex".into())) + })?; + Self::Regex(Regex { + pattern: regex.parse().map_err(|error: regex::Error| { + ConvertProtoConfigError::new(error.to_string(), Some("Regex.regex".into())) + })?, + }) + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryFrom; + + #[test] + fn convert_proto_config() { + let test_cases = vec![( + "should succeed when all valid values are provided", + proto::Capture { + strategy: Some(proto::capture::Strategy::Suffix(proto::capture::Suffix { + size: 42, + remove: Some(true), + })), + metadata_key: Some("foobar".into()), + }, + Some(Config { + metadata_key: "foobar".into(), + strategy: Strategy::Suffix(Suffix { + size: 42, + remove: true, + }), + }), + )]; + + for (name, proto_config, expected) in test_cases { + let result = Config::try_from(proto_config); + assert_eq!( + result.is_err(), + expected.is_none(), + "{}: error expectation does not match", + name + ); + if let Some(expected) = expected { + assert_eq!(expected, result.unwrap(), "{}", name); + } + } + } +} diff --git a/src/filters/capture_bytes/metrics.rs b/src/filters/capture/metrics.rs similarity index 93% rename from src/filters/capture_bytes/metrics.rs rename to src/filters/capture/metrics.rs index 75393fe836..b400ee723f 100644 --- a/src/filters/capture_bytes/metrics.rs +++ b/src/filters/capture/metrics.rs @@ -20,8 +20,8 @@ use prometheus::{IntCounter, Registry}; use crate::metrics::{filter_opts, CollectorExt}; /// Register and manage metrics for this filter -pub(super) struct Metrics { - pub(super) packets_dropped_total: GenericCounter, +pub struct Metrics { + pub packets_dropped_total: GenericCounter, } impl Metrics { diff --git a/src/filters/capture/regex.rs b/src/filters/capture/regex.rs new file mode 100644 index 0000000000..e8ee3da42f --- /dev/null +++ b/src/filters/capture/regex.rs @@ -0,0 +1,33 @@ +use crate::metadata::Value; + +use super::Metrics; + +/// Capture from the start of the packet. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Regex { + /// The regular expression to use for capture. + #[serde(with = "serde_regex")] + pub pattern: regex::bytes::Regex, +} + +impl super::CaptureStrategy for Regex { + fn capture(&self, contents: &mut Vec, _metrics: &Metrics) -> Option { + let matches = self + .pattern + .find_iter(contents) + .map(|mat| Value::Bytes(bytes::Bytes::copy_from_slice(mat.as_bytes()))) + .collect::>(); + + if matches.len() > 1 { + Some(Value::List(matches)) + } else { + matches.into_iter().next() + } + } +} + +impl PartialEq for Regex { + fn eq(&self, rhs: &Self) -> bool { + self.pattern.as_str() == rhs.pattern.as_str() + } +} diff --git a/src/filters/capture_bytes/capture.rs b/src/filters/capture_bytes/capture.rs deleted file mode 100644 index cf8c4abaad..0000000000 --- a/src/filters/capture_bytes/capture.rs +++ /dev/null @@ -1,37 +0,0 @@ -/// Trait to implement different strategies for capturing packet data -pub trait Capture { - /// Capture the packet data from the contents. If remove is true, contents will be altered to - /// not have the retrieved set of bytes. - /// Returns the captured bytes. - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec; -} - -/// Capture from the end of the packet. -pub struct Suffix; - -impl Capture for Suffix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.split_off(contents.len() - size); - } - - contents - .iter() - .skip(contents.len() - size) - .cloned() - .collect::>() - } -} - -/// Capture from the start of the packet. -pub struct Prefix; - -impl Capture for Prefix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.drain(..size).collect(); - } - - contents.iter().cloned().take(size).collect() - } -} diff --git a/src/filters/capture_bytes/config.rs b/src/filters/capture_bytes/config.rs deleted file mode 100644 index 4c904b7a57..0000000000 --- a/src/filters/capture_bytes/config.rs +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright 2021 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::convert::TryFrom; - -use serde::{Deserialize, Serialize}; - -use super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::Strategy as ProtoStrategy, CaptureBytes as ProtoConfig, -}; -use crate::filters::{metadata::CAPTURED_BYTES, ConvertProtoConfigError}; -use crate::map_proto_enum; - -use super::capture::{Capture, Prefix, Suffix}; - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -/// Strategy to apply for acquiring a set of bytes in the UDP packet -pub enum Strategy { - #[serde(rename = "PREFIX")] - /// Looks for the set of bytes at the beginning of the packet - Prefix, - #[serde(rename = "SUFFIX")] - /// Look for the set of bytes at the end of the packet - Suffix, -} - -impl Strategy { - pub(crate) fn as_capture(&self) -> Box { - match self { - Self::Prefix => Box::new(Prefix {}), - Self::Suffix => Box::new(Suffix {}), - } - } -} - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -pub struct Config { - #[serde(default)] - pub strategy: Strategy, - /// the number of bytes to capture - #[serde(rename = "size")] - pub size: usize, - /// the key to use when storing the captured bytes in the filter context - #[serde(rename = "metadataKey")] - #[serde(default = "default_metadata_key")] - pub metadata_key: String, - /// whether or not to remove the set of the bytes from the packet once captured - #[serde(default = "default_remove")] - pub remove: bool, -} - -/// default value for [`Config::remove`]. -fn default_remove() -> bool { - false -} - -/// default value for the context key in the Config -fn default_metadata_key() -> String { - CAPTURED_BYTES.into() -} - -impl Default for Strategy { - fn default() -> Self { - Strategy::Suffix - } -} - -impl TryFrom for Config { - type Error = ConvertProtoConfigError; - - fn try_from(p: ProtoConfig) -> Result { - let strategy = p - .strategy - .map(|strategy| { - map_proto_enum!( - value = strategy.value, - field = "strategy", - proto_enum_type = ProtoStrategy, - target_enum_type = Strategy, - variants = [Suffix, Prefix] - ) - }) - .transpose()? - .unwrap_or_default(); - - Ok(Self { - strategy, - size: p.size as usize, - metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), - remove: p.remove.unwrap_or_else(default_remove), - }) - } -} - -#[cfg(test)] -mod tests { - use std::convert::TryFrom; - - use super::super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::{Strategy as ProtoStrategy, StrategyValue}, - CaptureBytes as ProtoConfig, - }; - use super::*; - - #[test] - fn convert_proto_config() { - let test_cases = vec![ - ( - "should succeed when all valid values are provided", - ProtoConfig { - strategy: Some(StrategyValue { - value: ProtoStrategy::Suffix as i32, - }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - Some(Config { - strategy: Strategy::Suffix, - size: 42, - metadata_key: "foobar".into(), - remove: true, - }), - ), - ( - "should fail when invalid strategy is provided", - ProtoConfig { - strategy: Some(StrategyValue { value: 42 }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - None, - ), - ( - "should use correct default values", - ProtoConfig { - strategy: None, - size: 42, - metadata_key: None, - remove: None, - }, - Some(Config { - strategy: Strategy::default(), - size: 42, - metadata_key: default_metadata_key(), - remove: default_remove(), - }), - ), - ]; - for (name, proto_config, expected) in test_cases { - let result = Config::try_from(proto_config); - assert_eq!( - result.is_err(), - expected.is_none(), - "{}: error expectation does not match", - name - ); - if let Some(expected) = expected { - assert_eq!(expected, result.unwrap(), "{}", name); - } - } - } -} diff --git a/src/filters/capture_bytes/proto.rs b/src/filters/capture_bytes/proto.rs deleted file mode 100644 index 2c33e22d33..0000000000 --- a/src/filters/capture_bytes/proto.rs +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// Protobuf config for this filter. -pub(super) mod quilkin { - pub mod extensions { - pub mod filters { - pub mod capture_bytes { - pub mod v1alpha1 { - #![doc(hidden)] - tonic::include_proto!("quilkin.extensions.filters.capture_bytes.v1alpha1"); - } - } - } - } -} diff --git a/src/filters/metadata.rs b/src/filters/metadata.rs index f956c60af7..803d6ba967 100644 --- a/src/filters/metadata.rs +++ b/src/filters/metadata.rs @@ -16,7 +16,7 @@ //! Well known dynamic metadata used by Quilkin. -/// The default key under which the [`super::capture_bytes`] filter puts the +/// The default key under which the [`super::capture`] filter puts the /// byte slices it extracts from each packet. /// - **Type** `Vec` -pub const CAPTURED_BYTES: &str = "quilkin.dev/captured_bytes"; +pub const CAPTURED_BYTES: &str = "quilkin.dev/capture"; diff --git a/src/filters/read.rs b/src/filters/read.rs index 57cf07f4bd..e98b459d0f 100644 --- a/src/filters/read.rs +++ b/src/filters/read.rs @@ -76,6 +76,7 @@ impl From for ReadResponse { /// Some(ctx.into()) /// } /// ``` +#[derive(Debug)] #[non_exhaustive] pub struct ReadResponse { /// The upstream endpoints that the packet should be forwarded to. diff --git a/src/filters/set.rs b/src/filters/set.rs index 7752fe7496..75fd683157 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -37,7 +37,7 @@ impl FilterSet { /// - [`local_rate_limit`][filters::local_rate_limit] /// - [`concatenate_bytes`][filters::concatenate_bytes] /// - [`load_balancer`][filters::load_balancer] - /// - [`capture_bytes`][filters::capture_bytes] + /// - [`capture`][filters::capture] /// - [`token_router`][filters::token_router] /// - [`compress`][filters::compress] pub fn default() -> Self { @@ -52,7 +52,7 @@ impl FilterSet { pub fn default_with(filters: impl IntoIterator) -> Self { Self::with( std::array::IntoIter::new([ - filters::capture_bytes::factory(), + filters::capture::factory(), filters::compress::factory(), filters::concatenate_bytes::factory(), filters::debug::factory(), diff --git a/src/lib.rs b/src/lib.rs index 50662c32d6..df02462b57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ mod external_doc_tests { #![doc = include_str!("../docs/src/filters/local_rate_limit.md")] #![doc = include_str!("../docs/src/filters/debug.md")] #![doc = include_str!("../docs/src/filters/concatenate_bytes.md")] - #![doc = include_str!("../docs/src/filters/capture_bytes.md")] + #![doc = include_str!("../docs/src/filters/capture.md")] #![doc = include_str!("../docs/src/filters/token_router.md")] #![doc = include_str!("../docs/src/filters/compress.md")] #![doc = include_str!("../docs/src/filters/firewall.md")] diff --git a/tests/capture.rs b/tests/capture.rs new file mode 100644 index 0000000000..ac22ed0aac --- /dev/null +++ b/tests/capture.rs @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use tokio::time::{timeout, Duration}; + +use quilkin::{ + config::{Builder, Filter}, + endpoint::Endpoint, + filters::{capture, token_router}, + metadata::MetadataView, + test_utils::TestHelper, +}; + +/// This test covers both token_router and capture filters, +/// since they work in concert together. +#[tokio::test] +async fn token_router() { + let mut t = TestHelper::default(); + let echo = t.run_echo_server().await; + + let capture_yaml = r#" +regex: + pattern: .{3}$ +"#; + let endpoint_metadata = " +quilkin.dev: + tokens: + - YWJj # abc + "; + let server_port = 12348; + let server_config = Builder::empty() + .with_port(server_port) + .with_static( + vec![ + Filter { + name: capture::factory().name().into(), + config: serde_yaml::from_str(capture_yaml).unwrap(), + }, + Filter { + name: token_router::factory().name().into(), + config: None, + }, + ], + vec![Endpoint::with_metadata( + echo, + serde_yaml::from_str::>(endpoint_metadata).unwrap(), + )], + ) + .build(); + t.run_server_with_config(server_config); + + // valid packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); + let msg = b"helloabc"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "helloabc", + timeout(Duration::from_secs(5), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // send an invalid packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); + + let result = timeout(Duration::from_secs(3), recv_chan.recv()).await; + assert!(result.is_err(), "should not have received a packet"); +} diff --git a/tests/matches.rs b/tests/matches.rs index d54d64bf0d..d033d839eb 100644 --- a/tests/matches.rs +++ b/tests/matches.rs @@ -21,7 +21,7 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::{Builder, Filter}, endpoint::Endpoint, - filters::{capture_bytes, matches}, + filters::{capture, matches}, test_utils::TestHelper, }; @@ -31,13 +31,14 @@ async fn matches() { let echo = t.run_echo_server().await; let capture_yaml = " -size: 3 -remove: true +suffix: + size: 3 + remove: true "; let matches_yaml = " on_read: - metadataKey: quilkin.dev/captured_bytes + metadataKey: quilkin.dev/capture fallthrough: filter: quilkin.extensions.filters.concatenate_bytes.v1alpha1.ConcatenateBytes config: @@ -61,7 +62,7 @@ on_read: .with_static( vec![ Filter { - name: capture_bytes::factory().name().into(), + name: capture::factory().name().into(), config: serde_yaml::from_str(capture_yaml).unwrap(), }, Filter { diff --git a/tests/token_router.rs b/tests/token_router.rs index 08ebffee99..c45f573ea6 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -21,12 +21,12 @@ use tokio::time::{timeout, Duration}; use quilkin::{ config::{Builder, Filter}, endpoint::Endpoint, - filters::{capture_bytes, token_router}, + filters::{capture, token_router}, metadata::MetadataView, test_utils::TestHelper, }; -/// This test covers both token_router and capture_bytes filters, +/// This test covers both token_router and capture filters, /// since they work in concert together. #[tokio::test] async fn token_router() { @@ -34,8 +34,9 @@ async fn token_router() { let echo = t.run_echo_server().await; let capture_yaml = " -size: 3 -remove: true +suffix: + size: 3 + remove: true "; let endpoint_metadata = " quilkin.dev: @@ -48,7 +49,7 @@ quilkin.dev: .with_static( vec![ Filter { - name: capture_bytes::factory().name().into(), + name: capture::factory().name().into(), config: serde_yaml::from_str(capture_yaml).unwrap(), }, Filter {