diff --git a/libbeat/processors/actions/add_fields.go b/libbeat/processors/actions/add_fields.go index 3a9f3625cee8..710810d2a52b 100644 --- a/libbeat/processors/actions/add_fields.go +++ b/libbeat/processors/actions/add_fields.go @@ -36,12 +36,13 @@ const FieldsKey = "fields" func init() { processors.RegisterPlugin("add_fields", - configChecked(createAddFields, + configChecked(CreateAddFields, requireFields(FieldsKey), allowedFields(FieldsKey, "target", "when"))) } -func createAddFields(c *common.Config) (processors.Processor, error) { +// CreateAddFields constructs an add_fields processor from config. +func CreateAddFields(c *common.Config) (processors.Processor, error) { config := struct { Fields common.MapStr `config:"fields" validate:"required"` Target *string `config:"target"` diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go index 95f39d5137b6..c57447ac6ee8 100644 --- a/libbeat/processors/actions/copy_fields.go +++ b/libbeat/processors/actions/copy_fields.go @@ -40,13 +40,14 @@ type copyFieldsConfig struct { func init() { processors.RegisterPlugin("copy_fields", - configChecked(newCopyFields, + configChecked(NewCopyFields, requireFields("fields"), ), ) } -func newCopyFields(c *common.Config) (processors.Processor, error) { +// NewCopyFields returns a new copy_fields processor. +func NewCopyFields(c *common.Config) (processors.Processor, error) { config := copyFieldsConfig{ IgnoreMissing: false, FailOnError: true, diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go index 7eded9bf23fe..2fc578e3cb82 100644 --- a/libbeat/processors/actions/rename.go +++ b/libbeat/processors/actions/rename.go @@ -45,11 +45,12 @@ type fromTo struct { func init() { processors.RegisterPlugin("rename", - configChecked(newRenameFields, + configChecked(NewRenameFields, requireFields("fields"))) } -func newRenameFields(c *common.Config) (processors.Processor, error) { +// NewRenameFields returns a new rename processor. +func NewRenameFields(c *common.Config) (processors.Processor, error) { config := renameFieldsConfig{ IgnoreMissing: false, FailOnError: true, diff --git a/libbeat/processors/actions/truncate_fields.go b/libbeat/processors/actions/truncate_fields.go index b6b5db6b69c3..70244b2cfa12 100644 --- a/libbeat/processors/actions/truncate_fields.go +++ b/libbeat/processors/actions/truncate_fields.go @@ -48,14 +48,15 @@ type truncater func(*truncateFields, []byte) ([]byte, bool, error) func init() { processors.RegisterPlugin("truncate_fields", - configChecked(newTruncateFields, + configChecked(NewTruncateFields, requireFields("fields"), mutuallyExclusiveRequiredFields("max_bytes", "max_characters"), ), ) } -func newTruncateFields(c *common.Config) (processors.Processor, error) { +// NewTruncateFields returns a new truncate_fields processor. +func NewTruncateFields(c *common.Config) (processors.Processor, error) { var config truncateFieldsConfig err := c.Unpack(&config) if err != nil { diff --git a/libbeat/processors/script/javascript/module/processor/chain.go b/libbeat/processors/script/javascript/module/processor/chain.go new file mode 100644 index 000000000000..f86300c213b4 --- /dev/null +++ b/libbeat/processors/script/javascript/module/processor/chain.go @@ -0,0 +1,183 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processor + +import ( + "github.com/dop251/goja" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/script/javascript" +) + +// chainBuilder builds a new processor chain. +type chainBuilder struct { + chain + runtime *goja.Runtime + this *goja.Object +} + +// newChainBuilder returns a javascript constructor that constructs a +// chainBuilder. +func newChainBuilder(runtime *goja.Runtime) func(call goja.ConstructorCall) *goja.Object { + return func(call goja.ConstructorCall) *goja.Object { + if len(call.Arguments) > 0 { + panic(runtime.NewGoError(errors.New("Chain accepts no arguments"))) + } + + c := &chainBuilder{runtime: runtime, this: call.This} + for name, fn := range constructors { + c.this.Set(name, c.makeBuilderFunc(fn)) + } + call.This.Set("Add", c.Add) + call.This.Set("Build", c.Build) + + return nil + } +} + +// makeBuilderFunc returns a javascript function that constructs a new native +// beat processor and adds it to the chain. +func (b *chainBuilder) makeBuilderFunc(constructor processors.Constructor) func(call goja.FunctionCall) goja.Value { + return func(call goja.FunctionCall) goja.Value { + return b.addProcessor(constructor, call) + } +} + +// addProcessor constructs a new native beat processor and adds it to the chain. +func (b *chainBuilder) addProcessor(constructor processors.Constructor, call goja.FunctionCall) *goja.Object { + p, err := newNativeProcessor(constructor, call) + if err != nil { + panic(b.runtime.NewGoError(err)) + } + + b.procs = append(b.procs, p) + return b.this +} + +// Add adds a processor to the chain. It requires one argument that can be +// either a beat processor or a javascript function. +func (b *chainBuilder) Add(call goja.FunctionCall) goja.Value { + a0 := call.Argument(0) + if goja.IsUndefined(a0) { + panic(b.runtime.NewGoError(errors.New("Add requires a processor object parameter, but got undefined"))) + } + + switch v := a0.Export().(type) { + case *beatProcessor: + b.procs = append(b.procs, v.p) + case func(goja.FunctionCall) goja.Value: + b.procs = append(b.procs, &jsProcessor{fn: v}) + default: + panic(b.runtime.NewGoError(errors.Errorf("arg0 must be a processor object, but got %T", a0.Export()))) + } + + return b.this +} + +// Build returns a processor consisting of the previously added processors. +func (b *chainBuilder) Build(call goja.FunctionCall) goja.Value { + if len(b.procs) == 0 { + b.runtime.NewGoError(errors.New("no processors have been added to the chain")) + } + + p := &beatProcessor{b.runtime, &b.chain} + o := b.runtime.NewObject() + o.Set("Run", p.Run) + return o +} + +type gojaCall interface { + Argument(idx int) goja.Value +} + +type jsFunction func(call goja.FunctionCall) goja.Value + +type processor interface { + run(event javascript.Event) error +} + +// jsProcessor is a javascript function that accepts the event as a parameter. +type jsProcessor struct { + fn jsFunction + call goja.FunctionCall +} + +func (p *jsProcessor) run(event javascript.Event) error { + p.call.Arguments = p.call.Arguments[0:] + p.call.Arguments = append(p.call.Arguments, event.JSObject()) + p.fn(p.call) + return nil +} + +// nativeProcessor is a normal Beat processor. +type nativeProcessor struct { + processors.Processor +} + +func newNativeProcessor(constructor processors.Constructor, call gojaCall) (processor, error) { + var config *common.Config + + if a0 := call.Argument(0); !goja.IsUndefined(a0) { + var err error + config, err = common.NewConfigFrom(a0.Export()) + if err != nil { + return nil, err + } + } else { + // No config so use an empty config. + config = common.NewConfig() + } + + p, err := constructor(config) + if err != nil { + return nil, err + } + return &nativeProcessor{p}, nil +} + +func (p *nativeProcessor) run(event javascript.Event) error { + out, err := p.Processor.Run(event.Wrapped()) + if err != nil { + return err + } + if out == nil { + event.Cancel() + } + return nil +} + +// chain is a list of processors to run serially to process an event. +type chain struct { + procs []processor +} + +func (c *chain) run(event javascript.Event) error { + for _, p := range c.procs { + if event.IsCancelled() { + return nil + } + + if err := p.run(event); err != nil { + return err + } + } + + return nil +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 0c48764eec63..9acb38cfed5f 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -22,7 +22,6 @@ import ( "github.com/dop251/goja_nodejs/require" "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/actions" "github.com/elastic/beats/libbeat/processors/add_cloud_metadata" @@ -37,91 +36,103 @@ import ( "github.com/elastic/beats/libbeat/processors/script/javascript" ) -// newConstructor returns a JS constructor function. The constructor wraps a -// beat processor constructor. The javascript constructor must be passed a value -// that can be treated as the processor's config. -func newConstructor( - s *goja.Runtime, - constructor processors.Constructor, -) func(call goja.ConstructorCall) *goja.Object { - return func(call goja.ConstructorCall) *goja.Object { - a0 := call.Argument(0) - if a0 == nil { - panic(s.ToValue("constructor requires a configuration arg")) - } - - commonConfig, err := common.NewConfigFrom(a0.Export()) - if err != nil { - panic(s.NewGoError(err)) - } - - p, err := constructor(commonConfig) - if err != nil { - panic(s.NewGoError(err)) - } - - bp := &beatProcessor{s, p} - call.This.Set("Run", bp.Run) - return nil - } +// Create constructors for most of the Beat processors. +// Note that script is omitted to avoid nesting. +var constructors = map[string]processors.Constructor{ + "AddCloudMetadata": add_cloud_metadata.New, + "AddDockerMetadata": add_docker_metadata.New, + "AddFields": actions.CreateAddFields, + "AddHostMetadata": add_host_metadata.New, + "AddKubernetesMetadata": add_kubernetes_metadata.New, + "AddLocale": add_locale.New, + "AddProcessMetadata": add_process_metadata.New, + "CommunityID": communityid.New, + "CopyFields": actions.NewCopyFields, + "DecodeJSONFields": actions.NewDecodeJSONFields, + "Dissect": dissect.NewProcessor, + "DNS": dns.New, + "Rename": actions.NewRenameFields, + "TruncateFields": actions.NewTruncateFields, } -// beatProcessor wraps a beat processor for javascript. +// beatProcessor wraps a processor for javascript. type beatProcessor struct { - runtime *goja.Runtime - p processors.Processor + rt *goja.Runtime + p processor } func (bp *beatProcessor) Run(call goja.FunctionCall) goja.Value { if len(call.Arguments) != 1 { - panic(bp.runtime.NewGoError(errors.New("Run requires one argument"))) + panic(bp.rt.NewGoError(errors.New("Run requires one argument"))) } - e, ok := call.Argument(0).ToObject(bp.runtime).Get("_private").Export().(javascript.Event) + e, ok := call.Argument(0).ToObject(bp.rt).Get("_private").Export().(javascript.Event) if !ok { - panic(bp.runtime.NewGoError(errors.New("arg 0 must be an Event"))) + panic(bp.rt.NewGoError(errors.New("arg 0 must be an Event"))) } if e.IsCancelled() { return goja.Null() } - beatEvent, err := bp.p.Run(e.Wrapped()) + err := bp.p.run(e) if err != nil { - panic(bp.runtime.NewGoError(err)) + panic(bp.rt.NewGoError(err)) } - if beatEvent == nil { - e.Cancel() + if e.IsCancelled() { return goja.Null() } return e.JSObject() } +// newConstructor returns a Javascript constructor function that constructs a +// Beat processor. The constructor wraps a beat processor constructor. The +// javascript constructor must be passed a value that can be treated as the +// processor's config. +func newConstructor( + runtime *goja.Runtime, + constructor processors.Constructor, +) func(call goja.ConstructorCall) *goja.Object { + return func(call goja.ConstructorCall) *goja.Object { + p, err := newNativeProcessor(constructor, call) + if err != nil { + panic(runtime.NewGoError(err)) + } + + bp := &beatProcessor{runtime, p} + return runtime.ToValue(bp).ToObject(nil) + } +} + // Require registers the processor module that exposes constructors for beat // processors from javascript. // // // javascript // var processor = require('processor'); +// +// // Construct a single processor. // var chopLog = new processor.Dissect({tokenizer: "%{key}: %{value}"}); // +// // Construct/compose a processor chain. +// var mutateLog = new processor.Chain() +// .Add(chopLog) +// .AddProcessMetadata({match_pids: [process.pid]}) +// .Add(function(evt) { +// evt.Put("hello", "world"); +// }) +// .Build(); +// func Require(runtime *goja.Runtime, module *goja.Object) { o := module.Get("exports").(*goja.Object) - // Create constructors for most of the Beat processors. - // Note that script to avoid nesting. And some of the actions like rename - // and add_tags are omitted because those can be done natively in JS. - o.Set("AddCloudMetadata", newConstructor(runtime, add_cloud_metadata.New)) - o.Set("AddDockerMetadata", newConstructor(runtime, add_docker_metadata.New)) - o.Set("AddHostMetadata", newConstructor(runtime, add_host_metadata.New)) - o.Set("AddKubernetesMetadata", newConstructor(runtime, add_kubernetes_metadata.New)) - o.Set("AddLocale", newConstructor(runtime, add_locale.New)) - o.Set("AddProcessMetadata", newConstructor(runtime, add_process_metadata.New)) - o.Set("CommunityID", newConstructor(runtime, communityid.New)) - o.Set("DecodeJSONFields", newConstructor(runtime, actions.NewDecodeJSONFields)) - o.Set("Dissect", newConstructor(runtime, dissect.NewProcessor)) - o.Set("DNS", newConstructor(runtime, dns.New)) + for name, fn := range constructors { + o.Set(name, newConstructor(runtime, fn)) + } + + // Chain returns a builder for creating a chain of processors. + o.Set("Chain", newChainBuilder(runtime)) } // Enable adds path to the given runtime. diff --git a/libbeat/processors/script/javascript/module/processor/processor_test.go b/libbeat/processors/script/javascript/module/processor/processor_test.go index 679398300e56..8a6162bfeec4 100644 --- a/libbeat/processors/script/javascript/module/processor/processor_test.go +++ b/libbeat/processors/script/javascript/module/processor/processor_test.go @@ -158,6 +158,36 @@ function process(evt) { assert.Equal(t, "1:15+Ly6HsDg0sJdTmNktf6rko+os=", id) } +func TestNewCopyFields(t *testing.T) { + const script = ` +var processor = require('processor'); + +var copy = new processor.CopyFields({ + fields: [ + {from: "message", to: "log.original"}, + ], +}); + +function process(evt) { + copy.Run(evt); +} +` + + logp.TestingSetup() + p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) + if err != nil { + t.Fatal(err) + } + + evt, err := p.Run(testEvent()) + if err != nil { + t.Fatal(err) + } + + _, err = evt.GetValue("log.original") + assert.NoError(t, err) +} + func TestNewProcessorDecodeJSONFields(t *testing.T) { const script = ` var processor = require('processor'); @@ -239,7 +269,7 @@ var dns = new processor.DNS({ function process(evt) { dns.Run(evt); if (evt.Get().tags[0] !== "_dns_reverse_lookup_failed") { - throw "missing tag" + throw "missing tag"; } } ` @@ -255,3 +285,104 @@ function process(evt) { t.Fatal(err) } } + +func TestNewRename(t *testing.T) { + const script = ` +var processor = require('processor'); + +var rename = new processor.Rename({ + fields: [ + {from: "message", to: "log.original"}, + ], +}); + +function process(evt) { + rename.Run(evt); +} +` + + logp.TestingSetup() + p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) + if err != nil { + t.Fatal(err) + } + + evt, err := p.Run(testEvent()) + if err != nil { + t.Fatal(err) + } + + _, err = evt.GetValue("log.original") + assert.NoError(t, err) +} + +func TestNewTruncateFields(t *testing.T) { + const script = ` +var processor = require('processor'); + +var truncate = new processor.TruncateFields({ + fields: [ + "message", + ], + max_characters: 4, +}); + +function process(evt) { + truncate.Run(evt); +} +` + + logp.TestingSetup() + p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) + if err != nil { + t.Fatal(err) + } + + evt, err := p.Run(testEvent()) + if err != nil { + t.Fatal(err) + } + + msg, _ := evt.GetValue("message") + assert.Equal(t, "key=", msg) +} + +func TestNewProcessorChain(t *testing.T) { + const script = ` +var processor = require('processor'); + +var localeProcessor = new processor.AddLocale(); + +var chain = new processor.Chain() + .Add(localeProcessor) + .Rename({ + fields: [ + {from: "event.timezone", to: "timezone"}, + ], + }) + .Add(function(evt) { + evt.Put("hello", "world"); + }) + .Build(); + +function process(evt) { + chain.Run(evt); +} +` + + logp.TestingSetup() + p, err := javascript.NewFromConfig(javascript.Config{Source: script}, nil) + if err != nil { + t.Fatal(err) + } + + evt, err := p.Run(testEvent()) + if err != nil { + t.Fatal(err) + } + + _, err = evt.GetValue("timezone") + assert.NoError(t, err) + v, _ := evt.GetValue("hello") + assert.Equal(t, "world", v) +}