Skip to content

Commit

Permalink
Fixing logic to keep list of unique cluster UUIDs (#22808) (#22816)
Browse files Browse the repository at this point in the history
* Fixing logic to keep list of unique cluster UUIDs

* Adding CHANGELOG entry

* Use common.StringSet

* Adding more test cases

* Adding back logic to broadly override cluster UUID for all pipelines, if set

* Removing ToSlice()

* Fixing loop
  • Loading branch information
ycombinator authored Dec 1, 2020
1 parent 711762b commit 552c68f
Show file tree
Hide file tree
Showing 6 changed files with 620 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix failiures caused by custom beat names with more than 15 characters {pull}22550[22550]
- Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634]
- Update NATS dashboards to leverage connection and route metricsets {pull}22646[22646]
- Fix `logstash` module when `xpack.enabled: true` is set from emitting redundant events. {pull}22808[22808]

*Packetbeat*

Expand Down
10 changes: 5 additions & 5 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ type MetricSet struct {
XPack bool
}

type graph struct {
type Graph struct {
Vertices []map[string]interface{} `json:"vertices"`
Edges []map[string]interface{} `json:"edges"`
}

type graphContainer struct {
Graph *graph `json:"graph,omitempty"`
type GraphContainer struct {
Graph *Graph `json:"graph,omitempty"`
Type string `json:"type"`
Version string `json:"version"`
Hash string `json:"hash"`
Expand All @@ -74,8 +74,8 @@ type PipelineState struct {
ID string `json:"id"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
Graph *graphContainer `json:"graph,omitempty"`
Representation *graphContainer `json:"representation"`
Graph *GraphContainer `json:"graph,omitempty"`
Representation *GraphContainer `json:"representation"`
BatchSize int `json:"batch_size"`
Workers int `json:"workers"`
}
Expand Down
13 changes: 9 additions & 4 deletions metricbeat/module/logstash/node/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,26 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust
var clusterToPipelinesMap map[string][]logstash.PipelineState
clusterToPipelinesMap = make(map[string][]logstash.PipelineState)

if overrideClusterUUID != "" {
clusterToPipelinesMap[overrideClusterUUID] = pipelines
return clusterToPipelinesMap
}

for _, pipeline := range pipelines {
var clusterUUIDs []string
clusterUUIDs := common.StringSet{}
for _, vertex := range pipeline.Graph.Graph.Vertices {
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
clusterUUIDs.Add(clusterUUID)
}
}

// If no cluster UUID was found in this pipeline, assign it a blank one
if len(clusterUUIDs) == 0 {
clusterUUIDs = []string{""}
clusterUUIDs.Add("")
}

for _, clusterUUID := range clusterUUIDs {
for clusterUUID := range clusterUUIDs {
clusterPipelines := clusterToPipelinesMap[clusterUUID]
if clusterPipelines == nil {
clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{}
Expand Down
328 changes: 328 additions & 0 deletions metricbeat/module/logstash/node/data_xpack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package node

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/metricbeat/module/logstash"
)

func TestMakeClusterToPipelinesMap(t *testing.T) {
tests := map[string]struct {
pipelines []logstash.PipelineState
overrideClusterUUID string
expectedMap map[string][]logstash.PipelineState
}{
"no_vertex_cluster_id": {
pipelines: []logstash.PipelineState{
{
ID: "test_pipeline",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
},
},
},
overrideClusterUUID: "prod_cluster_id",
expectedMap: map[string][]logstash.PipelineState{
"prod_cluster_id": {
{
ID: "test_pipeline",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
},
},
},
},
},
"one_vertex_cluster_id": {
pipelines: []logstash.PipelineState{
{
ID: "test_pipeline",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
},
},
},
overrideClusterUUID: "prod_cluster_id",
expectedMap: map[string][]logstash.PipelineState{
"prod_cluster_id": {
{
ID: "test_pipeline",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
},
},
},
},
},
"two_pipelines": {
pipelines: []logstash.PipelineState{
{
ID: "test_pipeline_1",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_1_2",
},
{
"id": "vertex_1_3",
},
},
},
},
},
{
ID: "test_pipeline_2",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_2_1",
},
{
"id": "vertex_2_2",
},
{
"id": "vertex_2_3",
},
},
},
},
},
},
overrideClusterUUID: "prod_cluster_id",
expectedMap: map[string][]logstash.PipelineState{
"prod_cluster_id": {
{
ID: "test_pipeline_1",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_1_2",
},
{
"id": "vertex_1_3",
},
},
},
},
},
{
ID: "test_pipeline_2",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_2_1",
},
{
"id": "vertex_2_2",
},
{
"id": "vertex_2_3",
},
},
},
},
},
},
},
},
"no_override_cluster_id": {
pipelines: []logstash.PipelineState{
{
ID: "test_pipeline_1",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_1_2",
"cluster_uuid": "es_2",
},
{
"id": "vertex_1_3",
},
},
},
},
},
{
ID: "test_pipeline_2",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_2_1",
},
{
"id": "vertex_2_2",
},
{
"id": "vertex_2_3",
},
},
},
},
},
},
overrideClusterUUID: "",
expectedMap: map[string][]logstash.PipelineState{
"es_1": {
{
ID: "test_pipeline_1",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_1_2",
"cluster_uuid": "es_2",
},
{
"id": "vertex_1_3",
},
},
},
},
},
},
"es_2": {
{
ID: "test_pipeline_1",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1_1",
"cluster_uuid": "es_1",
},
{
"id": "vertex_1_2",
"cluster_uuid": "es_2",
},
{
"id": "vertex_1_3",
},
},
},
},
},
},
"": {
{
ID: "test_pipeline_2",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_2_1",
},
{
"id": "vertex_2_2",
},
{
"id": "vertex_2_3",
},
},
},
},
},
},
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
actualMap := makeClusterToPipelinesMap(test.pipelines, test.overrideClusterUUID)
require.Equal(t, test.expectedMap, actualMap)
})
}
}
Loading

0 comments on commit 552c68f

Please sign in to comment.