From ed9dcb414c5fd5f4adbb78785a318c9a9a40b492 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Fri, 22 Dec 2023 16:35:46 -0800 Subject: [PATCH] [exporterhelper] Add WithRequestQueue option to the exporter The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process. --- .chloggen/exporter-helper-v2.yaml | 32 ++++ Makefile | 1 + cmd/builder/test/core.builder.yaml | 1 + cmd/mdatagen/go.mod | 2 + cmd/otelcorecol/builder-config.yaml | 1 + cmd/otelcorecol/go.mod | 3 + config/configgrpc/go.mod | 3 + config/confighttp/go.mod | 2 + config/configqueue/Makefile | 1 + config/configqueue/go.mod | 48 +++++ config/configqueue/go.sum | 90 +++++++++ config/configqueue/queue.go | 69 +++++++ config/configqueue/queue_test.go | 30 +++ connector/forwardconnector/go.mod | 6 +- connector/go.mod | 2 + consumer/go.mod | 2 + exporter/debugexporter/go.mod | 7 +- exporter/exporterhelper/common.go | 46 ++++- .../internal/bounded_memory_queue.go | 6 +- .../internal/bounded_memory_queue_test.go | 20 +- .../internal/persistent_queue.go | 14 +- .../internal/persistent_queue_test.go | 36 +++- exporter/exporterhelper/internal/queue.go | 8 + exporter/exporterhelper/queue.go | 52 ++++++ exporter/exporterhelper/queue_sender.go | 15 +- exporter/exporterhelper/queue_sender_test.go | 171 +++++++++++++----- exporter/go.mod | 3 + exporter/loggingexporter/go.mod | 7 +- exporter/otlpexporter/go.mod | 7 +- exporter/otlphttpexporter/go.mod | 7 +- extension/ballastextension/go.mod | 6 +- extension/zpagesextension/go.mod | 4 +- go.mod | 3 + internal/e2e/go.mod | 3 + otelcol/go.mod | 2 + processor/batchprocessor/go.mod | 4 +- processor/go.mod | 3 + processor/memorylimiterprocessor/go.mod | 6 +- receiver/go.mod | 3 + receiver/otlpreceiver/go.mod | 2 + service/go.mod | 2 + 41 files changed, 627 insertions(+), 103 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml create mode 100644 config/configqueue/Makefile create mode 100644 config/configqueue/go.mod create mode 100644 config/configqueue/go.sum create mode 100644 config/configqueue/queue.go create mode 100644 config/configqueue/queue_test.go create mode 100644 exporter/exporterhelper/queue.go diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 000000000000..f177699edbda --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,32 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter/exporterhelper package: + - `WithRequestQueue`: a new exporter helper option for using a queue. + - `Queue`: an interface for queue implementations. + - `QueueFactory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option. + - `QueueCreateSettings`: queue factory settings. + - `QueueConfig`: common configuration for queue implementations. + - `NewDefaultQueueConfig`: a function for creating a default queue configuration. + - `NewMemoryQueueFactory`: a new factory for creating a memory queue. + - `NewPersistentQueueFactory: a factory for creating a persistent queue. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/Makefile b/Makefile index 925cb18a53a3..d33ae5bffff3 100644 --- a/Makefile +++ b/Makefile @@ -252,6 +252,7 @@ check-contrib: @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/confighttp=$(CURDIR)/config/confighttp" @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/confignet=$(CURDIR)/config/confignet" @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configopaque=$(CURDIR)/config/configopaque" + @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configqueue=$(CURDIR)/config/configqueue" @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configretry=$(CURDIR)/config/configretry" @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configtelemetry=$(CURDIR)/config/configtelemetry" @$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configtls=$(CURDIR)/config/configtls" diff --git a/cmd/builder/test/core.builder.yaml b/cmd/builder/test/core.builder.yaml index 34017f6fe41d..bbc7c4f4a511 100644 --- a/cmd/builder/test/core.builder.yaml +++ b/cmd/builder/test/core.builder.yaml @@ -26,6 +26,7 @@ replaces: - go.opentelemetry.io/collector/config/confighttp => ${WORKSPACE_DIR}/config/confighttp - go.opentelemetry.io/collector/config/confignet => ${WORKSPACE_DIR}/config/confignet - go.opentelemetry.io/collector/config/configopaque => ${WORKSPACE_DIR}/config/configopaque + - go.opentelemetry.io/collector/config/configqueue => ${WORKSPACE_DIR}/config/configqueue - go.opentelemetry.io/collector/config/configretry => ${WORKSPACE_DIR}/config/configretry - go.opentelemetry.io/collector/config/configtelemetry => ${WORKSPACE_DIR}/config/configtelemetry - go.opentelemetry.io/collector/config/configtls => ${WORKSPACE_DIR}/config/configtls diff --git a/cmd/mdatagen/go.mod b/cmd/mdatagen/go.mod index 610f2dec11d3..3b29914cfc0f 100644 --- a/cmd/mdatagen/go.mod +++ b/cmd/mdatagen/go.mod @@ -75,3 +75,5 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter replace go.opentelemetry.io/collector/semconv => ../../semconv replace go.opentelemetry.io/collector/receiver => ../../receiver + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 4da64de2440c..40a0d0701055 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -31,6 +31,7 @@ replaces: - go.opentelemetry.io/collector/config/confighttp => ../../config/confighttp - go.opentelemetry.io/collector/config/confignet => ../../config/confignet - go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque + - go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue - go.opentelemetry.io/collector/config/configretry => ../../config/configretry - go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry - go.opentelemetry.io/collector/config/configtls => ../../config/configtls diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index c09f035d74a6..b07c1185aedf 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -84,6 +84,7 @@ require ( go.opentelemetry.io/collector/config/confighttp v0.91.0 // indirect go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/configtls v0.91.0 // indirect @@ -146,6 +147,8 @@ replace go.opentelemetry.io/collector/config/confignet => ../../config/confignet replace go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue + replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index 6496e81808db..65ae1848555b 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -55,6 +55,7 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -86,6 +87,8 @@ replace go.opentelemetry.io/collector/config/configopaque => ../configopaque replace go.opentelemetry.io/collector/config/configtls => ../configtls +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue + replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry replace go.opentelemetry.io/collector/config/configtelemetry => ../configtelemetry diff --git a/config/confighttp/go.mod b/config/confighttp/go.mod index 87ee7497387f..aca4520ddcc6 100644 --- a/config/confighttp/go.mod +++ b/config/confighttp/go.mod @@ -88,3 +88,5 @@ replace go.opentelemetry.io/collector/component => ../../component replace go.opentelemetry.io/collector/consumer => ../../consumer replace go.opentelemetry.io/collector/config/configretry => ../configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../configqueue diff --git a/config/configqueue/Makefile b/config/configqueue/Makefile new file mode 100644 index 000000000000..ded7a36092dc --- /dev/null +++ b/config/configqueue/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/config/configqueue/go.mod b/config/configqueue/go.mod new file mode 100644 index 000000000000..677af473deb2 --- /dev/null +++ b/config/configqueue/go.mod @@ -0,0 +1,48 @@ +module go.opentelemetry.io/collector/config/configqueue + +go 1.20 + +require ( + github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector/component v0.91.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.0.1 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect + go.opentelemetry.io/collector/confmap v0.91.0 // indirect + go.opentelemetry.io/collector/featuregate v1.0.0 // indirect + go.opentelemetry.io/collector/pdata v1.0.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect + google.golang.org/grpc v1.60.1 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/component => ../../component + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate + +replace go.opentelemetry.io/collector/confmap => ../../confmap + +replace go.opentelemetry.io/collector/config/configtelemetry => ../configtelemetry diff --git a/config/configqueue/go.sum b/config/configqueue/go.sum new file mode 100644 index 000000000000..3becee0c47ac --- /dev/null +++ b/config/configqueue/go.sum @@ -0,0 +1,90 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.0.1 h1:1dYGITt1I23x8cfx8ZnldtezdyaZtfAuRtIFOiRzK7g= +github.com/knadh/koanf/v2 v2.0.1/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmLKZf+SjVanKKhCgf3bg+511DmU9eDQTen7LLbY= +github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/config/configqueue/queue.go b/config/configqueue/queue.go new file mode 100644 index 000000000000..13b7bf843de1 --- /dev/null +++ b/config/configqueue/queue.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package configqueue // import "go.opentelemetry.io/collector/config/configqueue" + +import ( + "errors" + + "go.opentelemetry.io/collector/component" +) + +// QueueConfig defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type QueueConfig struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSizeItems is the maximum number of items (spans, metric data points or log records) + // allowed in queue at any given time. + QueueSizeItems int `mapstructure:"queue_size_items"` + // QueueSize is the maximum number of requests allowed in queue at any given time. + // This option is left for backward compatibility and will be deprecated in the future. + // It's recommended to use QueueSizeItems instead. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultQueueConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultQueueConfig() QueueConfig { + return QueueConfig{ + Enabled: true, + NumConsumers: 10, + QueueSizeItems: 100_000, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *QueueConfig) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.NumConsumers <= 0 { + return errors.New("number of consumers must be positive") + } + if qCfg.QueueSizeItems > 0 && qCfg.QueueSize > 0 { + return errors.New("only one of 'queue_size' and 'queue_size_items' can be specified") + } + if qCfg.QueueSizeItems <= 0 && qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} + +// PersistentQueueConfig defines configuration for queueing requests in a persistent storage. +// The struct is provided to be added in the exporter configuration as one struct under the "sending_queue" key. +// The exporter helper Go interface requires the fields to be provided separately to WithRequestQueue and +// NewPersistentQueueFactory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueConfig struct { + QueueConfig `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} diff --git a/config/configqueue/queue_test.go b/config/configqueue/queue_test.go new file mode 100644 index 000000000000..ce88519a3c07 --- /dev/null +++ b/config/configqueue/queue_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package configqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultQueueConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.NumConsumers = 0 + assert.EqualError(t, qCfg.Validate(), "number of consumers must be positive") + + qCfg = NewDefaultQueueConfig() + qCfg.QueueSizeItems = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + qCfg.QueueSize = 1 + qCfg.QueueSizeItems = 1 + assert.EqualError(t, qCfg.Validate(), "only one of 'queue_size' and 'queue_size_items' can be specified") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} diff --git a/connector/forwardconnector/go.mod b/connector/forwardconnector/go.mod index 10271f0c88a1..fde32bc7f36b 100644 --- a/connector/forwardconnector/go.mod +++ b/connector/forwardconnector/go.mod @@ -8,6 +8,8 @@ require ( go.opentelemetry.io/collector/connector v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 ) require ( @@ -30,8 +32,6 @@ require ( go.opentelemetry.io/collector/confmap v0.91.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.18.0 // indirect @@ -73,3 +73,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/connector/go.mod b/connector/go.mod index d9bc232bea5c..147d989a369a 100644 --- a/connector/go.mod +++ b/connector/go.mod @@ -65,3 +65,5 @@ replace go.opentelemetry.io/collector/receiver => ../receiver replace go.opentelemetry.io/collector/exporter => ../exporter replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/consumer/go.mod b/consumer/go.mod index 509dd9b6726f..a5918933d07a 100644 --- a/consumer/go.mod +++ b/consumer/go.mod @@ -53,3 +53,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index bf74262655b6..46b380dfd793 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -8,6 +8,8 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 ) require ( @@ -28,6 +30,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f // indirect go.opentelemetry.io/collector/consumer v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -35,8 +38,6 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0 // indirect go.opentelemetry.io/collector/receiver v0.91.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.18.0 // indirect @@ -71,3 +72,5 @@ replace go.opentelemetry.io/collector/processor => ../../processor replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 2c5a4e96692e..121f48da35f3 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -10,9 +10,11 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configqueue" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" ) // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). @@ -102,7 +104,7 @@ func WithRetry(config configretry.BackOffConfig) Option { func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } if !config.Enabled { o.queueSender = &errorLoggingRequestSender{ @@ -111,7 +113,47 @@ func WithQueue(config QueueSettings) Option { } return } - o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) + var queue internal.Queue[Request] + set := internal.QueueCreateSettings[Request]{ + CreateSettings: o.set, + Sizer: &internal.RequestSizer[Request]{}, + Capacity: config.QueueSize, + DataType: o.signal, + } + if config.StorageID != nil { + queue = internal.NewPersistentQueue[Request](set, *config.StorageID, o.marshaler, o.unmarshaler) + } else { + queue = internal.NewBoundedMemoryQueue[Request](set) + } + o.queueSender = newQueueSender(queue, o.set, config.NumConsumers) + } +} + +// WithRequestQueue enables queueing for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithRequestQueue(cfg configqueue.QueueConfig, queueFactory QueueFactory) Option { + return func(o *baseExporter) { + if !cfg.Enabled { + o.queueSender = &errorLoggingRequestSender{ + logger: o.set.Logger, + message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", + } + return + } + set := QueueCreateSettings{ + CreateSettings: o.set, + DataType: o.signal, + } + if cfg.QueueSizeItems > 0 { + set.Sizer = &internal.ItemsSizer[Request]{} + set.Capacity = cfg.QueueSizeItems + } else { + set.Sizer = &internal.RequestSizer[Request]{} + set.Capacity = cfg.QueueSize + } + o.queueSender = newQueueSender(queueFactory(set), o.set, cfg.NumConsumers) } } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 0390495c5107..7cf2166e3849 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -22,10 +22,10 @@ type boundedMemoryQueue[T any] struct { // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedMemoryQueue[T any](sizer Sizer[T], capacity int) Queue[T] { +func NewBoundedMemoryQueue[T any](set QueueCreateSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity), - items: make(chan queueRequest[T], capacity), + queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), + items: make(chan queueRequest[T], set.Capacity), } } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 436854dcfa43..82fa095c73e0 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -22,7 +22,10 @@ import ( // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. func TestBoundedQueue(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1) + q := NewBoundedMemoryQueue[string](QueueCreateSettings[string]{ + Sizer: &RequestSizer[string]{}, + Capacity: 1, + }) assert.NoError(t, q.Offer(context.Background(), "a")) @@ -72,7 +75,10 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1000) + q := NewBoundedMemoryQueue[string](QueueCreateSettings[string]{ + Sizer: &RequestSizer[string]{}, + Capacity: 1000, + }) assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 10; i++ { @@ -153,7 +159,10 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numCo func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) { var wg sync.WaitGroup wg.Add(numberOfItems) - q := NewBoundedMemoryQueue[fakeReq](sizer, capacity) + q := NewBoundedMemoryQueue[fakeReq](QueueCreateSettings[fakeReq]{ + Sizer: sizer, + Capacity: capacity, + }) consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error { wg.Done() return nil @@ -170,7 +179,10 @@ func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers } func TestZeroSizeNoConsumers(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 0) + q := NewBoundedMemoryQueue[string](QueueCreateSettings[string]{ + Sizer: &RequestSizer[string]{}, + Capacity: 0, + }) err := q.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index f058a9177dae..52026214545b 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -87,19 +87,19 @@ var ( ) // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue[T any](sizer Sizer[T], capacity int, dataType component.DataType, storageID component.ID, - marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] { - _, isRequestSized := sizer.(*RequestSizer[T]) +func NewPersistentQueue[T any](set QueueCreateSettings[T], storageID component.ID, marshaler func(req T) ([]byte, + error), unmarshaler func([]byte) (T, error)) Queue[T] { + _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity), - set: set, + queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), + set: set.CreateSettings, storageID: storageID, - dataType: dataType, + dataType: set.DataType, unmarshaler: unmarshaler, marshaler: marshaler, initQueueSize: &atomic.Uint64{}, isRequestSized: isRequestSized, - putChan: make(chan struct{}, capacity), + putChan: make(chan struct{}, set.Capacity), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 1b4fccdeac7d..d0c2bb5617cd 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -55,8 +55,13 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()) + set := QueueCreateSettings[tracesRequest]{ + Sizer: sizer, + Capacity: capacity, + DataType: component.DataTypeTraces, + CreateSettings: exportertest.NewNopCreateSettings(), + } + pq := NewPersistentQueue[tracesRequest](set, component.ID{}, marshalTracesRequest, unmarshalTracesRequest) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} @@ -69,8 +74,13 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], } func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1000, component.DataTypeTraces, - component.ID{}, marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest]) + set := QueueCreateSettings[tracesRequest]{ + Sizer: &RequestSizer[tracesRequest]{}, + Capacity: 1000, + DataType: component.DataTypeTraces, + CreateSettings: exportertest.NewNopCreateSettings(), + } + pq := NewPersistentQueue[tracesRequest](set, component.ID{}, marshalTracesRequest, unmarshalTracesRequest).(*persistentQueue[tracesRequest]) pq.initClient(context.Background(), client) return pq } @@ -85,8 +95,13 @@ func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extens func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest], capacity int) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest]) + set := QueueCreateSettings[tracesRequest]{ + Sizer: sizer, + Capacity: capacity, + DataType: component.DataTypeTraces, + CreateSettings: exportertest.NewNopCreateSettings(), + } + pq := NewPersistentQueue[tracesRequest](set, component.ID{}, marshalTracesRequest, unmarshalTracesRequest).(*persistentQueue[tracesRequest]) require.NoError(t, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq } @@ -320,8 +335,13 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()) + set := QueueCreateSettings[tracesRequest]{ + Sizer: &RequestSizer[tracesRequest]{}, + Capacity: 1, + DataType: component.DataTypeTraces, + CreateSettings: exportertest.NewNopCreateSettings(), + } + pq := NewPersistentQueue[tracesRequest](set, component.ID{}, marshalTracesRequest, unmarshalTracesRequest) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 8bd8879a9402..5ccc9609e798 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -10,6 +10,7 @@ import ( "errors" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" ) var ( @@ -34,3 +35,10 @@ type Queue[T any] interface { // Capacity returns the capacity of the queue. Capacity() int } + +type QueueCreateSettings[T any] struct { + CreateSettings exporter.CreateSettings + Sizer Sizer[T] + Capacity int + DataType component.DataType +} diff --git a/exporter/exporterhelper/queue.go b/exporter/exporterhelper/queue.go new file mode 100644 index 000000000000..463e7b7f460b --- /dev/null +++ b/exporter/exporterhelper/queue.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue +// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Queue = internal.Queue[Request] + +// QueueCreateSettings defines settings for creating a queue. +type QueueCreateSettings = internal.QueueCreateSettings[Request] + +type QueueFactory func(QueueCreateSettings) Queue + +// NewMemoryQueueFactory returns a factory to create a new memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMemoryQueueFactory() QueueFactory { + return func(set QueueCreateSettings) Queue { + return internal.NewBoundedMemoryQueue[Request](set) + } +} + +// PersistentQueueSettings defines developer settings for the persistent queue factory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueSettings struct { + // RequestMarshaler is used to serialize requests before storing them in the persistent storage. + RequestMarshaler + // RequestUnmarshaler is used to deserialize requests after reading them from the persistent storage. + RequestUnmarshaler +} + +// NewPersistentQueueFactory returns a factory to create a new persistent queue. +// If cfg.StorageID is nil then it falls back to memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewPersistentQueueFactory(storageID *component.ID, factorySettings PersistentQueueSettings) QueueFactory { + if storageID == nil { + return NewMemoryQueueFactory() + } + return func(set QueueCreateSettings) Queue { + return internal.NewPersistentQueue[Request](set, *storageID, factorySettings.RequestMarshaler, factorySettings.RequestUnmarshaler) + } +} diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index c68918258511..554f22a44465 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -85,18 +85,7 @@ type queueSender struct { metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, - marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender { - - isPersistent := config.StorageID != nil - var queue internal.Queue[Request] - queueSizer := &internal.RequestSizer[Request]{} - if isPersistent { - queue = internal.NewPersistentQueue[Request](queueSizer, config.QueueSize, signal, *config.StorageID, marshaler, - unmarshaler, set) - } else { - queue = internal.NewBoundedMemoryQueue[Request](queueSizer, config.QueueSize) - } +func newQueueSender(queue Queue, set exporter.CreateSettings, numConsumers int) *queueSender { qs := &queueSender{ fullName: set.ID.String(), queue: queue, @@ -104,7 +93,7 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), } - qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) + qs.consumers = internal.NewQueueConsumers(queue, numConsumers, qs.consume) return qs } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index fe0633a605d5..7ae226349a14 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configqueue" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" @@ -96,41 +97,85 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { } func TestQueuedRetryHappyPath(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) - for i := 0; i < wantRequests; i++ { - ocs.run(func() { - req := newMockRequest(2, nil) - reqs = append(reqs, req) - require.NoError(t, be.send(context.Background(), req)) - }) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: WithQueue(QueueSettings{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }), + }, + { + name: "WithRequestQueue/MemoryQueueFactory", + queueOption: WithRequestQueue(configqueue.QueueConfig{ + Enabled: true, + QueueSizeItems: 21, + NumConsumers: 1, + }, NewMemoryQueueFactory()), + }, + { + name: "WithRequestQueue/PersistentQueueFactory", + queueOption: WithRequestQueue(configqueue.QueueConfig{ + Enabled: true, + QueueSizeItems: 21, + NumConsumers: 1, + }, NewPersistentQueueFactory(nil, PersistentQueueSettings{})), + }, + { + name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", + queueOption: WithRequestQueue(configqueue.QueueConfig{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, NewPersistentQueueFactory(nil, PersistentQueueSettings{})), + }, } - - // Wait until all batches received - ocs.awaitAsyncProcessing() - - require.Len(t, reqs, wantRequests) - for _, req := range reqs { - req.checkNumRequests(t, 1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tel, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), tt.queueOption) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + + wantRequests := 10 + reqs := make([]*mockRequest, 0, 10) + for i := 0; i < wantRequests; i++ { + ocs.run(func() { + req := newMockRequest(2, nil) + reqs = append(reqs, req) + require.NoError(t, be.send(context.Background(), req)) + }) + } + + // expect queue to be full + require.Error(t, be.send(context.Background(), newMockRequest(2, nil))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Wait until all batches received + ocs.awaitAsyncProcessing() + + require.Len(t, reqs, wantRequests) + for _, req := range reqs { + req.checkNumRequests(t, 1) + } + + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) + }) } - - ocs.checkSendItemsCount(t, 2*wantRequests) - ocs.checkDroppedItemsCount(t, 0) } // Force the state of feature gate for a test @@ -222,22 +267,47 @@ func TestQueueSettings_Validate(t *testing.T) { } func TestQueueRetryWithDisabledQueue(t *testing.T) { - qs := NewDefaultQueueSettings() - qs.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.IsType(t, &errorLoggingRequestSender{}, be.queueSender) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: func() Option { + qs := NewDefaultQueueSettings() + qs.Enabled = false + return WithQueue(qs) + }(), + }, + { + name: "WithRequestQueue", + queueOption: func() Option { + qs := configqueue.NewDefaultQueueConfig() + qs.Enabled = false + return WithRequestQueue(qs, NewMemoryQueueFactory()) + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, tt.queueOption) + require.IsType(t, &errorLoggingRequestSender{}, be.queueSender) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + } func TestQueuedRetryPersistenceEnabled(t *testing.T) { @@ -331,7 +401,8 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { } func TestQueueSenderNoStartShutdown(t *testing.T) { - qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil) + queue := internal.NewBoundedMemoryQueue[Request](QueueCreateSettings{}) + qs := newQueueSender(queue, exportertest.NewNopCreateSettings(), 1) assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/go.mod b/exporter/go.mod index 0d5bbcd7ebd7..b88dd0a57389 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -8,6 +8,7 @@ require ( go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.91.0 go.opentelemetry.io/collector/component v0.91.0 + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f go.opentelemetry.io/collector/config/configtelemetry v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 @@ -88,3 +89,5 @@ retract v0.76.0 // Depends on retracted pdata v1.0.0-rc10 module replace go.opentelemetry.io/collector/config/configretry => ../config/configretry replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/exporter/loggingexporter/go.mod b/exporter/loggingexporter/go.mod index 27614d667681..77b250da1139 100644 --- a/exporter/loggingexporter/go.mod +++ b/exporter/loggingexporter/go.mod @@ -9,6 +9,8 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 ) @@ -30,6 +32,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f // indirect go.opentelemetry.io/collector/consumer v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -37,8 +40,6 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0 // indirect go.opentelemetry.io/collector/receiver v0.91.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -77,3 +78,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 90807134649a..bccdf2090f57 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -16,6 +16,8 @@ require ( go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.31.0 @@ -46,6 +48,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -55,10 +58,8 @@ require ( go.opentelemetry.io/contrib/config v0.1.1 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.19.0 // indirect @@ -111,3 +112,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index aa019ca1e854..a82c1d1ea29e 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -15,6 +15,8 @@ require ( go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 google.golang.org/grpc v1.60.1 @@ -46,6 +48,7 @@ require ( github.com/rs/cors v1.10.1 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/config/configauth v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -55,10 +58,8 @@ require ( go.opentelemetry.io/contrib/config v0.1.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -108,3 +109,5 @@ retract ( ) replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/extension/ballastextension/go.mod b/extension/ballastextension/go.mod index a51dc33eea48..cb0ca366f1b6 100644 --- a/extension/ballastextension/go.mod +++ b/extension/ballastextension/go.mod @@ -9,6 +9,8 @@ require ( go.opentelemetry.io/collector/component v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/extension v0.91.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 ) @@ -35,8 +37,6 @@ require ( go.opentelemetry.io/collector/featuregate v1.0.0 // indirect go.opentelemetry.io/collector/pdata v1.0.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -75,3 +75,5 @@ replace go.opentelemetry.io/collector/processor => ../../processor replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/extension/zpagesextension/go.mod b/extension/zpagesextension/go.mod index c13974f7d032..fd6cfd2a0bef 100644 --- a/extension/zpagesextension/go.mod +++ b/extension/zpagesextension/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/extension v0.91.0 go.opentelemetry.io/contrib/zpages v0.46.1 + go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 @@ -34,7 +35,6 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0 // indirect go.opentelemetry.io/contrib/config v0.1.1 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect @@ -76,3 +76,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/go.mod b/go.mod index 4a7266f5ee58..8ab71ab8283f 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect go.opentelemetry.io/collector/extension v0.91.0 // indirect @@ -104,3 +105,5 @@ retract ( v0.57.0 // Release failed, use v0.57.2 v0.32.0 // Contains incomplete metrics transition to proto 0.9.0, random components are not working. ) + +replace go.opentelemetry.io/collector/config/configqueue => ./config/configqueue diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index ae76c77e270a..07b8ea53ccec 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -49,6 +49,7 @@ require ( go.opentelemetry.io/collector/config/configcompression v0.91.0 // indirect go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect @@ -123,3 +124,5 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/otelcol/go.mod b/otelcol/go.mod index e92acc663e2e..fafd48824144 100644 --- a/otelcol/go.mod +++ b/otelcol/go.mod @@ -128,3 +128,5 @@ replace go.opentelemetry.io/collector/featuregate => ../featuregate replace go.opentelemetry.io/collector/config/confignet => ../config/confignet replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 553e9da6874e..367dbcf2e53b 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/sdk/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 ) @@ -51,7 +52,6 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect go.opentelemetry.io/collector/featuregate v1.0.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -90,3 +90,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/processor/go.mod b/processor/go.mod index 7dc0c95705e7..b5a4cf3bdc50 100644 --- a/processor/go.mod +++ b/processor/go.mod @@ -47,6 +47,7 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect @@ -82,3 +83,5 @@ replace go.opentelemetry.io/collector/receiver => ../receiver replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index 660055143949..82685c066589 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -11,6 +11,8 @@ require ( go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 go.opentelemetry.io/collector/processor v0.91.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 ) @@ -39,8 +41,6 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect @@ -79,3 +79,5 @@ retract ( replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/receiver/go.mod b/receiver/go.mod index 75bd10a7a98c..81d42499cf58 100644 --- a/receiver/go.mod +++ b/receiver/go.mod @@ -48,6 +48,7 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect + go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect @@ -84,3 +85,5 @@ retract v0.76.0 // Depends on retracted pdata v1.0.0-rc10 module replace go.opentelemetry.io/collector/config/configtelemetry => ../config/configtelemetry replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index be75ffde6e84..ec5ab6afc8ce 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -131,3 +131,5 @@ retract ( ) replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue diff --git a/service/go.mod b/service/go.mod index aaf1d95716c8..28af9367342f 100644 --- a/service/go.mod +++ b/service/go.mod @@ -125,3 +125,5 @@ replace go.opentelemetry.io/collector/featuregate => ../featuregate replace go.opentelemetry.io/collector/config/confignet => ../config/confignet replace go.opentelemetry.io/collector/config/configretry => ../config/configretry + +replace go.opentelemetry.io/collector/config/configqueue => ../config/configqueue