From 322de1967f861f29fe41f9c8e45836455a8c1b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Thu, 13 Jul 2017 08:54:02 +0200 Subject: [PATCH] More consistent logging with x.Print* functions fmt.Print* and log.Print* functions are replaced by x.Print* functions. It uses logger (from log package) that can be configured, and makes it possible to control logging from Dgraph. --- posting/lists.go | 3 +-- posting/lru.go | 3 +-- query/query.go | 3 +-- raftwal/wal.go | 7 +++---- schema/schema.go | 2 +- worker/conn.go | 7 +++---- worker/draft.go | 27 ++++++++++++++------------- worker/export.go | 2 +- worker/groups.go | 16 ++++++++-------- worker/predicate_test.go | 2 +- worker/worker.go | 4 ++-- x/init.go | 27 +++++---------------------- x/log.go | 38 ++++++++++++++++++++++++++++++++++++++ x/tls_helper.go | 5 ++--- 14 files changed, 81 insertions(+), 65 deletions(-) create mode 100644 x/log.go diff --git a/posting/lists.go b/posting/lists.go index 5f7f799a2dc..15e8e8341fe 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -20,7 +20,6 @@ package posting import ( "crypto/md5" "fmt" - "log" "math" "runtime" "sync" @@ -309,7 +308,7 @@ func commitOne(l *List) { return } if _, err := l.SyncIfDirty(); err != nil { - log.Printf("Error while committing dirty list: %v\n", err) + x.Printf("Error while committing dirty list: %v\n", err) } } diff --git a/posting/lru.go b/posting/lru.go index 55b931a80c3..9f68dafc66c 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -22,7 +22,6 @@ package posting import ( "container/list" "context" - "fmt" "sync" "github.com/dgraph-io/dgraph/x" @@ -69,7 +68,7 @@ func (c *listCache) UpdateMaxSize() { defer c.Unlock() if c.curSize < (50 << 20) { c.MaxSize = 50 << 20 - fmt.Println("LRU cache max size is being set to 50 MB") + x.Println("LRU cache max size is being set to 50 MB") return } x.LcacheCapacity.Set(int64(50 << 0)) diff --git a/query/query.go b/query/query.go index 8572a4adeaa..b37ce479a79 100644 --- a/query/query.go +++ b/query/query.go @@ -22,7 +22,6 @@ import ( "context" "errors" "fmt" - "log" "sort" "strconv" "strings" @@ -459,7 +458,7 @@ func (sg *SubGraph) preTraverse(uid uint64, dst, parent outputNode) error { continue // next UID. } // Some other error. - log.Printf("Error while traversal: %v", rerr) + x.Printf("Error while traversal: %v", rerr) return rerr } diff --git a/raftwal/wal.go b/raftwal/wal.go index 4ece8da6582..c7974461b12 100644 --- a/raftwal/wal.go +++ b/raftwal/wal.go @@ -20,7 +20,6 @@ package raftwal import ( "bytes" "encoding/binary" - "fmt" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -82,7 +81,7 @@ func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error { if err := w.wals.Set(w.snapshotKey(gid), data); err != nil { return err } - fmt.Printf("Writing snapshot to WAL: %+v\n", s) + x.Printf("Writing snapshot to WAL: %+v\n", s) go func() { // Delete all entries before this snapshot to save disk space. @@ -105,11 +104,11 @@ func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error { // Failure to delete entries is not a fatal error, so should be // ok to ignore if err := w.wals.BatchSet(wb); err != nil { - fmt.Printf("Error while deleting entries %v\n", err) + x.Printf("Error while deleting entries %v\n", err) } for _, wbe := range wb { if err := wbe.Error; err != nil { - fmt.Printf("Error while deleting entries %v\n", err) + x.Printf("Error while deleting entries %v\n", err) } } }() diff --git a/schema/schema.go b/schema/schema.go index 0a12381e030..f98b5899e65 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -107,7 +107,7 @@ func (s *stateGroup) update(se SyncEntry) { syncCh <- se s.elog.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr, types.TypeID(se.Schema.ValueType).Name(), se.Schema.Tokenizer, se.Schema.Directive) - fmt.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr, + x.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr, types.TypeID(se.Schema.ValueType).Name(), se.Schema.Tokenizer, se.Schema.Directive) } diff --git a/worker/conn.go b/worker/conn.go index 08377d60b20..0cbc696f057 100644 --- a/worker/conn.go +++ b/worker/conn.go @@ -22,7 +22,6 @@ import ( "context" "crypto/rand" "fmt" - "log" "sync" "sync/atomic" @@ -101,7 +100,7 @@ func (p *poolsi) release(pl *pool) { func destroyPool(pl *pool) { err := pl.conn.Close() if err != nil { - log.Printf("Error closing cluster connection: %v\n", err.Error()) + x.Printf("Error closing cluster connection: %v\n", err.Error()) } } @@ -142,11 +141,11 @@ func (p *poolsi) connect(addr string) (*pool, bool) { defer p.release(pool) err = TestConnection(pool) if err != nil { - log.Printf("Connection to %q fails, got error: %v\n", addr, err) + x.Printf("Connection to %q fails, got error: %v\n", addr, err) // Don't return -- let's still put the empty pool in the map. Its users // have to handle errors later anyway. } else { - fmt.Printf("Connection with %q healthy.\n", addr) + x.Printf("Connection with %q healthy.\n", addr) } }() diff --git a/worker/draft.go b/worker/draft.go index 4917807c131..3553f0a6799 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -216,7 +216,7 @@ func (n *node) ConfState() *raftpb.ConfState { } func newNode(gid uint32, id uint64, myAddr string) *node { - fmt.Printf("Node with GroupID: %v, ID: %v\n", gid, id) + x.Printf("Node with GroupID: %v, ID: %v\n", gid, id) peers := peerPool{ peers: make(map[uint64]peerPoolEntry), @@ -244,6 +244,7 @@ func newNode(gid uint32, id uint64, myAddr string) *node { Storage: store, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, + Logger: &raft.DefaultLogger{Logger: x.Logger}, }, applyCh: make(chan raftpb.Entry, numPendingMutations), peers: peers, @@ -293,7 +294,7 @@ func (n *node) Connect(pid uint64, addr string) { p, ok := pools().connect(addr) if !ok { // TODO: Note this fact in more general peer health info somehow. - log.Printf("Peer %d claims same host as me\n", pid) + x.Printf("Peer %d claims same host as me\n", pid) } n.SetPeer(pid, addr, p) } @@ -418,7 +419,7 @@ func (n *node) send(m raftpb.Message) { data, err := m.Marshal() x.Check(err) if m.Type != raftpb.MsgHeartbeat && m.Type != raftpb.MsgHeartbeatResp { - fmt.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To) + x.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To) } select { case n.messages <- sendmsg{to: m.To, data: data}: @@ -748,11 +749,11 @@ func (n *node) Run() { x.Check(rc.Unmarshal(rd.Snapshot.Data)) x.AssertTrue(rc.Group == n.gid) if rc.Id != n.id { - fmt.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id) + x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id) n.retrieveSnapshot(rc.Id) - fmt.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid) + x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid) } else { - fmt.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id) + x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id) } } if len(rd.CommittedEntries) > 0 { @@ -879,7 +880,7 @@ func (n *node) joinPeers() { // Get leader information for MY group. pid, paddr := groups().Leader(n.gid) n.Connect(pid, paddr) - fmt.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid) + x.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid) pool, err := pools().get(paddr) if err != nil { @@ -912,7 +913,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) { } var term, idx uint64 if !raft.IsEmptySnap(sp) { - fmt.Printf("Found Snapshot: %+v\n", sp) + x.Printf("Found Snapshot: %+v\n", sp) restart = true if rerr = n.store.ApplySnapshot(sp); rerr != nil { return @@ -929,7 +930,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) { return } if !raft.IsEmptyHardState(hd) { - fmt.Printf("Found hardstate: %+v\n", sp) + x.Printf("Found hardstate: %+v\n", sp) restart = true if rerr = n.store.SetHardState(hd); rerr != nil { return @@ -941,7 +942,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) { if rerr != nil { return } - fmt.Printf("Group %d found %d entries\n", n.gid, len(es)) + x.Printf("Group %d found %d entries\n", n.gid, len(es)) if len(es) > 0 { restart = true } @@ -955,11 +956,11 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) { x.Check(err) if restart { - fmt.Printf("Restarting node for group: %d\n", n.gid) + x.Printf("Restarting node for group: %d\n", n.gid) n.SetRaft(raft.RestartNode(n.cfg)) } else { - fmt.Printf("New Node for group: %d\n", n.gid) + x.Printf("New Node for group: %d\n", n.gid) if groups().HasPeer(n.gid) { n.joinPeers() n.SetRaft(raft.StartNode(n.cfg, nil)) @@ -1034,7 +1035,7 @@ func (w *grpcWorker) RaftMessage(ctx context.Context, query *protos.Payload) (*p x.Check(err) } if msg.Type != raftpb.MsgHeartbeat && msg.Type != raftpb.MsgHeartbeatResp { - fmt.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To) + x.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To) } if err := applyMessage(ctx, msg); err != nil { return &protos.Payload{}, err diff --git a/worker/export.go b/worker/export.go index 91de7d22f94..d9f6ff56829 100644 --- a/worker/export.go +++ b/worker/export.go @@ -194,7 +194,7 @@ func export(gid uint32, bdir string) error { time.Now().Format("2006-01-02-15-04"))) fspath := path.Join(bdir, fmt.Sprintf("dgraph-schema-%d-%s.rdf.gz", gid, time.Now().Format("2006-01-02-15-04"))) - fmt.Printf("Exporting to: %v, schema at %v\n", fpath, fspath) + x.Printf("Exporting to: %v, schema at %v\n", fpath, fspath) chb := make(chan []byte, 1000) errChan := make(chan error, 2) go func() { diff --git a/worker/groups.go b/worker/groups.go index 2857cbe0825..d7466768865 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -140,10 +140,10 @@ func StartRaftNodes(walStore *badger.KV) { gr.syncMemberships() for gr.LastUpdate() == 0 { time.Sleep(time.Second) - fmt.Println("Last update raft index for membership information is zero. Syncing...") + x.Println("Last update raft index for membership information is zero. Syncing...") gr.syncMemberships() } - fmt.Printf("Last update is now: %d\n", gr.LastUpdate()) + x.Printf("Last update is now: %d\n", gr.LastUpdate()) }() } @@ -453,7 +453,7 @@ func (g *groupi) syncMemberships() { pl, err = pools().any() } if err == errNoConnection { - fmt.Println("Unable to sync memberships. No valid connection") + x.Println("Unable to sync memberships. No valid connection") return } x.Check(err) @@ -482,7 +482,7 @@ func (g *groupi) syncMemberships() { if len(addr) == 0 { return } - fmt.Printf("Got redirect for: %q\n", addr) + x.Printf("Got redirect for: %q\n", addr) var ok bool pl, ok = pools().connect(addr) if !ok { @@ -536,9 +536,9 @@ func (g *groupi) applyMembershipUpdate(raftIdx uint64, mm *protos.Membership) { x.AssertTrue(ok) } - fmt.Println("----------------------------") - fmt.Printf("====== APPLYING MEMBERSHIP UPDATE: %+v\n", update) - fmt.Println("----------------------------") + x.Println("----------------------------") + x.Printf("====== APPLYING MEMBERSHIP UPDATE: %+v\n", update) + x.Println("----------------------------") g.Lock() defer g.Unlock() @@ -553,7 +553,7 @@ func (g *groupi) applyMembershipUpdate(raftIdx uint64, mm *protos.Membership) { // Print out the entire list. for gid, sl := range g.all { - fmt.Printf("Group: %v. List: %+v\n", gid, sl.list) + x.Printf("Group: %v. List: %+v\n", gid, sl.list) } } diff --git a/worker/predicate_test.go b/worker/predicate_test.go index 05ba82bb3de..726becc39b9 100644 --- a/worker/predicate_test.go +++ b/worker/predicate_test.go @@ -69,7 +69,7 @@ func newServer(port string) (*grpc.Server, net.Listener, error) { log.Fatalf("While running server: %v", err) return nil, nil, err } - log.Printf("Worker listening at address: %v", ln.Addr()) + x.Printf("Worker listening at address: %v", ln.Addr()) s := grpc.NewServer() return s, ln, nil diff --git a/worker/worker.go b/worker/worker.go index 98e43f9bfd1..82e43d1e90c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -97,7 +97,7 @@ func RunServer(bindall bool) { if bindall { laddr = "0.0.0.0" } else if len(Config.MyAddr) > 0 { - fmt.Printf("--my flag is provided without bindall, Did you forget to specify bindall?\n") + x.Printf("--my flag is provided without bindall, Did you forget to specify bindall?\n") } var err error ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", laddr, workerPort())) @@ -105,7 +105,7 @@ func RunServer(bindall bool) { log.Fatalf("While running server: %v", err) return } - log.Printf("Worker listening at address: %v", ln.Addr()) + x.Printf("Worker listening at address: %v", ln.Addr()) protos.RegisterWorkerServer(workerServer, &grpcWorker{}) workerServer.Serve(ln) diff --git a/x/init.go b/x/init.go index 036a98b5c4c..0671f9e983c 100644 --- a/x/init.go +++ b/x/init.go @@ -20,7 +20,6 @@ import ( "flag" "fmt" "io/ioutil" - "log" "os" yaml "gopkg.in/yaml.v2" @@ -28,7 +27,6 @@ import ( var ( initFunc []func() - logger *log.Logger isTest bool // These variables are set using -ldflags @@ -54,21 +52,16 @@ func AddInit(f func()) { // Init initializes flags and run all functions in initFunc. func Init() { - log.SetFlags(log.Lshortfile | log.Flags()) - printVersionOnly() // Lets print the details of the current build on startup. printBuildDetails() if Config.ConfigFile != "" { - log.Println("Loading configuration from file:", Config.ConfigFile) + Println("Loading configuration from file:", Config.ConfigFile) loadConfigFromYAML() } - logger = log.New(os.Stderr, "", log.Lshortfile|log.Flags()) - AssertTrue(logger != nil) - // Next, run all the init functions that have been added. for _, f := range initFunc { f() @@ -84,7 +77,7 @@ func loadConfigFromYAML() { Checkf(yaml.Unmarshal(bs, &m), "Error while parsing config file: %v", Config.ConfigFile) for k, v := range m { - fmt.Printf("Picked flag from config: [%q = %v]\n", k, v) + Printf("Picked flag from config: [%q = %v]\n", k, v) flag.Set(k, v) } } @@ -94,7 +87,7 @@ func printBuildDetails() { return } - fmt.Printf(fmt.Sprintf(` + Printf(fmt.Sprintf(` Dgraph version : %v Commit SHA-1 : %v Commit timestamp : %v @@ -106,8 +99,8 @@ Branch : %v`, func printVersionOnly() { if Config.Version { printBuildDetails() - fmt.Println("Copyright 2017 Dgraph Labs, Inc.") - fmt.Println(` + Println("Copyright 2017 Dgraph Labs, Inc.") + Println(` Licensed under AGPLv3. For Dgraph official documentation, visit https://docs.dgraph.io. For discussions about Dgraph , visit https://discuss.dgraph.io. @@ -120,13 +113,3 @@ To say hi to the community , visit https://dgraph.slack.com. func Version() string { return dgraphVersion } - -// Printf does a log.Printf. We often do printf for debugging but has to keep -// adding import "fmt" or "log" and removing them after we are done. -// Let's add Printf to "x" and include "x" almost everywhere. Caution: Do remember -// to call x.Init. For tests, you need a TestMain that calls x.Init. -func Printf(format string, args ...interface{}) { - AssertTruef(logger != nil, "Logger is not defined. Have you called x.Init?") - // Call depth is one higher than default. - logger.Output(2, fmt.Sprintf(format, args...)) -} diff --git a/x/log.go b/x/log.go new file mode 100644 index 00000000000..38fc5ff7739 --- /dev/null +++ b/x/log.go @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2017 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package x + +import ( + "fmt" + "log" + "os" +) + +var ( + Logger = log.New(os.Stderr, "", log.Lshortfile|log.Flags()) +) + +// Printf does a log.Printf. We often do printf for debugging but has to keep +// adding import "fmt" or "log" and removing them after we are done. +// Let's add Printf to "x" and include "x" almost everywhere. Caution: Do remember +// to call x.Init. For tests, you need a TestMain that calls x.Init. +func Printf(format string, args ...interface{}) { + Logger.Output(2, fmt.Sprintf(format, args...)) +} + +func Println(args ...interface{}) { + Logger.Output(2, fmt.Sprintln(args...)) +} diff --git a/x/tls_helper.go b/x/tls_helper.go index 9a999beb938..d9249f0a2cb 100644 --- a/x/tls_helper.go +++ b/x/tls_helper.go @@ -22,7 +22,6 @@ import ( "encoding/pem" "fmt" "io/ioutil" - "log" "strings" "sync" "time" @@ -339,7 +338,7 @@ func (c *wrapperTLSConfig) reloadConfig() { // Loading new certificate cert, err := parseCertificate(c.helperConfig.CertRequired, c.helperConfig.Cert, c.helperConfig.Key, c.helperConfig.KeyPassphrase) if err != nil { - log.Printf("Error reloading certificate. %s\nUsing current certificate\n", err.Error()) + Printf("Error reloading certificate. %s\nUsing current certificate\n", err.Error()) } else if cert != nil { if c.helperConfig.ConfigType == TLSServerConfig { c.cert.Lock() @@ -352,7 +351,7 @@ func (c *wrapperTLSConfig) reloadConfig() { if len(c.helperConfig.ClientCACerts) > 0 || c.helperConfig.UseSystemClientCACerts { pool, err := generateCertPool(c.helperConfig.ClientCACerts, c.helperConfig.UseSystemClientCACerts) if err != nil { - log.Printf("Error reloading CAs. %s\nUsing current Client CAs\n", err.Error()) + Printf("Error reloading CAs. %s\nUsing current Client CAs\n", err.Error()) } else { c.clientCAPool.Lock() c.clientCAPool.pool = pool