Skip to content

Commit

Permalink
Add experimental event log reader with increased performance
Browse files Browse the repository at this point in the history
This PR adds a new event log reader implementation that's behind a
feature flag for now. It achieves higher event throughput than the current
reader by not using XML and by heavily caching static metadata about events.
To enable it add `api` to each event log reader.

```
winlogbeat.event_logs:
- name: Security
  api: wineventlog-experimental
```

The existing reader requests each event as XML and then must unmarshal the XML
document. EvtFormatMessage is used to get the XML document from Windows. Then the
Go stdlib encoder/xml package is used to parse it. Both of these operations are
relatively slow (see golang/go#21823 about encoding/xml).

This new reader utilizes the publisher metadata APIs to fetch and cache metadata
about all event IDs associated with a provider. It does this the first time it
encounters a provider ID while reading events. __Risk: Caching this info could
lead to having stale information in memory if metadata changes via software
update (see Edge Cases).__ It caches the names of the event data parameters
and a templatized version of the message string.

To get the data for an event this reader receives EVT_VARIANT structs containing
the parameters rather than receiving and parsing XML. This is more efficient because
there are fewer and smaller memory allocations and no XML encoding or decoding.

To get the message for an event it utilizes the cached text/template it has
for the event ID and passes it the list of parameter values.

Edge Cases

There is no provider metadata installed on the host. Could happen for forwarded
events or reading from .evtx files.
- Mitigate by falling back to getting parameter names by the event XML and rendering
  the message with EvtFormatMessage for each event.

Software is updated and an event ID changes it's event data parameters. Saw this
between Sysmon versions 9 and 10 with event ID 5.
- Mitigate by fingerprinting the number of event data parameters and their types.
- If the fingerprint changes, fetch the XML for the event and store the parameter
  names.

Benchmark Comparison

Comparing batch_size 500, that's a 1396% increase in events/sec, a -81% reduction in bytes allocated per event, and -86% decrease in the number of allocations.

PS C:\Gopath\src\github.com\elastic\beats\winlogbeat\eventlog> go test -run TestBenchmarkRead -benchmem -benchtime 10s -benchtest -v .
--- PASS: TestBenchmarkRead (231.68s)
    --- PASS: TestBenchmarkRead/api=wineventlog (53.57s)
        --- PASS: TestBenchmarkRead/api=wineventlog/batch_size=10 (12.19s)
            bench_test.go:128: 2067.28 events/sec        18283 B/event   182836 B/batch  251 allocs/event        2516 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog/batch_size=100 (16.73s)
            bench_test.go:128: 2144.50 events/sec        17959 B/event   1795989 B/batch         250 allocs/event        25020 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog/batch_size=500 (13.48s)
            bench_test.go:128: 1888.40 events/sec        17648 B/event   8824455 B/batch         250 allocs/event        125018 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog/batch_size=1000 (11.18s)
            bench_test.go:128: 2064.14 events/sec        17650 B/event   17650459 B/batch        250 allocs/event        250012 allocs/batch
    --- PASS: TestBenchmarkRead/api=wineventlog-experimental (98.28s)
        --- PASS: TestBenchmarkRead/api=wineventlog-experimental/batch_size=10 (18.72s)
            bench_test.go:128: 16813.52 events/sec       3974 B/event    39744 B/batch   34 allocs/event         344 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog-experimental/batch_size=100 (25.39s)
            bench_test.go:128: 28300.30 events/sec       3634 B/event    363498 B/batch  33 allocs/event         3324 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog-experimental/batch_size=500 (26.40s)
            bench_test.go:128: 28266.73 events/sec       3332 B/event    1666041 B/batch         33 allocs/event         16597 allocs/batch
        --- PASS: TestBenchmarkRead/api=wineventlog-experimental/batch_size=1000 (27.77s)
            bench_test.go:128: 28387.74 events/sec       3330 B/event    3330690 B/batch         33 allocs/event         33127 allocs/batch
    --- PASS: TestBenchmarkRead/api=eventlogging (13.29s)
        bench_test.go:128: 56243.80 events/sec   8043 B/event    6513053 B/batch         31 allocs/event         25151 allocs/batch
PASS
ok      github.com/elastic/beats/v7/winlogbeat/eventlog 231.932s
  • Loading branch information
andrewkroh committed Mar 12, 2020
1 parent 1d36da7 commit 52092de
Show file tree
Hide file tree
Showing 33 changed files with 4,607 additions and 448 deletions.
5 changes: 5 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ Vagrant.configure(2) do |config|
config.vm.define "win2019", primary: true do |c|
c.vm.box = "StefanScherer/windows_2019"
c.vm.provision "shell", inline: $winPsProvision, privileged: false

c.vm.provider :virtualbox do |vbox|
vbox.memory = 4096
vbox.cpus = 4
end
end

# Solaris 11.2
Expand Down
135 changes: 76 additions & 59 deletions winlogbeat/eventlog/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,95 +22,112 @@ package eventlog
import (
"bytes"
"flag"
"fmt"
"math/rand"
"os/exec"
"strconv"
"strings"
"testing"
"time"

elog "github.com/andrewkroh/sys/windows/svc/eventlog"
"github.com/dustin/go-humanize"
"golang.org/x/sys/windows/svc/eventlog"

"github.com/elastic/beats/v7/libbeat/common"
)

// Benchmark tests with customized output. (`go test -v -benchtime 10s -benchtest .`)
const gigabyte = 1 << 30

var (
benchTest = flag.Bool("benchtest", false, "Run benchmarks for the eventlog package")
injectAmount = flag.Int("inject", 50000, "Number of events to inject before running benchmarks")
benchTest = flag.Bool("benchtest", false, "Run benchmarks for the eventlog package.")
injectAmount = flag.Int("inject", 1E6, "Number of events to inject before running benchmarks.")
)

// TestBenchmarkBatchReadSize tests the performance of different
// batch_read_size values.
func TestBenchmarkBatchReadSize(t *testing.T) {
// TestBenchmarkRead benchmarks each event log reader implementation with
// different batch sizes.
//
// Recommended usage:
// go test -run TestBenchmarkRead -benchmem -benchtime 10s -benchtest -v .
func TestBenchmarkRead(t *testing.T) {
if !*benchTest {
t.Skip("-benchtest not enabled")
}

log, err := initLog(providerName, sourceName, eventCreateMsgFile)
if err != nil {
t.Fatal(err)
}
defer func() {
err := uninstallLog(providerName, sourceName, log)
if err != nil {
t.Fatal(err)
}
}()
writer, teardown := createLog(t)
defer teardown()

// Increase the log size so that it can hold these large events.
output, err := exec.Command("wevtutil.exe", "sl", "/ms:1073741824", providerName).CombinedOutput()
if err != nil {
t.Fatal(err, string(output))
}
setLogSize(t, providerName, gigabyte)

// Publish test messages:
for i := 0; i < *injectAmount; i++ {
err = log.Report(elog.Info, uint32(rand.Int63()%1000), []string{strconv.Itoa(i) + " " + randomSentence(256)})
err := writer.Report(eventlog.Info, uint32(rand.Int63()%1000), []string{strconv.Itoa(i) + " " + randomSentence(256)})
if err != nil {
t.Fatal("ReportEvent error", err)
t.Fatal(err)
}
}

benchTest := func(batchSize int) {
var err error
result := testing.Benchmark(func(b *testing.B) {
eventlog, tearDown := setupWinEventLog(t, 0, map[string]interface{}{
"name": providerName,
"batch_read_size": batchSize,
})
defer tearDown()
b.ResetTimer()

// Each iteration reads one batch.
for i := 0; i < b.N; i++ {
_, err = eventlog.Read()
if err != nil {
return
}
for _, api := range []string{winEventLogAPIName, winEventLogExpAPIName} {
t.Run("api="+api, func(t *testing.T) {
for _, batchSize := range []int{10, 100, 500, 1000} {
t.Run(fmt.Sprintf("batch_size=%d", batchSize), func(t *testing.T) {
result := testing.Benchmark(benchmarkEventLog(api, batchSize))
outputBenchmarkResults(t, result)
})
}
})
}

if err != nil {
t.Fatal(err)
return
t.Run("api="+eventLoggingAPIName, func(t *testing.T) {
result := testing.Benchmark(benchmarkEventLog(eventLoggingAPIName, -1))
outputBenchmarkResults(t, result)
})
}

func benchmarkEventLog(api string, batchSize int) func(b *testing.B) {
return func(b *testing.B) {
conf := common.MapStr{
"name": providerName,
}
if strings.HasPrefix(api, "wineventlog") {
conf.Put("batch_read_size", batchSize)
conf.Put("no_more_events", "stop")
}

t.Logf("batch_size=%v, total_events=%v, batch_time=%v, events_per_sec=%v, bytes_alloced_per_event=%v, total_allocs=%v",
batchSize,
result.N*batchSize,
time.Duration(result.NsPerOp()),
float64(batchSize)/time.Duration(result.NsPerOp()).Seconds(),
humanize.Bytes(result.MemBytes/(uint64(result.N)*uint64(batchSize))),
result.MemAllocs)
}
log := openLog(b, api, nil, conf)
defer log.Close()

events := 0
b.ResetTimer()

// Each iteration reads one batch.
for i := 0; i < b.N; i++ {
records, err := log.Read()
if err != nil {
b.Fatal(err)
return
}
events += len(records)
}

b.StopTimer()

benchTest(10)
benchTest(100)
benchTest(500)
benchTest(1000)
b.ReportMetric(float64(events), "events")
b.ReportMetric(float64(batchSize), "batch_size")
}
}

// Utility Functions
func outputBenchmarkResults(t testing.TB, result testing.BenchmarkResult) {
totalBatches := result.N
totalEvents := int(result.Extra["events"])
totalBytes := result.MemBytes
totalAllocs := result.MemAllocs

eventsPerSec := float64(totalEvents) / result.T.Seconds()
bytesPerEvent := float64(totalBytes) / float64(totalEvents)
bytesPerBatch := float64(totalBytes) / float64(totalBatches)
allocsPerEvent := float64(totalAllocs) / float64(totalEvents)
allocsPerBatch := float64(totalAllocs) / float64(totalBatches)

t.Logf("%.2f events/sec\t %d B/event\t %d B/batch\t %d allocs/event\t %d allocs/batch",
eventsPerSec, int(bytesPerEvent), int(bytesPerBatch), int(allocsPerEvent), int(allocsPerBatch))
}

var randomWords = []string{
"recover",
Expand Down
3 changes: 3 additions & 0 deletions winlogbeat/eventlog/eventlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/joeshaw/multierror"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/sys"
Expand Down Expand Up @@ -277,6 +278,8 @@ func (l *eventLogging) ignoreOlder(r *Record) bool {
// newEventLogging creates and returns a new EventLog for reading event logs
// using the Event Logging API.
func newEventLogging(options *common.Config) (EventLog, error) {
cfgwarn.Deprecate("8.0", "The eventlogging API reader is deprecated.")

c := eventLoggingConfig{
ReadBufferSize: win.MaxEventBufferSize,
FormatBufferSize: win.MaxFormatMessageBufferSize,
Expand Down
Loading

0 comments on commit 52092de

Please sign in to comment.