Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace all logrus to internal log #436

Merged
merged 1 commit into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/interceptor/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"

"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/log"

"google.golang.org/grpc"
)
Expand All @@ -32,7 +32,7 @@ func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor {
if _, ok := RPCNeedRetry[method]; !ok {
return stream, err
}
log.Debugf("[NewStreamRetry] return retryStreawm for method %s", method)
log.Debugf(ctx, "[NewStreamRetry] return retryStreawm for method %s", method)
return &retryStream{
ctx: ctx,
ClientStream: stream,
Expand All @@ -57,7 +57,7 @@ func (s *retryStream) RecvMsg(m interface{}) (err error) {
}

return backoff.Retry(func() error {
log.Debug("[retryStream] retry on new stream")
log.Debug(context.TODO(), "[retryStream] retry on new stream")
stream, err := s.newStream()
if err != nil {
// even io.EOF triggers retry, and it's what we want!
Expand Down
8 changes: 4 additions & 4 deletions client/resolver/eru/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"

"github.com/projecteru2/core/client/servicediscovery"
log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/log"

"github.com/projecteru2/core/types"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -44,20 +44,20 @@ func (r *Resolver) Close() {
}

func (r *Resolver) sync() {
log.Debug("[EruResolver] start sync service discovery")
log.Debug(context.TODO(), "[EruResolver] start sync service discovery")
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
defer cancel()

ch, err := r.discovery.Watch(ctx)
if err != nil {
log.Errorf("[EruResolver] failed to watch service status: %v", err)
log.Errorf(ctx, "[EruResolver] failed to watch service status: %v", err)
return
}
for {
select {
case <-ctx.Done():
log.Errorf("[EruResolver] watch interrupted: %v", ctx.Err())
log.Errorf(ctx, "[EruResolver] watch interrupted: %v", ctx.Err())
return
case endpoints, ok := <-ch:
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions client/servicediscovery/eru_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/projecteru2/core/auth"
"github.com/projecteru2/core/client/interceptor"
"github.com/projecteru2/core/client/utils"
log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/log"

pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"
Expand All @@ -34,7 +34,7 @@ func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery {
func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
cc, err := w.dial(ctx, w.endpoint, w.authConfig)
if err != nil {
log.Errorf("[EruServiceWatch] dial failed: %v", err)
log.Errorf(ctx, "[EruServiceWatch] dial failed: %v", err)
return
}
client := pb.NewCoreRPCClient(cc)
Expand All @@ -48,7 +48,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
watchCtx, cancelWatch := context.WithCancel(ctx)
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
if err != nil {
log.Errorf("[EruServiceWatch] watch failed, try later: %v", err)
log.Errorf(ctx, "[EruServiceWatch] watch failed, try later: %v", err)
time.Sleep(10 * time.Second)
continue
}
Expand All @@ -69,7 +69,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
status, err := stream.Recv()
close(cancelTimer)
if err != nil {
log.Errorf("[EruServiceWatch] recv failed: %v", err)
log.Errorf(ctx, "[EruServiceWatch] recv failed: %v", err)
break
}
expectedInterval = time.Duration(status.GetIntervalInSecond())
Expand Down
16 changes: 8 additions & 8 deletions client/utils/servicepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/go-ping/ping"
log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/log"
)

// EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable
Expand Down Expand Up @@ -53,7 +53,7 @@ func (p *EndpointPusher) delOutdated(endpoints []string) {
if _, ok := newEps[ep]; !ok {
cancel()
p.pendingEndpoints.Delete(ep)
log.Debugf("[EruResolver] pending endpoint deleted: %s", ep)
log.Debugf(context.TODO(), "[EruResolver] pending endpoint deleted: %s", ep)
}
return true
})
Expand All @@ -66,7 +66,7 @@ func (p *EndpointPusher) delOutdated(endpoints []string) {
}
if _, ok := newEps[ep]; !ok {
p.availableEndpoints.Delete(ep)
log.Debugf("[EruResolver] available endpoint deleted: %s", ep)
log.Debugf(context.TODO(), "[EruResolver] available endpoint deleted: %s", ep)
}
return true
})
Expand All @@ -84,20 +84,20 @@ func (p *EndpointPusher) addCheck(endpoints []string) {
ctx, cancel := context.WithCancel(context.Background())
p.pendingEndpoints.Store(endpoint, cancel)
go p.pollReachability(ctx, endpoint)
log.Debugf("[EruResolver] pending endpoint added: %s", endpoint)
log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint)
}
}
func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) {
parts := strings.Split(endpoint, ":")
if len(parts) != 2 {
log.Errorf("[EruResolver] wrong format of endpoint: %s", endpoint)
log.Errorf(context.TODO(), "[EruResolver] wrong format of endpoint: %s", endpoint)
return
}

for {
select {
case <-ctx.Done():
log.Debugf("[EruResolver] reachability goroutine ends: %s", endpoint)
log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint)
return
default:
}
Expand All @@ -110,15 +110,15 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
p.pendingEndpoints.Delete(endpoint)
p.availableEndpoints.Store(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf("[EruResolver] available endpoint added: %s", endpoint)
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}
}

func (p *EndpointPusher) checkReachability(host string) (err error) {
pinger, err := ping.NewPinger(host)
if err != nil {
log.Errorf("[EruResolver] failed to create pinger: %+v", err)
log.Errorf(context.TODO(), "[EruResolver] failed to create pinger: %+v", err)
return
}
defer pinger.Stop()
Expand Down
6 changes: 3 additions & 3 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func Infof(ctx context.Context, format string, args ...interface{}) {

// Debug is Debug
func Debug(ctx context.Context, args ...interface{}) {
a := []interface{}{}
a = append(a, interface{}(getTracingInfo(ctx)))
log.Debug(append(a, args...))
a := []interface{}{getTracingInfo(ctx)}
a = append(a, args...)
log.Debug(a)
}

// Debugf is Debugf
Expand Down
9 changes: 5 additions & 4 deletions wal/hydro.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package wal

import (
"context"
"encoding/json"
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/projecteru2/core/log"

coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/wal/kv"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (h *Hydro) Recover() {
for ent := range ch {
ev, err := h.decodeEvent(ent)
if err != nil {
log.Errorf("[Recover] decode event error: %v", err)
log.Errorf(context.TODO(), "[Recover] decode event error: %v", err)
continue
}
events = append(events, ev)
Expand All @@ -57,12 +58,12 @@ func (h *Hydro) Recover() {
for _, ev := range events {
handler, ok := h.getEventHandler(ev.Type)
if !ok {
log.Errorf("[Recover] no such event handler for %s", ev.Type)
log.Errorf(context.TODO(), "[Recover] no such event handler for %s", ev.Type)
continue
}

if err := h.recover(handler, ev); err != nil {
log.Errorf("[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err)
log.Errorf(context.TODO(), "[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err)
continue
}
}
Expand Down