Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More consistent logging with x.Print* functions #1190

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package posting
import (
"crypto/md5"
"fmt"
"log"
"math"
"runtime"
"sync"
Expand Down Expand Up @@ -309,7 +308,7 @@ func commitOne(l *List) {
return
}
if _, err := l.SyncIfDirty(); err != nil {
log.Printf("Error while committing dirty list: %v\n", err)
x.Printf("Error while committing dirty list: %v\n", err)
}
}

Expand Down
3 changes: 1 addition & 2 deletions posting/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package posting
import (
"container/list"
"context"
"fmt"
"sync"

"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -69,7 +68,7 @@ func (c *listCache) UpdateMaxSize() {
defer c.Unlock()
if c.curSize < (50 << 20) {
c.MaxSize = 50 << 20
fmt.Println("LRU cache max size is being set to 50 MB")
x.Println("LRU cache max size is being set to 50 MB")
return
}
x.LcacheCapacity.Set(int64(50 << 0))
Expand Down
3 changes: 1 addition & 2 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"errors"
"fmt"
"log"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -459,7 +458,7 @@ func (sg *SubGraph) preTraverse(uid uint64, dst, parent outputNode) error {
continue // next UID.
}
// Some other error.
log.Printf("Error while traversal: %v", rerr)
x.Printf("Error while traversal: %v", rerr)
return rerr
}

