diff --git a/plugins/parsers/avro/README.md b/plugins/parsers/avro/README.md index a36e05915918a..c44dd13fac27b 100644 --- a/plugins/parsers/avro/README.md +++ b/plugins/parsers/avro/README.md @@ -31,10 +31,15 @@ The message is supposed to be encoded as follows: ## Supported values are "binary" (default) and "json" # avro_format = "binary" - ## Url of the schema registry; exactly one of schema registry and - ## schema must be set + ## URL of the schema registry which may contain username and password in the + ## form http[s]://[username[:password]@][:port] + ## NOTE: Exactly one of schema registry and schema must be set avro_schema_registry = "http://localhost:8081" + ## Path to the schema registry certificate. Should be specified only if + ## required for connection to the schema registry. + # avro_schema_registry_cert = "/etc/telegraf/ca_cert.crt" + ## Schema string; exactly one of schema registry and schema must be set #avro_schema = ''' # { diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go index 83aebc2007779..1a792b7ebd875 100644 --- a/plugins/parsers/avro/parser.go +++ b/plugins/parsers/avro/parser.go @@ -26,6 +26,7 @@ import ( type Parser struct { MetricName string `toml:"metric_name"` SchemaRegistry string `toml:"avro_schema_registry"` + CaCertPath string `toml:"avro_schema_registry_cert"` Schema string `toml:"avro_schema"` Format string `toml:"avro_format"` Measurement string `toml:"avro_measurement"` @@ -62,7 +63,11 @@ func (p *Parser) Init() error { return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat) } if p.SchemaRegistry != "" { - p.registryObj = newSchemaRegistry(p.SchemaRegistry) + registry, err := newSchemaRegistry(p.SchemaRegistry, p.CaCertPath) + if err != nil { + return fmt.Errorf("error connecting to the schema registry %q: %w", p.SchemaRegistry, err) + } + p.registryObj = registry } return nil diff --git a/plugins/parsers/avro/schema_registry.go b/plugins/parsers/avro/schema_registry.go index 8b352b7ab6cf1..45e725d19a2b7 100644 --- a/plugins/parsers/avro/schema_registry.go +++ b/plugins/parsers/avro/schema_registry.go @@ -1,9 +1,14 @@ package avro import ( + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "net/http" + "net/url" + "os" + "time" "github.com/linkedin/goavro/v2" ) @@ -14,21 +19,75 @@ type schemaAndCodec struct { } type schemaRegistry struct { - url string - cache map[int]*schemaAndCodec + url string + username string + password string + cache map[int]*schemaAndCodec + client *http.Client } const schemaByID = "%s/schemas/ids/%d" -func newSchemaRegistry(url string) *schemaRegistry { - return &schemaRegistry{url: url, cache: make(map[int]*schemaAndCodec)} +func newSchemaRegistry(addr string, caCertPath string) (*schemaRegistry, error) { + caCert, err := os.ReadFile(caCertPath) + if err != nil { + return nil, err + } + + var client *http.Client + var tlsCfg *tls.Config + if caCertPath != "" { + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsCfg = &tls.Config{ + RootCAs: caCertPool, + } + } + client = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + MaxIdleConns: 10, + IdleConnTimeout: 90 * time.Second, + }, + } + + u, err := url.Parse(addr) + if err != nil { + return nil, fmt.Errorf("parsing registry URL failed: %w", err) + } + + var username, password string + if u.User != nil { + username = u.User.Username() + password, _ = u.User.Password() + } + + registry := &schemaRegistry{ + url: u.String(), + username: username, + password: password, + cache: make(map[int]*schemaAndCodec), + client: client, + } + + return registry, nil } func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) { if v, ok := sr.cache[id]; ok { return v, nil } - resp, err := http.Get(fmt.Sprintf(schemaByID, sr.url, id)) + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil) + if err != nil { + return nil, err + } + + if sr.username != "" { + req.SetBasicAuth(sr.username, sr.password) + } + + resp, err := sr.client.Do(req) if err != nil { return nil, err }