diff --git a/cmd/root.go b/cmd/root.go index 21e7408d3666..9628f0a764d2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - "io/ioutil" + "io" stdlog "log" "strconv" "strings" + "sync" "time" "github.com/sirupsen/logrus" @@ -20,20 +21,22 @@ import ( "go.k6.io/k6/log" ) -const waitRemoteLoggerTimeout = time.Second * 5 +const waitLoggerCloseTimeout = time.Second * 5 // This is to keep all fields needed for the main/root k6 command type rootCommand struct { globalState *state.GlobalState cmd *cobra.Command - loggerStopped <-chan struct{} + stopLoggersCh chan struct{} + loggersWg sync.WaitGroup loggerIsRemote bool } func newRootCommand(gs *state.GlobalState) *rootCommand { c := &rootCommand{ - globalState: gs, + globalState: gs, + stopLoggersCh: make(chan struct{}), } // the base command when called without any subcommands. rootCmd := &cobra.Command{ @@ -66,37 +69,31 @@ func newRootCommand(gs *state.GlobalState) *rootCommand { } func (c *rootCommand) persistentPreRunE(cmd *cobra.Command, args []string) error { - var err error - - c.loggerStopped, err = c.setupLoggers() + err := c.setupLoggers(c.stopLoggersCh) if err != nil { return err } - select { - case <-c.loggerStopped: - default: - c.loggerIsRemote = true - } - - stdlog.SetOutput(c.globalState.Logger.Writer()) c.globalState.Logger.Debugf("k6 version: v%s", consts.FullVersion()) return nil } func (c *rootCommand) execute() { ctx, cancel := context.WithCancel(c.globalState.Ctx) - defer cancel() c.globalState.Ctx = ctx + exitCode := -1 + defer func() { + cancel() + c.stopLoggers() + c.globalState.OSExit(exitCode) + }() + err := c.cmd.Execute() if err == nil { - cancel() - c.waitRemoteLogger() - // TODO: explicitly call c.globalState.osExit(0), for simpler tests and clarity? + exitCode = 0 return } - exitCode := -1 var ecerr errext.HasExitCode if errors.As(err, &ecerr) { exitCode = int(ecerr.ExitCode()) @@ -117,11 +114,7 @@ func (c *rootCommand) execute() { c.globalState.Logger.WithFields(fields).Error(errText) if c.loggerIsRemote { c.globalState.FallbackLogger.WithFields(fields).Error(errText) - cancel() - c.waitRemoteLogger() } - - c.globalState.OSExit(exitCode) } // Execute adds all child commands to the root command sets flags appropriately. @@ -138,13 +131,17 @@ func ExecuteWithGlobalState(gs *state.GlobalState) { newRootCommand(gs).execute() } -func (c *rootCommand) waitRemoteLogger() { - if c.loggerIsRemote { - select { - case <-c.loggerStopped: - case <-time.After(waitRemoteLoggerTimeout): - c.globalState.FallbackLogger.Errorf("Remote logger didn't stop in %s", waitRemoteLoggerTimeout) - } +func (c *rootCommand) stopLoggers() { + done := make(chan struct{}) + go func() { + c.loggersWg.Wait() + close(done) + }() + close(c.stopLoggersCh) + select { + case <-done: + case <-time.After(waitLoggerCloseTimeout): + c.globalState.FallbackLogger.Errorf("The logger didn't stop in %s", waitLoggerCloseTimeout) } } @@ -201,14 +198,16 @@ func (f RawFormatter) Format(entry *logrus.Entry) ([]byte, error) { // The returned channel will be closed when the logger has finished flushing and pushing logs after // the provided context is closed. It is closed if the logger isn't buffering and sending messages // Asynchronously -func (c *rootCommand) setupLoggers() (<-chan struct{}, error) { - ch := make(chan struct{}) - close(ch) - +func (c *rootCommand) setupLoggers(stop <-chan struct{}) error { if c.globalState.Flags.Verbose { c.globalState.Logger.SetLevel(logrus.DebugLevel) } + var ( + hook log.AsyncHook + err error + ) + loggerForceColors := false // disable color by default switch line := c.globalState.Flags.LogOutput; { case line == "stderr": @@ -218,33 +217,24 @@ func (c *rootCommand) setupLoggers() (<-chan struct{}, error) { loggerForceColors = !c.globalState.Flags.NoColor && c.globalState.Stdout.IsTTY c.globalState.Logger.SetOutput(c.globalState.Stdout) case line == "none": - c.globalState.Logger.SetOutput(ioutil.Discard) - + c.globalState.Logger.SetOutput(io.Discard) case strings.HasPrefix(line, "loki"): - ch = make(chan struct{}) // TODO: refactor, get it from the constructor - hook, err := log.LokiFromConfigLine(c.globalState.Ctx, c.globalState.FallbackLogger, line, ch) + c.loggerIsRemote = true + hook, err = log.LokiFromConfigLine(c.globalState.FallbackLogger, line) if err != nil { - return nil, err + return err } - c.globalState.Logger.AddHook(hook) - c.globalState.Logger.SetOutput(ioutil.Discard) // don't output to anywhere else c.globalState.Flags.LogFormat = "raw" - case strings.HasPrefix(line, "file"): - ch = make(chan struct{}) // TODO: refactor, get it from the constructor - hook, err := log.FileHookFromConfigLine( - c.globalState.Ctx, c.globalState.FS, c.globalState.Getwd, - c.globalState.FallbackLogger, line, ch, + hook, err = log.FileHookFromConfigLine( + c.globalState.FS, c.globalState.Getwd, + c.globalState.FallbackLogger, line, ) if err != nil { - return nil, err + return err } - - c.globalState.Logger.AddHook(hook) - c.globalState.Logger.SetOutput(ioutil.Discard) - default: - return nil, fmt.Errorf("unsupported log output '%s'", line) + return fmt.Errorf("unsupported log output '%s'", line) } switch c.globalState.Flags.LogFormat { @@ -260,5 +250,36 @@ func (c *rootCommand) setupLoggers() (<-chan struct{}, error) { }) c.globalState.Logger.Debug("Logger format: TEXT") } - return ch, nil + + cancel := func() {} // noop as default + if hook != nil { + ctx := context.Background() + ctx, cancel = context.WithCancel(ctx) + c.setLoggerHook(ctx, hook) + } + + // Sometimes the Go runtime uses the standard log output to + // log some messages directly. + // It does when an invalid char is found in a Cookie. + // Check for details https://github.com/grafana/k6/issues/711#issue-341414887 + w := c.globalState.Logger.Writer() + stdlog.SetOutput(w) + c.loggersWg.Add(1) + go func() { + <-stop + cancel() + _ = w.Close() + c.loggersWg.Done() + }() + return nil +} + +func (c *rootCommand) setLoggerHook(ctx context.Context, h log.AsyncHook) { + c.loggersWg.Add(1) + go func() { + h.Listen(ctx) + c.loggersWg.Done() + }() + c.globalState.Logger.AddHook(h) + c.globalState.Logger.SetOutput(io.Discard) // don't output to anywhere else } diff --git a/cmd/stdlog_integration_test.go b/cmd/stdlog_integration_test.go new file mode 100644 index 000000000000..c47e771d8f6a --- /dev/null +++ b/cmd/stdlog_integration_test.go @@ -0,0 +1,45 @@ +package cmd + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/cmd/tests" + "go.k6.io/k6/lib/testutils/httpmultibin" +) + +// SetOutput sets the global log so it is racy with other tests +// +//nolint:paralleltest +func TestStdLogOutputIsSet(t *testing.T) { + tb := httpmultibin.NewHTTPMultiBin(t) + ts := tests.NewGlobalTestState(t) + // Sometimes the Go runtime uses the standard log output to + // log some messages directly. + // It does when an invalid char is found in a Cookie. + // Check for details https://github.com/grafana/k6/issues/711#issue-341414887 + ts.Stdin = bytes.NewReader([]byte(tb.Replacer.Replace(` +import http from 'k6/http'; +export const options = { + hosts: { + "HTTPSBIN_DOMAIN": "HTTPSBIN_IP", + }, + insecureSkipTLSVerify: true, +} +export default function() { + http.get("HTTPSBIN_URL/get", { + "cookies": { + "test": "\"" + }, + }) +}`))) + + ts.CmdArgs = []string{"k6", "run", "-i", "1", "-"} + newRootCommand(ts.GlobalState).execute() + + entries := ts.LoggerHook.Drain() + require.Len(t, entries, 1) + assert.Contains(t, entries[0].Message, "Cookie.Value; dropping invalid bytes") +} diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 5e75e8e0cde4..2c67d3a89214 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -1716,11 +1716,9 @@ func TestPrometheusRemoteWriteOutput(t *testing.T) { t.Parallel() ts := NewGlobalTestState(t) + ts.Env["K6_PROMETHEUS_RW_SERVER_URL"] = "http://a-fake-url-for-fail" ts.CmdArgs = []string{"k6", "run", "--out", "experimental-prometheus-rw", "-"} - ts.Stdin = bytes.NewBufferString(` - import exec from 'k6/execution'; - export default function () {}; - `) + ts.Stdin = bytes.NewBufferString(`export default function () {};`) cmd.ExecuteWithGlobalState(ts.GlobalState) ts.OutMutex.Lock() @@ -1881,3 +1879,26 @@ func TestRunStaticArchives(t *testing.T) { }) } } + +func TestBadLogOutput(t *testing.T) { + t.Parallel() + + cases := map[string]string{ + "NotExist": "badout", + "FileBadConfig": "file=,levels=bad", + "LokiBadConfig": "loki=,levels=bad", + } + + for name, tc := range cases { + name := name + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + ts := NewGlobalTestState(t) + ts.CmdArgs = []string{"k6", "run", "--log-output", tc, "-"} + ts.Stdin = bytes.NewBufferString(`export default function () {};`) + ts.ExpectedExitCode = -1 + cmd.ExecuteWithGlobalState(ts.GlobalState) + }) + } +} diff --git a/cmd/tests/tests.go b/cmd/tests/tests.go index 3d3ca9edd069..44bbaae6e3c3 100644 --- a/cmd/tests/tests.go +++ b/cmd/tests/tests.go @@ -54,10 +54,7 @@ func Main(m *testing.M) { }() defer func() { - // TODO: figure out why logrus' `Entry.WriterLevel` goroutine sticks - // around and remove this exception. - opt := goleak.IgnoreTopFunction("io.(*pipe).read") - if err := goleak.Find(opt); err != nil { + if err := goleak.Find(); err != nil { fmt.Println(err) exitCode = 3 } diff --git a/cmd/tests/tests_test.go b/cmd/tests/tests_test.go index efa11ef661d0..c52ec80cd284 100644 --- a/cmd/tests/tests_test.go +++ b/cmd/tests/tests_test.go @@ -3,8 +3,33 @@ package tests import ( "testing" + + "github.com/stretchr/testify/assert" + "go.k6.io/k6/cmd" ) func TestMain(m *testing.M) { Main(m) } + +func TestRootCommand(t *testing.T) { + t.Parallel() + + cases := map[string][]string{ + "Just root": {"k6"}, + "Help flag": {"k6", "--help"}, + } + + helptxt := "Usage:\n k6 [command]\n\nAvailable Commands" + for name, args := range cases { + name, args := name, args + t.Run(name, func(t *testing.T) { + t.Parallel() + ts := NewGlobalTestState(t) + ts.CmdArgs = args + cmd.ExecuteWithGlobalState(ts.GlobalState) + assert.Len(t, ts.LoggerHook.Drain(), 0) + assert.Contains(t, ts.Stdout.String(), helptxt) + }) + } +} diff --git a/js/modules/k6/ws/ws_test.go b/js/modules/k6/ws/ws_test.go index bc8d73a683f6..0f667e595075 100644 --- a/js/modules/k6/ws/ws_test.go +++ b/js/modules/k6/ws/ws_test.go @@ -262,17 +262,16 @@ func TestSessionTimeout(t *testing.T) { test := newTestState(t) _, err := test.VU.Runtime().RunString(sr(` var start = new Date().getTime(); - var ellapsed = new Date().getTime() - start; + var elapsed = new Date().getTime() - start; var res = ws.connect("WSBIN_URL/ws-echo", function(socket){ socket.setTimeout(function () { - ellapsed = new Date().getTime() - start; + elapsed = new Date().getTime() - start; socket.close(); }, 500); }); - if (ellapsed > 3000 || ellapsed < 500) { - throw new Error ("setTimeout occurred after " + ellapsed + "ms, expected 500 3000 || elapsed < 500) { + throw new Error ("setTimeout occurred after " + elapsed + "ms, expected 500 0) { + if h.limit < 1 { return fmt.Errorf("loki limit needs to be a positive number, is %d", h.limit) } case "msgMaxSize": @@ -120,7 +110,7 @@ func (h *lokiHook) parseArgs(line string) error { if err != nil { return fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err) } - if !(h.msgMaxSize > 0) { + if h.msgMaxSize < 1 { return fmt.Errorf("loki msgMaxSize needs to be a positive number, is %d", h.msgMaxSize) } case "level": @@ -145,10 +135,13 @@ func (h *lokiHook) parseArgs(line string) error { return nil } -// fill one of two equally sized slices with entries and then push it while filling the other one +// Listen fills one of two equally sized slices +// with entries and then push it while filling the other one. +// // TODO benchmark this +// //nolint:funlen -func (h *lokiHook) loop() { +func (h *lokiHook) Listen(ctx context.Context) { var ( msgs = make([]tmpMsg, h.limit) msgsToPush = make([]tmpMsg, h.limit) @@ -162,8 +155,6 @@ func (h *lokiHook) loop() { defer close(pushCh) go func() { - defer close(h.lokiStopped) - oldLogs := make([]tmpMsg, 0, h.limit*2) for ch := range pushCh { msgsToPush, msgs = msgs, msgsToPush @@ -254,12 +245,11 @@ func (h *lokiHook) loop() { pushCh <- ch ch <- t.Add(-(h.pushPeriod / 2)).UnixNano() <-ch - case <-h.ctx.Done(): + case <-ctx.Done(): ch := make(chan int64) pushCh <- ch ch <- time.Now().Add(time.Second).UnixNano() <-ch - return } } @@ -307,7 +297,7 @@ func sortAndSplitMsgs(msgs []tmpMsg, cutOff int64) int { }) cutOffIndex := sort.Search(len(msgs), func(i int) bool { - return !(msgs[i].t < cutOff) + return msgs[i].t >= cutOff }) return cutOffIndex @@ -398,31 +388,33 @@ type tmpMsg struct { msg string } +// Fire implements logrus.Hook. func (h *lokiHook) Fire(entry *logrus.Entry) error { h.ch <- entry return nil } +// Levels implements logrus.Hook. func (h *lokiHook) Levels() []logrus.Level { return h.levels } /* -{ - "streams": [ - { - "stream": { - "label1": "value1" - "label2": "value2" - }, - "values": [ // the nanoseconds need to be in order - [ "", "" ], - [ "", "" ] - ] - } - ] -} + { + "streams": [ + { + "stream": { + "label1": "value1" + "label2": "value2" + }, + "values": [ // the nanoseconds need to be in order + [ "", "" ], + [ "", "" ] + ] + } + ] + } */ type lokiPushMessage struct { Streams []*stream `json:"streams"` diff --git a/log/loki_test.go b/log/loki_test.go index 8cfa8f582c12..050cc10559dc 100644 --- a/log/loki_test.go +++ b/log/loki_test.go @@ -1,7 +1,6 @@ package log import ( - "context" "encoding/json" "fmt" "testing" @@ -22,7 +21,6 @@ func TestSyslogFromConfigLine(t *testing.T) { { line: "loki", // default settings res: lokiHook{ - ctx: context.Background(), addr: "http://127.0.0.1:3100/loki/api/v1/push", limit: 100, pushPeriod: time.Second * 1, @@ -35,7 +33,6 @@ func TestSyslogFromConfigLine(t *testing.T) { { line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=info,allowedLabels=[something],pushPeriod=5m32s,msgMaxSize=1231", res: lokiHook{ - ctx: context.Background(), addr: "somewhere:1233", limit: 32, pushPeriod: time.Minute*5 + time.Second*32, @@ -74,16 +71,15 @@ func TestSyslogFromConfigLine(t *testing.T) { t.Run(test.line, func(t *testing.T) { // no parallel because this is way too fast and parallel will only slow it down - res, err := LokiFromConfigLine(context.Background(), nil, test.line, make(chan struct{})) - + res, err := LokiFromConfigLine(nil, test.line) if test.err { require.Error(t, err) return } require.NoError(t, err) + test.res.client = res.(*lokiHook).client test.res.ch = res.(*lokiHook).ch - test.res.lokiStopped = res.(*lokiHook).lokiStopped require.Equal(t, &test.res, res) }) }