Skip to content

Commit

Permalink
Add agent stream configuration (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtojek authored Mar 26, 2020
1 parent a92c6c5 commit 82df862
Show file tree
Hide file tree
Showing 346 changed files with 5,219 additions and 38 deletions.
49 changes: 49 additions & 0 deletions dev/import-beats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# import-beats

The script is responsible for importing existing beats modules and transforming
them into integration packages compatible with Elastic Package Registry (EPR).

The `import-beats` script depends on active Kibana instance, which is used to
migrate existing dashboards to a newer version.

## Usage

```bash
$ go run dev/import-beats/*.go -help
Usage of /var/folders/gz/dht4sjdx5w9f72knybys10zw0000gn/T/go-build249388773/b001/exe/agent:
-beatsDir string
Path to the beats repository (default "../beats")
-euiDir string
Path to the Elastic UI framework repository (default "../eui")
-kibanaDir string
Path to the kibana repository (default "../kibana")
-kibanaHostPort string
Kibana host and port (default "http://localhost:5601")
-outputDir string
Path to the output directory (default "dev/packages/beats")
-skipKibana
Skip storing Kibana objects
```

## Import all packages

1. Make sure that the following repositories have been fetched locally:
https://github.com/elastic/beats
https://github.com/elastic/eui
2. Start Kibana server (make sure the endpoint is accessible: http://localhost:5601/)
2. Run the importing procedure with the following command:

```bash
$ mage ImportBeats
```

## Troubleshooting

*Importing process takes too long.*

While developeing, you can try to perform the migration with skipping migration of all Kibana objects,
as this is the most time consuming part of whole process:

```bash
$ SKIP_KIBANA=true mage ImportBeats
```
212 changes: 212 additions & 0 deletions dev/import-beats/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/package-registry/util"
)

type agentContent struct {
streams []streamContent
}

type streamContent struct {
targetFileName string
body []byte
}

func createAgentContent(modulePath, moduleName, datasetName, beatType string, streams []util.Stream) (agentContent, error) {
switch beatType {
case "logs":
return createAgentContentForLogs(modulePath, datasetName)
case "metrics":
return createAgentContentForMetrics(modulePath, moduleName, datasetName, streams)
}
return agentContent{}, fmt.Errorf("invalid beat type: %s", beatType)
}

func createAgentContentForLogs(modulePath, datasetName string) (agentContent, error) {
configFilePaths, err := filepath.Glob(filepath.Join(modulePath, datasetName, "config", "*.yml"))
if err != nil {
return agentContent{}, errors.Wrapf(err, "location config files failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

if len(configFilePaths) == 0 {
return agentContent{}, fmt.Errorf("expected at least one config file (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

var buffer bytes.Buffer

for _, configFilePath := range configFilePaths {
configFile, err := transformAgentConfigFile(configFilePath)
if err != nil {
return agentContent{}, errors.Wrapf(err, "loading config file failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

inputConfigName := extractInputConfigName(configFilePath)
if len(configFilePaths) > 1 {
buffer.WriteString(fmt.Sprintf("{{#if input == %s}}\n", inputConfigName))
}
buffer.Write(configFile)
if len(configFilePaths) > 1 {
buffer.WriteString("{{/if}}\n")
}
}
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
body: buffer.Bytes(),
},
},
}, nil
}

func extractInputConfigName(configFilePath string) string {
i := strings.LastIndex(configFilePath, "/")
inputConfigName := configFilePath[i+1:]
j := strings.Index(inputConfigName, ".")
return inputConfigName[:j]
}

func transformAgentConfigFile(configFilePath string) ([]byte, error) {
var buffer bytes.Buffer

configFile, err := os.Open(configFilePath)
if err != nil {
return nil, errors.Wrapf(err, "opening agent config file failed (path: %s)", configFilePath)
}

scanner := bufio.NewScanner(configFile)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}

if strings.HasPrefix(line, "type: ") {
line = strings.ReplaceAll(line, "type: ", "input: ")
}

// simple cases: if, range, -}}
line = strings.ReplaceAll(line, "{{ ", "{{")
line = strings.ReplaceAll(line, " }}", "}}")
line = strings.ReplaceAll(line, "{{if .", "{{if this.")
line = strings.ReplaceAll(line, "{{if", "{{#if")
line = strings.ReplaceAll(line, "{{end}}", "{{/end}}")
line = strings.ReplaceAll(line, "{{.", "{{this.")
line = strings.ReplaceAll(line, "{{range .", "{{#each this.")
line = strings.ReplaceAll(line, ".}}", "}}")
line = strings.ReplaceAll(line, " -}}", "}}") // no support for cleaning trailing white characters?
line = strings.ReplaceAll(line, "{{- ", "{{") // no support for cleaning trailing white characters?

// if/else if eq
if strings.Contains(line, " eq ") {
line = strings.ReplaceAll(line, " eq .", " ")
line = strings.ReplaceAll(line, " eq ", " ")

skipSpaces := 1
if strings.HasPrefix(line, "{{else") {
skipSpaces = 2
}

splitCondition := strings.SplitN(line, " ", skipSpaces+2)
line = strings.Join(splitCondition[:len(splitCondition)-1], " ") + " == " +
splitCondition[len(splitCondition)-1]
}

if strings.Contains(line, "{{range ") || strings.Contains(line, " range ") {
loopedVar, err := extractRangeVar(line)
if err != nil {
return nil, errors.Wrapf(err, "extracting range var failed")
}

line = fmt.Sprintf("{{#each %s}}\n", loopedVar)
line += " - {{this}}\n"
line += "{{/each}}"

for scanner.Scan() { // skip all lines inside range
rangeLine := scanner.Text()
if strings.Contains(rangeLine, "{{ end }}") {
break
}
}
}

buffer.WriteString(line)
buffer.WriteString("\n")
}
return buffer.Bytes(), nil
}

func extractRangeVar(line string) (string, error) {
line = line[strings.Index(line, "range")+1:]
i := strings.Index(line, ":=")
var sliced string
if i >= 0 {
line = strings.TrimSpace(line[i+3:])
split := strings.Split(line, " ")
sliced = split[0]
} else {
split := strings.Split(line, " ")
sliced = split[1]
}

if strings.HasPrefix(sliced, ".") {
sliced = sliced[1:]
}
return sliced, nil
}

func createAgentContentForMetrics(modulePath, moduleName, datasetName string, streams []util.Stream) (agentContent, error) {
inputName := moduleName + "/metrics"
vars := extractVarsFromStream(streams, inputName)

var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("input: %s\n", inputName))
buffer.WriteString(fmt.Sprintf("metricsets: [\"%s\"]\n", datasetName))

for _, aVar := range vars {
variableName := aVar["name"].(string)

if !isAgentConfigOptionRequired(variableName) {
buffer.WriteString(fmt.Sprintf("{{#if %s}}\n", variableName))
}
buffer.WriteString(fmt.Sprintf("%s: {{%s}}\n", variableName, variableName))
if !isAgentConfigOptionRequired(variableName) {
buffer.WriteString(fmt.Sprintf("{{#if %s}}\n", variableName))
}
}
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
body: buffer.Bytes(),
},
},
}, nil
}