Expand Down
7 changes: 3 additions & 4 deletions raftwal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package raftwal
import (
"bytes"
"encoding/binary"
"fmt"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
if err := w.wals.Set(w.snapshotKey(gid), data); err != nil {
return err
}
fmt.Printf("Writing snapshot to WAL: %+v\n", s)
x.Printf("Writing snapshot to WAL: %+v\n", s)

go func() {
// Delete all entries before this snapshot to save disk space.
Expand All @@ -105,11 +104,11 @@ func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
// Failure to delete entries is not a fatal error, so should be
// ok to ignore
if err := w.wals.BatchSet(wb); err != nil {
fmt.Printf("Error while deleting entries %v\n", err)
x.Printf("Error while deleting entries %v\n", err)
}
for _, wbe := range wb {
if err := wbe.Error; err != nil {
fmt.Printf("Error while deleting entries %v\n", err)
x.Printf("Error while deleting entries %v\n", err)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *stateGroup) update(se SyncEntry) {
syncCh <- se
s.elog.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr,
types.TypeID(se.Schema.ValueType).Name(), se.Schema.Tokenizer, se.Schema.Directive)
fmt.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr,
x.Printf("Setting schema type for attr %s: %v, tokenizer: %v, directive: %v\n", se.Attr,
types.TypeID(se.Schema.ValueType).Name(), se.Schema.Tokenizer, se.Schema.Directive)
}

Expand Down
7 changes: 3 additions & 4 deletions worker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"crypto/rand"
"fmt"
"log"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -101,7 +100,7 @@ func (p *poolsi) release(pl *pool) {
func destroyPool(pl *pool) {
err := pl.conn.Close()
if err != nil {
log.Printf("Error closing cluster connection: %v\n", err.Error())
x.Printf("Error closing cluster connection: %v\n", err.Error())
}
}

Expand Down Expand Up @@ -142,11 +141,11 @@ func (p *poolsi) connect(addr string) (*pool, bool) {
defer p.release(pool)
err = TestConnection(pool)
if err != nil {
log.Printf("Connection to %q fails, got error: %v\n", addr, err)
x.Printf("Connection to %q fails, got error: %v\n", addr, err)
// Don't return -- let's still put the empty pool in the map. Its users
// have to handle errors later anyway.
} else {
fmt.Printf("Connection with %q healthy.\n", addr)
x.Printf("Connection with %q healthy.\n", addr)
}
}()

Expand Down
27 changes: 14 additions & 13 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (n *node) ConfState() *raftpb.ConfState {
}

func newNode(gid uint32, id uint64, myAddr string) *node {
fmt.Printf("Node with GroupID: %v, ID: %v\n", gid, id)
x.Printf("Node with GroupID: %v, ID: %v\n", gid, id)

peers := peerPool{
peers: make(map[uint64]peerPoolEntry),
Expand Down Expand Up @@ -244,6 +244,7 @@ func newNode(gid uint32, id uint64, myAddr string) *node {
Storage: store,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
Logger: &raft.DefaultLogger{Logger: x.Logger},
},
applyCh: make(chan raftpb.Entry, numPendingMutations),
peers: peers,
Expand Down Expand Up @@ -293,7 +294,7 @@ func (n *node) Connect(pid uint64, addr string) {
p, ok := pools().connect(addr)
if !ok {
// TODO: Note this fact in more general peer health info somehow.
log.Printf("Peer %d claims same host as me\n", pid)
x.Printf("Peer %d claims same host as me\n", pid)
}
n.SetPeer(pid, addr, p)
}
Expand Down Expand Up @@ -418,7 +419,7 @@ func (n *node) send(m raftpb.Message) {
data, err := m.Marshal()
x.Check(err)
if m.Type != raftpb.MsgHeartbeat && m.Type != raftpb.MsgHeartbeatResp {
fmt.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To)
x.Printf("\t\tSENDING: %v %v-->%v\n", m.Type, m.From, m.To)
}
select {
case n.messages <- sendmsg{to: m.To, data: data}:
Expand Down Expand Up @@ -748,11 +749,11 @@ func (n *node) Run() {
x.Check(rc.Unmarshal(rd.Snapshot.Data))
x.AssertTrue(rc.Group == n.gid)
if rc.Id != n.id {
fmt.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id)
n.retrieveSnapshot(rc.Id)
fmt.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid)
} else {
fmt.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id)
}
}
if len(rd.CommittedEntries) > 0 {
Expand Down Expand Up @@ -879,7 +880,7 @@ func (n *node) joinPeers() {
// Get leader information for MY group.
pid, paddr := groups().Leader(n.gid)
n.Connect(pid, paddr)
fmt.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid)
x.Printf("joinPeers connected with: %q with peer id: %d\n", paddr, pid)

pool, err := pools().get(paddr)
if err != nil {
Expand Down Expand Up @@ -912,7 +913,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
}
var term, idx uint64
if !raft.IsEmptySnap(sp) {
fmt.Printf("Found Snapshot: %+v\n", sp)
x.Printf("Found Snapshot: %+v\n", sp)
restart = true
if rerr = n.store.ApplySnapshot(sp); rerr != nil {
return
Expand All @@ -929,7 +930,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
return
}
if !raft.IsEmptyHardState(hd) {
fmt.Printf("Found hardstate: %+v\n", sp)
x.Printf("Found hardstate: %+v\n", sp)
restart = true
if rerr = n.store.SetHardState(hd); rerr != nil {
return
Expand All @@ -941,7 +942,7 @@ func (n *node) initFromWal(wal *raftwal.Wal) (restart bool, rerr error) {
if rerr != nil {
return
}
fmt.Printf("Group %d found %d entries\n", n.gid, len(es))
x.Printf("Group %d found %d entries\n", n.gid, len(es))
if len(es) > 0 {
restart = true
}
Expand All @@ -955,11 +956,11 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
x.Check(err)

if restart {
fmt.Printf("Restarting node for group: %d\n", n.gid)
x.Printf("Restarting node for group: %d\n", n.gid)
n.SetRaft(raft.RestartNode(n.cfg))

} else {
fmt.Printf("New Node for group: %d\n", n.gid)
x.Printf("New Node for group: %d\n", n.gid)
if groups().HasPeer(n.gid) {
n.joinPeers()
n.SetRaft(raft.StartNode(n.cfg, nil))
Expand Down Expand Up @@ -1034,7 +1035,7 @@ func (w *grpcWorker) RaftMessage(ctx context.Context, query *protos.Payload) (*p
x.Check(err)
}
if msg.Type != raftpb.MsgHeartbeat && msg.Type != raftpb.MsgHeartbeatResp {
fmt.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To)
x.Printf("RECEIVED: %v %v-->%v\n", msg.Type, msg.From, msg.To)
}
if err := applyMessage(ctx, msg); err != nil {
return &protos.Payload{}, err
Expand Down
2 changes: 1 addition & 1 deletion worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func export(gid uint32, bdir string) error {
time.Now().Format("2006-01-02-15-04")))
fspath := path.Join(bdir, fmt.Sprintf("dgraph-schema-%d-%s.rdf.gz", gid,
time.Now().Format("2006-01-02-15-04")))
fmt.Printf("Exporting to: %v, schema at %v\n", fpath, fspath)
x.Printf("Exporting to: %v, schema at %v\n", fpath, fspath)
chb := make(chan []byte, 1000)
errChan := make(chan error, 2)
go func() {
Expand Down
16 changes: 8 additions & 8 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func StartRaftNodes(walStore *badger.KV) {
gr.syncMemberships()
for gr.LastUpdate() == 0 {
time.Sleep(time.Second)
fmt.Println("Last update raft index for membership information is zero. Syncing...")
x.Println("Last update raft index for membership information is zero. Syncing...")
gr.syncMemberships()
}
fmt.Printf("Last update is now: %d\n", gr.LastUpdate())
x.Printf("Last update is now: %d\n", gr.LastUpdate())
}()
}

Expand Down Expand Up @@ -454,7 +454,7 @@ func (g *groupi) syncMemberships() {
pl, err = pools().any()
}
if err == errNoConnection {
fmt.Println("Unable to sync memberships. No valid connection")
x.Println("Unable to sync memberships. No valid connection")
return
}
x.Check(err)
Expand Down Expand Up @@ -483,7 +483,7 @@ func (g *groupi) syncMemberships() {
if len(addr) == 0 {
return
}
fmt.Printf("Got redirect for: %q\n", addr)
x.Printf("Got redirect for: %q\n", addr)
var ok bool
pl, ok = pools().connect(addr)
if !ok {
Expand Down Expand Up @@ -537,9 +537,9 @@ func (g *groupi) applyMembershipUpdate(raftIdx uint64, mm *protos.Membership) {
x.AssertTrue(ok)
}

fmt.Println("----------------------------")
fmt.Printf("====== APPLYING MEMBERSHIP UPDATE: %+v\n", update)
fmt.Println("----------------------------")
x.Println("----------------------------")
x.Printf("====== APPLYING MEMBERSHIP UPDATE: %+v\n", update)
x.Println("----------------------------")
g.Lock()
defer g.Unlock()

Expand All @@ -554,7 +554,7 @@ func (g *groupi) applyMembershipUpdate(raftIdx uint64, mm *protos.Membership) {

// Print out the entire list.
for gid, sl := range g.all {
fmt.Printf("Group: %v. List: %+v\n", gid, sl.list)
x.Printf("Group: %v. List: %+v\n", gid, sl.list)
}
}

Expand Down
2 changes: 1 addition & 1 deletion worker/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func newServer(port string) (*grpc.Server, net.Listener, error) {
log.Fatalf("While running server: %v", err)
return nil, nil, err
}
log.Printf("Worker listening at address: %v", ln.Addr())
x.Printf("Worker listening at address: %v", ln.Addr())

s := grpc.NewServer()
return s, ln, nil
Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func RunServer(bindall bool) {
if bindall {
laddr = "0.0.0.0"
} else if len(Config.MyAddr) > 0 {
fmt.Printf("--my flag is provided without bindall, Did you forget to specify bindall?\n")
x.Printf("--my flag is provided without bindall, Did you forget to specify bindall?\n")
}
var err error
ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", laddr, workerPort()))
if err != nil {
log.Fatalf("While running server: %v", err)
return
}
log.Printf("Worker listening at address: %v", ln.Addr())
x.Printf("Worker listening at address: %v", ln.Addr())

protos.RegisterWorkerServer(workerServer, &grpcWorker{})
workerServer.Serve(ln)
Expand Down
Loading