Skip to content

Commit

Permalink
enhancement(lua transform): Add version configuration option (#2056)
Browse files Browse the repository at this point in the history
* enhancement(lua transform): Add "version" configuration option

Signed-off-by: Alexander Rodin <[email protected]>

* Don't expose internal typetag names `lua_v1` and `lua_v1` to users

Signed-off-by: Alexander Rodin <[email protected]>

* Add `version` configuration option to the documentation

Signed-off-by: Alexander Rodin <[email protected]>

* Use independent implementations of `rlua::UserData` for `Event`

Signed-off-by: Alexander Rodin <[email protected]>

* Fix benchmarks

Signed-off-by: Alexander Rodin <[email protected]>

* Benchmark both version 1 and version 2

Signed-off-by: Alexander Rodin <[email protected]>

* Don't show version 2 in the docs

Signed-off-by: Alexander Rodin <[email protected]>

* Add behavior tests for `version` configuration option

Signed-off-by: Alexander Rodin <[email protected]>

* Update CODEOWNERS to match the new directory structure

Signed-off-by: Alexander Rodin <[email protected]>
  • Loading branch information
a-rodin authored Mar 16, 2020
1 parent e410454 commit 874c0c0
Show file tree
Hide file tree
Showing 9 changed files with 755 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
/src/transforms/grok_parser.rs @lukesteensen
/src/transforms/json_parser.rs @LucioFranco
/src/transforms/log_to_metric.rs @lukesteensen
/src/transforms/lua.rs @a-rodin
/src/transforms/lua/ @a-rodin
/src/transforms/merge.rs @MOZGIII
/src/transforms/regex_parser.rs @lukesteensen
/src/transforms/remove_fields.rs @LucioFranco
Expand Down
8 changes: 8 additions & 0 deletions .meta/transforms/lua.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ requirements = {}

<%= render("_partials/_component_options.toml", type: "transform", name: "lua") %>

[transforms.lua.options.version]
type = "string"
common = true
required = false
description = "transform API version"
default = "1"
enum = { 1 = "transform API version 1" }

[transforms.lua.options.source]
type = "string"
common = true
Expand Down
60 changes: 50 additions & 10 deletions benches/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ fn add_fields(c: &mut Criterion) {
let key = "the key";
let value = "this is the value";

let key_atom = key.into();
let value_bytes = value.into();
let key_atom2 = key.into();
let value_bytes2 = value.into();
let key_atom_native = key.into();
let value_bytes_native = value.into();
let key_atom_v1 = key.into();
let value_bytes_v1 = value.into();
let key_atom_v2 = key.into();
let value_bytes_v2 = value.into();

c.bench(
"lua_add_fields",
Expand All @@ -30,22 +32,37 @@ fn add_fields(c: &mut Criterion) {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[&key_atom], value_bytes);
assert_eq!(event.as_log()[&key_atom_native], value_bytes_native);
}
},
)
})
.with_function("lua", move |b| {
.with_function("v1", move |b| {
b.iter_with_setup(
|| {
let source = format!("event['{}'] = '{}'", key, value);
transforms::lua::Lua::new(&source, vec![]).unwrap()
transforms::lua::v1::Lua::new(&source, vec![]).unwrap()
},
|mut transform| {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[&key_atom2], value_bytes2);
assert_eq!(event.as_log()[&key_atom_v1], value_bytes_v1);
}
},
)
})
.with_function("v2", move |b| {
b.iter_with_setup(
|| {
let source = format!("event['{}'] = '{}'", key, value);
transforms::lua::v2::Lua::new(&source, vec![]).unwrap()
},
|mut transform| {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[&key_atom_v2], value_bytes_v2);
}
},
)
Expand Down Expand Up @@ -83,15 +100,38 @@ fn field_filter(c: &mut Criterion) {
},
)
})
.with_function("lua", move |b| {
.with_function("v1", move |b| {
b.iter_with_setup(
|| {
let source = r#"
if event["the_field"] ~= "0" then
event = nil
end
"#;
transforms::lua::v1::Lua::new(&source, vec![]).unwrap()
},
|mut transform| {
let num = (0..num_events)
.map(|i| {
let mut event = Event::new_empty_log();
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
})
.filter_map(|r| transform.transform(r))
.count();
assert_eq!(num, num_events / 10);
},
)
})
.with_function("v2", move |b| {
b.iter_with_setup(
|| {
let source = r#"
if event["the_field"] ~= "0" then
event = nil
end
"#;
transforms::lua::Lua::new(&source, vec![]).unwrap()
transforms::lua::v2::Lua::new(&source, vec![]).unwrap()
},
|mut transform| {
let num = (0..num_events)
Expand Down
8 changes: 8 additions & 0 deletions config/vector.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,14 @@ end
# * type: [string]
search_dirs = ["/etc/vector/lua"]

# transform API version
#
# * optional
# * default: "1"
# * type: string
# * must be: "1" (if supplied)
version = "1"

# Accepts and outputs `log` events allowing you to merge partial log events into a single event.
[transforms.merge]
# The component type. This is a required field that tells Vector which
Expand Down
78 changes: 78 additions & 0 deletions src/transforms/lua/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
pub mod v1;
pub mod v2;

use crate::{
topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
transforms::Transform,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
enum V1 {
#[serde(rename = "1")]
V1,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct LuaConfigV1 {
version: Option<V1>,
#[serde(flatten)]
config: v1::LuaConfig,
}

#[derive(Serialize, Deserialize, Debug)]
enum V2 {
#[serde(rename = "2")]
V2,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct LuaConfigV2 {
version: V2,
#[serde(flatten)]
config: v2::LuaConfig,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum LuaConfig {
V1(LuaConfigV1),
V2(LuaConfigV2),
}

inventory::submit! {
TransformDescription::new_without_default::<LuaConfig>("lua")
}

#[typetag::serde(name = "lua")]
impl TransformConfig for LuaConfig {
fn build(&self, cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
match self {
LuaConfig::V1(v1) => v1.config.build(cx),
LuaConfig::V2(v2) => v2.config.build(cx),
}
}

fn input_type(&self) -> DataType {
match self {
LuaConfig::V1(v1) => v1.config.input_type(),
LuaConfig::V2(v2) => v2.config.input_type(),
}
}

fn output_type(&self) -> DataType {
match self {
LuaConfig::V1(v1) => v1.config.output_type(),
LuaConfig::V2(v2) => v2.config.output_type(),
}
}

fn transform_type(&self) -> &'static str {
match self {
LuaConfig::V1(v1) => v1.config.transform_type(),
LuaConfig::V2(v2) => v2.config.transform_type(),
}
}
}
68 changes: 41 additions & 27 deletions src/transforms/lua.rs → src/transforms/lua/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Transform;
use crate::{
event::{Event, Value},
topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
topology::config::{DataType, TransformContext},
transforms::Transform,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
Expand All @@ -20,28 +20,29 @@ pub struct LuaConfig {
search_dirs: Vec<String>,
}

inventory::submit! {
TransformDescription::new_without_default::<LuaConfig>("lua")
}

#[typetag::serde(name = "lua")]
impl TransformConfig for LuaConfig {
fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
// Implementation of methods from `TransformConfig`
// Note that they are implemented as struct methods instead of trait implementation methods
// because `TransformConfig` trait requires specification of a unique `typetag::serde` name.
// Specifying some name (for example, "lua_v*") results in this name being listed among
// possible configuration options for `transforms` section, but such internal name should not
// be exposed to users.
impl LuaConfig {
pub fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
Lua::new(&self.source, self.search_dirs.clone()).map(|l| {
let b: Box<dyn Transform> = Box::new(l);
b
})
}

fn input_type(&self) -> DataType {
pub fn input_type(&self) -> DataType {
DataType::Log
}

fn output_type(&self) -> DataType {
pub fn output_type(&self) -> DataType {
DataType::Log
}

fn transform_type(&self) -> &'static str {
pub fn transform_type(&self) -> &'static str {
"lua"
}
}
Expand All @@ -59,6 +60,13 @@ pub struct Lua {
invocations_after_gc: usize,
}

// This wrapping structure is added in order to make it possible to have independent implementations
// of `rlua::UserData` trait for event in version 1 and version 2 of the transform.
#[derive(Clone)]
struct LuaEvent {
inner: Event,
}

impl Lua {
pub fn new(source: &str, search_dirs: Vec<String>) -> crate::Result<Self> {
let lua = rlua::Lua::new();
Expand Down Expand Up @@ -95,11 +103,13 @@ impl Lua {
let result = self.lua.context(|ctx| {
let globals = ctx.globals();

globals.set("event", event)?;
globals.set("event", LuaEvent { inner: event })?;

let func = ctx.named_registry_value::<_, rlua::Function<'_>>("vector_func")?;
func.call(())?;
globals.get::<_, Option<Event>>("event")
globals
.get::<_, Option<LuaEvent>>("event")
.map(|option| option.map(|lua_event| lua_event.inner))
});
self.invocations_after_gc += 1;
if self.invocations_after_gc % GC_INTERVAL == 0 {
Expand All @@ -122,26 +132,26 @@ impl Transform for Lua {
}
}

impl rlua::UserData for Event {
impl rlua::UserData for LuaEvent {
fn add_methods<'lua, M: rlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method_mut(
rlua::MetaMethod::NewIndex,
|_ctx, this, (key, value): (String, Option<rlua::Value<'lua>>)| {
match value {
Some(rlua::Value::String(string)) => {
this.as_mut_log().insert(key, string.as_bytes());
this.inner.as_mut_log().insert(key, string.as_bytes());
}
Some(rlua::Value::Integer(integer)) => {
this.as_mut_log().insert(key, Value::Integer(integer));
this.inner.as_mut_log().insert(key, Value::Integer(integer));
}
Some(rlua::Value::Number(number)) => {
this.as_mut_log().insert(key, Value::Float(number));
this.inner.as_mut_log().insert(key, Value::Float(number));
}
Some(rlua::Value::Boolean(boolean)) => {
this.as_mut_log().insert(key, Value::Boolean(boolean));
this.inner.as_mut_log().insert(key, Value::Boolean(boolean));
}
Some(rlua::Value::Nil) | None => {
this.as_mut_log().remove(&key.into());
this.inner.as_mut_log().remove(&key.into());
}
_ => {
info!(
Expand All @@ -150,7 +160,7 @@ impl rlua::UserData for Event {
field = key.as_str(),
rate_limit_secs = 30
);
this.as_mut_log().remove(&key.into());
this.inner.as_mut_log().remove(&key.into());
}
}

Expand All @@ -159,29 +169,33 @@ impl rlua::UserData for Event {
);

methods.add_meta_method(rlua::MetaMethod::Index, |ctx, this, key: String| {
if let Some(value) = this.as_log().get(&key.into()) {
if let Some(value) = this.inner.as_log().get(&key.into()) {
let string = ctx.create_string(&value.as_bytes())?;
Ok(Some(string))
} else {
Ok(None)
}
});

methods.add_meta_function(rlua::MetaMethod::Pairs, |ctx, event: Event| {
methods.add_meta_function(rlua::MetaMethod::Pairs, |ctx, event: LuaEvent| {
let state = ctx.create_table()?;
{
let keys =
ctx.create_table_from(event.as_log().keys().map(|k| (k.to_string(), true)))?;
let keys = ctx.create_table_from(
event.inner.as_log().keys().map(|k| (k.to_string(), true)),
)?;
state.set("event", event)?;
state.set("keys", keys)?;
}
let function =
ctx.create_function(|ctx, (state, prev): (rlua::Table, Option<String>)| {
let event: Event = state.get("event")?;
let event: LuaEvent = state.get("event")?;
let keys: rlua::Table = state.get("keys")?;
let next: rlua::Function = ctx.globals().get("next")?;
let key: Option<String> = next.call((keys, prev))?;
match key.clone().and_then(|k| event.as_log().get(&k.into())) {
match key
.clone()
.and_then(|k| event.inner.as_log().get(&k.into()))
{
Some(value) => Ok((key, Some(ctx.create_string(&value.as_bytes())?))),
None => Ok((None, None)),
}
Expand Down
Loading

0 comments on commit 874c0c0

Please sign in to comment.