func extractVarsFromStream(streams []util.Stream, inputName string) []map[string]interface{} {
for _, stream := range streams {
if stream.Input == inputName {
return stream.Vars
}
}
return []map[string]interface{}{}
}

func isAgentConfigOptionRequired(optionName string) bool {
return optionName == "hosts" || optionName == "period"
}
23 changes: 17 additions & 6 deletions dev/import-beats/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type datasetContent struct {

manifest util.DataSet

agent agentContent
elasticsearch elasticsearchContent
fields fieldsContent
}
Expand Down Expand Up @@ -89,6 +90,13 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
modulePath, datasetName)
}

fields := fieldsContent{
files: map[string][]byte{
"package-fields.yml": moduleFieldsFiles,
"fields.yml": fieldsFiles,
},
}

// elasticsearch
elasticsearch, err := loadElasticsearchContent(datasetPath)
if err != nil {
Expand All @@ -101,6 +109,13 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
return nil, errors.Wrapf(err, "creating streams failed (datasetPath: %s)", datasetPath)
}

// agent
agent, err := createAgentContent(modulePath, moduleName, datasetName, beatType, streams)
if err != nil {
return nil, errors.Wrapf(err, "creating agent content failed (modulePath: %s, datasetName: %s)",
modulePath, datasetName)
}

// manifest
manifest := util.DataSet{
ID: datasetName,
Expand All @@ -114,13 +129,9 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
name: datasetName,
beatType: beatType,
manifest: manifest,
agent: agent,
elasticsearch: elasticsearch,
fields: fieldsContent{
files: map[string][]byte{
"package-fields.yml": moduleFieldsFiles,
"fields.yml": fieldsFiles,
},
},
fields: fields,
})
}
return contents, nil
Expand Down
17 changes: 17 additions & 0 deletions dev/import-beats/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (r *packageRepository) save(outputDir string) error {
}
}

// dataset/elasticsearch
if len(dataset.elasticsearch.ingestPipelines) > 0 {
ingestPipelinePath := filepath.Join(datasetPath, "elasticsearch", "ingest-pipeline")
err := os.MkdirAll(ingestPipelinePath, 0755)
Expand All @@ -274,6 +275,22 @@ func (r *packageRepository) save(outputDir string) error {
}
}
}

// dataset/agent/stream
if len(dataset.agent.streams) > 0 {
agentStreamPath := filepath.Join(datasetPath, "agent", "stream")
err := os.MkdirAll(agentStreamPath, 0755)
if err != nil {
return errors.Wrapf(err, "cannot make directory for dataset agent stream: '%s'", agentStreamPath)
}

for _, agentStream := range dataset.agent.streams {
err := ioutil.WriteFile(path.Join(agentStreamPath, agentStream.targetFileName), agentStream.body, 0644)
if err != nil {
return errors.Wrapf(err, "writing agent stream file failed")
}
}
}
}

// img
Expand Down
2 changes: 1 addition & 1 deletion dev/import-beats/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func createMetricStreams(modulePath, moduleName, datasetName string) ([]util.Str
func mergeMetaConfigFiles(modulePath string) ([]byte, error) {
configFilePaths, err := filepath.Glob(filepath.Join(modulePath, "_meta", "config*.yml"))
if err != nil {
return nil, errors.Wrapf(err, "location config files failed (modulePath: %s)", modulePath)
return nil, errors.Wrapf(err, "locating config files failed (modulePath: %s)", modulePath)
}

var mergedConfig bytes.Buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
input: log
paths:
{{#each paths}}}}
- {{this}}
{{/each}}
exclude_files: [".gz$"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
input: activemq/metrics
metricsets: ["broker"]
hosts: {{hosts}}
{{#if password}}
password: {{password}}
{{#if password}}
{{#if path}}
path: {{path}}
{{#if path}}
period: {{period}}
{{#if username}}
username: {{username}}
{{#if username}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
input: log
paths:
{{#each paths}}}}
- {{this}}
{{/each}}
exclude_files: [".gz$"]
multiline:
pattern: '^\d{4}-\d{2}-\d{2} '
negate: true
match: after
processors:
- add_locale: ~
Loading

0 comments on commit 82df862

Please sign in to comment.