Skip to content

Commit

Permalink
expose additional client options around timeouts and max connections …
Browse files Browse the repository at this point in the history
…per broker
  • Loading branch information
atoulme committed Mar 3, 2023
1 parent a7fce73 commit 797d80b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
8 changes: 7 additions & 1 deletion exporter/pulsarexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Config struct {
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
TLSAllowInsecureConnection bool `mapstructure:"tls_allow_insecure_connection"`
Authentication Authentication `mapstructure:"auth"`
OperationTimeout time.Duration `mapstructure:"operation_timeout"`
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
MaxConnectionsPerBroker int `mapstructure:"map_connections_per_broker"`
}

type Authentication struct {
Expand Down Expand Up @@ -133,7 +136,10 @@ func (cfg *Config) auth() pulsar.Authentication {

func (cfg *Config) clientOptions() pulsar.ClientOptions {
options := pulsar.ClientOptions{
URL: cfg.Endpoint,
URL: cfg.Endpoint,
ConnectionTimeout: cfg.ConnectionTimeout,
OperationTimeout: cfg.OperationTimeout,
MaxConnectionsPerBroker: cfg.MaxConnectionsPerBroker,
}

options.TLSAllowInsecureConnection = cfg.TLSAllowInsecureConnection
Expand Down
22 changes: 14 additions & 8 deletions exporter/pulsarexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Endpoint: "pulsar://localhost:6650",
Topic: "spans",
Encoding: "otlp-spans",
TLSTrustCertsFilePath: "ca.pem",
Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}},
Endpoint: "pulsar://localhost:6650",
Topic: "spans",
Encoding: "otlp-spans",
TLSTrustCertsFilePath: "ca.pem",
Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}},
MaxConnectionsPerBroker: 1,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 30 * time.Second,
Producer: Producer{
MaxReconnectToBroker: nil,
HashingScheme: "java_string_hash",
Expand Down Expand Up @@ -108,9 +111,12 @@ func TestClientOptions(t *testing.T) {
options := cfg.(*Config).clientOptions()

assert.Equal(t, &pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
TLSTrustCertsFilePath: "ca.pem",
Authentication: pulsar.NewAuthenticationTLS("cert.pem", "key.pem"),
URL: "pulsar://localhost:6650",
TLSTrustCertsFilePath: "ca.pem",
Authentication: pulsar.NewAuthenticationTLS("cert.pem", "key.pem"),
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 30 * time.Second,
MaxConnectionsPerBroker: 1,
}, &options)

}
10 changes: 7 additions & 3 deletions exporter/pulsarexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pulsarexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -73,9 +74,12 @@ func createDefaultConfig() component.Config {
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
Endpoint: defaultBroker,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Authentication: Authentication{},
Topic: "",
Encoding: defaultEncoding,
Authentication: Authentication{},
MaxConnectionsPerBroker: 1,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 30 * time.Second,
}
}

Expand Down
10 changes: 7 additions & 3 deletions exporter/pulsarexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pulsarexporter
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -32,9 +33,12 @@ func Test_createDefaultConfig(t *testing.T) {
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
Endpoint: defaultBroker,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Authentication: Authentication{},
Topic: "",
Encoding: defaultEncoding,
Authentication: Authentication{},
MaxConnectionsPerBroker: 1,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 30 * time.Second,
})
}

Expand Down

0 comments on commit 797d80b

Please sign in to comment.