Skip to content

Commit

Permalink
Use clickhouse.ParseDSN for clickhouse URL parsing (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Apr 19, 2024
1 parent 40de98c commit 076a33a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 87 deletions.
31 changes: 6 additions & 25 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -540,34 +539,16 @@ const (
func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, error) {
// use empty database to create database
ctx := context.Background()
dsnURL, err := url.Parse(cfg.DSN)
options, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
}

// setting maxOpenIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` error
maxOpenIdleConnections := cfg.QueueSettings.NumConsumers + 1

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
MaxOpenConns: maxOpenIdleConnections + 5,
MaxIdleConns: maxOpenIdleConnections,
}

if dsnURL.Query().Get("username") != "" {
auth := clickhouse.Auth{
Username: dsnURL.Query().Get("username"),
Password: dsnURL.Query().Get("password"),
}
options.Auth = auth
}

if dsnURL.Query().Get("dial_timeout") != "" {
dialTimeout, err := time.ParseDuration(dsnURL.Query().Get("dial_timeout"))
if err != nil {
return nil, fmt.Errorf("failed to parse dial_timeout from dsn: %w", err)
}
options.DialTimeout = dialTimeout
// setting maxIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` error
maxIdleConnections := cfg.QueueSettings.NumConsumers + 1
if options.MaxIdleConns < maxIdleConnections {
options.MaxIdleConns = maxIdleConnections
options.MaxOpenConns = maxIdleConnections + 5
}

db, err := clickhouse.Open(options)
Expand Down
30 changes: 10 additions & 20 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"fmt"
"math"
"net/url"
"runtime/pprof"
"strings"
"sync"
Expand Down Expand Up @@ -93,31 +92,22 @@ type ClickHouseParams struct {
func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
l := logrus.WithField("component", "clickhouse")

dsnURL, err := url.Parse(params.DSN)
options, err := clickhouse.ParseDSN(params.DSN)

if err != nil {
return nil, err
}
database := dsnURL.Query().Get("database")
if database == "" {
return nil, fmt.Errorf("database should be set in ClickHouse DSN")
}

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
MaxIdleConns: params.MaxIdleConns,
MaxOpenConns: params.MaxOpenConns,
DialTimeout: 1 * time.Minute,
if options.MaxIdleConns < params.MaxIdleConns {
options.MaxIdleConns = params.MaxIdleConns
}
if dsnURL.Query().Get("username") != "" {
auth := clickhouse.Auth{
// Database: "",
Username: dsnURL.Query().Get("username"),
Password: dsnURL.Query().Get("password"),
}

options.Auth = auth
if options.MaxOpenConns < params.MaxOpenConns {
options.MaxOpenConns = params.MaxOpenConns
}
if options.DialTimeout < 1*time.Minute {
options.DialTimeout = 1 * time.Minute
}

conn, err := clickhouse.Open(options)
if err != nil {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
Expand All @@ -126,7 +116,7 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
ch := &clickHouse{
conn: conn,
l: l,
database: database,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,

timeSeries: make(map[uint64]struct{}, 8192),
Expand Down
27 changes: 12 additions & 15 deletions exporter/clickhousetracesexporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"flag"
"fmt"
"net/url"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/google/uuid"
Expand Down Expand Up @@ -91,21 +90,19 @@ type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)

func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
ctx := context.Background()
options, err := clickhouse.ParseDSN(cfg.Datasource)

if err != nil {
return nil, err
}

// setting maxOpenIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout`
// error when using multiple consumers along with usage exporter
maxOpenIdleConnections := cfg.NumConsumers + 1
dsnURL, err := url.Parse(cfg.Datasource)
options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
MaxOpenConns: maxOpenIdleConnections + 5,
MaxIdleConns: maxOpenIdleConnections,
}
if dsnURL.Query().Get("username") != "" {
auth := clickhouse.Auth{
Username: dsnURL.Query().Get("username"),
Password: dsnURL.Query().Get("password"),
}
options.Auth = auth
maxIdleConnections := cfg.NumConsumers + 1

if options.MaxIdleConns < maxIdleConnections {
options.MaxIdleConns = maxIdleConnections
options.MaxOpenConns = maxIdleConnections + 5
}
db, err := clickhouse.Open(options)
if err != nil {
Expand All @@ -116,7 +113,7 @@ func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
return nil, err
}

query := fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s`, dsnURL.Query().Get("database"), cfg.Cluster)
query := fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s`, options.Auth.Database, cfg.Cluster)
if err := db.Exec(ctx, query); err != nil {
return nil, err
}
Expand Down
33 changes: 6 additions & 27 deletions migrationmanager/migrators/basemigrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package basemigrator
import (
"context"
"fmt"
"net/url"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -86,51 +85,31 @@ func (m *BaseMigrator) runSqlMigrations(ctx context.Context, migrationFolder, da

func (m *BaseMigrator) buildClickhouseMigrateURL(database string) (string, error) {
var clickhouseUrl, migrationsTableEngine string
parsedURL, err := url.Parse(m.Cfg.DSN)
options, err := clickhouse.ParseDSN(m.Cfg.DSN)
if err != nil {
return "", err
}
host := parsedURL.Host
if host == "" {
return "", fmt.Errorf("unable to parse host")

}
paramMap, err := url.ParseQuery(parsedURL.RawQuery)
if err != nil {
return "", err
}
username := paramMap["username"]
password := paramMap["password"]

if m.Cfg.ReplicationEnabled {
migrationsTableEngine = "ReplicatedMergeTree"
} else {
migrationsTableEngine = "MergeTree"
}

if len(username) > 0 && len(password) > 0 {
clickhouseUrl = fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", username[0], password[0], host, database, m.Cfg.ClusterName, migrationsTableEngine)
if len(options.Auth.Username) > 0 && len(options.Auth.Password) > 0 {
clickhouseUrl = fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", options.Auth.Username, options.Auth.Password, options.Addr[0], database, m.Cfg.ClusterName, migrationsTableEngine)
} else {
clickhouseUrl = fmt.Sprintf("clickhouse://%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", host, database, m.Cfg.ClusterName, migrationsTableEngine)
clickhouseUrl = fmt.Sprintf("clickhouse://%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", options.Addr[0], database, m.Cfg.ClusterName, migrationsTableEngine)
}
return clickhouseUrl, nil
}

func createClickhouseConnection(dsn string) (driver.Conn, error) {
dsnURL, err := url.Parse(dsn)
options, err := clickhouse.ParseDSN(dsn)
if err != nil {
return nil, fmt.Errorf("failed to parse dsn: %w", err)
}
options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
}
if dsnURL.Query().Get("username") != "" {
auth := clickhouse.Auth{
Username: dsnURL.Query().Get("username"),
Password: dsnURL.Query().Get("password"),
}
options.Auth = auth
}

db, err := clickhouse.Open(options)
if err != nil {
return nil, fmt.Errorf("failed to open clickhouse connection: %w", err)
Expand Down

0 comments on commit 076a33a

Please sign in to comment.