Skip to content

Commit

Permalink
etcdmain: support structured logging for discovery service
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Apr 26, 2018
1 parent 6dff981 commit fd10577
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 29 deletions.
59 changes: 46 additions & 13 deletions etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func startEtcdOrProxyV2() {
if err != nil {
lg := cfg.ec.GetLogger()
if lg != nil {
lg.Error("failed to verify flags", zap.Error(err))
lg.Warn("failed to verify flags", zap.Error(err))
} else {
plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
}
switch err {
case embed.ErrUnsetAdvertiseClientURLsFlag:
if lg != nil {
lg.Error("advertise client URLs are not set", zap.Error(err))
lg.Warn("advertise client URLs are not set", zap.Error(err))
} else {
plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
Expand Down Expand Up @@ -143,7 +143,11 @@ func startEtcdOrProxyV2() {
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
if lg != nil {

lg.Info(
"server has been already initialized",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
} else {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
}
Expand All @@ -153,7 +157,14 @@ func startEtcdOrProxyV2() {
case dirProxy:
err = startProxy(cfg)
default:
plog.Panicf("unhandled dir type %v", which)
if lg != nil {
lg.Panic(
"unknown directory type",
zap.String("dir-type", string(which)),
)
} else {
plog.Panicf("unhandled dir type %v", which)
}
}
} else {
shouldProxy := cfg.isProxy()
Expand All @@ -162,12 +173,20 @@ func startEtcdOrProxyV2() {
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
if lg != nil {

lg.Warn(
"discovery cluster is full, falling back to proxy",
zap.String("fallback-proxy", fallbackFlagProxy),
zap.Error(err),
)
} else {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
}
shouldProxy = true
}
} else if err != nil {
if lg != nil {
lg.Warn("failed to start etcd", zap.Error(err))
}
}
}
if shouldProxy {
Expand All @@ -180,13 +199,13 @@ func startEtcdOrProxyV2() {
switch derr.Err {
case discovery.ErrDuplicateID:
if lg != nil {
lg.Error(
lg.Warn(
"member has been registered with discovery service",
zap.String("name", cfg.ec.Name),
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
)
lg.Error(
lg.Warn(
"but could not find valid cluster configuration",
zap.String("data-dir", cfg.ec.Dir),
)
Expand All @@ -198,9 +217,10 @@ func startEtcdOrProxyV2() {
plog.Infof("Please check the given data dir path if the previous bootstrap succeeded")
plog.Infof("or use a new discovery token if the previous bootstrap failed.")
}

case discovery.ErrDuplicateName:
if lg != nil {
lg.Error(
lg.Warn(
"member with duplicated name has already been registered",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
Expand All @@ -212,9 +232,10 @@ func startEtcdOrProxyV2() {
plog.Errorf("please check (cURL) the discovery token for more information.")
plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.")
}

default:
if lg != nil {
lg.Error(
lg.Warn(
"failed to bootstrap; discovery token was already used",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(err),
Expand All @@ -231,7 +252,7 @@ func startEtcdOrProxyV2() {

if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
if lg != nil {
lg.Error("failed to start", zap.Error(err))
lg.Warn("failed to start", zap.Error(err))
} else {
plog.Infof("%v", err)
}
Expand Down Expand Up @@ -320,7 +341,12 @@ func startProxy(cfg *config) error {
clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS
cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS

pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond)
pt, err := transport.NewTimeoutTransport(
clientTLSInfo,
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
)
if err != nil {
return err
}
Expand All @@ -333,7 +359,12 @@ func startProxy(cfg *config) error {
plog.Fatalf("could not get certs (%v)", err)
}
}
tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond)
tr, err := transport.NewTimeoutTransport(
cfg.ec.PeerTLSInfo,
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -385,6 +416,7 @@ func startProxy(cfg *config) error {
} else {
plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
}

case os.IsNotExist(err):
var urlsmap types.URLsMap
urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
Expand All @@ -394,7 +426,7 @@ func startProxy(cfg *config) error {

if cfg.ec.Durl != "" {
var s string
s, err = discovery.GetCluster(cfg.ec.Durl, cfg.ec.Dproxy)
s, err = discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
if err != nil {
return err
}
Expand All @@ -408,6 +440,7 @@ func startProxy(cfg *config) error {
} else {
plog.Infof("proxy: using peer urls %v ", peerURLs)
}

default:
return err
}
Expand Down
19 changes: 9 additions & 10 deletions etcdmain/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"os"
"time"

"go.uber.org/zap"

"github.com/coreos/etcd/proxy/tcpproxy"

"github.com/spf13/cobra"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -91,7 +90,14 @@ func stripSchema(eps []string) []string {
}

func startGateway(cmd *cobra.Command, args []string) {
srvs := discoverEndpoints(gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery)
var lg *zap.Logger
lg, err := zap.NewProduction()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery)
if len(srvs.Endpoints) == 0 {
// no endpoints discovered, fall back to provided endpoints
srvs.Endpoints = gatewayEndpoints
Expand All @@ -116,13 +122,6 @@ func startGateway(cmd *cobra.Command, args []string) {
os.Exit(1)
}

var lg *zap.Logger
lg, err := zap.NewProduction()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

var l net.Listener
l, err = net.Listen("tcp", gatewayListenAddr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func checkArgs() {
}

func mustNewClient(lg *zap.Logger) *clientv3.Client {
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
eps := srvs.Endpoints
if len(eps) == 0 {
eps = grpcProxyEndpoints
Expand Down
51 changes: 46 additions & 5 deletions etcdmain/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (

"github.com/coreos/etcd/pkg/srv"
"github.com/coreos/etcd/pkg/transport"

"go.uber.org/zap"
)

func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) {
func discoverEndpoints(lg *zap.Logger, dns string, ca string, insecure bool) (s srv.SRVClients) {
if dns == "" {
return s
}
Expand All @@ -32,7 +34,17 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients)
os.Exit(1)
}
endpoints := srvs.Endpoints
plog.Infof("discovered the cluster %s from %s", endpoints, dns)

if lg != nil {
lg.Info(
"discovered cluster from SRV",
zap.String("srv-server", dns),
zap.Strings("endpoints", endpoints),
)
} else {
plog.Infof("discovered the cluster %s from %s", endpoints, dns)
}

if insecure {
return *srvs
}
Expand All @@ -41,12 +53,41 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients)
TrustedCAFile: ca,
ServerName: dns,
}
plog.Infof("validating discovered endpoints %v", endpoints)

if lg != nil {
lg.Info(
"validating discovered SRV endpoints",
zap.String("srv-server", dns),
zap.Strings("endpoints", endpoints),
)
} else {
plog.Infof("validating discovered endpoints %v", endpoints)
}

endpoints, err = transport.ValidateSecureEndpoints(tlsInfo, endpoints)
if err != nil {
plog.Warningf("%v", err)
if lg != nil {
lg.Warn(
"failed to validate discovered endpoints",
zap.String("srv-server", dns),
zap.Strings("endpoints", endpoints),
zap.Error(err),
)
} else {
plog.Warningf("%v", err)
}
} else {
if lg != nil {
lg.Info(
"using validated discovered SRV endpoints",
zap.String("srv-server", dns),
zap.Strings("endpoints", endpoints),
)
}
}
if lg == nil {
plog.Infof("using discovered endpoints %v", endpoints)
}
plog.Infof("using discovered endpoints %v", endpoints)

// map endpoints back to SRVClients struct with SRV data
eps := make(map[string]struct{})
Expand Down

0 comments on commit fd10577

Please sign in to comment.