Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and reintroduce "Allow multiple ES outputs as long as they are the same ES" #1879

Merged
merged 6 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
FieldPolicyRevisionIdx = "policy_revision_idx"
FieldRevisionIdx = "revision_idx"
FieldUnenrolledReason = "unenrolled_reason"
FiledType = "type"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
FiledType = "type"
FieldType = "type"


FieldActive = "active"
FieldUpdatedAt = "updated_at"
Expand Down
22 changes: 15 additions & 7 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (p *Output) prepareElasticsearch(
return ErrNoOutputPerms
}

output, ok := agent.Outputs[p.Name]
if !ok {
output, foundOutput := agent.Outputs[p.Name]
if !foundOutput {
if agent.Outputs == nil {
agent.Outputs = map[string]*model.PolicyOutput{}
}
Expand Down Expand Up @@ -120,11 +120,6 @@ func (p *Output) prepareElasticsearch(
return fmt.Errorf("failed generate output API key: %w", err)
}

output.Type = OutputTypeElasticsearch
output.APIKey = outputAPIKey.Agent()
output.APIKeyID = outputAPIKey.ID
output.PermissionsHash = p.Role.Sha2 // for the sake of consistency

// When a new keys is generated we need to update the Agent record,
// this will need to be updated when multiples remote Elasticsearch output
// are supported.
Expand All @@ -138,6 +133,10 @@ func (p *Output) prepareElasticsearch(
dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID,
dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2,
}

if !foundOutput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the fix for the bug? I see this if statement added in the commit that contains a fix, along with the block below being moved later, but it isn't actually clear to me why this fixes the problem.

		// Now that all is done, we can update the output on the agent variable
		// Right not it's more for consistency and to ensure the in-memory agent
		// data is correct and in sync with ES, so it can be safely used after
		// this method returns.
		output.Type = OutputTypeElasticsearch
		output.APIKey = outputAPIKey.Agent()
		output.APIKeyID = outputAPIKey.ID
		output.PermissionsHash = p.Role.Sha2 // for the sake of consistency

fields[dl.FiledType] = OutputTypeElasticsearch
}
if output.APIKeyID != "" {
fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{
ID: output.APIKeyID,
Expand All @@ -155,6 +154,15 @@ func (p *Output) prepareElasticsearch(
zlog.Error().Err(err).Msg("fail update agent record")
return fmt.Errorf("fail update agent record: %w", err)
}

// Now that all is done, we can update the output on the agent variable
// Right not it's more for consistency and to ensure the in-memory agent
// data is correct and in sync with ES, so it can be safely used after
// this method returns.
output.Type = OutputTypeElasticsearch
output.APIKey = outputAPIKey.Agent()
output.APIKeyID = outputAPIKey.ID
output.PermissionsHash = p.Role.Sha2 // for the sake of consistency
}

// Always insert the `api_key` as part of the output block, this is required
Expand Down
73 changes: 73 additions & 0 deletions internal/pkg/policy/policy_output_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"time"

"github.com/gofrs/uuid"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
)

Expand Down Expand Up @@ -125,3 +127,74 @@ func TestRenderUpdatePainlessScript(t *testing.T) {
})
}
}

func TestPolicyOutputESPrepareRealES(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this new test reproduce the API key invalidation problem?

index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents)

agentID := createAgent(t, index, bulker)
agent, err := dl.FindAgent(
context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index))
if err != nil {
require.NoError(t, err, "failed to find agent ID %q", agentID)
}

output := Output{
Type: OutputTypeElasticsearch,
Name: "test output",
Role: &RoleT{
Sha2: "new-hash",
Raw: TestPayload,
},
}
policyMap := smap.Map{
"test output": map[string]interface{}{},
}

err = output.prepareElasticsearch(
context.Background(), zerolog.Nop(), bulker, &agent, policyMap)
require.NoError(t, err)

// need to wait a bit before querying the agent again
// TODO: find a better way to query the updated agent
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use require.Eventually to poll for the state you need. There will still technically be a timeout, but it will allow the test to complete faster than the timeout by polling with shorter intervals.


got, err := dl.FindAgent(
context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index))
if err != nil {
require.NoError(t, err, "failed to find agent ID %q", agentID)
}

gotOutput, ok := got.Outputs[output.Name]
require.True(t, ok, "no '%s' output fouled on agent document", output.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.True(t, ok, "no '%s' output fouled on agent document", output.Name)
require.True(t, ok, "no '%s' output failed on agent document", output.Name)


assert.Empty(t, gotOutput.ToRetireAPIKeyIds)
assert.Equal(t, gotOutput.Type, OutputTypeElasticsearch)
assert.Equal(t, gotOutput.PermissionsHash, output.Role.Sha2)
assert.NotEmpty(t, gotOutput.APIKey)
assert.NotEmpty(t, gotOutput.APIKeyID)
}

func createAgent(t *testing.T, index string, bulker bulk.Bulk) string {
const nowStr = "2022-08-12T16:50:05Z"

agentID := uuid.Must(uuid.NewV4()).String()
policyID := uuid.Must(uuid.NewV4()).String()

agentModel := model.Agent{
PolicyID: policyID,
Active: true,
LastCheckin: nowStr,
LastCheckinStatus: "",
UpdatedAt: nowStr,
EnrolledAt: nowStr,
}

body, err := json.Marshal(agentModel)
require.NoError(t, err)

_, err = bulker.Create(
context.Background(), index, agentID, body, bulk.WithRefresh())
require.NoError(t, err)

return agentID
}