forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
142 lines (118 loc) · 3.28 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package postgresql
import (
"database/sql"
"fmt"
"net"
"net/url"
"regexp"
"sort"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
// pulled from lib/pq
// ParseURL no longer needs to be used by clients of this library since supplying a URL as a
// connection string to sql.Open() is now supported:
//
// sql.Open("postgres", "postgres://bob:[email protected]:5432/mydb?sslmode=verify-full")
//
// It remains exported here for backwards-compatibility.
//
// ParseURL converts a url to a connection string for driver.Open.
// Example:
//
// "postgres://bob:[email protected]:5432/mydb?sslmode=verify-full"
//
// converts to:
//
// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full"
//
// A minimal example:
//
// "postgres://"
//
// This will be blank, causing driver.Open to use all of the defaults
func parseURL(uri string) (string, error) {
u, err := url.Parse(uri)
if err != nil {
return "", err
}
if u.Scheme != "postgres" && u.Scheme != "postgresql" {
return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
}
var kvs []string
escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
accrue := func(k, v string) {
if v != "" {
kvs = append(kvs, k+"="+escaper.Replace(v))
}
}
if u.User != nil {
v := u.User.Username()
accrue("user", v)
v, _ = u.User.Password()
accrue("password", v)
}
if host, port, err := net.SplitHostPort(u.Host); err != nil {
accrue("host", u.Host)
} else {
accrue("host", host)
accrue("port", port)
}
if u.Path != "" {
accrue("dbname", u.Path[1:])
}
q := u.Query()
for k := range q {
accrue(k, q.Get(k))
}
sort.Strings(kvs) // Makes testing easier (not a performance concern)
return strings.Join(kvs, " "), nil
}
// Service common functionality shared between the postgresql and postgresql_extensible
// packages.
type Service struct {
Address string
Outputaddress string
MaxIdle int
MaxOpen int
MaxLifetime internal.Duration
DB *sql.DB
}
// Start starts the ServiceInput's service, whatever that may be
func (p *Service) Start(telegraf.Accumulator) (err error) {
const localhost = "host=localhost sslmode=disable"
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}
if p.DB, err = sql.Open("pgx", p.Address); err != nil {
return err
}
p.DB.SetMaxOpenConns(p.MaxOpen)
p.DB.SetMaxIdleConns(p.MaxIdle)
p.DB.SetConnMaxLifetime(p.MaxLifetime.Duration)
return nil
}
// Stop stops the services and closes any necessary channels and connections
func (p *Service) Stop() {
p.DB.Close()
}
var kvMatcher, _ = regexp.Compile("(password|sslcert|sslkey|sslmode|sslrootcert)=\\S+ ?")
// SanitizedAddress utility function to strip sensitive information from the connection string.
func (p *Service) SanitizedAddress() (sanitizedAddress string, err error) {
var (
canonicalizedAddress string
)
if p.Outputaddress != "" {
return p.Outputaddress, nil
}
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
if canonicalizedAddress, err = parseURL(p.Address); err != nil {
return sanitizedAddress, err
}
} else {
canonicalizedAddress = p.Address
}
sanitizedAddress = kvMatcher.ReplaceAllString(canonicalizedAddress, "")
return sanitizedAddress, err
}