Skip to content

Commit

Permalink
Remove global logger from outputs, common.transport and monitoring (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
simitt committed Mar 24, 2020
1 parent e0b817d commit 6c18b2f
Show file tree
Hide file tree
Showing 32 changed files with 242 additions and 202 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]

- The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734]
- The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734]
- The newly generated Beats are using go modules to manage dependencies. {pull}16288[16288]

==== Bugfixes

Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/logp"
)

// SOCKS5Layer configures a SOCKS5 proxy layer in a DialerChain.
Expand All @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next))
dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next))
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/common/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)

type Client struct {
log *logp.Logger
dialer Dialer
network string
host string
Expand Down Expand Up @@ -75,6 +77,7 @@ func NewClientWithDialer(d Dialer, c Config, network, host string, defaultPort i
}

client := &Client{
log: logp.NewLogger(logSelector),
dialer: d,
network: network,
host: host,
Expand Down Expand Up @@ -112,7 +115,7 @@ func (c *Client) Close() error {
defer c.mutex.Unlock()

if c.conn != nil {
debugf("closing")
c.log.Debug("closing")
err := c.conn.Close()
c.conn = nil
return err
Expand Down Expand Up @@ -199,7 +202,7 @@ func (c *Client) SetWriteDeadline(t time.Time) error {

func (c *Client) handleError(err error) error {
if err != nil {
debugf("handle error: %v", err)
c.log.Debugf("handle error: %+v", err)

if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) {
_ = c.Close()
Expand Down
6 changes: 3 additions & 3 deletions libbeat/common/transport/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *ProxyConfig) Validate() error {
return nil
}

func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, error) {
if config == nil || config.URL == "" {
return forward, nil
}
Expand All @@ -67,7 +67,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
return nil, err
}

logp.Info("proxy host: '%s'", url.Host)
log.Infof("proxy host: '%s'", url.Host)
return DialerFunc(func(network, address string) (net.Conn, error) {
var err error
var addresses []string
Expand All @@ -80,7 +80,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
if config.LocalResolve {
addresses, err = net.LookupHost(host)
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
log.Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
d.Fatal("dns lookup", err)
d.Info("addresses", strings.Join(addresses, ", "))
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}

Expand Down
29 changes: 17 additions & 12 deletions libbeat/common/transport/tlscommon/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const logSelector = "tls"

// LoadCertificate will load a certificate from disk and return a tls.Certificate or error
func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
certificate := config.Certificate
Expand All @@ -46,31 +48,33 @@ func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
return nil, nil
}

certPEM, err := ReadPEMFile(certificate, config.Passphrase)
log := logp.NewLogger(logSelector)

certPEM, err := ReadPEMFile(log, certificate, config.Passphrase)
if err != nil {
logp.Critical("Failed reading certificate file %v: %+v", certificate, err)
log.Errorf("Failed reading certificate file %v: %+v", certificate, err)
return nil, fmt.Errorf("%v %v", err, certificate)
}

keyPEM, err := ReadPEMFile(key, config.Passphrase)
keyPEM, err := ReadPEMFile(log, key, config.Passphrase)
if err != nil {
logp.Critical("Failed reading key file %v: %+v", key, err)
log.Errorf("Failed reading key file %v: %+v", key, err)
return nil, fmt.Errorf("%v %v", err, key)
}

cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
logp.Critical("Failed loading client certificate %+v", err)
log.Errorf("Failed loading client certificate %+v", err)
return nil, err
}

logp.Debug("tls", "loading certificate: %v and key %v", certificate, key)
log.Debugf("tls", "loading certificate: %v and key %v", certificate, key)
return &cert, nil
}

// ReadPEMFile reads a PEM format file on disk and decrypt it with the privided password and
// return the raw content.
func ReadPEMFile(path, passphrase string) ([]byte, error) {
func ReadPEMFile(log *logp.Logger, path, passphrase string) ([]byte, error) {
pass := []byte(passphrase)
var blocks []*pem.Block

Expand Down Expand Up @@ -102,7 +106,7 @@ func ReadPEMFile(path, passphrase string) ([]byte, error) {
}

if err != nil {
logp.Err("Dropping encrypted pem '%v' block read from %v. %v",
log.Errorf("Dropping encrypted pem '%v' block read from %v. %+v",
block.Type, path, err)
continue
}
Expand Down Expand Up @@ -138,21 +142,22 @@ func LoadCertificateAuthorities(CAs []string) (*x509.CertPool, []error) {
return nil, nil
}

log := logp.NewLogger(logSelector)
roots := x509.NewCertPool()
for _, path := range CAs {
pemData, err := ioutil.ReadFile(path)
if err != nil {
logp.Critical("Failed reading CA certificate: %v", err)
log.Errorf("Failed reading CA certificate: %+v", err)
errors = append(errors, fmt.Errorf("%v reading %v", err, path))
continue
}

if ok := roots.AppendCertsFromPEM(pemData); !ok {
logp.Critical("Failed reading CA certificate: %v", err)
errors = append(errors, fmt.Errorf("%v adding %v", ErrNotACertificate, path))
log.Error("Failed to add CA to the cert pool, CA is not a valid PEM file")
errors = append(errors, fmt.Errorf("%v adding %v to the list of known CAs", ErrNotACertificate, path))
continue
}
logp.Debug("tls", "successfully loaded CA certificate: %v", path)
log.Debugf("tls", "successfully loaded CA certificate: %v", path)
}

return roots, errors
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tlscommon/tls_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *TLSConfig) ToConfig() *tls.Config {
minVersion, maxVersion := extractMinMaxVersion(c.Versions)
insecure := c.Verification != VerifyFull
if insecure {
logp.Warn("SSL/TLS verifications disabled.")
logp.NewLogger("tls").Warn("SSL/TLS verifications disabled.")
}

// When we are usign the CAsha256 pin to validate the CA used to validate the chain
Expand Down
4 changes: 1 addition & 3 deletions libbeat/common/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type DialerFunc func(network, address string) (net.Conn, error)

var (
ErrNotConnected = errors.New("client is not connected")

debugf = logp.MakeDebug("transport")
)

func (d DialerFunc) Dial(network, address string) (net.Conn, error) {
Expand All @@ -51,7 +49,7 @@ func Dial(c Config, network, address string) (net.Conn, error) {
func MakeDialer(c Config) (Dialer, error) {
var err error
dialer := NetDialer(c.Timeout)
dialer, err = ProxyDialer(c.Proxy, dialer)
dialer, err = ProxyDialer(logp.NewLogger(logSelector), c.Proxy, dialer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions libbeat/common/transport/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
)

const logSelector = "transport"

func fullAddress(host string, defaultPort int) string {
if _, _, err := net.SplitHostPort(host); err == nil {
return host
Expand Down
18 changes: 9 additions & 9 deletions libbeat/monitoring/adapter/go-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type GoMetricsRegistry struct {
mutex sync.Mutex

log *logp.Logger
reg *monitoring.Registry
filters *metricFilters

Expand All @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil
if v == nil {
return NewGoMetrics(parent, name, filters...)
}

reg := v.(*monitoring.Registry)
return &GoMetricsRegistry{
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
return newGoMetrics(v.(*monitoring.Registry), filters...)
}

// NewGoMetrics creates and registers a new GoMetricsRegistry with the parent
// registry.
func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry {
return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar), filters...)
}

func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry {
return &GoMetricsRegistry{
reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar),
log: logp.NewLogger("monitoring"),
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() {
r.shadow.UnregisterAll()
err := r.reg.Clear()
if err != nil {
logp.Err("Failed to clear registry: %v", err)
r.log.Errorf("Failed to clear registry: %+v", err)
}
}

Expand Down
24 changes: 13 additions & 11 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")

type publishClient struct {
log *logp.Logger
es *esout.Client
params map[string]string
format report.Format
Expand All @@ -47,6 +48,7 @@ func newPublishClient(
format report.Format,
) (*publishClient, error) {
p := &publishClient{
log: logp.NewLogger(selector),
es: es,
params: params,
format: format,
Expand All @@ -55,7 +57,7 @@ func newPublishClient(
}

func (c *publishClient) Connect() error {
debugf("Monitoring client: connect.")
c.log.Debug("Monitoring client: connect.")

err := c.es.Connect()
if err != nil {
Expand Down Expand Up @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error {
}

if !resp.Features.Monitoring.Enabled {
debugf("XPack monitoring is disabled.")
c.log.Debug("XPack monitoring is disabled.")
return errNoMonitoring
}

debugf("XPack monitoring is enabled")
c.log.Debug("XPack monitoring is enabled")

return nil
}
Expand All @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
// Extract type
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
c.log.Errorf("Type not available in monitoring reported. Please report this error: %+v", err)
continue
}

typ, ok := t.(string)
if !ok {
logp.Err("monitoring type is not a string")
c.log.Error("monitoring type is not a string")
}

var params = map[string]string{}
Expand Down Expand Up @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
return err
}

logBulkFailures(result, []report.Event{document})
logBulkFailures(c.log, result, []report.Event{document})
return err
}

Expand All @@ -245,25 +247,25 @@ func getMonitoringIndexName() string {
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
log.Errorf("failed to parse monitoring bulk items: %+v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
status, msg, err := esout.BulkReadItemStatus(log, reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
log.Errorf("failed to parse monitoring bulk item status: %+v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
Loading

0 comments on commit 6c18b2f

Please sign in to comment.