Skip to content

Commit

Permalink
feat(parsers.avro): Allow connection to https schema registry (#13903)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Sep 11, 2023
1 parent a4631a2 commit 9db814d
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
9 changes: 7 additions & 2 deletions plugins/parsers/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ The message is supposed to be encoded as follows:
## Supported values are "binary" (default) and "json"
# avro_format = "binary"

## Url of the schema registry; exactly one of schema registry and
## schema must be set
## URL of the schema registry which may contain username and password in the
## form http[s]://[username[:password]@]<host>[:port]
## NOTE: Exactly one of schema registry and schema must be set
avro_schema_registry = "http://localhost:8081"

## Path to the schema registry certificate. Should be specified only if
## required for connection to the schema registry.
# avro_schema_registry_cert = "/etc/telegraf/ca_cert.crt"

## Schema string; exactly one of schema registry and schema must be set
#avro_schema = '''
# {
Expand Down
7 changes: 6 additions & 1 deletion plugins/parsers/avro/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type Parser struct {
MetricName string `toml:"metric_name"`
SchemaRegistry string `toml:"avro_schema_registry"`
CaCertPath string `toml:"avro_schema_registry_cert"`
Schema string `toml:"avro_schema"`
Format string `toml:"avro_format"`
Measurement string `toml:"avro_measurement"`
Expand Down Expand Up @@ -62,7 +63,11 @@ func (p *Parser) Init() error {
return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat)
}
if p.SchemaRegistry != "" {
p.registryObj = newSchemaRegistry(p.SchemaRegistry)
registry, err := newSchemaRegistry(p.SchemaRegistry, p.CaCertPath)
if err != nil {
return fmt.Errorf("error connecting to the schema registry %q: %w", p.SchemaRegistry, err)
}
p.registryObj = registry
}

return nil
Expand Down
69 changes: 64 additions & 5 deletions plugins/parsers/avro/schema_registry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package avro

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"time"

"github.com/linkedin/goavro/v2"
)
Expand All @@ -14,21 +19,75 @@ type schemaAndCodec struct {
}

type schemaRegistry struct {
url string
cache map[int]*schemaAndCodec
url string
username string
password string
cache map[int]*schemaAndCodec
client *http.Client
}

const schemaByID = "%s/schemas/ids/%d"

func newSchemaRegistry(url string) *schemaRegistry {
return &schemaRegistry{url: url, cache: make(map[int]*schemaAndCodec)}
func newSchemaRegistry(addr string, caCertPath string) (*schemaRegistry, error) {
caCert, err := os.ReadFile(caCertPath)
if err != nil {
return nil, err
}

var client *http.Client
var tlsCfg *tls.Config
if caCertPath != "" {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsCfg = &tls.Config{
RootCAs: caCertPool,
}
}
client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
},
}

u, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("parsing registry URL failed: %w", err)
}

var username, password string
if u.User != nil {
username = u.User.Username()
password, _ = u.User.Password()
}

registry := &schemaRegistry{
url: u.String(),
username: username,
password: password,
cache: make(map[int]*schemaAndCodec),
client: client,
}

return registry, nil
}

func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
if v, ok := sr.cache[id]; ok {
return v, nil
}
resp, err := http.Get(fmt.Sprintf(schemaByID, sr.url, id))

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
if err != nil {
return nil, err
}

if sr.username != "" {
req.SetBasicAuth(sr.username, sr.password)
}

resp, err := sr.client.Do(req)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9db814d

Please sign in to comment.