From 797d80bcce159d1875a4871ac40482f81e9780a8 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 3 Mar 2023 11:31:55 -0800 Subject: [PATCH] expose additional client options around timeouts and max connections per broker --- exporter/pulsarexporter/config.go | 8 +++++++- exporter/pulsarexporter/config_test.go | 22 ++++++++++++++-------- exporter/pulsarexporter/factory.go | 10 +++++++--- exporter/pulsarexporter/factory_test.go | 10 +++++++--- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/exporter/pulsarexporter/config.go b/exporter/pulsarexporter/config.go index 8593edd11847..d8265838e19b 100644 --- a/exporter/pulsarexporter/config.go +++ b/exporter/pulsarexporter/config.go @@ -43,6 +43,9 @@ type Config struct { // Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) TLSAllowInsecureConnection bool `mapstructure:"tls_allow_insecure_connection"` Authentication Authentication `mapstructure:"auth"` + OperationTimeout time.Duration `mapstructure:"operation_timeout"` + ConnectionTimeout time.Duration `mapstructure:"connection_timeout"` + MaxConnectionsPerBroker int `mapstructure:"map_connections_per_broker"` } type Authentication struct { @@ -133,7 +136,10 @@ func (cfg *Config) auth() pulsar.Authentication { func (cfg *Config) clientOptions() pulsar.ClientOptions { options := pulsar.ClientOptions{ - URL: cfg.Endpoint, + URL: cfg.Endpoint, + ConnectionTimeout: cfg.ConnectionTimeout, + OperationTimeout: cfg.OperationTimeout, + MaxConnectionsPerBroker: cfg.MaxConnectionsPerBroker, } options.TLSAllowInsecureConnection = cfg.TLSAllowInsecureConnection diff --git a/exporter/pulsarexporter/config_test.go b/exporter/pulsarexporter/config_test.go index 831b27bf6590..2c2a10b3871b 100644 --- a/exporter/pulsarexporter/config_test.go +++ b/exporter/pulsarexporter/config_test.go @@ -57,11 +57,14 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Endpoint: "pulsar://localhost:6650", - Topic: "spans", - Encoding: "otlp-spans", - TLSTrustCertsFilePath: "ca.pem", - Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}}, + Endpoint: "pulsar://localhost:6650", + Topic: "spans", + Encoding: "otlp-spans", + TLSTrustCertsFilePath: "ca.pem", + Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}}, + MaxConnectionsPerBroker: 1, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 30 * time.Second, Producer: Producer{ MaxReconnectToBroker: nil, HashingScheme: "java_string_hash", @@ -108,9 +111,12 @@ func TestClientOptions(t *testing.T) { options := cfg.(*Config).clientOptions() assert.Equal(t, &pulsar.ClientOptions{ - URL: "pulsar://localhost:6650", - TLSTrustCertsFilePath: "ca.pem", - Authentication: pulsar.NewAuthenticationTLS("cert.pem", "key.pem"), + URL: "pulsar://localhost:6650", + TLSTrustCertsFilePath: "ca.pem", + Authentication: pulsar.NewAuthenticationTLS("cert.pem", "key.pem"), + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 30 * time.Second, + MaxConnectionsPerBroker: 1, }, &options) } diff --git a/exporter/pulsarexporter/factory.go b/exporter/pulsarexporter/factory.go index bc946985a62e..2511fcea3dae 100644 --- a/exporter/pulsarexporter/factory.go +++ b/exporter/pulsarexporter/factory.go @@ -16,6 +16,7 @@ package pulsarexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -73,9 +74,12 @@ func createDefaultConfig() component.Config { QueueSettings: exporterhelper.NewDefaultQueueSettings(), Endpoint: defaultBroker, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. - Topic: "", - Encoding: defaultEncoding, - Authentication: Authentication{}, + Topic: "", + Encoding: defaultEncoding, + Authentication: Authentication{}, + MaxConnectionsPerBroker: 1, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 30 * time.Second, } } diff --git a/exporter/pulsarexporter/factory_test.go b/exporter/pulsarexporter/factory_test.go index a9f679ed8481..d5adb3204aaa 100644 --- a/exporter/pulsarexporter/factory_test.go +++ b/exporter/pulsarexporter/factory_test.go @@ -17,6 +17,7 @@ package pulsarexporter import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,9 +33,12 @@ func Test_createDefaultConfig(t *testing.T) { QueueSettings: exporterhelper.NewDefaultQueueSettings(), Endpoint: defaultBroker, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. - Topic: "", - Encoding: defaultEncoding, - Authentication: Authentication{}, + Topic: "", + Encoding: defaultEncoding, + Authentication: Authentication{}, + MaxConnectionsPerBroker: 1, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 30 * time.Second, }) }