Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agent stream configuration #302

Merged
merged 6 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
49 changes: 49 additions & 0 deletions dev/import-beats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# import-beats

The script is responsible for importing existing beats modules and transforming
them into integration packages compatible with Elastic Package Registry (EPR).

The `import-beats` script depends on active Kibana instance, which is used to
migrate existing dashboards to a newer version.

## Usage

```bash
$ go run dev/import-beats/*.go -help
Usage of /var/folders/gz/dht4sjdx5w9f72knybys10zw0000gn/T/go-build249388773/b001/exe/agent:
-beatsDir string
Path to the beats repository (default "../beats")
-euiDir string
Path to the Elastic UI framework repository (default "../eui")
-kibanaDir string
Path to the kibana repository (default "../kibana")
-kibanaHostPort string
Kibana host and port (default "http://localhost:5601")
-outputDir string
Path to the output directory (default "dev/packages/beats")
-skipKibana
Skip storing Kibana objects
```

## Import all packages

1. Make sure that the following repositories have been fetched locally:
https://github.com/elastic/beats
https://github.com/elastic/eui
2. Start Kibana server (make sure the endpoint is accessible: http://localhost:5601/)
2. Run the importing procedure with the following command:

```bash
$ mage ImportBeats
```

## Troubleshooting

*Importing process takes too long.*

While developeing, you can try to perform the migration with skipping migration of all Kibana objects,
as this is the most time consuming part of whole process:

```bash
$ SKIP_KIBANA=true mage ImportBeats
```
212 changes: 212 additions & 0 deletions dev/import-beats/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/package-registry/util"
)

type agentContent struct {
streams []streamContent
}

type streamContent struct {
targetFileName string
body []byte
}

func createAgentContent(modulePath, moduleName, datasetName, beatType string, streams []util.Stream) (agentContent, error) {
switch beatType {
case "logs":
return createAgentContentForLogs(modulePath, datasetName)
case "metrics":
return createAgentContentForMetrics(modulePath, moduleName, datasetName, streams)
}
return agentContent{}, fmt.Errorf("invalid beat type: %s", beatType)
}

func createAgentContentForLogs(modulePath, datasetName string) (agentContent, error) {
configFilePaths, err := filepath.Glob(filepath.Join(modulePath, datasetName, "config", "*.yml"))
if err != nil {
return agentContent{}, errors.Wrapf(err, "location config files failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

if len(configFilePaths) == 0 {
return agentContent{}, fmt.Errorf("expected at least one config file (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

var buffer bytes.Buffer

for _, configFilePath := range configFilePaths {
configFile, err := transformAgentConfigFile(configFilePath)
if err != nil {
return agentContent{}, errors.Wrapf(err, "loading config file failed (modulePath: %s, datasetName: %s)", modulePath, datasetName)
}

inputConfigName := extractInputConfigName(configFilePath)
if len(configFilePaths) > 1 {
buffer.WriteString(fmt.Sprintf("{{#if input == %s}}\n", inputConfigName))
}
buffer.Write(configFile)
if len(configFilePaths) > 1 {
buffer.WriteString("{{/if}}\n")
}
}
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
body: buffer.Bytes(),
},
},
}, nil
}

func extractInputConfigName(configFilePath string) string {
i := strings.LastIndex(configFilePath, "/")
inputConfigName := configFilePath[i+1:]
j := strings.Index(inputConfigName, ".")
return inputConfigName[:j]
}

func transformAgentConfigFile(configFilePath string) ([]byte, error) {
var buffer bytes.Buffer

configFile, err := os.Open(configFilePath)
if err != nil {
return nil, errors.Wrapf(err, "opening agent config file failed (path: %s)", configFilePath)
}

scanner := bufio.NewScanner(configFile)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}

if strings.HasPrefix(line, "type: ") {
line = strings.ReplaceAll(line, "type: ", "input: ")
}

// simple cases: if, range, -}}
line = strings.ReplaceAll(line, "{{ ", "{{")
line = strings.ReplaceAll(line, " }}", "}}")
line = strings.ReplaceAll(line, "{{if .", "{{if this.")
line = strings.ReplaceAll(line, "{{if", "{{#if")
line = strings.ReplaceAll(line, "{{end}}", "{{/end}}")
line = strings.ReplaceAll(line, "{{.", "{{this.")
line = strings.ReplaceAll(line, "{{range .", "{{#each this.")
line = strings.ReplaceAll(line, ".}}", "}}")
line = strings.ReplaceAll(line, " -}}", "}}") // no support for cleaning trailing white characters?
line = strings.ReplaceAll(line, "{{- ", "{{") // no support for cleaning trailing white characters?

// if/else if eq
if strings.Contains(line, " eq ") {
line = strings.ReplaceAll(line, " eq .", " ")
line = strings.ReplaceAll(line, " eq ", " ")

skipSpaces := 1
if strings.HasPrefix(line, "{{else") {
skipSpaces = 2
}

splitCondition := strings.SplitN(line, " ", skipSpaces+2)
line = strings.Join(splitCondition[:len(splitCondition)-1], " ") + " == " +
splitCondition[len(splitCondition)-1]
}

if strings.Contains(line, "{{range ") || strings.Contains(line, " range ") {
loopedVar, err := extractRangeVar(line)
if err != nil {
return nil, errors.Wrapf(err, "extracting range var failed")
}

line = fmt.Sprintf("{{#each %s}}\n", loopedVar)
line += " - {{this}}\n"
line += "{{/each}}"

for scanner.Scan() { // skip all lines inside range
rangeLine := scanner.Text()
if strings.Contains(rangeLine, "{{ end }}") {
break
}
}
}

buffer.WriteString(line)
buffer.WriteString("\n")
}
return buffer.Bytes(), nil
}

