Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing context to api ingest pipelines #2137

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/benchrunner/runners/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *runner) SetUp(ctx context.Context) error {
return errors.New("data stream root not found")
}

r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.options.API, dataStreamPath)
r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.options.API, dataStreamPath)
if err != nil {
return fmt.Errorf("installing ingest pipelines failed: %w", err)
}
Expand Down
30 changes: 17 additions & 13 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ingest

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -45,7 +46,7 @@ type RerouteProcessor struct {
Namespace []string `yaml:"namespace"`
}

func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) {
func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) {
dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile))
if err != nil {
return "", nil, fmt.Errorf("reading data stream manifest failed: %w", err)
Expand All @@ -59,7 +60,7 @@ func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (
return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err)
}

err = installPipelinesInElasticsearch(api, pipelines)
err = installPipelinesInElasticsearch(ctx, api, pipelines)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -232,9 +233,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
}
}

func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []Pipeline) error {
func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
for _, p := range pipelines {
if err := installPipeline(api, p); err != nil {
if err := installPipeline(ctx, api, p); err != nil {
return err
}
}
Expand All @@ -251,20 +252,22 @@ func pipelineError(err error, pipeline Pipeline, format string, args ...interfac
return fmt.Errorf("%s: %w", errorStr, err)
}

func installPipeline(api *elasticsearch.API, pipeline Pipeline) error {
if err := putIngestPipeline(api, pipeline); err != nil {
func installPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
if err := putIngestPipeline(ctx, api, pipeline); err != nil {
return err
}
// Just to be sure the pipeline has been uploaded.
return getIngestPipeline(api, pipeline)
return getIngestPipeline(ctx, api, pipeline)
}

func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
func putIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
source, err := pipeline.MarshalJSON()
if err != nil {
return err
}
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source))
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source),
api.Ingest.PutPipeline.WithContext(ctx),
)
if err != nil {
return pipelineError(err, pipeline, "PutPipeline API call failed")
}
Expand All @@ -283,10 +286,11 @@ func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
return nil
}

func getIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
r, err := api.Ingest.GetPipeline(func(request *elasticsearch.IngestGetPipelineRequest) {
request.PipelineID = pipeline.Name
})
Comment on lines -287 to -289
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be replaced by

// WithPipelineID - comma separated list of pipeline ids. wildcards supported.
func (f IngestGetPipeline) WithPipelineID(v string) func(*IngestGetPipelineRequest) {
    return func(r *IngestGetPipelineRequest) {
        r.PipelineID = v
    }
}

func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
r, err := api.Ingest.GetPipeline(
api.Ingest.GetPipeline.WithContext(ctx),
api.Ingest.GetPipeline.WithPipelineID(pipeline.Name),
)
if err != nil {
return pipelineError(err, pipeline, "GetPipeline API call failed")
}
Expand Down
4 changes: 1 addition & 3 deletions internal/testrunner/runners/pipeline/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *tester) run(ctx context.Context) ([]testrunner.TestResult, error) {

startTesting := time.Now()
var entryPipeline string
entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.esAPI, dataStreamPath)
entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.esAPI, dataStreamPath)
if err != nil {
return nil, fmt.Errorf("installing ingest pipelines failed: %w", err)
}
Expand Down Expand Up @@ -278,7 +278,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T

return nil
})

if err != nil {
return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err)
}
Expand All @@ -297,7 +296,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T
}

return []testrunner.TestResult{tr}, nil

}

func (r *tester) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) ([]testrunner.TestResult, error) {
Expand Down