Skip to content

Commit

Permalink
WIP: Add regex capture and rename capture_bytes to capture
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Dec 24, 2021
1 parent a31ec4a commit c3e6877
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 153 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ tryhard = "0.4.0"
eyre = "0.6.5"
stable-eyre = "0.2.2"
ipnetwork = "0.18.0"
regex = "1.5.4"
serde_regex = "1.1.0"

[target.'cfg(target_os = "linux")'.dependencies]
sys-info = "0.9.0"
Expand Down
2 changes: 1 addition & 1 deletion src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod write;
pub(crate) mod chain;
pub(crate) mod manager;

pub mod capture_bytes;
pub mod capture;
pub mod compress;
pub mod concatenate_bytes;
pub mod debug;
Expand Down
105 changes: 55 additions & 50 deletions src/filters/capture_bytes.rs → src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,69 +14,69 @@
* limitations under the License.
*/

mod capture;
mod affix;
mod config;
mod metrics;
mod proto;
mod regex;

use std::sync::Arc;

use tracing::warn;
use crate::{filters::prelude::*, metadata::{DynamicMetadata, Value}};

use crate::{filters::prelude::*, metadata::Value};

use capture::Capture;
use metrics::Metrics;
use proto::quilkin::extensions::filters::capture_bytes::v1alpha1::CaptureBytes as ProtoConfig;
use self::{
affix::{Suffix, Prefix},
metrics::Metrics,
proto::quilkin::extensions::filters::capture_bytes::v1alpha1::CaptureBytes as ProtoConfig,
regex::Regex,
};

pub use config::{Config, Strategy};

pub const NAME: &str = "quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes";
pub const NAME: &str = "quilkin.extensions.filters.capture.v1alpha1.Capture";

/// Creates a new factory for generating capture filters.
pub fn factory() -> DynFilterFactory {
Box::from(CaptureBytesFactory::new())
}