func extractRangeVar(line string) (string, error) {
line = line[strings.Index(line, "range")+1:]
i := strings.Index(line, ":=")
var sliced string
if i >= 0 {
line = strings.TrimSpace(line[i+3:])
split := strings.Split(line, " ")
sliced = split[0]
} else {
split := strings.Split(line, " ")
sliced = split[1]
}

if strings.HasPrefix(sliced, ".") {
sliced = sliced[1:]
}
return sliced, nil
}

func createAgentContentForMetrics(modulePath, moduleName, datasetName string, streams []util.Stream) (agentContent, error) {
inputName := moduleName + "/metrics"
vars := extractVarsFromStream(streams, inputName)

var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("input: %s\n", inputName))
buffer.WriteString(fmt.Sprintf("metricsets: [\"%s\"]\n", datasetName))

for _, aVar := range vars {
variableName := aVar["name"].(string)

if !isAgentConfigOptionRequired(variableName) {
buffer.WriteString(fmt.Sprintf("{{#if %s}}\n", variableName))
}
buffer.WriteString(fmt.Sprintf("%s: {{%s}}\n", variableName, variableName))
if !isAgentConfigOptionRequired(variableName) {
buffer.WriteString(fmt.Sprintf("{{#if %s}}\n", variableName))
}
}
return agentContent{
streams: []streamContent{
{
targetFileName: "stream.yml",
body: buffer.Bytes(),
},
},
}, nil
}

func extractVarsFromStream(streams []util.Stream, inputName string) []map[string]interface{} {
for _, stream := range streams {
if stream.Input == inputName {
return stream.Vars
}
}
return []map[string]interface{}{}
}

func isAgentConfigOptionRequired(optionName string) bool {
return optionName == "hosts" || optionName == "period"
}
23 changes: 17 additions & 6 deletions dev/import-beats/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type datasetContent struct {

manifest util.DataSet

agent agentContent
elasticsearch elasticsearchContent
fields fieldsContent
}
Expand Down Expand Up @@ -89,6 +90,13 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
modulePath, datasetName)
}

fields := fieldsContent{
files: map[string][]byte{
"package-fields.yml": moduleFieldsFiles,
"fields.yml": fieldsFiles,
},
}

// elasticsearch
elasticsearch, err := loadElasticsearchContent(datasetPath)
if err != nil {
Expand All @@ -101,6 +109,13 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
return nil, errors.Wrapf(err, "creating streams failed (datasetPath: %s)", datasetPath)
}

// agent
agent, err := createAgentContent(modulePath, moduleName, datasetName, beatType, streams)
if err != nil {
return nil, errors.Wrapf(err, "creating agent content failed (modulePath: %s, datasetName: %s)",
modulePath, datasetName)
}

// manifest
manifest := util.DataSet{
ID: datasetName,
Expand All @@ -114,13 +129,9 @@ func createDatasets(modulePath, moduleName, moduleRelease, beatType string) (dat
name: datasetName,
beatType: beatType,
manifest: manifest,
agent: agent,
elasticsearch: elasticsearch,
fields: fieldsContent{
files: map[string][]byte{
"package-fields.yml": moduleFieldsFiles,
"fields.yml": fieldsFiles,
},
},
fields: fields,
})
}
return contents, nil
Expand Down
17 changes: 17 additions & 0 deletions dev/import-beats/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (r *packageRepository) save(outputDir string) error {
}
}

// dataset/elasticsearch
if len(dataset.elasticsearch.ingestPipelines) > 0 {
ingestPipelinePath := filepath.Join(datasetPath, "elasticsearch", "ingest-pipeline")
err := os.MkdirAll(ingestPipelinePath, 0755)
Expand All @@ -274,6 +275,22 @@ func (r *packageRepository) save(outputDir string) error {
}
}
}

// dataset/agent/stream
if len(dataset.agent.streams) > 0 {
agentStreamPath := filepath.Join(datasetPath, "agent", "stream")
err := os.MkdirAll(agentStreamPath, 0755)
if err != nil {
return errors.Wrapf(err, "cannot make directory for dataset agent stream: '%s'", agentStreamPath)
}

for _, agentStream := range dataset.agent.streams {
err := ioutil.WriteFile(path.Join(agentStreamPath, agentStream.targetFileName), agentStream.body, 0644)
if err != nil {
return errors.Wrapf(err, "writing agent stream file failed")
}
}
}
}

// img
Expand Down
2 changes: 1 addition & 1 deletion dev/import-beats/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func createMetricStreams(modulePath, moduleName, datasetName string) ([]util.Str
func mergeMetaConfigFiles(modulePath string) ([]byte, error) {
configFilePaths, err := filepath.Glob(filepath.Join(modulePath, "_meta", "config*.yml"))
if err != nil {
return nil, errors.Wrapf(err, "location config files failed (modulePath: %s)", modulePath)
return nil, errors.Wrapf(err, "locating config files failed (modulePath: %s)", modulePath)
}

var mergedConfig bytes.Buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
input: log
paths:
{{#each paths}}}}
- {{this}}
{{/each}}
exclude_files: [".gz$"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
input: activemq/metrics
metricsets: ["broker"]
hosts: {{hosts}}
{{#if password}}
password: {{password}}
{{#if password}}
{{#if path}}
path: {{path}}
{{#if path}}
period: {{period}}
{{#if username}}
username: {{username}}
{{#if username}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
input: log
paths:
{{#each paths}}}}
- {{this}}
{{/each}}
exclude_files: [".gz$"]
multiline:
pattern: '^\d{4}-\d{2}-\d{2} '
negate: true
match: after
processors:
- add_locale: ~
Loading