Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] moved mergeBatchFunc and mergeBatchSplitFunc to request #11459

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/merge-function-as-requet-method.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter
sfc-gh-sili marked this conversation as resolved.
Show resolved Hide resolved

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Made mergeFunc and mergeSplitFunc required method of exporter.Request

# One or more tracking issues or pull requests related to the change
issues: [10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
mergeFunc and mergeSplitFunc used to be part of the configuration pass to the exporter. Now it is changed
| to be a method function of request.
sfc-gh-sili marked this conversation as resolved.
Show resolved Hide resolved

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 0 additions & 24 deletions exporter/exporterbatcher/batch_func.go

This file was deleted.

8 changes: 0 additions & 8 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
func WithBatcher(cfg exporterbatcher.Config) Option {
return internal.WithBatcher(cfg)
}

// WithBatchFuncs enables setting custom batch merge functions.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return internal.WithBatchFuncs(mf, msf)
}
2 changes: 1 addition & 1 deletion exporter/exporterhelper/exporterhelperprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
go.opentelemetry.io/collector/exporter v0.111.0
go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0
go.opentelemetry.io/collector/exporter/exportertest v0.111.0
go.opentelemetry.io/collector/pdata v1.17.0
go.opentelemetry.io/collector/pdata/pprofile v0.111.0
go.opentelemetry.io/collector/pdata/testdata v0.111.0
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-20241021162523-3193106bf4b1
Expand All @@ -38,6 +37,7 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect
go.opentelemetry.io/collector/extension v0.111.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect
go.opentelemetry.io/collector/pdata v1.17.0 // indirect
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect
go.opentelemetry.io/collector/receiver v0.111.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewProfilesExporter(
}
profilesOpts := []exporterhelper.Option{
internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)),
internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles),
}
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...)
}
Expand Down
21 changes: 10 additions & 11 deletions exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,28 @@ import (
"go.opentelemetry.io/collector/pdata/pprofile"
)

// mergeProfiles merges two profiles requests into one.
func mergeProfiles(_ context.Context, r1 exporterhelper.Request, r2 exporterhelper.Request) (exporterhelper.Request, error) {
tr1, ok1 := r1.(*profilesRequest)
// Merge merges two profiles requests into one.
func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) {
tr2, ok2 := r2.(*profilesRequest)
if !ok1 || !ok2 {
if !ok2 {
return nil, errors.New("invalid input type")
}
tr2.pd.ResourceProfiles().MoveAndAppendTo(tr1.pd.ResourceProfiles())
return tr1, nil
tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles())
return req, nil
}

// mergeSplitProfiles splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func mergeSplitProfiles(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 exporterhelper.Request, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
var (
res []exporterhelper.Request
destReq *profilesRequest
capacityLeft = cfg.MaxSizeItems
)
for _, req := range []exporterhelper.Request{r1, r2} {
if req == nil {
for _, r := range []exporterhelper.Request{req, r2} {
if r == nil {
continue
}
srcReq, ok := req.(*profilesRequest)
srcReq, ok := r.(*profilesRequest)
if !ok {
return nil, errors.New("invalid input type")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,25 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

func TestMergeProfiles(t *testing.T) {
pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
res, err := mergeProfiles(context.Background(), pr1, pr2)
res, err := pr1.Merge(context.Background(), pr2)
require.NoError(t, err)
fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd)
assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount())
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := mergeProfiles(context.Background(), pr1, pr2)
_, err := pr2.Merge(context.Background(), pr1)
assert.Error(t, err)
}

Expand All @@ -51,13 +49,6 @@ func TestMergeSplitProfiles(t *testing.T) {
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
},
{
name: "both_requests_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: nil,
expected: []*profilesRequest{},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
Expand All @@ -66,17 +57,10 @@ func TestMergeSplitProfiles(t *testing.T) {
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
},
{
name: "first_requests_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)},
expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}},
},
{
name: "first_nil_second_empty",
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10},
pr1: nil,
pr2: &profilesRequest{pd: pprofile.NewProfiles()},
pr1: &profilesRequest{pd: pprofile.NewProfiles()},
pr2: nil,
expected: []*profilesRequest{{pd: pprofile.NewProfiles()}},
},
{
Expand All @@ -93,8 +77,8 @@ func TestMergeSplitProfiles(t *testing.T) {
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4},
pr1: nil,
pr2: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)},
pr2: nil,
expected: []*profilesRequest{
{pd: testdata.GenerateProfiles(4)},
{pd: testdata.GenerateProfiles(4)},
Expand Down Expand Up @@ -133,7 +117,7 @@ func TestMergeSplitProfiles(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := mergeSplitProfiles(context.Background(), tt.cfg, tt.pr1, tt.pr2)
res, err := tt.pr1.MergeSplit(context.Background(), tt.cfg, tt.pr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
Expand All @@ -145,9 +129,9 @@ func TestMergeSplitProfiles(t *testing.T) {
}

func TestMergeSplitProfilesInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r1 := &dummyRequest{}
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := mergeSplitProfiles(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2)
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
assert.Error(t, err)
}

Expand All @@ -160,15 +144,23 @@ func TestExtractProfiles(t *testing.T) {
}
}

type tracesRequest struct {
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
// dummyRequest implements Request. It is for checking that merging two request types would fail
type dummyRequest struct {
}

func (req *dummyRequest) Export(_ context.Context) error {
return nil
}

func (req *dummyRequest) ItemsCount() int {
return 1
}

func (req *tracesRequest) Export(ctx context.Context) error {
return req.pusher(ctx, req.td)
func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
return nil, nil
}

func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
[]exporterhelper.Request, error) {
return nil, nil
}
18 changes: 1 addition & 17 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type BaseExporter struct {

Signal pipeline.Signal

BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request]

Marshaler exporterqueue.Marshaler[internal.Request]
Unmarshaler exporterqueue.Unmarshaler[internal.Request]

Expand Down Expand Up @@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}

Expand Down Expand Up @@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op
}
}

// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
// It must be provided as the first option when creating a new exporter helper.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option {
return func(o *BaseExporter) error {
o.BatchMergeFunc = mf
o.BatchMergeSplitfunc = msf
return nil
}
}

func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
if err != nil {
require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd)
Expand Down
24 changes: 13 additions & 11 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
// - concurrencyLimit is reached.
type BatchSender struct {
BaseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request]
cfg exporterbatcher.Config

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
Expand All @@ -46,14 +44,11 @@ type BatchSender struct {
}

// newBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings,
mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender {
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
bs := &BatchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
Expand Down Expand Up @@ -156,10 +151,17 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
var reqs []internal.Request
var mergeSplitErr error
if bs.activeBatch.request == nil {
reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil)
} else {
reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
return mergeSplitErr
}

bs.activeRequests.Add(1)
Expand Down Expand Up @@ -201,7 +203,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)

if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
req, err = bs.activeBatch.request.Merge(ctx, req)
if err != nil {
bs.mu.Unlock()
return err
Expand Down
Loading
Loading