struct CaptureBytes {
capture: Box<dyn Capture + Sync + Send>,
/// Trait to implement different strategies for capturing packet data.
pub trait CaptureStrategy {
/// Capture packet data from the contents, and optionally returns a value if
/// anything was captured.
fn capture(&self, contents: &mut Vec<u8>, metrics: &Metrics) -> Option<Value>;
}

struct Capture {
capture: Box<dyn CaptureStrategy + Sync + Send>,
/// metrics reporter for this filter.
metrics: Metrics,
metadata_key: Arc<String>,
size: usize,
remove: bool,
}

impl CaptureBytes {
impl Capture {
fn new(config: Config, metrics: Metrics) -> Self {
CaptureBytes {
capture: config.strategy.as_capture(),
Self {
capture: config.strategy.into_capture(),
metrics,
metadata_key: Arc::new(config.metadata_key),
size: config.size,
remove: config.remove,
}
}
}

impl Filter for CaptureBytes {
impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
// if the capture size is bigger than the packet size, then we drop the packet,
// and occasionally warn
if ctx.contents.len() < self.size {
if self.metrics.packets_dropped_total.get() % 1000 == 0 {
warn!(count = ?self.metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", self.size);
}
self.metrics.packets_dropped_total.inc();
return None;
}
let token = self
.capture
.capture(&mut ctx.contents, self.size, self.remove);

ctx.metadata
.insert(self.metadata_key.clone(), Value::Bytes(token.into()));
let capture = self.capture.capture(&mut ctx.contents, &self.metrics);
let mut capture_key = (*self.metadata_key).clone();
capture_key += "/captured";

ctx.metadata.insert(Arc::from(capture_key), Value::Bool(capture.is_some()));

if let Some(value) = capture {
ctx.metadata.insert(self.metadata_key.clone(), value);
}

Some(ctx.into())
}
Expand All @@ -98,7 +98,7 @@ impl FilterFactory for CaptureBytesFactory {
let (config_json, config) = self
.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name())?;
let filter = CaptureBytes::new(config, Metrics::new(&args.metrics_registry)?);
let filter = Capture::new(config, Metrics::new(&args.metrics_registry)?);
Ok(FilterInstance::new(
config_json,
Box::new(filter) as Box<dyn Filter>,
Expand All @@ -116,18 +116,16 @@ mod tests {
use crate::endpoint::{Endpoint, Endpoints};
use crate::test_utils::assert_write_no_change;

use super::{CaptureBytes, CaptureBytesFactory, Config, Metrics, Strategy};

use super::capture::{Capture, Prefix, Suffix};
use super::*;

use crate::filters::{
metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext,
};

const TOKEN_KEY: &str = "TOKEN";

fn capture_bytes(config: Config) -> CaptureBytes {
CaptureBytes::new(config, Metrics::new(&Registry::default()).unwrap())
fn capture_bytes(config: Config) -> Capture {
Capture::new(config, Metrics::new(&Registry::default()).unwrap())
}

#[test]
Expand Down Expand Up @@ -186,22 +184,25 @@ mod tests {
#[test]
fn read() {
let config = Config {
strategy: Strategy::Suffix,
metadata_key: TOKEN_KEY.into(),
size: 3,
remove: true,
strategy: Strategy::Suffix(Suffix {
size: 3,
remove: true,
}),
};

let filter = capture_bytes(config);
assert_end_strategy(&filter, TOKEN_KEY, true);
}

#[test]
fn read_overflow_capture_size() {
let config = Config {
strategy: Strategy::Suffix,
metadata_key: TOKEN_KEY.into(),
size: 99,
remove: true,
strategy: Strategy::Suffix(Suffix {
size: 99,
remove: true,
}),
};
let filter = capture_bytes(config);
let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())];
Expand All @@ -219,24 +220,28 @@ mod tests {
#[test]
fn write() {
let config = Config {
strategy: Strategy::Suffix,
strategy: Strategy::Suffix(Suffix { size: 0, remove: false }),
metadata_key: TOKEN_KEY.into(),
size: 0,
remove: false,
};
let filter = capture_bytes(config);
assert_write_no_change(&filter);
}

#[test]
fn end_capture() {
let end = Suffix {};
let metrics = Metrics::new(&Registry::default()).unwrap();
let mut end = Suffix {
size: 3,
remove: false,
};
let mut contents = b"helloabc".to_vec();
let result = end.capture(&mut contents, 3, false);
let result = end.capture(&mut contents, &metrics).unwrap();
assert_eq!(b"abc".to_vec(), result);
assert_eq!(b"helloabc".to_vec(), contents);

let result = end.capture(&mut contents, 3, true);
end.remove = true;

let result = end.capture(&mut contents, &metrics).unwrap();
assert_eq!(b"abc".to_vec(), result);
assert_eq!(b"hello".to_vec(), contents);
}
Expand Down
63 changes: 63 additions & 0 deletions src/filters/capture/affix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::metadata::Value;

use super::Metrics;

fn is_valid_size(contents: &[u8], size: usize, metrics: &Metrics) -> bool {
// if the capture size is bigger than the packet size, then we drop the packet,
// and occasionally warn
if contents.len() < size {
if metrics.packets_dropped_total.get() % 1000 == 0 {
tracing::warn!(count = ?metrics.packets_dropped_total.get(), "Packets are being dropped due to their length being less than {} bytes", size);
}
metrics.packets_dropped_total.inc();

false
} else {
true
}
}

/// Capture from the start of the packet.
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
pub struct Prefix {
/// Whether captured bytes are removed from the original packet.
#[serde(default)]
pub remove: bool,
/// The number of bytes to capture.
pub size: usize,
}

impl super::CaptureStrategy for Prefix {
fn capture(&self, contents: &mut Vec<u8>, metrics: &Metrics) -> Option<Value> {
is_valid_size(contents, self.size, metrics).then(|| {
if self.remove {
Value::Bytes(contents.drain(..self.size).collect())
} else {
Value::Bytes(contents.iter().take(self.size).copied().collect())
}
})
}
}

/// Capture from the end of the packet.
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
pub struct Suffix {
/// Whether captured bytes are removed from the original packet.
pub size: usize,
/// The number of bytes to capture.
pub remove: bool,
}

impl super::CaptureStrategy for Suffix {
fn capture(&self, contents: &mut Vec<u8>, metrics: &Metrics) -> Option<Value> {
is_valid_size(contents, self.size, metrics).then(|| {
let index = contents.len() - self.size;

if self.remove {
Value::Bytes(contents.split_off(index).into())
} else {
Value::Bytes(contents.iter().skip(index).copied().collect())
}
})
}
}
Loading

0 comments on commit c3e6877

Please sign in to comment.