diff --git a/api/agent.go b/api/agent.go index 46979403c21..d372b8062bd 100644 --- a/api/agent.go +++ b/api/agent.go @@ -237,6 +237,57 @@ func (a *Agent) Health() (*AgentHealthResponse, error) { return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err) } +// Monitor returns a channel which will receive streaming logs from the agent +// Providing a non-nil stopCh can be used to close the connection and stop log streaming +func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + errCh := make(chan error, 1) + r, err := a.client.newRequest("GET", "/v1/agent/monitor") + if err != nil { + errCh <- err + return nil, errCh + } + + r.setQueryOptions(q) + _, resp, err := requireOK(a.client.doRequest(r)) + if err != nil { + errCh <- err + return nil, errCh + } + + frames := make(chan *StreamFrame, 10) + go func() { + defer resp.Body.Close() + + dec := json.NewDecoder(resp.Body) + + for { + select { + case <-stopCh: + close(frames) + return + default: + } + + // Decode the next frame + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + close(frames) + errCh <- err + return + } + + // Discard heartbeat frame + if frame.IsHeartbeat() { + continue + } + + frames <- &frame + } + }() + + return frames, errCh +} + // joinResponse is used to decode the response we get while // sending a member join request. type joinResponse struct { diff --git a/api/agent_test.go b/api/agent_test.go index b8658ae08d2..d01cfdf27d0 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1,9 +1,15 @@ package api import ( + "fmt" "reflect" "sort" + "strings" "testing" + "time" + + "github.com/kr/pretty" + "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/api/internal/testutil" "github.com/stretchr/testify/assert" @@ -257,3 +263,116 @@ func TestAgent_Health(t *testing.T) { assert.Nil(err) assert.True(health.Server.Ok) } + +// TestAgent_MonitorWithNode tests the Monitor endpoint +// passing in a log level and node ie, which tests monitor +// functionality for a specific client node +func TestAgent_MonitorWithNode(t *testing.T) { + t.Parallel() + rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + rpcPort = c.Ports.RPC + c.Client = &testutil.ClientConfig{ + Enabled: true, + } + }) + defer s.Stop() + + require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)})) + + agent := c.Agent() + + index := uint64(0) + var node *NodeListStub + // grab a node + testutil.WaitForResult(func() (bool, error) { + nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + index = qm.LastIndex + if len(nodes) != 1 { + return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes)) + } + if nodes[0].Status != "ready" { + return false, fmt.Errorf("node not ready: %s", nodes[0].Status) + } + node = nodes[0] + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + doneCh := make(chan struct{}) + q := &QueryOptions{ + Params: map[string]string{ + "log_level": "debug", + "node_id": node.ID, + }, + } + + frames, errCh := agent.Monitor(doneCh, q) + defer close(doneCh) + + // make a request to generate some logs + _, err := agent.NodeName() + require.NoError(t, err) + + // Wait for a log message +OUTER: + for { + select { + case f := <-frames: + if strings.Contains(string(f.Data), "[DEBUG]") { + break OUTER + } + case err := <-errCh: + t.Errorf("Error: %v", err) + case <-time.After(2 * time.Second): + require.Fail(t, "failed to get a DEBUG log message") + } + } +} + +// TestAgent_Monitor tests the Monitor endpoint +// passing in only a log level, which tests the servers +// monitor functionality +func TestAgent_Monitor(t *testing.T) { + t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + + agent := c.Agent() + + q := &QueryOptions{ + Params: map[string]string{ + "log_level": "debug", + }, + } + + doneCh := make(chan struct{}) + frames, errCh := agent.Monitor(doneCh, q) + defer close(doneCh) + + // make a request to generate some logs + _, err := agent.Region() + require.NoError(t, err) + + // Wait for a log message +OUTER: + for { + select { + case log := <-frames: + if log == nil { + continue + } + if strings.Contains(string(log.Data), "[DEBUG]") { + break OUTER + } + case err := <-errCh: + t.Fatalf("error: %v", err) + case <-time.After(2 * time.Second): + require.Fail(t, "failed to get a DEBUG log message") + } + } +} diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go new file mode 100644 index 00000000000..19df4732abf --- /dev/null +++ b/client/agent_endpoint.go @@ -0,0 +1,163 @@ +package client + +import ( + "bytes" + "context" + "errors" + "io" + "time" + + "github.com/hashicorp/nomad/command/agent/monitor" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" + + metrics "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +type Agent struct { + c *Client +} + +func NewAgentEndpoint(c *Client) *Agent { + m := &Agent{c: c} + m.c.streamingRpcs.Register("Agent.Monitor", m.monitor) + return m +} + +func (m *Agent) monitor(conn io.ReadWriteCloser) { + defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now()) + defer conn.Close() + + // Decode arguments + var args cstructs.MonitorRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // Check acl + if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(403), encoder) + return + } else if aclObj != nil && !aclObj.AllowAgentRead() { + handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + + logLevel := log.LevelFromString(args.LogLevel) + if args.LogLevel == "" { + logLevel = log.LevelFromString("INFO") + } + + if logLevel == log.NoLevel { + handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder) + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{ + JSONFormat: args.LogJSON, + Level: logLevel, + }) + + frames := make(chan *sframer.StreamFrame, streamFramesBuffer) + errCh := make(chan error) + var buf bytes.Buffer + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing + go func() { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe explicitly closed, exit + cancel() + return + } + select { + case <-ctx.Done(): + return + } + }() + + logCh := monitor.Start() + defer monitor.Stop() + initialOffset := int64(0) + + // receive logs and build frames + go func() { + defer framer.Destroy() + LOOP: + for { + select { + case log := <-logCh: + if err := framer.Send("", "log", log, initialOffset); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + }() + + var streamErr error +OUTER: + for { + select { + case frame, ok := <-frames: + if !ok { + // frame may have been closed when an error + // occurred. Check once more for an error. + select { + case streamErr = <-errCh: + // There was a pending error! + default: + // No error, continue on + } + + break OUTER + } + + var resp cstructs.StreamErrWrapper + if args.PlainText { + resp.Payload = frame.Data + } else { + if err := frameCodec.Encode(frame); err != nil { + streamErr = err + break OUTER + } + + resp.Payload = buf.Bytes() + buf.Reset() + } + + if err := encoder.Encode(resp); err != nil { + streamErr = err + break OUTER + } + encoder.Reset(conn) + case <-ctx.Done(): + break OUTER + } + } + + if streamErr != nil { + handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder) + return + } +} diff --git a/client/agent_endpoint_test.go b/client/agent_endpoint_test.go new file mode 100644 index 00000000000..14f53cc6a6a --- /dev/null +++ b/client/agent_endpoint_test.go @@ -0,0 +1,215 @@ +package client + +import ( + "encoding/json" + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/config" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +func TestMonitor_Monitor(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s := nomad.TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c, cleanup := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + req := cstructs.MonitorRequest{ + LogLevel: "debug", + NodeID: c.NodeID(), + } + + handler, err := c.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + expected := "[DEBUG]" + received := "" + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} + +func TestMonitor_Monitor_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s, root := nomad.TestACLServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c, cleanup := TestClient(t, func(c *config.Config) { + c.ACLEnabled = true + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + policyBad := mock.NodePolicy(acl.PolicyDeny) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.AgentPolicy(acl.PolicyRead) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid", policyGood) + + cases := []struct { + Name string + Token string + ExpectedErr string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedErr: "Unknown log level", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedErr: "Unknown log level", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + req := &cstructs.MonitorRequest{ + LogLevel: "unknown", + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: "global", + AuthToken: tc.Token, + }, + } + + handler, err := c.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { + break OUTER + } else { + t.Fatalf("Bad error: %v", msg.Error) + } + } + } + }) + } +} diff --git a/client/client.go b/client/client.go index e5d688ef181..0b68e383628 100644 --- a/client/client.go +++ b/client/client.go @@ -163,7 +163,7 @@ type Client struct { configCopy *config.Config configLock sync.RWMutex - logger hclog.Logger + logger hclog.InterceptLogger rpcLogger hclog.Logger connPool *pool.ConnPool @@ -304,7 +304,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic } // Create the logger - logger := cfg.Logger.ResetNamed("client") + logger := cfg.Logger.ResetNamedIntercept("client") // Create the client c := &Client{ diff --git a/client/config/config.go b/client/config/config.go index 4783bea561f..d9cabf8d8dd 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -81,7 +81,7 @@ type Config struct { LogOutput io.Writer // Logger provides a logger to thhe client - Logger log.Logger + Logger log.InterceptLogger // Region is the clients region Region string diff --git a/client/rpc.go b/client/rpc.go index beaec6f2cd7..3388f6483d4 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -23,6 +23,7 @@ type rpcEndpoints struct { ClientStats *ClientStats FileSystem *FileSystem Allocations *Allocations + Agent *Agent } // ClientRPC is used to make a local, client only RPC call @@ -218,6 +219,7 @@ func (c *Client) setupClientRpc() { c.endpoints.ClientStats = &ClientStats{c} c.endpoints.FileSystem = NewFileSystemEndpoint(c) c.endpoints.Allocations = NewAllocationsEndpoint(c) + c.endpoints.Agent = NewAgentEndpoint(c) // Create the RPC Server c.rpcServer = rpc.NewServer() diff --git a/client/structs/structs.go b/client/structs/structs.go index eff8ceaf365..6a84e2d635d 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -34,6 +34,22 @@ type ClientStatsResponse struct { structs.QueryMeta } +type MonitorRequest struct { + // LogLevel is the log level filter we want to stream logs on + LogLevel string + + // LogJSON specifies if log format should be unstructured or json + LogJSON bool + + // NodeID is the node we want to track the logs of + NodeID string + + // PlainText disables base64 encoding. + PlainText bool + + structs.QueryOptions +} + // AllocFileInfo holds information about a file inside the AllocDir type AllocFileInfo struct { Name string diff --git a/command/agent/agent.go b/command/agent/agent.go index 1dce4d97677..26f9e00a87c 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -53,7 +53,7 @@ type Agent struct { config *Config configLock sync.Mutex - logger log.Logger + logger log.InterceptLogger httpLogger log.Logger logOutput io.Writer @@ -87,7 +87,7 @@ type Agent struct { } // NewAgent is used to create a new agent with the given configuration -func NewAgent(config *Config, logger log.Logger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) { +func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) { a := &Agent{ config: config, logOutput: logOutput, diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 7fe9eae6f75..56c3631b7d1 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -1,16 +1,25 @@ package agent import ( + "bytes" + "context" "encoding/json" + "fmt" + "io" "net" "net/http" "sort" + "strconv" "strings" + "github.com/docker/docker/pkg/ioutils" + log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/serf/serf" "github.com/mitchellh/copystructure" + "github.com/ugorji/go/codec" ) type Member struct { @@ -145,6 +154,151 @@ func (s *HTTPServer) AgentMembersRequest(resp http.ResponseWriter, req *http.Req return out, nil } +func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var secret string + s.parseToken(req, &secret) + + // Check agent read permissions + if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil { + return nil, err + } else if aclObj != nil && !aclObj.AllowAgentRead() { + return nil, structs.ErrPermissionDenied + } + + // Get the provided loglevel. + logLevel := req.URL.Query().Get("log_level") + if logLevel == "" { + logLevel = "INFO" + } + + if log.LevelFromString(logLevel) == log.NoLevel { + return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) + } + + // Determine if we are targeting a server or client + nodeID := req.URL.Query().Get("node_id") + + logJSON := false + logJSONStr := req.URL.Query().Get("log_json") + if logJSONStr != "" { + parsed, err := strconv.ParseBool(logJSONStr) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown option for log json: %v", err)) + } + logJSON = parsed + } + + plainText := false + plainTextStr := req.URL.Query().Get("plain") + if plainTextStr != "" { + parsed, err := strconv.ParseBool(plainTextStr) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown option for plain: %v", err)) + } + plainText = parsed + } + + // Build the request and parse the ACL token + args := cstructs.MonitorRequest{ + NodeID: nodeID, + LogLevel: logLevel, + LogJSON: logJSON, + PlainText: plainText, + } + + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + // Make the RPC + var handler structs.StreamingRpcHandler + var handlerErr error + if nodeID != "" { + // Determine the handler to use + useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID) + if useLocalClient { + handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor") + } else if useClientRPC { + handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor") + } else if useServerRPC { + handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor") + } else { + handlerErr = CodedError(400, "No local Node and node_id not provided") + } + } else { + // No node id we want to monitor this server + handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor") + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) + + ctx, cancel := context.WithCancel(req.Context()) + go func() { + <-ctx.Done() + httpPipe.Close() + }() + + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + // create an error channel to handle errors + errCh := make(chan HTTPCodedError, 2) + + // stream response + go func() { + defer cancel() + + // Send the request + if err := encoder.Encode(args); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + + for { + select { + case <-ctx.Done(): + errCh <- nil + return + default: + } + + var res cstructs.StreamErrWrapper + if err := decoder.Decode(&res); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + decoder.Reset(httpPipe) + + if err := res.Error; err != nil { + if err.Code != nil { + errCh <- CodedError(int(*err.Code), err.Error()) + return + } + } + + if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + } + }() + + handler(handlerPipe) + cancel() + codedErr := <-errCh + + if codedErr != nil && + (codedErr == io.EOF || + strings.Contains(codedErr.Error(), "closed") || + strings.Contains(codedErr.Error(), "EOF")) { + codedErr = nil + } + return nil, codedErr +} + func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "PUT" && req.Method != "POST" { return nil, CodedError(405, ErrInvalidMethod) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index d9e26f186f8..572742fe0f1 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -4,10 +4,12 @@ import ( "bytes" "encoding/json" "fmt" + "io/ioutil" "net" "net/http" "net/http/httptest" "net/url" + "strings" "testing" "time" @@ -249,6 +251,162 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) { }) } +func TestHTTP_AgentMonitor(t *testing.T) { + t.Parallel() + + httpTest(t, nil, func(s *TestAgent) { + // invalid log_json + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil) + require.Nil(t, err) + resp := newClosableRecorder() + + // Make the request + _, err = s.Server.AgentMonitor(resp, req) + if err.(HTTPCodedError).Code() != 400 { + t.Fatalf("expected 400 response, got: %v", resp.Code) + } + } + + // unknown log_level + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil) + require.Nil(t, err) + resp := newClosableRecorder() + + // Make the request + _, err = s.Server.AgentMonitor(resp, req) + if err.(HTTPCodedError).Code() != 400 { + t.Fatalf("expected 400 response, got: %v", resp.Code) + } + } + + // check for a specific log + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil) + require.Nil(t, err) + resp := newClosableRecorder() + defer resp.Close() + + go func() { + _, err = s.Server.AgentMonitor(resp, req) + require.NoError(t, err) + }() + + // send the same log until monitor sink is set up + maxLogAttempts := 10 + tried := 0 + testutil.WaitForResult(func() (bool, error) { + if tried < maxLogAttempts { + s.Server.logger.Warn("log that should be sent") + tried++ + } + + got := resp.Body.String() + want := `{"Data":"` + if strings.Contains(got, want) { + return true, nil + } + + return false, fmt.Errorf("missing expected log, got: %v, want: %v", got, want) + }, func(err error) { + require.Fail(t, err.Error()) + }) + } + + // plain param set to true + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=debug&plain=true", nil) + require.Nil(t, err) + resp := newClosableRecorder() + defer resp.Close() + + go func() { + _, err = s.Server.AgentMonitor(resp, req) + require.NoError(t, err) + }() + + // send the same log until monitor sink is set up + maxLogAttempts := 10 + tried := 0 + testutil.WaitForResult(func() (bool, error) { + if tried < maxLogAttempts { + s.Server.logger.Debug("log that should be sent") + tried++ + } + + got := resp.Body.String() + want := `[DEBUG] http: log that should be sent` + if strings.Contains(got, want) { + return true, nil + } + + return false, fmt.Errorf("missing expected log, got: %v, want: %v", got, want) + }, func(err error) { + require.Fail(t, err.Error()) + }) + } + + // stream logs for a given node + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn&node_id="+s.client.NodeID(), nil) + require.Nil(t, err) + resp := newClosableRecorder() + defer resp.Close() + + go func() { + _, err = s.Server.AgentMonitor(resp, req) + require.NoError(t, err) + }() + + // send the same log until monitor sink is set up + maxLogAttempts := 10 + tried := 0 + out := "" + testutil.WaitForResult(func() (bool, error) { + if tried < maxLogAttempts { + s.Server.logger.Debug("log that should not be sent") + s.Server.logger.Warn("log that should be sent") + tried++ + } + output, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, err + } + + out += string(output) + want := `{"Data":"` + if strings.Contains(out, want) { + return true, nil + } + + return false, fmt.Errorf("missing expected log, got: %v, want: %v", out, want) + }, func(err error) { + require.Fail(t, err.Error()) + }) + } + }) +} + +type closableRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func newClosableRecorder() *closableRecorder { + r := httptest.NewRecorder() + closer := make(chan bool) + return &closableRecorder{r, closer} +} + +func (r *closableRecorder) Close() { + close(r.closer) +} + +func (r *closableRecorder) CloseNotify() <-chan bool { + return r.closer +} + func TestHTTP_AgentForceLeave(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/command/agent/command.go b/command/agent/command.go index 49aa738a176..da3e40e909c 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -373,8 +373,8 @@ func (c *Command) isValidConfig(config, cmdConfig *Config) bool { return true } -// setupLoggers is used to setup the logGate, logWriter, and our logOutput -func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) { +// setupLoggers is used to setup the logGate, and our logOutput +func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, io.Writer) { // Setup logging. First create the gated log writer, which will // store logs until we're ready to show them. Then create the level // filter, filtering logs of the specified level. @@ -389,19 +389,18 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, c.Ui.Error(fmt.Sprintf( "Invalid log level: %s. Valid log levels are: %v", c.logFilter.MinLevel, c.logFilter.Levels)) - return nil, nil, nil + return nil, nil } // Create a log writer, and wrap a logOutput around it - logWriter := NewLogWriter(512) - writers := []io.Writer{c.logFilter, logWriter} + writers := []io.Writer{c.logFilter} // Check if syslog is enabled if config.EnableSyslog { l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad") if err != nil { c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err)) - return nil, nil, nil + return nil, nil } writers = append(writers, &SyslogWrapper{l, c.logFilter}) } @@ -421,7 +420,7 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, duration, err := time.ParseDuration(config.LogRotateDuration) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to parse log rotation duration: %v", err)) - return nil, nil, nil + return nil, nil } logRotateDuration = duration } else { @@ -443,11 +442,11 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, c.logOutput = io.MultiWriter(writers...) log.SetOutput(c.logOutput) - return logGate, logWriter, c.logOutput + return logGate, c.logOutput } // setupAgent is used to start the agent and various interfaces -func (c *Command) setupAgent(config *Config, logger hclog.Logger, logOutput io.Writer, inmem *metrics.InmemSink) error { +func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error { c.Ui.Output("Starting Nomad agent...") agent, err := NewAgent(config, logger, logOutput, inmem) if err != nil { @@ -596,13 +595,13 @@ func (c *Command) Run(args []string) int { } // Setup the log outputs - logGate, _, logOutput := c.setupLoggers(config) + logGate, logOutput := c.setupLoggers(config) if logGate == nil { return 1 } // Create logger - logger := hclog.New(&hclog.LoggerOptions{ + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: "agent", Level: hclog.LevelFromString(config.LogLevel), Output: logOutput, diff --git a/command/agent/http.go b/command/agent/http.go index 1bb673a2aab..756b9ea5a9c 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -183,6 +183,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest)) s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest)) s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest)) + s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor)) s.mux.HandleFunc("/v1/metrics", s.wrap(s.MetricsRequest)) diff --git a/command/agent/log_writer.go b/command/agent/log_writer.go deleted file mode 100644 index ebb96878bbc..00000000000 --- a/command/agent/log_writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package agent - -import ( - "sync" -) - -// LogHandler interface is used for clients that want to subscribe -// to logs, for example to stream them over an IPC mechanism -type LogHandler interface { - HandleLog(string) -} - -// logWriter implements io.Writer so it can be used as a log sink. -// It maintains a circular buffer of logs, and a set of handlers to -// which it can stream the logs to. -type logWriter struct { - sync.Mutex - logs []string - index int - handlers map[LogHandler]struct{} -} - -// NewLogWriter creates a logWriter with the given buffer capacity -func NewLogWriter(buf int) *logWriter { - return &logWriter{ - logs: make([]string, buf), - index: 0, - handlers: make(map[LogHandler]struct{}), - } -} - -// RegisterHandler adds a log handler to receive logs, and sends -// the last buffered logs to the handler -func (l *logWriter) RegisterHandler(lh LogHandler) { - l.Lock() - defer l.Unlock() - - // Do nothing if already registered - if _, ok := l.handlers[lh]; ok { - return - } - - // Register - l.handlers[lh] = struct{}{} - - // Send the old logs - if l.logs[l.index] != "" { - for i := l.index; i < len(l.logs); i++ { - lh.HandleLog(l.logs[i]) - } - } - for i := 0; i < l.index; i++ { - lh.HandleLog(l.logs[i]) - } -} - -// DeregisterHandler removes a LogHandler and prevents more invocations -func (l *logWriter) DeregisterHandler(lh LogHandler) { - l.Lock() - defer l.Unlock() - delete(l.handlers, lh) -} - -// Write is used to accumulate new logs -func (l *logWriter) Write(p []byte) (n int, err error) { - l.Lock() - defer l.Unlock() - - // Strip off newlines at the end if there are any since we store - // individual log lines in the agent. - n = len(p) - if p[n-1] == '\n' { - p = p[:n-1] - } - - l.logs[l.index] = string(p) - l.index = (l.index + 1) % len(l.logs) - - for lh := range l.handlers { - lh.HandleLog(string(p)) - } - return -} diff --git a/command/agent/log_writer_test.go b/command/agent/log_writer_test.go deleted file mode 100644 index 19c23c573d7..00000000000 --- a/command/agent/log_writer_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package agent - -import ( - "testing" -) - -type MockLogHandler struct { - logs []string -} - -func (m *MockLogHandler) HandleLog(l string) { - m.logs = append(m.logs, l) -} - -func TestLogWriter(t *testing.T) { - t.Parallel() - h := &MockLogHandler{} - w := NewLogWriter(4) - - // Write some logs - w.Write([]byte("one")) // Gets dropped! - w.Write([]byte("two")) - w.Write([]byte("three")) - w.Write([]byte("four")) - w.Write([]byte("five")) - - // Register a handler, sends old! - w.RegisterHandler(h) - - w.Write([]byte("six")) - w.Write([]byte("seven")) - - // Deregister - w.DeregisterHandler(h) - - w.Write([]byte("eight")) - w.Write([]byte("nine")) - - out := []string{ - "two", - "three", - "four", - "five", - "six", - "seven", - } - for idx := range out { - if out[idx] != h.logs[idx] { - t.Fatalf("mismatch %v", h.logs) - } - } -} diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go new file mode 100644 index 00000000000..d788110c196 --- /dev/null +++ b/command/agent/monitor/monitor.go @@ -0,0 +1,172 @@ +package monitor + +import ( + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" +) + +// Monitor provides a mechanism to stream logs using go-hclog +// InterceptLogger and SinkAdapter. It allows streaming of logs +// at a different log level than what is set on the logger. +type Monitor interface { + // Start returns a channel of log messages which are sent + // ever time a log message occurs + Start() <-chan []byte + + // Stop de-registers the sink from the InterceptLogger + // and closes the log channels + Stop() +} + +// monitor implements the Monitor interface +type monitor struct { + // protects droppedCount and logCh + sync.Mutex + + sink log.SinkAdapter + + // logger is the logger we will be monitoring + logger log.InterceptLogger + + // logCh is a buffered chan where we send logs when streaming + logCh chan []byte + + // doneCh coordinates the shutdown of logCh + doneCh chan struct{} + + // droppedCount is the current count of messages + // that were dropped from the logCh buffer. + // only access under lock + droppedCount int + bufSize int + // droppedDuration is the amount of time we should + // wait to check for dropped messages. Defaults + // to 3 seconds + droppedDuration time.Duration +} + +// New creates a new Monitor. Start must be called in order to actually start +// streaming logs +func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor { + return new(buf, logger, opts) +} + +func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor { + sw := &monitor{ + logger: logger, + logCh: make(chan []byte, buf), + doneCh: make(chan struct{}, 1), + bufSize: buf, + droppedDuration: 3 * time.Second, + } + + opts.Output = sw + sink := log.NewSinkAdapter(opts) + sw.sink = sink + + return sw +} + +// Stop deregisters the sink and stops the monitoring process +func (d *monitor) Stop() { + d.logger.DeregisterSink(d.sink) + close(d.doneCh) +} + +// Start registers a sink on the monitor's logger and starts sending +// received log messages over the returned channel. +func (d *monitor) Start() <-chan []byte { + // register our sink with the logger + d.logger.RegisterSink(d.sink) + + streamCh := make(chan []byte, d.bufSize) + + // run a go routine that listens for streamed + // log messages and sends them to streamCh + go func() { + defer close(streamCh) + + for { + select { + case log := <-d.logCh: + select { + case <-d.doneCh: + return + case streamCh <- log: + } + case <-d.doneCh: + return + } + } + }() + + // run a go routine that periodically checks for + // dropped messages and makes room on the logCh + // to add a dropped message count warning + go func() { + // loop and check for dropped messages + for { + select { + case <-d.doneCh: + return + case <-time.After(d.droppedDuration): + d.Lock() + + // Check if there have been any dropped messages. + if d.droppedCount > 0 { + dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + select { + case <-d.doneCh: + d.Unlock() + return + // Try sending dropped message count to logCh in case + // there is room in the buffer now. + case d.logCh <- []byte(dropped): + default: + // Drop a log message to make room for "Monitor dropped.." message + select { + case <-d.logCh: + d.droppedCount++ + dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + default: + } + d.logCh <- []byte(dropped) + } + d.droppedCount = 0 + } + // unlock after handling dropped message + d.Unlock() + } + } + }() + + return streamCh +} + +// Write attempts to send latest log to logCh +// it drops the log if channel is unavailable to receive +func (d *monitor) Write(p []byte) (n int, err error) { + d.Lock() + defer d.Unlock() + + // ensure logCh is still open + select { + case <-d.doneCh: + return + default: + } + + bytes := make([]byte, len(p)) + copy(bytes, p) + + select { + case d.logCh <- bytes: + default: + d.droppedCount++ + } + + return len(p), nil +} diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go new file mode 100644 index 00000000000..697b5bdc58c --- /dev/null +++ b/command/agent/monitor/monitor_test.go @@ -0,0 +1,88 @@ +package monitor + +import ( + "fmt" + "strings" + "testing" + "time" + + log "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestMonitor_Start(t *testing.T) { + t.Parallel() + + logger := log.NewInterceptLogger(&log.LoggerOptions{ + Level: log.Error, + }) + + m := New(512, logger, &log.LoggerOptions{ + Level: log.Debug, + }) + + logCh := m.Start() + defer m.Stop() + + go func() { + logger.Debug("test log") + time.Sleep(10 * time.Millisecond) + }() + + for { + select { + case log := <-logCh: + require.Contains(t, string(log), "[DEBUG] test log") + return + case <-time.After(3 * time.Second): + t.Fatal("Expected to receive from log channel") + } + } +} + +// Ensure number of dropped messages are logged +func TestMonitor_DroppedMessages(t *testing.T) { + t.Parallel() + + logger := log.NewInterceptLogger(&log.LoggerOptions{ + Level: log.Warn, + }) + + m := new(5, logger, &log.LoggerOptions{ + Level: log.Debug, + }) + m.droppedDuration = 5 * time.Millisecond + + doneCh := make(chan struct{}) + defer close(doneCh) + + logCh := m.Start() + + for i := 0; i <= 100; i++ { + logger.Debug(fmt.Sprintf("test message %d", i)) + } + + received := "" + passed := make(chan struct{}) + go func() { + for { + select { + case recv := <-logCh: + received += string(recv) + if strings.Contains(received, "[WARN] Monitor dropped") { + close(passed) + } + } + } + }() + +TEST: + for { + select { + case <-passed: + break TEST + case <-time.After(2 * time.Second): + require.Fail(t, "expected to see warn dropped messages") + } + } +} diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 59d8eaeb83b..a36a461169e 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -216,7 +216,7 @@ func (a *TestAgent) start() (*Agent, error) { return nil, fmt.Errorf("unable to set up in memory metrics needed for agent initialization") } - logger := hclog.New(&hclog.LoggerOptions{ + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: "agent", Level: hclog.LevelFromString(a.Config.LogLevel), Output: a.LogOutput, diff --git a/command/agent_monitor.go b/command/agent_monitor.go new file mode 100644 index 00000000000..4927c5784fe --- /dev/null +++ b/command/agent_monitor.go @@ -0,0 +1,134 @@ +package command + +import ( + "fmt" + "io" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/mitchellh/cli" +) + +type MonitorCommand struct { + Meta +} + +func (c *MonitorCommand) Help() string { + helpText := ` +Usage: nomad monitor [options] + + Stream log messages of a nomad agent. The monitor command lets you + listen for log levels that may be filtered out of the Nomad agent. For + example your agent may only be logging at INFO level, but with the monitor + command you can set -log-level DEBUG + +General Options: + + ` + generalOptionsUsage() + ` + +Monitor Specific Options: + + -log-level + Sets the log level to monitor (default: INFO) + + -node-id + Sets the specific node to monitor + + -json + Sets log output to JSON format + ` + return strings.TrimSpace(helpText) +} + +func (c *MonitorCommand) Synopsis() string { + return "stream logs from a Nomad agent" +} + +func (c *MonitorCommand) Name() string { return "monitor" } + +func (c *MonitorCommand) Run(args []string) int { + c.Ui = &cli.PrefixedUi{ + OutputPrefix: " ", + InfoPrefix: " ", + ErrorPrefix: "==> ", + Ui: c.Ui, + } + + var logLevel string + var nodeID string + var logJSON bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.StringVar(&logLevel, "log-level", "", "") + flags.StringVar(&nodeID, "node-id", "", "") + flags.BoolVar(&logJSON, "json", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + if l := len(args); l != 0 { + c.Ui.Error("This command takes no arguments") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + c.Ui.Error(commandErrorText(c)) + return 1 + } + + params := map[string]string{ + "log_level": logLevel, + "node_id": nodeID, + "log_json": strconv.FormatBool(logJSON), + } + + query := &api.QueryOptions{ + Params: params, + } + + eventDoneCh := make(chan struct{}) + frames, errCh := client.Agent().Monitor(eventDoneCh, query) + select { + case err := <-errCh: + c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) + c.Ui.Error(commandErrorText(c)) + return 1 + default: + } + + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, errCh, eventDoneCh) + frameReader.SetUnblockTime(500 * time.Millisecond) + r = frameReader + + defer r.Close() + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + go func() { + <-signalCh + // End the streaming + r.Close() + }() + + _, err = io.Copy(os.Stdout, r) + if err != nil { + c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err)) + return 1 + } + + return 0 +} diff --git a/command/agent_monitor_test.go b/command/agent_monitor_test.go new file mode 100644 index 00000000000..24109b28048 --- /dev/null +++ b/command/agent_monitor_test.go @@ -0,0 +1,36 @@ +package command + +import ( + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestMonitorCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &MonitorCommand{} +} + +func TestMonitorCommand_Fails(t *testing.T) { + t.Parallel() + srv, _, _ := testServer(t, false, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &MonitorCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 { + t.Fatalf("exepected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, commandErrorText(cmd)) { + t.Fatalf("expected help output, got: %s", out) + } + + ui.ErrorWriter.Reset() + + if code := cmd.Run([]string{"-address=nope"}); code != 1 { + t.Fatalf("exepected exit code 1, got: %d", code) + } +} diff --git a/command/assets/example-short.nomad b/command/assets/example-short.nomad index ae3de97d310..d450ff7a1ac 100644 --- a/command/assets/example-short.nomad +++ b/command/assets/example-short.nomad @@ -19,7 +19,7 @@ job "example" { network { mbits = 10 - port "db" {} + port "db" {} } } } diff --git a/command/assets/example.nomad b/command/assets/example.nomad index 8f8d5fffab0..f78dc356bde 100644 --- a/command/assets/example.nomad +++ b/command/assets/example.nomad @@ -316,7 +316,7 @@ job "example" { network { mbits = 10 - port "db" {} + port "db" {} } } # The "service" stanza instructs Nomad to register this task as a service diff --git a/command/commands.go b/command/commands.go index 0eeade107cc..2c2d3052aca 100644 --- a/command/commands.go +++ b/command/commands.go @@ -366,6 +366,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "monitor": func() (cli.Command, error) { + return &MonitorCommand{ + Meta: meta, + }, nil + }, "namespace": func() (cli.Command, error) { return &NamespaceCommand{ Meta: meta, diff --git a/dev/docker-clients/client.nomad b/dev/docker-clients/client.nomad index 4689df77534..37248bfce05 100644 --- a/dev/docker-clients/client.nomad +++ b/dev/docker-clients/client.nomad @@ -23,7 +23,7 @@ job "client" { network { mbits = 10 - port "http"{} + port "http" {} } } diff --git a/e2e/consul/input/consul_example.nomad b/e2e/consul/input/consul_example.nomad index 18b02be7c10..24217b84260 100644 --- a/e2e/consul/input/consul_example.nomad +++ b/e2e/consul/input/consul_example.nomad @@ -49,7 +49,7 @@ job "consul-example" { network { mbits = 10 - port "db" {} + port "db" {} } } diff --git a/e2e/metrics/input/helloworld.nomad b/e2e/metrics/input/helloworld.nomad index bd8cfb44318..f8fed4ed8fe 100644 --- a/e2e/metrics/input/helloworld.nomad +++ b/e2e/metrics/input/helloworld.nomad @@ -29,7 +29,7 @@ job "hello" { network { mbits = 10 - port "web" {} + port "web" {} } } diff --git a/e2e/metrics/input/redis.nomad b/e2e/metrics/input/redis.nomad index 27d8a5d8429..2fedaed8755 100644 --- a/e2e/metrics/input/redis.nomad +++ b/e2e/metrics/input/redis.nomad @@ -39,7 +39,7 @@ job "redis" { network { mbits = 10 - port "db" {} + port "db" {} } } diff --git a/e2e/metrics/input/simpleweb.nomad b/e2e/metrics/input/simpleweb.nomad index 352f89bb020..92a20e1a36a 100644 --- a/e2e/metrics/input/simpleweb.nomad +++ b/e2e/metrics/input/simpleweb.nomad @@ -28,7 +28,7 @@ job "nginx" { network { mbits = 1 - port "http"{} + port "http" {} } } diff --git a/e2e/prometheus/prometheus.nomad b/e2e/prometheus/prometheus.nomad index c32d45d3347..85b64544331 100644 --- a/e2e/prometheus/prometheus.nomad +++ b/e2e/prometheus/prometheus.nomad @@ -64,7 +64,7 @@ EOH resources { network { mbits = 10 - port "prometheus_ui"{} + port "prometheus_ui" {} } } diff --git a/helper/testlog/testlog.go b/helper/testlog/testlog.go index d8502dd0ae0..7a343b50c2e 100644 --- a/helper/testlog/testlog.go +++ b/helper/testlog/testlog.go @@ -72,7 +72,7 @@ func Logger(t LogPrinter) *log.Logger { } //HCLogger returns a new test hc-logger. -func HCLogger(t LogPrinter) hclog.Logger { +func HCLogger(t LogPrinter) hclog.InterceptLogger { level := hclog.Trace envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL") if envLogLevel != "" { @@ -83,7 +83,7 @@ func HCLogger(t LogPrinter) hclog.Logger { Output: NewWriter(t), IncludeLocation: true, } - return hclog.New(opts) + return hclog.NewInterceptLogger(opts) } type prefixStdout struct { diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go new file mode 100644 index 00000000000..bb9281441f1 --- /dev/null +++ b/nomad/client_agent_endpoint.go @@ -0,0 +1,236 @@ +package nomad + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net" + "time" + + log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/monitor" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" + + "github.com/ugorji/go/codec" +) + +type Agent struct { + srv *Server +} + +func (m *Agent) register() { + m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor) +} + +func (m *Agent) monitor(conn io.ReadWriteCloser) { + defer conn.Close() + + // Decode args + var args cstructs.MonitorRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // Check agent read permissions + if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil { + handleStreamResultError(err, nil, encoder) + return + } else if aclObj != nil && !aclObj.AllowAgentRead() { + handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + + logLevel := log.LevelFromString(args.LogLevel) + if args.LogLevel == "" { + logLevel = log.LevelFromString("INFO") + } + + if logLevel == log.NoLevel { + handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder) + return + } + + // Targeting a client so forward the request + if args.NodeID != "" { + m.forwardMonitor(conn, args, encoder, decoder) + } + + // NodeID was empty, so monitor this current server + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ + Level: logLevel, + JSONFormat: args.LogJSON, + }) + + frames := make(chan *sframer.StreamFrame, 32) + errCh := make(chan error) + var buf bytes.Buffer + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing + go func() { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe explicitly closed, exit + cancel() + return + } + select { + case <-ctx.Done(): + return + } + }() + + logCh := monitor.Start() + defer monitor.Stop() + initialOffset := int64(0) + + // receive logs and build frames + go func() { + defer framer.Destroy() + LOOP: + for { + select { + case log := <-logCh: + if err := framer.Send("", "log", log, initialOffset); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + }() + + var streamErr error +OUTER: + for { + select { + case frame, ok := <-frames: + if !ok { + // frame may have been closed when an error + // occurred. Check once more for an error. + select { + case streamErr = <-errCh: + // There was a pending error! + default: + // No error, continue on + } + + break OUTER + } + + var resp cstructs.StreamErrWrapper + if args.PlainText { + resp.Payload = frame.Data + } else { + if err := frameCodec.Encode(frame); err != nil { + streamErr = err + break OUTER + } + + resp.Payload = buf.Bytes() + buf.Reset() + } + + if err := encoder.Encode(resp); err != nil { + streamErr = err + break OUTER + } + encoder.Reset(conn) + case <-ctx.Done(): + break OUTER + } + } + + if streamErr != nil { + handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder) + return + } +} + +func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + nodeID := args.NodeID + + snap, err := m.srv.State().Snapshot() + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + if err := nodeSupportsRpc(node); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + // Get the Connection to the client either by fowarding to another server + // or creating direct stream + var clientConn net.Conn + state, ok := m.srv.getNodeConn(nodeID) + if !ok { + // Determine the server that has a connection to the node + srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + if err != nil { + var code *int64 + if structs.IsErrNoNodeConn(err) { + code = helper.Int64ToPtr(404) + } + handleStreamResultError(err, code, encoder) + return + } + conn, err := m.srv.streamingRpc(srv, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + clientConn = conn + } else { + stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + clientConn = stream + } + defer clientConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, clientConn) + return +} diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go new file mode 100644 index 00000000000..201e21fb937 --- /dev/null +++ b/nomad/client_agent_endpoint_test.go @@ -0,0 +1,307 @@ +package nomad + +import ( + "encoding/json" + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/config" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +func TestMonitor_Monitor_Remote_Server(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + c, cleanup := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // No node ID to monitor the remote server + req := cstructs.MonitorRequest{ + LogLevel: "debug", + NodeID: c.NodeID(), + } + + handler, err := s1.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(3 * time.Second) + expected := "[DEBUG]" + received := "" + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} + +func TestMonitor_MonitorServer(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s := TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + // No node ID to monitor the remote server + req := cstructs.MonitorRequest{ + LogLevel: "debug", + } + + handler, err := s.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(1 * time.Second) + expected := "[DEBUG]" + received := "" + + // send logs + go func() { + for { + s.logger.Debug("test log") + time.Sleep(100 * time.Millisecond) + } + }() + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} + +func TestMonitor_Monitor_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s, root := TestACLServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.AgentPolicy(acl.PolicyRead) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid", policyGood) + + cases := []struct { + Name string + Token string + ExpectedErr string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedErr: "Unknown log level", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedErr: "Unknown log level", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + req := &cstructs.MonitorRequest{ + LogLevel: "unknown", + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: "global", + AuthToken: tc.Token, + }, + } + + handler, err := s.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { + break OUTER + } else { + t.Fatalf("Bad error: %v", msg.Error) + } + } + } + }) + } +} diff --git a/nomad/config.go b/nomad/config.go index cabf981eafd..053408eca2e 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -76,7 +76,7 @@ type Config struct { LogOutput io.Writer // Logger is the logger used by the server. - Logger log.Logger + Logger log.InterceptLogger // ProtocolVersion is the protocol version to speak. This must be between // ProtocolVersionMin and ProtocolVersionMax. diff --git a/nomad/rpc.go b/nomad/rpc.go index 29dfe78c86d..41d47fe731a 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -54,11 +54,11 @@ type rpcHandler struct { } func newRpcHandler(s *Server) *rpcHandler { - logger := s.logger.Named("rpc") + logger := s.logger.NamedIntercept("rpc") return &rpcHandler{ Server: s, logger: logger, - gologger: logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}), + gologger: logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}), } } diff --git a/nomad/server.go b/nomad/server.go index 158d26a552f..e6c00c0e7c8 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -91,7 +91,7 @@ const ( type Server struct { config *Config - logger log.Logger + logger log.InterceptLogger // Connection pool to other Nomad servers connPool *pool.ConnPool @@ -252,6 +252,7 @@ type endpoints struct { // Client endpoints ClientStats *ClientStats FileSystem *FileSystem + Agent *Agent ClientAllocations *ClientAllocations } @@ -290,7 +291,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) } // Create the logger - logger := config.Logger.ResetNamed("nomad") + logger := config.Logger.ResetNamedIntercept("nomad") // Create the server s := &Server{ @@ -1044,6 +1045,9 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // Streaming endpoints s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} s.staticEndpoints.FileSystem.register() + + s.staticEndpoints.Agent = &Agent{srv: s} + s.staticEndpoints.Agent.register() } // Register the static handlers @@ -1102,7 +1106,7 @@ func (s *Server) setupRaft() error { s.raftTransport = trans // Make sure we set the Logger. - logger := s.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}) + logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}) s.config.RaftConfig.Logger = logger s.config.RaftConfig.LogOutput = nil @@ -1272,7 +1276,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( if s.config.UpgradeVersion != "" { conf.Tags[AutopilotVersionTag] = s.config.UpgradeVersion } - logger := s.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}) + logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}) conf.MemberlistConfig.Logger = logger conf.Logger = logger conf.MemberlistConfig.LogOutput = nil diff --git a/nomad/server_setup_oss.go b/nomad/server_setup_oss.go index b73d1fa4543..7436d9eb664 100644 --- a/nomad/server_setup_oss.go +++ b/nomad/server_setup_oss.go @@ -12,7 +12,7 @@ type EnterpriseState struct{} func (s *Server) setupEnterprise(config *Config) error { // Set up the OSS version of autopilot apDelegate := &AutopilotDelegate{s} - s.autopilot = autopilot.NewAutopilot(s.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}), apDelegate, config.AutopilotInterval, config.ServerHealthInterval) + s.autopilot = autopilot.NewAutopilot(s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}), apDelegate, config.AutopilotInterval, config.ServerHealthInterval) return nil } diff --git a/vendor/github.com/hashicorp/go-hclog/README.md b/vendor/github.com/hashicorp/go-hclog/README.md index 1153e28535e..9b6845e9887 100644 --- a/vendor/github.com/hashicorp/go-hclog/README.md +++ b/vendor/github.com/hashicorp/go-hclog/README.md @@ -128,6 +128,21 @@ stdLogger.Printf("[DEBUG] %+v", stdLogger) ... [DEBUG] my-app: &{mu:{state:0 sema:0} prefix: flag:0 out:0xc42000a0a0 buf:[]} ``` +Alternatively, you may configure the system-wide logger: + +```go +// log the standard logger from 'import "log"' +log.SetOutput(appLogger.Writer(&hclog.StandardLoggerOptions{InferLevels: true})) +log.SetPrefix("") +log.SetFlags(0) + +log.Printf("[DEBUG] %d", 42) +``` + +```text +... [DEBUG] my-app: 42 +``` + Notice that if `appLogger` is initialized with the `INFO` log level _and_ you specify `InferLevels: true`, you will not see any output here. You must change `appLogger` to `DEBUG` to see output. See the docs for more information. diff --git a/vendor/github.com/hashicorp/go-hclog/colorize_unix.go b/vendor/github.com/hashicorp/go-hclog/colorize_unix.go new file mode 100644 index 00000000000..44aa9bf2c62 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/colorize_unix.go @@ -0,0 +1,27 @@ +// +build !windows + +package hclog + +import ( + "github.com/mattn/go-isatty" +) + +// setColorization will mutate the values of this logger +// to approperately configure colorization options. It provides +// a wrapper to the output stream on Windows systems. +func (l *intLogger) setColorization(opts *LoggerOptions) { + switch opts.Color { + case ColorOff: + fallthrough + case ForceColor: + return + case AutoColor: + fi := l.checkWriterIsFile() + isUnixTerm := isatty.IsTerminal(fi.Fd()) + isCygwinTerm := isatty.IsCygwinTerminal(fi.Fd()) + isTerm := isUnixTerm || isCygwinTerm + if !isTerm { + l.writer.color = ColorOff + } + } +} diff --git a/vendor/github.com/hashicorp/go-hclog/colorize_windows.go b/vendor/github.com/hashicorp/go-hclog/colorize_windows.go new file mode 100644 index 00000000000..23486b6d74f --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/colorize_windows.go @@ -0,0 +1,33 @@ +// +build windows + +package hclog + +import ( + "os" + + colorable "github.com/mattn/go-colorable" + "github.com/mattn/go-isatty" +) + +// setColorization will mutate the values of this logger +// to approperately configure colorization options. It provides +// a wrapper to the output stream on Windows systems. +func (l *intLogger) setColorization(opts *LoggerOptions) { + switch opts.Color { + case ColorOff: + return + case ForceColor: + fi := l.checkWriterIsFile() + l.writer.w = colorable.NewColorable(fi) + case AutoColor: + fi := l.checkWriterIsFile() + isUnixTerm := isatty.IsTerminal(os.Stdout.Fd()) + isCygwinTerm := isatty.IsCygwinTerminal(os.Stdout.Fd()) + isTerm := isUnixTerm || isCygwinTerm + if !isTerm { + l.writer.color = ColorOff + return + } + l.writer.w = colorable.NewColorable(fi) + } +} diff --git a/vendor/github.com/hashicorp/go-hclog/context.go b/vendor/github.com/hashicorp/go-hclog/context.go new file mode 100644 index 00000000000..7815f501942 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/context.go @@ -0,0 +1,38 @@ +package hclog + +import ( + "context" +) + +// WithContext inserts a logger into the context and is retrievable +// with FromContext. The optional args can be set with the same syntax as +// Logger.With to set fields on the inserted logger. This will not modify +// the logger argument in-place. +func WithContext(ctx context.Context, logger Logger, args ...interface{}) context.Context { + // While we could call logger.With even with zero args, we have this + // check to avoid unnecessary allocations around creating a copy of a + // logger. + if len(args) > 0 { + logger = logger.With(args...) + } + + return context.WithValue(ctx, contextKey, logger) +} + +// FromContext returns a logger from the context. This will return L() +// (the default logger) if no logger is found in the context. Therefore, +// this will never return a nil value. +func FromContext(ctx context.Context) Logger { + logger, _ := ctx.Value(contextKey).(Logger) + if logger == nil { + return L() + } + + return logger +} + +// Unexported new type so that our context key never collides with another. +type contextKeyType struct{} + +// contextKey is the key used for the context to store the logger. +var contextKey = contextKeyType{} diff --git a/vendor/github.com/hashicorp/go-hclog/global.go b/vendor/github.com/hashicorp/go-hclog/global.go index 55ce4396034..22ebc57d877 100644 --- a/vendor/github.com/hashicorp/go-hclog/global.go +++ b/vendor/github.com/hashicorp/go-hclog/global.go @@ -8,27 +8,55 @@ var ( protect sync.Once def Logger - // The options used to create the Default logger. These are - // read only when the Default logger is created, so set them - // as soon as the process starts. + // DefaultOptions is used to create the Default logger. These are read + // only when the Default logger is created, so set them as soon as the + // process starts. DefaultOptions = &LoggerOptions{ Level: DefaultLevel, Output: DefaultOutput, } ) -// Return a logger that is held globally. This can be a good starting +// Default returns a globally held logger. This can be a good starting // place, and then you can use .With() and .Name() to create sub-loggers // to be used in more specific contexts. +// The value of the Default logger can be set via SetDefault() or by +// changing the options in DefaultOptions. +// +// This method is goroutine safe, returning a global from memory, but +// cause should be used if SetDefault() is called it random times +// in the program as that may result in race conditions and an unexpected +// Logger being returned. func Default() Logger { protect.Do(func() { - def = New(DefaultOptions) + // If SetDefault was used before Default() was called, we need to + // detect that here. + if def == nil { + def = New(DefaultOptions) + } }) return def } -// A short alias for Default() +// L is a short alias for Default(). func L() Logger { return Default() } + +// SetDefault changes the logger to be returned by Default()and L() +// to the one given. This allows packages to use the default logger +// and have higher level packages change it to match the execution +// environment. It returns any old default if there is one. +// +// NOTE: This is expected to be called early in the program to setup +// a default logger. As such, it does not attempt to make itself +// not racy with regard to the value of the default logger. Ergo +// if it is called in goroutines, you may experience race conditions +// with other goroutines retrieving the default logger. Basically, +// don't do that. +func SetDefault(log Logger) Logger { + old := def + def = log + return old +} diff --git a/vendor/github.com/hashicorp/go-hclog/go.mod b/vendor/github.com/hashicorp/go-hclog/go.mod new file mode 100644 index 00000000000..b6698c0836f --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/go.mod @@ -0,0 +1,12 @@ +module github.com/hashicorp/go-hclog + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.7.0 + github.com/mattn/go-colorable v0.1.4 + github.com/mattn/go-isatty v0.0.10 + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.2.2 +) + +go 1.13 diff --git a/vendor/github.com/hashicorp/go-hclog/go.sum b/vendor/github.com/hashicorp/go-hclog/go.sum new file mode 100644 index 00000000000..9cee2196c24 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= +github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/vendor/github.com/hashicorp/go-hclog/int.go b/vendor/github.com/hashicorp/go-hclog/int.go deleted file mode 100644 index 7d17d81cbc9..00000000000 --- a/vendor/github.com/hashicorp/go-hclog/int.go +++ /dev/null @@ -1,457 +0,0 @@ -package hclog - -import ( - "bufio" - "encoding" - "encoding/json" - "fmt" - "log" - "os" - "runtime" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" -) - -var ( - _levelToBracket = map[Level]string{ - Debug: "[DEBUG]", - Trace: "[TRACE]", - Info: "[INFO ]", - Warn: "[WARN ]", - Error: "[ERROR]", - } -) - -// Given the options (nil for defaults), create a new Logger -func New(opts *LoggerOptions) Logger { - if opts == nil { - opts = &LoggerOptions{} - } - - output := opts.Output - if output == nil { - output = os.Stderr - } - - level := opts.Level - if level == NoLevel { - level = DefaultLevel - } - - mtx := opts.Mutex - if mtx == nil { - mtx = new(sync.Mutex) - } - - ret := &intLogger{ - m: mtx, - json: opts.JSONFormat, - caller: opts.IncludeLocation, - name: opts.Name, - timeFormat: TimeFormat, - w: bufio.NewWriter(output), - level: new(int32), - } - if opts.TimeFormat != "" { - ret.timeFormat = opts.TimeFormat - } - atomic.StoreInt32(ret.level, int32(level)) - return ret -} - -// The internal logger implementation. Internal in that it is defined entirely -// by this package. -type intLogger struct { - json bool - caller bool - name string - timeFormat string - - // this is a pointer so that it's shared by any derived loggers, since - // those derived loggers share the bufio.Writer as well. - m *sync.Mutex - w *bufio.Writer - level *int32 - - implied []interface{} -} - -// Make sure that intLogger is a Logger -var _ Logger = &intLogger{} - -// The time format to use for logging. This is a version of RFC3339 that -// contains millisecond precision -const TimeFormat = "2006-01-02T15:04:05.000Z0700" - -// Log a message and a set of key/value pairs if the given level is at -// or more severe that the threshold configured in the Logger. -func (z *intLogger) Log(level Level, msg string, args ...interface{}) { - if level < Level(atomic.LoadInt32(z.level)) { - return - } - - t := time.Now() - - z.m.Lock() - defer z.m.Unlock() - - if z.json { - z.logJson(t, level, msg, args...) - } else { - z.log(t, level, msg, args...) - } - - z.w.Flush() -} - -// Cleanup a path by returning the last 2 segments of the path only. -func trimCallerPath(path string) string { - // lovely borrowed from zap - // nb. To make sure we trim the path correctly on Windows too, we - // counter-intuitively need to use '/' and *not* os.PathSeparator here, - // because the path given originates from Go stdlib, specifically - // runtime.Caller() which (as of Mar/17) returns forward slashes even on - // Windows. - // - // See https://github.com/golang/go/issues/3335 - // and https://github.com/golang/go/issues/18151 - // - // for discussion on the issue on Go side. - // - - // Find the last separator. - // - idx := strings.LastIndexByte(path, '/') - if idx == -1 { - return path - } - - // Find the penultimate separator. - idx = strings.LastIndexByte(path[:idx], '/') - if idx == -1 { - return path - } - - return path[idx+1:] -} - -// Non-JSON logging format function -func (z *intLogger) log(t time.Time, level Level, msg string, args ...interface{}) { - z.w.WriteString(t.Format(z.timeFormat)) - z.w.WriteByte(' ') - - s, ok := _levelToBracket[level] - if ok { - z.w.WriteString(s) - } else { - z.w.WriteString("[UNKN ]") - } - - if z.caller { - if _, file, line, ok := runtime.Caller(3); ok { - z.w.WriteByte(' ') - z.w.WriteString(trimCallerPath(file)) - z.w.WriteByte(':') - z.w.WriteString(strconv.Itoa(line)) - z.w.WriteByte(':') - } - } - - z.w.WriteByte(' ') - - if z.name != "" { - z.w.WriteString(z.name) - z.w.WriteString(": ") - } - - z.w.WriteString(msg) - - args = append(z.implied, args...) - - var stacktrace CapturedStacktrace - - if args != nil && len(args) > 0 { - if len(args)%2 != 0 { - cs, ok := args[len(args)-1].(CapturedStacktrace) - if ok { - args = args[:len(args)-1] - stacktrace = cs - } else { - args = append(args, "") - } - } - - z.w.WriteByte(':') - - FOR: - for i := 0; i < len(args); i = i + 2 { - var val string - - switch st := args[i+1].(type) { - case string: - val = st - case int: - val = strconv.FormatInt(int64(st), 10) - case int64: - val = strconv.FormatInt(int64(st), 10) - case int32: - val = strconv.FormatInt(int64(st), 10) - case int16: - val = strconv.FormatInt(int64(st), 10) - case int8: - val = strconv.FormatInt(int64(st), 10) - case uint: - val = strconv.FormatUint(uint64(st), 10) - case uint64: - val = strconv.FormatUint(uint64(st), 10) - case uint32: - val = strconv.FormatUint(uint64(st), 10) - case uint16: - val = strconv.FormatUint(uint64(st), 10) - case uint8: - val = strconv.FormatUint(uint64(st), 10) - case CapturedStacktrace: - stacktrace = st - continue FOR - case Format: - val = fmt.Sprintf(st[0].(string), st[1:]...) - default: - val = fmt.Sprintf("%v", st) - } - - z.w.WriteByte(' ') - z.w.WriteString(args[i].(string)) - z.w.WriteByte('=') - - if strings.ContainsAny(val, " \t\n\r") { - z.w.WriteByte('"') - z.w.WriteString(val) - z.w.WriteByte('"') - } else { - z.w.WriteString(val) - } - } - } - - z.w.WriteString("\n") - - if stacktrace != "" { - z.w.WriteString(string(stacktrace)) - } -} - -// JSON logging function -func (z *intLogger) logJson(t time.Time, level Level, msg string, args ...interface{}) { - vals := map[string]interface{}{ - "@message": msg, - "@timestamp": t.Format("2006-01-02T15:04:05.000000Z07:00"), - } - - var levelStr string - switch level { - case Error: - levelStr = "error" - case Warn: - levelStr = "warn" - case Info: - levelStr = "info" - case Debug: - levelStr = "debug" - case Trace: - levelStr = "trace" - default: - levelStr = "all" - } - - vals["@level"] = levelStr - - if z.name != "" { - vals["@module"] = z.name - } - - if z.caller { - if _, file, line, ok := runtime.Caller(3); ok { - vals["@caller"] = fmt.Sprintf("%s:%d", file, line) - } - } - - args = append(z.implied, args...) - - if args != nil && len(args) > 0 { - if len(args)%2 != 0 { - cs, ok := args[len(args)-1].(CapturedStacktrace) - if ok { - args = args[:len(args)-1] - vals["stacktrace"] = cs - } else { - args = append(args, "") - } - } - - for i := 0; i < len(args); i = i + 2 { - if _, ok := args[i].(string); !ok { - // As this is the logging function not much we can do here - // without injecting into logs... - continue - } - val := args[i+1] - switch sv := val.(type) { - case error: - // Check if val is of type error. If error type doesn't - // implement json.Marshaler or encoding.TextMarshaler - // then set val to err.Error() so that it gets marshaled - switch sv.(type) { - case json.Marshaler, encoding.TextMarshaler: - default: - val = sv.Error() - } - case Format: - val = fmt.Sprintf(sv[0].(string), sv[1:]...) - } - - vals[args[i].(string)] = val - } - } - - err := json.NewEncoder(z.w).Encode(vals) - if err != nil { - panic(err) - } -} - -// Emit the message and args at DEBUG level -func (z *intLogger) Debug(msg string, args ...interface{}) { - z.Log(Debug, msg, args...) -} - -// Emit the message and args at TRACE level -func (z *intLogger) Trace(msg string, args ...interface{}) { - z.Log(Trace, msg, args...) -} - -// Emit the message and args at INFO level -func (z *intLogger) Info(msg string, args ...interface{}) { - z.Log(Info, msg, args...) -} - -// Emit the message and args at WARN level -func (z *intLogger) Warn(msg string, args ...interface{}) { - z.Log(Warn, msg, args...) -} - -// Emit the message and args at ERROR level -func (z *intLogger) Error(msg string, args ...interface{}) { - z.Log(Error, msg, args...) -} - -// Indicate that the logger would emit TRACE level logs -func (z *intLogger) IsTrace() bool { - return Level(atomic.LoadInt32(z.level)) == Trace -} - -// Indicate that the logger would emit DEBUG level logs -func (z *intLogger) IsDebug() bool { - return Level(atomic.LoadInt32(z.level)) <= Debug -} - -// Indicate that the logger would emit INFO level logs -func (z *intLogger) IsInfo() bool { - return Level(atomic.LoadInt32(z.level)) <= Info -} - -// Indicate that the logger would emit WARN level logs -func (z *intLogger) IsWarn() bool { - return Level(atomic.LoadInt32(z.level)) <= Warn -} - -// Indicate that the logger would emit ERROR level logs -func (z *intLogger) IsError() bool { - return Level(atomic.LoadInt32(z.level)) <= Error -} - -// Return a sub-Logger for which every emitted log message will contain -// the given key/value pairs. This is used to create a context specific -// Logger. -func (z *intLogger) With(args ...interface{}) Logger { - if len(args)%2 != 0 { - panic("With() call requires paired arguments") - } - - var nz intLogger = *z - - result := make(map[string]interface{}, len(z.implied)+len(args)) - keys := make([]string, 0, len(z.implied)+len(args)) - - // Read existing args, store map and key for consistent sorting - for i := 0; i < len(z.implied); i += 2 { - key := z.implied[i].(string) - keys = append(keys, key) - result[key] = z.implied[i+1] - } - // Read new args, store map and key for consistent sorting - for i := 0; i < len(args); i += 2 { - key := args[i].(string) - _, exists := result[key] - if !exists { - keys = append(keys, key) - } - result[key] = args[i+1] - } - - // Sort keys to be consistent - sort.Strings(keys) - - nz.implied = make([]interface{}, 0, len(z.implied)+len(args)) - for _, k := range keys { - nz.implied = append(nz.implied, k) - nz.implied = append(nz.implied, result[k]) - } - - return &nz -} - -// Create a new sub-Logger that a name decending from the current name. -// This is used to create a subsystem specific Logger. -func (z *intLogger) Named(name string) Logger { - var nz intLogger = *z - - if nz.name != "" { - nz.name = nz.name + "." + name - } else { - nz.name = name - } - - return &nz -} - -// Create a new sub-Logger with an explicit name. This ignores the current -// name. This is used to create a standalone logger that doesn't fall -// within the normal hierarchy. -func (z *intLogger) ResetNamed(name string) Logger { - var nz intLogger = *z - - nz.name = name - - return &nz -} - -// Update the logging level on-the-fly. This will affect all subloggers as -// well. -func (z *intLogger) SetLevel(level Level) { - atomic.StoreInt32(z.level, int32(level)) -} - -// Create a *log.Logger that will send it's data through this Logger. This -// allows packages that expect to be using the standard library log to actually -// use this logger. -func (z *intLogger) StandardLogger(opts *StandardLoggerOptions) *log.Logger { - if opts == nil { - opts = &StandardLoggerOptions{} - } - - return log.New(&stdlogAdapter{z, opts.InferLevels}, "", 0) -} diff --git a/vendor/github.com/hashicorp/go-hclog/interceptlogger.go b/vendor/github.com/hashicorp/go-hclog/interceptlogger.go new file mode 100644 index 00000000000..68f31e42d88 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/interceptlogger.go @@ -0,0 +1,216 @@ +package hclog + +import ( + "io" + "log" + "sync" + "sync/atomic" +) + +var _ Logger = &interceptLogger{} + +type interceptLogger struct { + Logger + + sync.Mutex + sinkCount *int32 + Sinks map[SinkAdapter]struct{} +} + +func NewInterceptLogger(opts *LoggerOptions) InterceptLogger { + intercept := &interceptLogger{ + Logger: New(opts), + sinkCount: new(int32), + Sinks: make(map[SinkAdapter]struct{}), + } + + atomic.StoreInt32(intercept.sinkCount, 0) + + return intercept +} + +// Emit the message and args at TRACE level to log and sinks +func (i *interceptLogger) Trace(msg string, args ...interface{}) { + i.Logger.Trace(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Trace, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at DEBUG level to log and sinks +func (i *interceptLogger) Debug(msg string, args ...interface{}) { + i.Logger.Debug(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Debug, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at INFO level to log and sinks +func (i *interceptLogger) Info(msg string, args ...interface{}) { + i.Logger.Info(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Info, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at WARN level to log and sinks +func (i *interceptLogger) Warn(msg string, args ...interface{}) { + i.Logger.Warn(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Warn, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at ERROR level to log and sinks +func (i *interceptLogger) Error(msg string, args ...interface{}) { + i.Logger.Error(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Error, msg, i.retrieveImplied(args...)...) + } +} + +func (i *interceptLogger) retrieveImplied(args ...interface{}) []interface{} { + top := i.Logger.ImpliedArgs() + + cp := make([]interface{}, len(top)+len(args)) + copy(cp, top) + copy(cp[len(top):], args) + + return cp +} + +// Create a new sub-Logger that a name decending from the current name. +// This is used to create a subsystem specific Logger. +// Registered sinks will subscribe to these messages as well. +func (i *interceptLogger) Named(name string) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.Named(name) + + return &sub +} + +// Create a new sub-Logger with an explicit name. This ignores the current +// name. This is used to create a standalone logger that doesn't fall +// within the normal hierarchy. Registered sinks will subscribe +// to these messages as well. +func (i *interceptLogger) ResetNamed(name string) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.ResetNamed(name) + + return &sub +} + +// Create a new sub-Logger that a name decending from the current name. +// This is used to create a subsystem specific Logger. +// Registered sinks will subscribe to these messages as well. +func (i *interceptLogger) NamedIntercept(name string) InterceptLogger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.Named(name) + + return &sub +} + +// Create a new sub-Logger with an explicit name. This ignores the current +// name. This is used to create a standalone logger that doesn't fall +// within the normal hierarchy. Registered sinks will subscribe +// to these messages as well. +func (i *interceptLogger) ResetNamedIntercept(name string) InterceptLogger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.ResetNamed(name) + + return &sub +} + +// Return a sub-Logger for which every emitted log message will contain +// the given key/value pairs. This is used to create a context specific +// Logger. +func (i *interceptLogger) With(args ...interface{}) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.With(args...) + + return &sub +} + +// RegisterSink attaches a SinkAdapter to interceptLoggers sinks. +func (i *interceptLogger) RegisterSink(sink SinkAdapter) { + i.Lock() + defer i.Unlock() + + i.Sinks[sink] = struct{}{} + + atomic.AddInt32(i.sinkCount, 1) +} + +// DeregisterSink removes a SinkAdapter from interceptLoggers sinks. +func (i *interceptLogger) DeregisterSink(sink SinkAdapter) { + i.Lock() + defer i.Unlock() + + delete(i.Sinks, sink) + + atomic.AddInt32(i.sinkCount, -1) +} + +// Create a *log.Logger that will send it's data through this Logger. This +// allows packages that expect to be using the standard library to log to +// actually use this logger, which will also send to any registered sinks. +func (l *interceptLogger) StandardLoggerIntercept(opts *StandardLoggerOptions) *log.Logger { + if opts == nil { + opts = &StandardLoggerOptions{} + } + + return log.New(l.StandardWriterIntercept(opts), "", 0) +} + +func (l *interceptLogger) StandardWriterIntercept(opts *StandardLoggerOptions) io.Writer { + return &stdlogAdapter{ + log: l, + inferLevels: opts.InferLevels, + forceLevel: opts.ForceLevel, + } +} diff --git a/vendor/github.com/hashicorp/go-hclog/intlogger.go b/vendor/github.com/hashicorp/go-hclog/intlogger.go new file mode 100644 index 00000000000..5882b87018d --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/intlogger.go @@ -0,0 +1,588 @@ +package hclog + +import ( + "bytes" + "encoding" + "encoding/json" + "fmt" + "io" + "log" + "os" + "reflect" + "regexp" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/fatih/color" +) + +// TimeFormat to use for logging. This is a version of RFC3339 that contains +// contains millisecond precision +const TimeFormat = "2006-01-02T15:04:05.000Z0700" + +// errJsonUnsupportedTypeMsg is included in log json entries, if an arg cannot be serialized to json +const errJsonUnsupportedTypeMsg = "logging contained values that don't serialize to json" + +var ( + _levelToBracket = map[Level]string{ + Debug: "[DEBUG]", + Trace: "[TRACE]", + Info: "[INFO] ", + Warn: "[WARN] ", + Error: "[ERROR]", + } + + _levelToColor = map[Level]*color.Color{ + Debug: color.New(color.FgHiWhite), + Trace: color.New(color.FgHiGreen), + Info: color.New(color.FgHiBlue), + Warn: color.New(color.FgHiYellow), + Error: color.New(color.FgHiRed), + } +) + +// Make sure that intLogger is a Logger +var _ Logger = &intLogger{} + +// intLogger is an internal logger implementation. Internal in that it is +// defined entirely by this package. +type intLogger struct { + json bool + caller bool + name string + timeFormat string + + // This is a pointer so that it's shared by any derived loggers, since + // those derived loggers share the bufio.Writer as well. + mutex *sync.Mutex + writer *writer + level *int32 + + implied []interface{} +} + +// New returns a configured logger. +func New(opts *LoggerOptions) Logger { + return newLogger(opts) +} + +// NewSinkAdapter returns a SinkAdapter with configured settings +// defined by LoggerOptions +func NewSinkAdapter(opts *LoggerOptions) SinkAdapter { + return newLogger(opts) +} + +func newLogger(opts *LoggerOptions) *intLogger { + if opts == nil { + opts = &LoggerOptions{} + } + + output := opts.Output + if output == nil { + output = DefaultOutput + } + + level := opts.Level + if level == NoLevel { + level = DefaultLevel + } + + mutex := opts.Mutex + if mutex == nil { + mutex = new(sync.Mutex) + } + + l := &intLogger{ + json: opts.JSONFormat, + caller: opts.IncludeLocation, + name: opts.Name, + timeFormat: TimeFormat, + mutex: mutex, + writer: newWriter(output, opts.Color), + level: new(int32), + } + + l.setColorization(opts) + + if opts.TimeFormat != "" { + l.timeFormat = opts.TimeFormat + } + + atomic.StoreInt32(l.level, int32(level)) + + return l +} + +// Log a message and a set of key/value pairs if the given level is at +// or more severe that the threshold configured in the Logger. +func (l *intLogger) Log(name string, level Level, msg string, args ...interface{}) { + if level < Level(atomic.LoadInt32(l.level)) { + return + } + + t := time.Now() + + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.json { + l.logJSON(t, name, level, msg, args...) + } else { + l.log(t, name, level, msg, args...) + } + + l.writer.Flush(level) +} + +// Cleanup a path by returning the last 2 segments of the path only. +func trimCallerPath(path string) string { + // lovely borrowed from zap + // nb. To make sure we trim the path correctly on Windows too, we + // counter-intuitively need to use '/' and *not* os.PathSeparator here, + // because the path given originates from Go stdlib, specifically + // runtime.Caller() which (as of Mar/17) returns forward slashes even on + // Windows. + // + // See https://github.com/golang/go/issues/3335 + // and https://github.com/golang/go/issues/18151 + // + // for discussion on the issue on Go side. + + // Find the last separator. + idx := strings.LastIndexByte(path, '/') + if idx == -1 { + return path + } + + // Find the penultimate separator. + idx = strings.LastIndexByte(path[:idx], '/') + if idx == -1 { + return path + } + + return path[idx+1:] +} + +var logImplFile = regexp.MustCompile(`github.com/hashicorp/go-hclog/.+logger.go$`) + +// Non-JSON logging format function +func (l *intLogger) log(t time.Time, name string, level Level, msg string, args ...interface{}) { + l.writer.WriteString(t.Format(l.timeFormat)) + l.writer.WriteByte(' ') + + s, ok := _levelToBracket[level] + if ok { + l.writer.WriteString(s) + } else { + l.writer.WriteString("[?????]") + } + + offset := 3 + if l.caller { + // Check if the caller is inside our package and inside + // a logger implementation file + if _, file, _, ok := runtime.Caller(3); ok { + match := logImplFile.MatchString(file) + if match { + offset = 4 + } + } + + if _, file, line, ok := runtime.Caller(offset); ok { + l.writer.WriteByte(' ') + l.writer.WriteString(trimCallerPath(file)) + l.writer.WriteByte(':') + l.writer.WriteString(strconv.Itoa(line)) + l.writer.WriteByte(':') + } + } + + l.writer.WriteByte(' ') + + if name != "" { + l.writer.WriteString(name) + l.writer.WriteString(": ") + } + + l.writer.WriteString(msg) + + args = append(l.implied, args...) + + var stacktrace CapturedStacktrace + + if args != nil && len(args) > 0 { + if len(args)%2 != 0 { + cs, ok := args[len(args)-1].(CapturedStacktrace) + if ok { + args = args[:len(args)-1] + stacktrace = cs + } else { + args = append(args, "") + } + } + + l.writer.WriteByte(':') + + FOR: + for i := 0; i < len(args); i = i + 2 { + var ( + val string + raw bool + ) + + switch st := args[i+1].(type) { + case string: + val = st + case int: + val = strconv.FormatInt(int64(st), 10) + case int64: + val = strconv.FormatInt(int64(st), 10) + case int32: + val = strconv.FormatInt(int64(st), 10) + case int16: + val = strconv.FormatInt(int64(st), 10) + case int8: + val = strconv.FormatInt(int64(st), 10) + case uint: + val = strconv.FormatUint(uint64(st), 10) + case uint64: + val = strconv.FormatUint(uint64(st), 10) + case uint32: + val = strconv.FormatUint(uint64(st), 10) + case uint16: + val = strconv.FormatUint(uint64(st), 10) + case uint8: + val = strconv.FormatUint(uint64(st), 10) + case CapturedStacktrace: + stacktrace = st + continue FOR + case Format: + val = fmt.Sprintf(st[0].(string), st[1:]...) + default: + v := reflect.ValueOf(st) + if v.Kind() == reflect.Slice { + val = l.renderSlice(v) + raw = true + } else { + val = fmt.Sprintf("%v", st) + } + } + + l.writer.WriteByte(' ') + l.writer.WriteString(args[i].(string)) + l.writer.WriteByte('=') + + if !raw && strings.ContainsAny(val, " \t\n\r") { + l.writer.WriteByte('"') + l.writer.WriteString(val) + l.writer.WriteByte('"') + } else { + l.writer.WriteString(val) + } + } + } + + l.writer.WriteString("\n") + + if stacktrace != "" { + l.writer.WriteString(string(stacktrace)) + } +} + +func (l *intLogger) renderSlice(v reflect.Value) string { + var buf bytes.Buffer + + buf.WriteRune('[') + + for i := 0; i < v.Len(); i++ { + if i > 0 { + buf.WriteString(", ") + } + + sv := v.Index(i) + + var val string + + switch sv.Kind() { + case reflect.String: + val = sv.String() + case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64: + val = strconv.FormatInt(sv.Int(), 10) + case reflect.Uint, reflect.Uint16, reflect.Uint32, reflect.Uint64: + val = strconv.FormatUint(sv.Uint(), 10) + default: + val = fmt.Sprintf("%v", sv.Interface()) + } + + if strings.ContainsAny(val, " \t\n\r") { + buf.WriteByte('"') + buf.WriteString(val) + buf.WriteByte('"') + } else { + buf.WriteString(val) + } + } + + buf.WriteRune(']') + + return buf.String() +} + +// JSON logging function +func (l *intLogger) logJSON(t time.Time, name string, level Level, msg string, args ...interface{}) { + vals := l.jsonMapEntry(t, name, level, msg) + args = append(l.implied, args...) + + if args != nil && len(args) > 0 { + if len(args)%2 != 0 { + cs, ok := args[len(args)-1].(CapturedStacktrace) + if ok { + args = args[:len(args)-1] + vals["stacktrace"] = cs + } else { + args = append(args, "") + } + } + + for i := 0; i < len(args); i = i + 2 { + if _, ok := args[i].(string); !ok { + // As this is the logging function not much we can do here + // without injecting into logs... + continue + } + val := args[i+1] + switch sv := val.(type) { + case error: + // Check if val is of type error. If error type doesn't + // implement json.Marshaler or encoding.TextMarshaler + // then set val to err.Error() so that it gets marshaled + switch sv.(type) { + case json.Marshaler, encoding.TextMarshaler: + default: + val = sv.Error() + } + case Format: + val = fmt.Sprintf(sv[0].(string), sv[1:]...) + } + + vals[args[i].(string)] = val + } + } + + err := json.NewEncoder(l.writer).Encode(vals) + if err != nil { + if _, ok := err.(*json.UnsupportedTypeError); ok { + plainVal := l.jsonMapEntry(t, name, level, msg) + plainVal["@warn"] = errJsonUnsupportedTypeMsg + + json.NewEncoder(l.writer).Encode(plainVal) + } + } +} + +func (l intLogger) jsonMapEntry(t time.Time, name string, level Level, msg string) map[string]interface{} { + vals := map[string]interface{}{ + "@message": msg, + "@timestamp": t.Format("2006-01-02T15:04:05.000000Z07:00"), + } + + var levelStr string + switch level { + case Error: + levelStr = "error" + case Warn: + levelStr = "warn" + case Info: + levelStr = "info" + case Debug: + levelStr = "debug" + case Trace: + levelStr = "trace" + default: + levelStr = "all" + } + + vals["@level"] = levelStr + + if name != "" { + vals["@module"] = name + } + + if l.caller { + if _, file, line, ok := runtime.Caller(4); ok { + vals["@caller"] = fmt.Sprintf("%s:%d", file, line) + } + } + return vals +} + +// Emit the message and args at DEBUG level +func (l *intLogger) Debug(msg string, args ...interface{}) { + l.Log(l.Name(), Debug, msg, args...) +} + +// Emit the message and args at TRACE level +func (l *intLogger) Trace(msg string, args ...interface{}) { + l.Log(l.Name(), Trace, msg, args...) +} + +// Emit the message and args at INFO level +func (l *intLogger) Info(msg string, args ...interface{}) { + l.Log(l.Name(), Info, msg, args...) +} + +// Emit the message and args at WARN level +func (l *intLogger) Warn(msg string, args ...interface{}) { + l.Log(l.Name(), Warn, msg, args...) +} + +// Emit the message and args at ERROR level +func (l *intLogger) Error(msg string, args ...interface{}) { + l.Log(l.Name(), Error, msg, args...) +} + +// Indicate that the logger would emit TRACE level logs +func (l *intLogger) IsTrace() bool { + return Level(atomic.LoadInt32(l.level)) == Trace +} + +// Indicate that the logger would emit DEBUG level logs +func (l *intLogger) IsDebug() bool { + return Level(atomic.LoadInt32(l.level)) <= Debug +} + +// Indicate that the logger would emit INFO level logs +func (l *intLogger) IsInfo() bool { + return Level(atomic.LoadInt32(l.level)) <= Info +} + +// Indicate that the logger would emit WARN level logs +func (l *intLogger) IsWarn() bool { + return Level(atomic.LoadInt32(l.level)) <= Warn +} + +// Indicate that the logger would emit ERROR level logs +func (l *intLogger) IsError() bool { + return Level(atomic.LoadInt32(l.level)) <= Error +} + +// Return a sub-Logger for which every emitted log message will contain +// the given key/value pairs. This is used to create a context specific +// Logger. +func (l *intLogger) With(args ...interface{}) Logger { + if len(args)%2 != 0 { + panic("With() call requires paired arguments") + } + + sl := *l + + result := make(map[string]interface{}, len(l.implied)+len(args)) + keys := make([]string, 0, len(l.implied)+len(args)) + + // Read existing args, store map and key for consistent sorting + for i := 0; i < len(l.implied); i += 2 { + key := l.implied[i].(string) + keys = append(keys, key) + result[key] = l.implied[i+1] + } + // Read new args, store map and key for consistent sorting + for i := 0; i < len(args); i += 2 { + key := args[i].(string) + _, exists := result[key] + if !exists { + keys = append(keys, key) + } + result[key] = args[i+1] + } + + // Sort keys to be consistent + sort.Strings(keys) + + sl.implied = make([]interface{}, 0, len(l.implied)+len(args)) + for _, k := range keys { + sl.implied = append(sl.implied, k) + sl.implied = append(sl.implied, result[k]) + } + + return &sl +} + +// Create a new sub-Logger that a name decending from the current name. +// This is used to create a subsystem specific Logger. +func (l *intLogger) Named(name string) Logger { + sl := *l + + if sl.name != "" { + sl.name = sl.name + "." + name + } else { + sl.name = name + } + + return &sl +} + +// Create a new sub-Logger with an explicit name. This ignores the current +// name. This is used to create a standalone logger that doesn't fall +// within the normal hierarchy. +func (l *intLogger) ResetNamed(name string) Logger { + sl := *l + + sl.name = name + + return &sl +} + +// Update the logging level on-the-fly. This will affect all subloggers as +// well. +func (l *intLogger) SetLevel(level Level) { + atomic.StoreInt32(l.level, int32(level)) +} + +// Create a *log.Logger that will send it's data through this Logger. This +// allows packages that expect to be using the standard library log to actually +// use this logger. +func (l *intLogger) StandardLogger(opts *StandardLoggerOptions) *log.Logger { + if opts == nil { + opts = &StandardLoggerOptions{} + } + + return log.New(l.StandardWriter(opts), "", 0) +} + +func (l *intLogger) StandardWriter(opts *StandardLoggerOptions) io.Writer { + return &stdlogAdapter{ + log: l, + inferLevels: opts.InferLevels, + forceLevel: opts.ForceLevel, + } +} + +// checks if the underlying io.Writer is a file, and +// panics if not. For use by colorization. +func (l *intLogger) checkWriterIsFile() *os.File { + fi, ok := l.writer.w.(*os.File) + if !ok { + panic("Cannot enable coloring of non-file Writers") + } + return fi +} + +// Accept implements the SinkAdapter interface +func (i *intLogger) Accept(name string, level Level, msg string, args ...interface{}) { + i.Log(name, level, msg, args...) +} + +// ImpliedArgs returns the loggers implied args +func (i *intLogger) ImpliedArgs() []interface{} { + return i.implied +} + +// Name returns the loggers name +func (i *intLogger) Name() string { + return i.name +} diff --git a/vendor/github.com/hashicorp/go-hclog/log.go b/vendor/github.com/hashicorp/go-hclog/logger.go similarity index 50% rename from vendor/github.com/hashicorp/go-hclog/log.go rename to vendor/github.com/hashicorp/go-hclog/logger.go index 894e8461bc8..48d608714f0 100644 --- a/vendor/github.com/hashicorp/go-hclog/log.go +++ b/vendor/github.com/hashicorp/go-hclog/logger.go @@ -9,38 +9,42 @@ import ( ) var ( - DefaultOutput = os.Stderr - DefaultLevel = Info + //DefaultOutput is used as the default log output. + DefaultOutput io.Writer = os.Stderr + + // DefaultLevel is used as the default log level. + DefaultLevel = Info ) +// Level represents a log level. type Level int32 const ( - // This is a special level used to indicate that no level has been + // NoLevel is a special level used to indicate that no level has been // set and allow for a default to be used. NoLevel Level = 0 - // The most verbose level. Intended to be used for the tracing of actions - // in code, such as function enters/exits, etc. + // Trace is the most verbose level. Intended to be used for the tracing + // of actions in code, such as function enters/exits, etc. Trace Level = 1 - // For programmer lowlevel analysis. + // Debug information for programmer lowlevel analysis. Debug Level = 2 - // For information about steady state operations. + // Info information about steady state operations. Info Level = 3 - // For information about rare but handled events. + // Warn information about rare but handled events. Warn Level = 4 - // For information about unrecoverable events. + // Error information about unrecoverable events. Error Level = 5 ) -// When processing a value of this type, the logger automatically treats the first -// argument as a Printf formatting string and passes the rest as the values to be -// formatted. For example: L.Info(Fmt{"%d beans/day", beans}). This is a simple -// convience type for when formatting is required. +// Format is a simple convience type for when formatting is required. When +// processing a value of this type, the logger automatically treats the first +// argument as a Printf formatting string and passes the rest as the values +// to be formatted. For example: L.Info(Fmt{"%d beans/day", beans}). type Format []interface{} // Fmt returns a Format type. This is a convience function for creating a Format @@ -49,11 +53,26 @@ func Fmt(str string, args ...interface{}) Format { return append(Format{str}, args...) } +// ColorOption expresses how the output should be colored, if at all. +type ColorOption uint8 + +const ( + // ColorOff is the default coloration, and does not + // inject color codes into the io.Writer. + ColorOff ColorOption = iota + // AutoColor checks if the io.Writer is a tty, + // and if so enables coloring. + AutoColor + // ForceColor will enable coloring, regardless of whether + // the io.Writer is a tty or not. + ForceColor +) + // LevelFromString returns a Level type for the named log level, or "NoLevel" if // the level string is invalid. This facilitates setting the log level via // config or environment variable by name in a predictable way. func LevelFromString(levelStr string) Level { - // We don't care about case. Accept "INFO" or "info" + // We don't care about case. Accept both "INFO" and "info". levelStr = strings.ToLower(strings.TrimSpace(levelStr)) switch levelStr { case "trace": @@ -71,7 +90,7 @@ func LevelFromString(levelStr string) Level { } } -// The main Logger interface. All code should code against this interface only. +// Logger describes the interface that must be implemeted by all loggers. type Logger interface { // Args are alternating key, val pairs // keys must be strings @@ -107,9 +126,15 @@ type Logger interface { // Indicate if ERROR logs would be emitted. This and the other Is* guards IsError() bool + // ImpliedArgs returns With key/value pairs + ImpliedArgs() []interface{} + // Creates a sublogger that will always have the given key/value pairs With(args ...interface{}) Logger + // Returns the Name of the logger + Name() string + // Create a logger that will prepend the name string on the front of all messages. // If the logger already has a name, the new value will be appended to the current // name. That way, a major subsystem can use this to decorate all it's own logs @@ -127,16 +152,27 @@ type Logger interface { // Return a value that conforms to the stdlib log.Logger interface StandardLogger(opts *StandardLoggerOptions) *log.Logger + + // Return a value that conforms to io.Writer, which can be passed into log.SetOutput() + StandardWriter(opts *StandardLoggerOptions) io.Writer } +// StandardLoggerOptions can be used to configure a new standard logger. type StandardLoggerOptions struct { // Indicate that some minimal parsing should be done on strings to try // and detect their level and re-emit them. // This supports the strings like [ERROR], [ERR] [TRACE], [WARN], [INFO], // [DEBUG] and strip it off before reapplying it. InferLevels bool + + // ForceLevel is used to force all output from the standard logger to be at + // the specified level. Similar to InferLevels, this will strip any level + // prefix contained in the logged string before applying the forced level. + // If set, this override InferLevels. + ForceLevel Level } +// LoggerOptions can be used to configure a new logger. type LoggerOptions struct { // Name of the subsystem to prefix logs with Name string @@ -144,7 +180,7 @@ type LoggerOptions struct { // The threshold for the logger. Anything less severe is supressed Level Level - // Where to write the logs to. Defaults to os.Stdout if nil + // Where to write the logs to. Defaults to os.Stderr if nil Output io.Writer // An optional mutex pointer in case Output is shared @@ -158,4 +194,47 @@ type LoggerOptions struct { // The time format to use instead of the default TimeFormat string + + // Color the output. On Windows, colored logs are only avaiable for io.Writers that + // are concretely instances of *os.File. + Color ColorOption +} + +// InterceptLogger describes the interface for using a logger +// that can register different output sinks. +// This is useful for sending lower level log messages +// to a different output while keeping the root logger +// at a higher one. +type InterceptLogger interface { + // Logger is the root logger for an InterceptLogger + Logger + + // RegisterSink adds a SinkAdapter to the InterceptLogger + RegisterSink(sink SinkAdapter) + + // DeregisterSink removes a SinkAdapter from the InterceptLogger + DeregisterSink(sink SinkAdapter) + + // Create a interceptlogger that will prepend the name string on the front of all messages. + // If the logger already has a name, the new value will be appended to the current + // name. That way, a major subsystem can use this to decorate all it's own logs + // without losing context. + NamedIntercept(name string) InterceptLogger + + // Create a interceptlogger that will prepend the name string on the front of all messages. + // This sets the name of the logger to the value directly, unlike Named which honor + // the current name as well. + ResetNamedIntercept(name string) InterceptLogger + + // Return a value that conforms to the stdlib log.Logger interface + StandardLoggerIntercept(opts *StandardLoggerOptions) *log.Logger + + // Return a value that conforms to io.Writer, which can be passed into log.SetOutput() + StandardWriterIntercept(opts *StandardLoggerOptions) io.Writer +} + +// SinkAdapter describes the interface that must be implemented +// in order to Register a new sink to an InterceptLogger +type SinkAdapter interface { + Accept(name string, level Level, msg string, args ...interface{}) } diff --git a/vendor/github.com/hashicorp/go-hclog/nulllogger.go b/vendor/github.com/hashicorp/go-hclog/nulllogger.go index 0942361a52d..4abdd5583e8 100644 --- a/vendor/github.com/hashicorp/go-hclog/nulllogger.go +++ b/vendor/github.com/hashicorp/go-hclog/nulllogger.go @@ -1,6 +1,7 @@ package hclog import ( + "io" "io/ioutil" "log" ) @@ -34,8 +35,12 @@ func (l *nullLogger) IsWarn() bool { return false } func (l *nullLogger) IsError() bool { return false } +func (l *nullLogger) ImpliedArgs() []interface{} { return []interface{}{} } + func (l *nullLogger) With(args ...interface{}) Logger { return l } +func (l *nullLogger) Name() string { return "" } + func (l *nullLogger) Named(name string) Logger { return l } func (l *nullLogger) ResetNamed(name string) Logger { return l } @@ -43,5 +48,9 @@ func (l *nullLogger) ResetNamed(name string) Logger { return l } func (l *nullLogger) SetLevel(level Level) {} func (l *nullLogger) StandardLogger(opts *StandardLoggerOptions) *log.Logger { - return log.New(ioutil.Discard, "", log.LstdFlags) + return log.New(l.StandardWriter(opts), "", log.LstdFlags) +} + +func (l *nullLogger) StandardWriter(opts *StandardLoggerOptions) io.Writer { + return ioutil.Discard } diff --git a/vendor/github.com/hashicorp/go-hclog/stacktrace.go b/vendor/github.com/hashicorp/go-hclog/stacktrace.go index 8af1a3be4c0..9b27bd3d3d9 100644 --- a/vendor/github.com/hashicorp/go-hclog/stacktrace.go +++ b/vendor/github.com/hashicorp/go-hclog/stacktrace.go @@ -40,12 +40,13 @@ var ( } ) -// A stacktrace gathered by a previous call to log.Stacktrace. If passed -// to a logging function, the stacktrace will be appended. +// CapturedStacktrace represents a stacktrace captured by a previous call +// to log.Stacktrace. If passed to a logging function, the stacktrace +// will be appended. type CapturedStacktrace string -// Gather a stacktrace of the current goroutine and return it to be passed -// to a logging function. +// Stacktrace captures a stacktrace of the current goroutine and returns +// it to be passed to a logging function. func Stacktrace() CapturedStacktrace { return CapturedStacktrace(takeStacktrace()) } diff --git a/vendor/github.com/hashicorp/go-hclog/stdlog.go b/vendor/github.com/hashicorp/go-hclog/stdlog.go index 2bb927fc90c..2cf0456a05a 100644 --- a/vendor/github.com/hashicorp/go-hclog/stdlog.go +++ b/vendor/github.com/hashicorp/go-hclog/stdlog.go @@ -9,39 +9,51 @@ import ( // and back into our Logger. This is basically the only way to // build upon *log.Logger. type stdlogAdapter struct { - hl Logger + log Logger inferLevels bool + forceLevel Level } // Take the data, infer the levels if configured, and send it through -// a regular Logger +// a regular Logger. func (s *stdlogAdapter) Write(data []byte) (int, error) { str := string(bytes.TrimRight(data, " \t\n")) - if s.inferLevels { + if s.forceLevel != NoLevel { + // Use pickLevel to strip log levels included in the line since we are + // forcing the level + _, str := s.pickLevel(str) + + // Log at the forced level + s.dispatch(str, s.forceLevel) + } else if s.inferLevels { level, str := s.pickLevel(str) - switch level { - case Trace: - s.hl.Trace(str) - case Debug: - s.hl.Debug(str) - case Info: - s.hl.Info(str) - case Warn: - s.hl.Warn(str) - case Error: - s.hl.Error(str) - default: - s.hl.Info(str) - } + s.dispatch(str, level) } else { - s.hl.Info(str) + s.log.Info(str) } return len(data), nil } -// Detect, based on conventions, what log level this is +func (s *stdlogAdapter) dispatch(str string, level Level) { + switch level { + case Trace: + s.log.Trace(str) + case Debug: + s.log.Debug(str) + case Info: + s.log.Info(str) + case Warn: + s.log.Warn(str) + case Error: + s.log.Error(str) + default: + s.log.Info(str) + } +} + +// Detect, based on conventions, what log level this is. func (s *stdlogAdapter) pickLevel(str string) (Level, string) { switch { case strings.HasPrefix(str, "[DEBUG]"): diff --git a/vendor/github.com/hashicorp/go-hclog/writer.go b/vendor/github.com/hashicorp/go-hclog/writer.go new file mode 100644 index 00000000000..421a1f06c0b --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/writer.go @@ -0,0 +1,82 @@ +package hclog + +import ( + "bytes" + "io" +) + +type writer struct { + b bytes.Buffer + w io.Writer + color ColorOption +} + +func newWriter(w io.Writer, color ColorOption) *writer { + return &writer{w: w, color: color} +} + +func (w *writer) Flush(level Level) (err error) { + var unwritten = w.b.Bytes() + + if w.color != ColorOff { + color := _levelToColor[level] + unwritten = []byte(color.Sprintf("%s", unwritten)) + } + + if lw, ok := w.w.(LevelWriter); ok { + _, err = lw.LevelWrite(level, unwritten) + } else { + _, err = w.w.Write(unwritten) + } + w.b.Reset() + return err +} + +func (w *writer) Write(p []byte) (int, error) { + return w.b.Write(p) +} + +func (w *writer) WriteByte(c byte) error { + return w.b.WriteByte(c) +} + +func (w *writer) WriteString(s string) (int, error) { + return w.b.WriteString(s) +} + +// LevelWriter is the interface that wraps the LevelWrite method. +type LevelWriter interface { + LevelWrite(level Level, p []byte) (n int, err error) +} + +// LeveledWriter writes all log messages to the standard writer, +// except for log levels that are defined in the overrides map. +type LeveledWriter struct { + standard io.Writer + overrides map[Level]io.Writer +} + +// NewLeveledWriter returns an initialized LeveledWriter. +// +// standard will be used as the default writer for all log levels, +// except for log levels that are defined in the overrides map. +func NewLeveledWriter(standard io.Writer, overrides map[Level]io.Writer) *LeveledWriter { + return &LeveledWriter{ + standard: standard, + overrides: overrides, + } +} + +// Write implements io.Writer. +func (lw *LeveledWriter) Write(p []byte) (int, error) { + return lw.standard.Write(p) +} + +// LevelWrite implements LevelWriter. +func (lw *LeveledWriter) LevelWrite(level Level, p []byte) (int, error) { + w, ok := lw.overrides[level] + if !ok { + w = lw.standard + } + return w.Write(p) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index fe000cbfc95..4f46cd6566e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -209,7 +209,7 @@ {"path":"github.com/hashicorp/go-envparse","checksumSHA1":"FKmqR4DC3nCXtnT9pe02z5CLNWo=","revision":"310ca1881b22af3522e3a8638c0b426629886196","revisionTime":"2018-01-19T21:58:41Z"}, {"path":"github.com/hashicorp/go-getter","checksumSHA1":"d4brua17AGQqMNtngK4xKOUwboY=","revision":"f5101da0117392c6e7960c934f05a2fd689a5b5f","revisionTime":"2019-08-22T19:45:07Z"}, {"path":"github.com/hashicorp/go-getter/helper/url","checksumSHA1":"9J+kDr29yDrwsdu2ULzewmqGjpA=","revision":"b345bfcec894fb7ff3fdf9b21baf2f56ea423d98","revisionTime":"2018-04-10T17:49:45Z"}, - {"path":"github.com/hashicorp/go-hclog","checksumSHA1":"dOP7kCX3dACHc9mU79826N411QA=","revision":"ff2cf002a8dd750586d91dddd4470c341f981fe1","revisionTime":"2018-07-09T16:53:50Z"}, + {"path":"github.com/hashicorp/go-hclog","checksumSHA1":"CYpA7Nmx/oTmWKIXtvO0uRhIyGk=","revision":"234833755cb25ae46996d0ef823326f492f89243","revisionTime":"2019-10-25T21:19:05Z","version":"f-multi-sink","versionExact":"f-multi-sink"}, {"path":"github.com/hashicorp/go-immutable-radix","checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"}, {"path":"github.com/hashicorp/go-memdb","checksumSHA1":"FMAvwDar2bQyYAW4XMFhAt0J5xA=","revision":"20ff6434c1cc49b80963d45bf5c6aa89c78d8d57","revisionTime":"2017-08-31T20:15:40Z"}, {"path":"github.com/hashicorp/go-msgpack/codec","checksumSHA1":"CKGYNUDKre3Z2g4hHNVfp5nTcfA=","revision":"23165f7bc3c2dda1891434ebb9da1511a7bafc1c","revisionTime":"2019-09-27T12:33:13Z","version":"upstream-08f7b40","versionExact":"upstream-08f7b40"},