From 0f431d9e2219245b49c4da1b78d89d2b904ea206 Mon Sep 17 00:00:00 2001 From: na-- Date: Fri, 26 Feb 2021 15:21:30 +0200 Subject: [PATCH] Use a new interface for Outputs and adapt old Collectors to it (#1869) This also should allow k6 to support xk6 extensions for outputs. So far the JSON output is the only one that has been fully transitioned to the new output.Output interface, the rest will be done in future PRs. --- api/server_test.go | 2 +- api/v1/group_routes_test.go | 2 +- api/v1/metric_routes_test.go | 4 +- api/v1/setup_teardown_routes_test.go | 2 +- api/v1/status_routes_test.go | 4 +- cloudapi/config.go | 30 ++- cmd/cloud.go | 12 +- cmd/collectors.go | 175 ---------------- cmd/config.go | 46 +---- cmd/convert.go | 18 +- cmd/login_cloud.go | 29 ++- cmd/login_influxdb.go | 16 +- cmd/outputs.go | 273 +++++++++++++++++++++++++ cmd/run.go | 40 ++-- cmd/ui.go | 33 ++- core/engine.go | 70 +++++-- core/engine_test.go | 76 ++++--- js/runner_test.go | 13 +- lib/collector.go | 4 + lib/testutils/mockoutput/mockoutput.go | 83 ++++++++ output/extensions.go | 55 +++++ output/helpers.go | 125 +++++++++++ output/helpers_test.go | 183 +++++++++++++++++ output/json/json.go | 153 ++++++++++++++ output/json/json_test.go | 187 +++++++++++++++++ {stats => output}/json/wrapper.go | 22 +- output/types.go | 90 ++++++++ stats/cloud/bench_test.go | 15 +- stats/cloud/collector.go | 5 +- stats/cloud/collector_test.go | 47 +---- stats/csv/config.go | 32 +++ stats/datadog/collector.go | 24 +++ stats/dummy/collector.go | 78 ------- stats/dummy/collector_test.go | 52 ----- stats/influxdb/collector_test.go | 10 +- stats/influxdb/config.go | 45 +++- stats/influxdb/util_test.go | 8 +- stats/json/collector.go | 178 ---------------- stats/json/collector_test.go | 54 ----- stats/json/wrapper_test.go | 48 ----- stats/kafka/collector.go | 4 +- stats/kafka/config.go | 32 +++ stats/statsd/collector.go | 24 +++ 43 files changed, 1570 insertions(+), 833 deletions(-) delete mode 100644 cmd/collectors.go create mode 100644 cmd/outputs.go create mode 100644 lib/testutils/mockoutput/mockoutput.go create mode 100644 output/extensions.go create mode 100644 output/helpers.go create mode 100644 output/helpers_test.go create mode 100644 output/json/json.go create mode 100644 output/json/json_test.go rename {stats => output}/json/wrapper.go (72%) create mode 100644 output/types.go delete mode 100644 stats/dummy/collector.go delete mode 100644 stats/dummy/collector_test.go delete mode 100644 stats/json/collector.go delete mode 100644 stats/json/collector_test.go delete mode 100644 stats/json/wrapper_test.go diff --git a/api/server_test.go b/api/server_test.go index cafec68d9e8..d269a2b957d 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -82,7 +82,7 @@ func TestWithEngine(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) rw := httptest.NewRecorder() diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index e2a30db52cf..e74539e0585 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -51,7 +51,7 @@ func TestGetGroups(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) t.Run("list", func(t *testing.T) { diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 423a5eb5c5e..b5a21d49d4f 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) engine.Metrics = map[string]*stats.Metric{ @@ -88,7 +88,7 @@ func TestGetMetric(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) engine.Metrics = map[string]*stats.Metric{ diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index bb9c5ad083e..a4b0214d814 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -155,7 +155,7 @@ func TestSetupData(t *testing.T) { }) execScheduler, err := local.NewExecutionScheduler(runner, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 1378f97c5a9..e102d3397b5 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -47,7 +47,7 @@ func TestGetStatus(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) rw := httptest.NewRecorder() @@ -101,7 +101,7 @@ func TestPatchStatus(t *testing.T) { t.Run(name, func(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/cloudapi/config.go b/cloudapi/config.go index be5bdd0fd79..e113ff59645 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -26,6 +26,7 @@ import ( "gopkg.in/guregu/null.v3" + "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib/types" ) @@ -241,7 +242,8 @@ func (c Config) Apply(cfg Config) Config { return c } -// MergeFromExternal merges three fields from json in a loadimpact key of the provided external map +// MergeFromExternal merges three fields from the JSON in a loadimpact key of +// the provided external map. Used for options.ext.loadimpact settings. func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error { if val, ok := external["loadimpact"]; ok { // TODO: Important! Separate configs and fix the whole 2 configs mess! @@ -262,3 +264,29 @@ func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error } return nil } + +// GetConsolidatedConfig combines the default config values with the JSON config +// values and environment variables and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, configArg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if configArg != "" { + result.Name = null.StringFrom(configArg) + } + + return result, nil +} diff --git a/cmd/cloud.go b/cmd/cloud.go index 939b583c839..680168a746e 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -33,7 +33,6 @@ import ( "syscall" "time" - "github.com/kelseyhightower/envconfig" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/afero" @@ -105,7 +104,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth return err } - runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ())) + osEnvironment := buildEnvMap(os.Environ()) + runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment) if err != nil { return err } @@ -141,8 +141,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth } // Cloud config - cloudConfig := cloudapi.NewConfig().Apply(derivedConf.Collectors.Cloud) - if err = envconfig.Process("", &cloudConfig); err != nil { + cloudConfig, err := cloudapi.GetConsolidatedConfig(derivedConf.Collectors["cloud"], osEnvironment, "") + if err != nil { return err } if !cloudConfig.Token.Valid { @@ -153,8 +153,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth arc := r.MakeArchive() // TODO: Fix this // We reuse cloud.Config for parsing options.ext.loadimpact, but this probably shouldn't be - // done as the idea of options.ext is that they are extensible without touching k6. But in - // order for this to happen we shouldn't actually marshall cloud.Config on top of it because + // done, as the idea of options.ext is that they are extensible without touching k6. But in + // order for this to happen, we shouldn't actually marshall cloud.Config on top of it, because // it will be missing some fields that aren't actually mentioned in the struct. // So in order for use to copy the fields that we need for loadimpact's api we unmarshal in // map[string]interface{} and copy what we need if it isn't set already diff --git a/cmd/collectors.go b/cmd/collectors.go deleted file mode 100644 index 158048e2794..00000000000 --- a/cmd/collectors.go +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package cmd - -import ( - "fmt" - "strings" - - "gopkg.in/guregu/null.v3" - - "github.com/kelseyhightower/envconfig" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - - "github.com/loadimpact/k6/cloudapi" - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/lib/consts" - "github.com/loadimpact/k6/loader" - "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/cloud" - "github.com/loadimpact/k6/stats/csv" - "github.com/loadimpact/k6/stats/datadog" - "github.com/loadimpact/k6/stats/influxdb" - jsonc "github.com/loadimpact/k6/stats/json" - "github.com/loadimpact/k6/stats/kafka" - "github.com/loadimpact/k6/stats/statsd" -) - -const ( - collectorInfluxDB = "influxdb" - collectorJSON = "json" - collectorKafka = "kafka" - collectorCloud = "cloud" - collectorStatsD = "statsd" - collectorDatadog = "datadog" - collectorCSV = "csv" -) - -func parseCollector(s string) (t, arg string) { - parts := strings.SplitN(s, "=", 2) - switch len(parts) { - case 0: - return "", "" - case 1: - return parts[0], "" - default: - return parts[0], parts[1] - } -} - -// TODO: totally refactor this... -func getCollector( - logger logrus.FieldLogger, - collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep, -) (lib.Collector, error) { - switch collectorName { - case collectorJSON: - return jsonc.New(logger, afero.NewOsFs(), arg) - case collectorInfluxDB: - config := influxdb.NewConfig().Apply(conf.Collectors.InfluxDB) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - urlConfig, err := influxdb.ParseURL(arg) - if err != nil { - return nil, err - } - config = config.Apply(urlConfig) - - return influxdb.New(logger, config) - case collectorCloud: - config := cloudapi.NewConfig().Apply(conf.Collectors.Cloud) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - config.Name = null.StringFrom(arg) - } - - return cloud.New(logger, config, src, conf.Options, executionPlan, consts.Version) - case collectorKafka: - config := kafka.NewConfig().Apply(conf.Collectors.Kafka) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - cmdConfig, err := kafka.ParseArg(arg) - if err != nil { - return nil, err - } - config = config.Apply(cmdConfig) - } - - return kafka.New(logger, config) - case collectorStatsD: - config := statsd.NewConfig().Apply(conf.Collectors.StatsD) - if err := envconfig.Process("k6_statsd", &config); err != nil { - return nil, err - } - - return statsd.New(logger, config) - case collectorDatadog: - config := datadog.NewConfig().Apply(conf.Collectors.Datadog) - if err := envconfig.Process("k6_datadog", &config); err != nil { - return nil, err - } - - return datadog.New(logger, config) - case collectorCSV: - config := csv.NewConfig().Apply(conf.Collectors.CSV) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - cmdConfig, err := csv.ParseArg(arg) - if err != nil { - return nil, err - } - - config = config.Apply(cmdConfig) - } - - return csv.New(logger, afero.NewOsFs(), conf.SystemTags.Map(), config) - - default: - return nil, errors.Errorf("unknown output type: %s", collectorName) - } -} - -func newCollector( - logger logrus.FieldLogger, - collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep, -) (lib.Collector, error) { - collector, err := getCollector(logger, collectorName, arg, src, conf, executionPlan) - if err != nil { - return collector, err - } - - // Check if all required tags are present - missingRequiredTags := []string{} - requiredTags := collector.GetRequiredSystemTags() - for _, tag := range stats.SystemTagSetValues() { - if requiredTags.Has(tag) && !conf.SystemTags.Has(tag) { - missingRequiredTags = append(missingRequiredTags, tag.String()) - } - } - if len(missingRequiredTags) > 0 { - return collector, fmt.Errorf( - "the specified collector '%s' needs the following system tags enabled: %s", - collectorName, - strings.Join(missingRequiredTags, ", "), - ) - } - - return collector, nil -} diff --git a/cmd/config.go b/cmd/config.go index 23035befcd0..8ad10b75faf 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -33,16 +33,10 @@ import ( "github.com/spf13/pflag" "gopkg.in/guregu/null.v3" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/executor" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/csv" - "github.com/loadimpact/k6/stats/datadog" - "github.com/loadimpact/k6/stats/influxdb" - "github.com/loadimpact/k6/stats/kafka" - "github.com/loadimpact/k6/stats/statsd" ) // configFlagSet returns a FlagSet with the default run configuration flags. @@ -62,14 +56,8 @@ type Config struct { Linger null.Bool `json:"linger" envconfig:"K6_LINGER"` NoUsageReport null.Bool `json:"noUsageReport" envconfig:"K6_NO_USAGE_REPORT"` - Collectors struct { - InfluxDB influxdb.Config `json:"influxdb"` - Kafka kafka.Config `json:"kafka"` - Cloud cloudapi.Config `json:"cloud"` - StatsD statsd.Config `json:"statsd"` - Datadog datadog.Config `json:"datadog"` - CSV csv.Config `json:"csv"` - } `json:"collectors"` + // TODO: deprecate + Collectors map[string]json.RawMessage `json:"collectors"` } // Validate checks if all of the specified options make sense @@ -92,12 +80,6 @@ func (c Config) Apply(cfg Config) Config { if cfg.NoUsageReport.Valid { c.NoUsageReport = cfg.NoUsageReport } - c.Collectors.InfluxDB = c.Collectors.InfluxDB.Apply(cfg.Collectors.InfluxDB) - c.Collectors.Cloud = c.Collectors.Cloud.Apply(cfg.Collectors.Cloud) - c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka) - c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD) - c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog) - c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV) return c } @@ -167,19 +149,11 @@ func writeDiskConfig(fs afero.Fs, configPath string, conf Config) error { } // Reads configuration variables from the environment. -func readEnvConfig() (conf Config, err error) { - // TODO: replace envconfig and refactor the whole configuration from the groun up :/ - for _, err := range []error{ - envconfig.Process("", &conf), - envconfig.Process("", &conf.Collectors.Cloud), - envconfig.Process("", &conf.Collectors.InfluxDB), - envconfig.Process("", &conf.Collectors.Kafka), - envconfig.Process("k6_statsd", &conf.Collectors.StatsD), - envconfig.Process("k6_datadog", &conf.Collectors.Datadog), - } { - return conf, err - } - return conf, nil +func readEnvConfig() (Config, error) { + // TODO: replace envconfig and refactor the whole configuration from the ground up :/ + conf := Config{} + err := envconfig.Process("", &conf) + return conf, err } // Assemble the final consolidated configuration from all of the different sources: @@ -192,12 +166,6 @@ func readEnvConfig() (conf Config, err error) { // TODO: add better validation, more explicit default values and improve consistency between formats // TODO: accumulate all errors and differentiate between the layers? func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf Config, err error) { - cliConf.Collectors.InfluxDB = influxdb.NewConfig().Apply(cliConf.Collectors.InfluxDB) - cliConf.Collectors.Cloud = cloudapi.NewConfig().Apply(cliConf.Collectors.Cloud) - cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka) - cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD) - cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog) - fileConf, _, err := readDiskConfig(fs) if err != nil { return conf, err diff --git a/cmd/convert.go b/cmd/convert.go index 39350ccbc65..ececf3ceb07 100644 --- a/cmd/convert.go +++ b/cmd/convert.go @@ -33,8 +33,10 @@ import ( "github.com/loadimpact/k6/lib" ) +// TODO: fix this... or remove k6 convert +//nolint: gochecknoglobals var ( - output string + convertOutput string optionsFilePath string minSleep uint maxSleep uint @@ -107,12 +109,12 @@ func getConvertCmd() *cobra.Command { } // Write script content to stdout or file - if output == "" || output == "-" { //nolint:nestif + if convertOutput == "" || convertOutput == "-" { //nolint:nestif if _, err := io.WriteString(defaultWriter, script); err != nil { return err } } else { - f, err := defaultFs.Create(output) + f, err := defaultFs.Create(convertOutput) if err != nil { return err } @@ -131,8 +133,14 @@ func getConvertCmd() *cobra.Command { } convertCmd.Flags().SortFlags = false - convertCmd.Flags().StringVarP(&output, "output", "O", output, "k6 script output filename (stdout by default)") - convertCmd.Flags().StringVarP(&optionsFilePath, "options", "", output, "path to a JSON file with options that would be injected in the output script") + convertCmd.Flags().StringVarP( + &convertOutput, "output", "O", convertOutput, + "k6 script output filename (stdout by default)", + ) + convertCmd.Flags().StringVarP( + &optionsFilePath, "options", "", optionsFilePath, + "path to a JSON file with options that would be injected in the output script", + ) convertCmd.Flags().StringSliceVarP(&only, "only", "", []string{}, "include only requests from the given domains") convertCmd.Flags().StringSliceVarP(&skip, "skip", "", []string{}, "skip requests from the given domains") convertCmd.Flags().UintVarP(&threshold, "batch-threshold", "", 500, "batch request idle time threshold (see example)") diff --git a/cmd/login_cloud.go b/cmd/login_cloud.go index 1e7a0aca7aa..a22e81e40d7 100644 --- a/cmd/login_cloud.go +++ b/cmd/login_cloud.go @@ -21,6 +21,7 @@ package cmd import ( + "encoding/json" "os" "syscall" @@ -58,22 +59,33 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, RunE: func(cmd *cobra.Command, args []string) error { fs := afero.NewOsFs() - k6Conf, err := getConsolidatedConfig(fs, Config{}, nil) + currentDiskConf, configPath, err := readDiskConfig(fs) if err != nil { return err } - currentDiskConf, configPath, err := readDiskConfig(fs) + currentJSONConfig := cloudapi.Config{} + currentJSONConfigRaw := currentDiskConf.Collectors["cloud"] + if currentJSONConfigRaw != nil { + // We only want to modify this config, see comment below + if jsonerr := json.Unmarshal(currentJSONConfigRaw, ¤tJSONConfig); jsonerr != nil { + return jsonerr + } + } + + // We want to use this fully consolidated config for things like + // host addresses, so users can overwrite them with env vars. + consolidatedCurrentConfig, err := cloudapi.GetConsolidatedConfig(currentJSONConfigRaw, buildEnvMap(os.Environ()), "") if err != nil { return err } + // But we don't want to save them back to the JSON file, we only + // want to save what already existed there and the login details. + newCloudConf := currentJSONConfig show := getNullBool(cmd.Flags(), "show") reset := getNullBool(cmd.Flags(), "reset") token := getNullString(cmd.Flags(), "token") - - newCloudConf := cloudapi.NewConfig().Apply(currentDiskConf.Collectors.Cloud) - switch { case reset.Valid: newCloudConf.Token = null.StringFromPtr(nil) @@ -104,7 +116,7 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, email := vals["Email"].(string) password := vals["Password"].(string) - client := cloudapi.NewClient(logger, "", k6Conf.Collectors.Cloud.Host.String, consts.Version) + client := cloudapi.NewClient(logger, "", consolidatedCurrentConfig.Host.String, consts.Version) res, err := client.Login(email, password) if err != nil { return err @@ -117,7 +129,10 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, newCloudConf.Token = null.StringFrom(res.Token) } - currentDiskConf.Collectors.Cloud = newCloudConf + currentDiskConf.Collectors["cloud"], err = json.Marshal(newCloudConf) + if err != nil { + return err + } if err := writeDiskConfig(fs, configPath, currentDiskConf); err != nil { return err } diff --git a/cmd/login_influxdb.go b/cmd/login_influxdb.go index 9e85bcf8ffd..264d5083b33 100644 --- a/cmd/login_influxdb.go +++ b/cmd/login_influxdb.go @@ -21,6 +21,7 @@ package cmd import ( + "encoding/json" "os" "syscall" "time" @@ -52,7 +53,15 @@ This will set the default server used when just "-o influxdb" is passed.`, return err } - conf := influxdb.NewConfig().Apply(config.Collectors.InfluxDB) + conf := influxdb.NewConfig() + jsonConf := config.Collectors["influxdb"] + if jsonConf != nil { + jsonConfParsed, jsonerr := influxdb.ParseJSON(jsonConf) + if jsonerr != nil { + return jsonerr + } + conf = conf.Apply(jsonConfParsed) + } if len(args) > 0 { urlConf, err := influxdb.ParseURL(args[0]) //nolint:govet if err != nil { @@ -112,7 +121,10 @@ This will set the default server used when just "-o influxdb" is passed.`, return err } - config.Collectors.InfluxDB = conf + config.Collectors["influxdb"], err = json.Marshal(conf) + if err != nil { + return err + } return writeDiskConfig(fs, configPath, config) }, } diff --git a/cmd/outputs.go b/cmd/outputs.go new file mode 100644 index 00000000000..02fb2ed9de6 --- /dev/null +++ b/cmd/outputs.go @@ -0,0 +1,273 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package cmd + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/afero" + + "github.com/loadimpact/k6/cloudapi" + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/consts" + "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/output/json" + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/csv" + "github.com/loadimpact/k6/stats/datadog" + "github.com/loadimpact/k6/stats/influxdb" + "github.com/loadimpact/k6/stats/kafka" + "github.com/loadimpact/k6/stats/statsd" +) + +// TODO: move this to an output sub-module after we get rid of the old collectors? +//nolint: funlen +func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) { + // Start with the built-in outputs + result := map[string]func(output.Params) (output.Output, error){ + "json": json.New, + + // TODO: remove all of these + "influxdb": func(params output.Params) (output.Output, error) { + conf, err := influxdb.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + influxc, err := influxdb.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, influxc) + }, + "cloud": func(params output.Params) (output.Output, error) { + conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + cloudc, err := cloud.New( + params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version, + ) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, cloudc) + }, + "kafka": func(params output.Params) (output.Output, error) { + conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + kafkac, err := kafka.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, kafkac) + }, + "statsd": func(params output.Params) (output.Output, error) { + conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment) + if err != nil { + return nil, err + } + statsdc, err := statsd.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, statsdc) + }, + "datadog": func(params output.Params) (output.Output, error) { + conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment) + if err != nil { + return nil, err + } + datadogc, err := datadog.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, datadogc) + }, + "csv": func(params output.Params) (output.Output, error) { + conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + csvc, err := csv.New(params.Logger, params.FS, params.ScriptOptions.SystemTags.Map(), conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, csvc) + }, + } + + exts := output.GetExtensions() + for k, v := range exts { + if _, ok := result[k]; ok { + return nil, fmt.Errorf("invalid output extension %s, built-in output with the same type already exists", k) + } + result[k] = v + } + + return result, nil +} + +func getPossibleIDList(constrs map[string]func(output.Params) (output.Output, error)) string { + res := make([]string, 0, len(constrs)) + for k := range constrs { + res = append(res, k) + } + sort.Strings(res) + return strings.Join(res, ", ") +} + +func createOutputs( + outputFullArguments []string, src *loader.SourceData, conf Config, rtOpts lib.RuntimeOptions, + executionPlan []lib.ExecutionStep, osEnvironment map[string]string, logger logrus.FieldLogger, +) ([]output.Output, error) { + outputConstructors, err := getAllOutputConstructors() + if err != nil { + return nil, err + } + baseParams := output.Params{ + ScriptPath: src.URL, + Logger: logger, + Environment: osEnvironment, + StdOut: stdout, + StdErr: stderr, + FS: afero.NewOsFs(), + ScriptOptions: conf.Options, + RuntimeOptions: rtOpts, + ExecutionPlan: executionPlan, + } + result := make([]output.Output, 0, len(outputFullArguments)) + + for _, outputFullArg := range outputFullArguments { + outputType, outputArg := parseOutputArgument(outputFullArg) + outputConstructor, ok := outputConstructors[outputType] + if !ok { + return nil, fmt.Errorf( + "invalid output type '%s', available types are: %s", + outputType, getPossibleIDList(outputConstructors), + ) + } + + params := baseParams + params.OutputType = outputType + params.ConfigArgument = outputArg + params.JSONConfig = conf.Collectors[outputType] + + output, err := outputConstructor(params) + if err != nil { + return nil, fmt.Errorf("could not create the '%s' output: %w", outputType, err) + } + result = append(result, output) + } + + return result, nil +} + +func parseOutputArgument(s string) (t, arg string) { + parts := strings.SplitN(s, "=", 2) + switch len(parts) { + case 0: + return "", "" + case 1: + return parts[0], "" + default: + return parts[0], parts[1] + } +} + +// TODO: remove this after we transition every collector to the output interface + +func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) { + // Check if all required tags are present + missingRequiredTags := []string{} + requiredTags := collector.GetRequiredSystemTags() + for _, tag := range stats.SystemTagSetValues() { + if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return nil, fmt.Errorf( + "the specified output '%s' needs the following system tags enabled: %s", + params.OutputType, strings.Join(missingRequiredTags, ", "), + ) + } + + return &collectorAdapter{ + outputType: params.OutputType, + collector: collector, + stopCh: make(chan struct{}), + }, nil +} + +// collectorAdapter is a _temporary_ fix until we move all of the old +// "collectors" to the new output interface +type collectorAdapter struct { + collector lib.Collector + outputType string + runCtx context.Context + runCtxCancel func() + stopCh chan struct{} +} + +func (ca *collectorAdapter) Description() string { + link := ca.collector.Link() + if link != "" { + return fmt.Sprintf("%s (%s)", ca.outputType, link) + } + return ca.outputType +} + +func (ca *collectorAdapter) Start() error { + if err := ca.collector.Init(); err != nil { + return err + } + ca.runCtx, ca.runCtxCancel = context.WithCancel(context.Background()) + go func() { + ca.collector.Run(ca.runCtx) + close(ca.stopCh) + }() + return nil +} + +func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) { + ca.collector.Collect(samples) +} + +func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) { + ca.collector.SetRunStatus(latestStatus) +} + +// Stop implements the new output interface. +func (ca *collectorAdapter) Stop() error { + ca.runCtxCancel() + <-ca.stopCh + return nil +} + +var _ output.WithRunStatusUpdates = &collectorAdapter{} diff --git a/cmd/run.go b/cmd/run.go index 81f05cce4ae..708799e6767 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -116,7 +116,8 @@ a commandline interface for interacting with it.`, return err } - runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ())) + osEnvironment := buildEnvMap(os.Environ()) + runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment) if err != nil { return err } @@ -171,8 +172,6 @@ a commandline interface for interacting with it.`, return err } - executionState := execScheduler.GetState() - // This is manually triggered after the Engine's Run() has completed, // and things like a single Ctrl+C don't affect it. We use it to make // sure that the progressbars finish updating with the latest execution @@ -191,26 +190,18 @@ a commandline interface for interacting with it.`, progressBarWG.Done() }() - // Create an engine. - initBar.Modify(pb.WithConstProgress(0, "Init engine")) - engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, logger) + // Create all outputs. + executionPlan := execScheduler.GetExecutionPlan() + outputs, err := createOutputs(conf.Out, src, conf, runtimeOptions, executionPlan, osEnvironment, logger) if err != nil { return err } - executionPlan := execScheduler.GetExecutionPlan() - // Create a collector and assign it to the engine if requested. - initBar.Modify(pb.WithConstProgress(0, "Init metric outputs")) - for _, out := range conf.Out { - t, arg := parseCollector(out) - collector, cerr := newCollector(logger, t, arg, src, conf, executionPlan) - if cerr != nil { - return cerr - } - if cerr = collector.Init(); cerr != nil { - return cerr - } - engine.Collectors = append(engine.Collectors, collector) + // Create the engine. + initBar.Modify(pb.WithConstProgress(0, "Init engine")) + engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, outputs, logger) + if err != nil { + return err } // Spin up the REST API server, if not disabled. @@ -230,9 +221,17 @@ a commandline interface for interacting with it.`, }() } + // We do this here so we can get any output URLs below. + initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) + err = engine.StartOutputs() + if err != nil { + return err + } + defer engine.StopOutputs() + printExecutionDescription( "local", filename, "", conf, execScheduler.GetState().ExecutionTuple, - executionPlan, engine.Collectors) + executionPlan, outputs) // Trap Interrupts, SIGINTs and SIGTERMs. sigC := make(chan os.Signal, 1) @@ -286,6 +285,7 @@ a commandline interface for interacting with it.`, progressCancel() progressBarWG.Wait() + executionState := execScheduler.GetState() // Warn if no iterations could be completed. if executionState.GetFullIterationCount() == 0 { logger.Warn("No script iterations finished, consider making the test duration longer") diff --git a/cmd/ui.go b/cmd/ui.go index 6021145d959..3d00c8eed65 100644 --- a/cmd/ui.go +++ b/cmd/ui.go @@ -36,6 +36,7 @@ import ( "golang.org/x/crypto/ssh/terminal" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/ui" "github.com/loadimpact/k6/ui/pb" ) @@ -111,31 +112,25 @@ func modifyAndPrintBar(bar *pb.ProgressBar, options ...pb.ProgressBarOption) { // Print execution description for both cloud and local execution. // TODO: Clean this up as part of #1499 or #1427 func printExecutionDescription( - execution, filename, output string, conf Config, et *lib.ExecutionTuple, - execPlan []lib.ExecutionStep, collectors []lib.Collector, + execution, filename, outputOverride string, conf Config, et *lib.ExecutionTuple, + execPlan []lib.ExecutionStep, outputs []output.Output, ) { fprintf(stdout, " execution: %s\n", ui.ValueColor.Sprint(execution)) fprintf(stdout, " script: %s\n", ui.ValueColor.Sprint(filename)) - if execution == "local" { - out := "-" - link := "" - - for idx, collector := range collectors { - if out != "-" { - out = out + "; " + conf.Out[idx] - } else { - out = conf.Out[idx] - } - - if l := collector.Link(); l != "" { - link = link + " (" + l + ")" - } + var outputDescriptions []string + switch { + case outputOverride != "": + outputDescriptions = []string{outputOverride} + case len(outputs) == 0: + outputDescriptions = []string{"-"} + default: + for _, out := range outputs { + outputDescriptions = append(outputDescriptions, out.Description()) } - fprintf(stdout, " output: %s%s\n", ui.ValueColor.Sprint(out), ui.ExtraColor.Sprint(link)) - } else { - fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(output)) } + + fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(strings.Join(outputDescriptions, ", "))) fprintf(stdout, "\n") maxDuration, _ := lib.GetEndOffset(execPlan) diff --git a/core/engine.go b/core/engine.go index 9953e018d7b..daf5bed3505 100644 --- a/core/engine.go +++ b/core/engine.go @@ -32,6 +32,7 @@ import ( "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/metrics" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) @@ -56,7 +57,7 @@ type Engine struct { Options lib.Options runtimeOptions lib.RuntimeOptions - Collectors []lib.Collector + outputs []output.Output logger *logrus.Entry stopOnce sync.Once @@ -77,7 +78,7 @@ type Engine struct { // NewEngine instantiates a new Engine, without doing any heavy initialization. func NewEngine( - ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, logger *logrus.Logger, + ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, outputs []output.Output, logger *logrus.Logger, ) (*Engine, error) { if ex == nil { return nil, errors.New("missing ExecutionScheduler instance") @@ -89,6 +90,7 @@ func NewEngine( Options: opts, runtimeOptions: rtOpts, + outputs: outputs, Metrics: make(map[string]*stats.Metric), Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64), stopChan: make(chan struct{}), @@ -109,10 +111,47 @@ func NewEngine( return e, nil } +// StartOutputs spins up all configured outputs, giving the thresholds to any +// that can accept them. And if some output fails, stop the already started +// ones. This may take some time, since some outputs make initial network +// requests to set up whatever remote services are going to listen to them. +// +// TODO: this doesn't really need to be in the Engine, so take it out? +func (e *Engine) StartOutputs() error { + e.logger.Debugf("Starting %d outputs...", len(e.outputs)) + for i, out := range e.outputs { + if thresholdOut, ok := out.(output.WithThresholds); ok { + thresholdOut.SetThresholds(e.thresholds) + } + + if err := out.Start(); err != nil { + e.stopOutputs(i) + return err + } + } + return nil +} + +// StopOutputs stops all configured outputs. +func (e *Engine) StopOutputs() { + e.stopOutputs(len(e.outputs)) +} + +func (e *Engine) stopOutputs(upToID int) { + e.logger.Debugf("Stopping %d outputs...", upToID) + for i := 0; i < upToID; i++ { + if err := e.outputs[i].Stop(); err != nil { + e.logger.WithError(err).Errorf("Stopping output %d failed", i) + } + } +} + // Init is used to initialize the execution scheduler and all metrics processing // in the engine. The first is a costly operation, since it initializes all of -// the planned VUs and could potentially take a long time. It either returns an -// error immediately, or it returns test run() and wait() functions. +// the planned VUs and could potentially take a long time. +// +// This method either returns an error immediately, or it returns test run() and +// wait() functions. // // Things to note: // - The first lambda, Run(), synchronously executes the actual load test. @@ -130,7 +169,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait } // TODO: move all of this in a separate struct? see main TODO above - runSubCtx, runSubCancel := context.WithCancel(runCtx) resultCh := make(chan error) @@ -154,6 +192,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait return err } + waitFn := e.startBackgroundProcesses(globalCtx, runCtx, resultCh, runSubCancel, processMetricsAfterRun) return runFn, waitFn, nil } @@ -162,20 +201,11 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // test run status when it ends. It returns a function that can be used after // the provided context is called, to wait for the complete winding down of all // started goroutines. -func (e *Engine) startBackgroundProcesses( //nolint:funlen +func (e *Engine) startBackgroundProcesses( globalCtx, runCtx context.Context, runResult <-chan error, runSubCancel func(), processMetricsAfterRun chan struct{}, ) (wait func()) { processes := new(sync.WaitGroup) - // Spin up all configured collectors - for _, collector := range e.Collectors { - processes.Add(1) - go func(collector lib.Collector) { - collector.Run(globalCtx) - processes.Done() - }(collector) - } - // Siphon and handle all produced metric samples processes.Add(1) go func() { @@ -300,8 +330,10 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu } func (e *Engine) setRunStatus(status lib.RunStatus) { - for _, c := range e.Collectors { - c.SetRunStatus(status) + for _, out := range e.outputs { + if statUpdOut, ok := out.(output.WithRunStatusUpdates); ok { + statUpdOut.SetRunStatus(status) + } } } @@ -443,7 +475,7 @@ func (e *Engine) processSamples(sampleContainers []stats.SampleContainer) { e.processSamplesForMetrics(sampleContainers) } - for _, collector := range e.Collectors { - collector.Collect(sampleContainers) + for _, out := range e.outputs { + out.AddMetricSamples(sampleContainers) } } diff --git a/core/engine_test.go b/core/engine_test.go index a6765b459cb..157eced789c 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -41,17 +41,18 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/testutils/minirunner" + "github.com/loadimpact/k6/lib/testutils/mockoutput" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/dummy" ) const isWindows = runtime.GOOS == "windows" // Wrapper around NewEngine that applies a logger and manages the options. func newTestEngine( //nolint:golint - t *testing.T, runCtx context.Context, runner lib.Runner, collectors []lib.Collector, opts lib.Options, + t *testing.T, runCtx context.Context, runner lib.Runner, outputs []output.Output, opts lib.Options, ) (engine *Engine, run func() error, wait func()) { if runner == nil { runner = &minirunner.MiniRunner{} @@ -76,11 +77,9 @@ func newTestEngine( //nolint:golint execScheduler, err := local.NewExecutionScheduler(runner, logger) require.NoError(t, err) - engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, logger) + engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, outputs, logger) require.NoError(t, err) - engine.Collectors = collectors - run, waitFn, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) @@ -142,10 +141,9 @@ func TestEngineRun(t *testing.T) { return nil }} - c := &dummy.Collector{} - + mockOutput := mockoutput.New() ctx, cancel := context.WithCancel(context.Background()) - _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{ + _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -158,7 +156,7 @@ func TestEngineRun(t *testing.T) { wait() found := 0 - for _, s := range c.Samples { + for _, s := range mockOutput.Samples { if s.Metric != testMetric { continue } @@ -197,7 +195,7 @@ func TestEngineStopped(t *testing.T) { e.Stop() // test that a second stop doesn't panic } -func TestEngineCollector(t *testing.T) { +func TestEngineOutput(t *testing.T) { testMetric := stats.New("test_metric", stats.Trend) runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { @@ -205,8 +203,8 @@ func TestEngineCollector(t *testing.T) { return nil }} - c := &dummy.Collector{} - e, run, wait := newTestEngine(t, nil, runner, []lib.Collector{c}, lib.Options{ + mockOutput := mockoutput.New() + e, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -215,7 +213,7 @@ func TestEngineCollector(t *testing.T) { wait() cSamples := []stats.Sample{} - for _, sample := range c.Samples { + for _, sample := range mockOutput.Samples { if sample.Metric == testMetric { cSamples = append(cSamples, sample) } @@ -224,9 +222,9 @@ func TestEngineCollector(t *testing.T) { if assert.NotNil(t, metric) { sink := metric.Sink.(*stats.TrendSink) if assert.NotNil(t, sink) { - numCollectorSamples := len(cSamples) + numOutputSamples := len(cSamples) numEngineSamples := len(sink.Values) - assert.Equal(t, numEngineSamples, numCollectorSamples) + assert.Equal(t, numEngineSamples, numOutputSamples) } } } @@ -361,8 +359,8 @@ func TestEngine_processThresholds(t *testing.T) { } } -func getMetricSum(collector *dummy.Collector, name string) (result float64) { - for _, sc := range collector.SampleContainers { +func getMetricSum(mo *mockoutput.MockOutput, name string) (result float64) { + for _, sc := range mo.SampleContainers { for _, s := range sc.GetSamples() { if s.Metric.Name == name { result += s.Value @@ -372,8 +370,8 @@ func getMetricSum(collector *dummy.Collector, name string) (result float64) { return } -func getMetricCount(collector *dummy.Collector, name string) (result uint) { - for _, sc := range collector.SampleContainers { +func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) { + for _, sc := range mo.SampleContainers { for _, s := range sc.GetSamples() { if s.Metric.Name == name { result++ @@ -448,8 +446,8 @@ func TestSentReceivedMetrics(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{ Iterations: null.IntFrom(tc.Iterations), VUs: null.IntFrom(tc.VUs), Hosts: tb.Dialer.Hosts, @@ -470,7 +468,7 @@ func TestSentReceivedMetrics(t *testing.T) { wait() checkData := func(name string, expected int64) float64 { - data := getMetricSum(collector, name) + data := getMetricSum(mockOutput, name) expectedDataMin := float64(expected * tc.Iterations) expectedDataMax := float64((expected + ts.NumRequests*expectedHeaderMaxLength) * tc.Iterations) @@ -582,8 +580,8 @@ func TestRunTags(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{ Iterations: null.IntFrom(3), VUs: null.IntFrom(2), Hosts: tb.Dialer.Hosts, @@ -617,7 +615,7 @@ func TestRunTags(t *testing.T) { return "the rainbow" } - for _, s := range collector.Samples { + for _, s := range mockOutput.Samples { for key, expVal := range runTagsMap { val, ok := s.Tags.Get(key) @@ -738,8 +736,8 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, lib.Options{}) + mockOutput := mockoutput.New() + engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{}) errC := make(chan error) go func() { errC <- run() }() @@ -756,11 +754,11 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // The 3.1 sleep in the default function would cause the first VU to complete 2 full iterations // and stat executing its third one, while the second VU will only fully complete 1 iteration // and will be canceled in the middle of its second one. - assert.Equal(t, 3.0, getMetricSum(collector, metrics.Iterations.Name)) + assert.Equal(t, 3.0, getMetricSum(mockOutput, metrics.Iterations.Name)) // That means that we expect to see 8 HTTP requests in total, 3*2=6 from the complete iterations // and one each from the two iterations that would be canceled in the middle of their execution - assert.Equal(t, 8.0, getMetricSum(collector, metrics.HTTPReqs.Name)) + assert.Equal(t, 8.0, getMetricSum(mockOutput, metrics.HTTPReqs.Name)) // And we expect to see the data_received for all 8 of those requests. Previously, the data for // the 8th request (the 3rd one in the first VU before the test ends) was cut off by the engine @@ -769,7 +767,7 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // it was interrupted. dataReceivedExpectedMin := 15000.0 * 8 dataReceivedExpectedMax := (15000.0 + expectedHeaderMaxLength) * 8 - dataReceivedActual := getMetricSum(collector, metrics.DataReceived.Name) + dataReceivedActual := getMetricSum(mockOutput, metrics.DataReceived.Name) if dataReceivedActual < dataReceivedExpectedMin || dataReceivedActual > dataReceivedExpectedMax { t.Errorf( "The data_received sum should be in the interval [%f, %f] but was %f", @@ -779,9 +777,9 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // Also, the interrupted iterations shouldn't affect the average iteration_duration in any way, only // complete iterations should be taken into account - durationCount := float64(getMetricCount(collector, metrics.IterationDuration.Name)) + durationCount := float64(getMetricCount(mockOutput, metrics.IterationDuration.Name)) assert.Equal(t, 3.0, durationCount) - durationSum := getMetricSum(collector, metrics.IterationDuration.Name) + durationSum := getMetricSum(mockOutput, metrics.IterationDuration.Name) assert.InDelta(t, 3.35, durationSum/(1000*durationCount), 0.25) } @@ -843,8 +841,8 @@ func TestMetricsEmission(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, runner.GetOptions()) + mockOutput := mockoutput.New() + engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, runner.GetOptions()) errC := make(chan error) go func() { errC <- run() }() @@ -858,8 +856,8 @@ func TestMetricsEmission(t *testing.T) { require.False(t, engine.IsTainted()) } - assert.Equal(t, tc.expIters, getMetricSum(collector, metrics.Iterations.Name)) - assert.Equal(t, tc.expCount, getMetricSum(collector, "testcounter")) + assert.Equal(t, tc.expIters, getMetricSum(mockOutput, metrics.Iterations.Name)) + assert.Equal(t, tc.expCount, getMetricSum(mockOutput, "testcounter")) }) } } @@ -961,8 +959,8 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { }, } - c := &dummy.Collector{} - _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -970,7 +968,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { wait() var count float64 - for _, sample := range c.Samples { + for _, sample := range mockOutput.Samples { if sample.Metric == testMetric { count += sample.Value } diff --git a/js/runner_test.go b/js/runner_test.go index af9fca2275e..910196b39a0 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -58,10 +58,11 @@ import ( "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" + "github.com/loadimpact/k6/lib/testutils/mockoutput" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/dummy" ) func TestRunnerNew(t *testing.T) { @@ -295,15 +296,17 @@ func TestSetupDataIsolation(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(runner, testutils.NewLogger(t)) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, testutils.NewLogger(t)) + + mockOutput := mockoutput.New() + engine, err := core.NewEngine( + execScheduler, options, lib.RuntimeOptions{}, []output.Output{mockOutput}, testutils.NewLogger(t), + ) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) run, wait, err := engine.Init(ctx, ctx) require.NoError(t, err) - collector := &dummy.Collector{} - engine.Collectors = []lib.Collector{collector} require.Empty(t, runner.defaultGroup.Groups) errC := make(chan error) @@ -322,7 +325,7 @@ func TestSetupDataIsolation(t *testing.T) { require.Contains(t, runner.defaultGroup.Groups, "setup") require.Contains(t, runner.defaultGroup.Groups, "teardown") var count int - for _, s := range collector.Samples { + for _, s := range mockOutput.Samples { if s.Metric.Name == "mycounter" { count += int(s.Value) } diff --git a/lib/collector.go b/lib/collector.go index d3700ad0d11..408f517002e 100644 --- a/lib/collector.go +++ b/lib/collector.go @@ -26,6 +26,8 @@ import ( "github.com/loadimpact/k6/stats" ) +// TODO: move to some other package - types? models? + // RunStatus values can be used by k6 to denote how a script run ends // and by the cloud executor and collector so that k6 knows the current // status of a particular script run. @@ -46,6 +48,8 @@ const ( RunStatusAbortedThreshold RunStatus = 8 ) +// TODO: remove + // A Collector abstracts the process of funneling samples to an external storage backend, // such as an InfluxDB instance. type Collector interface { diff --git a/lib/testutils/mockoutput/mockoutput.go b/lib/testutils/mockoutput/mockoutput.go new file mode 100644 index 00000000000..b03273f23bd --- /dev/null +++ b/lib/testutils/mockoutput/mockoutput.go @@ -0,0 +1,83 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package mockoutput + +import ( + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" +) + +// New exists so that the usage from tests avoids repetition, i.e. is +// mockoutput.New() instead of &mockoutput.MockOutput{} +func New() *MockOutput { + return &MockOutput{} +} + +// MockOutput can be used in tests to mock an actual output. +type MockOutput struct { + SampleContainers []stats.SampleContainer + Samples []stats.Sample + RunStatus lib.RunStatus + + DescFn func() string + StartFn func() error + StopFn func() error +} + +var _ output.WithRunStatusUpdates = &MockOutput{} + +// AddMetricSamples just saves the results in memory. +func (mo *MockOutput) AddMetricSamples(scs []stats.SampleContainer) { + mo.SampleContainers = append(mo.SampleContainers, scs...) + for _, sc := range scs { + mo.Samples = append(mo.Samples, sc.GetSamples()...) + } +} + +// SetRunStatus updates the RunStatus property. +func (mo *MockOutput) SetRunStatus(latestStatus lib.RunStatus) { + mo.RunStatus = latestStatus +} + +// Description calls the supplied DescFn callback, if available. +func (mo *MockOutput) Description() string { + if mo.DescFn != nil { + return mo.DescFn() + } + return "mock" +} + +// Start calls the supplied StartFn callback, if available. +func (mo *MockOutput) Start() error { + if mo.StartFn != nil { + return mo.StartFn() + } + return nil +} + +// Stop calls the supplied StopFn callback, if available. +func (mo *MockOutput) Stop() error { + if mo.StopFn != nil { + return mo.StopFn() + } + return nil +} diff --git a/output/extensions.go b/output/extensions.go new file mode 100644 index 00000000000..d136b136234 --- /dev/null +++ b/output/extensions.go @@ -0,0 +1,55 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "fmt" + "sync" +) + +//nolint:gochecknoglobals +var ( + extensions = make(map[string]func(Params) (Output, error)) + mx sync.RWMutex +) + +// GetExtensions returns all registered extensions. +func GetExtensions() map[string]func(Params) (Output, error) { + mx.RLock() + defer mx.RUnlock() + res := make(map[string]func(Params) (Output, error), len(extensions)) + for k, v := range extensions { + res[k] = v + } + return res +} + +// RegisterExtension registers the given output extension constructor. This +// function panics if a module with the same name is already registered. +func RegisterExtension(name string, mod func(Params) (Output, error)) { + mx.Lock() + defer mx.Unlock() + + if _, ok := extensions[name]; ok { + panic(fmt.Sprintf("output extension already registered: %s", name)) + } + extensions[name] = mod +} diff --git a/output/helpers.go b/output/helpers.go new file mode 100644 index 00000000000..647ccd76f0f --- /dev/null +++ b/output/helpers.go @@ -0,0 +1,125 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "fmt" + "sync" + "time" + + "github.com/loadimpact/k6/stats" +) + +// SampleBuffer is a simple thread-safe buffer for metric samples. It should be +// used by most outputs, since we generally want to flush metric samples to the +// remote service asynchronously. We want to do it only every several seconds, +// and we don't want to block the Engine in the meantime. +type SampleBuffer struct { + sync.Mutex + buffer []stats.SampleContainer + maxLen int +} + +// AddMetricSamples adds the given metric samples to the internal buffer. +func (sc *SampleBuffer) AddMetricSamples(samples []stats.SampleContainer) { + if len(samples) == 0 { + return + } + sc.Lock() + sc.buffer = append(sc.buffer, samples...) + sc.Unlock() +} + +// GetBufferedSamples returns the currently buffered metric samples and makes a +// new internal buffer with some hopefully realistic size. If the internal +// buffer is empty, it will return nil. +func (sc *SampleBuffer) GetBufferedSamples() []stats.SampleContainer { + sc.Lock() + defer sc.Unlock() + + buffered, bufferedLen := sc.buffer, len(sc.buffer) + if bufferedLen == 0 { + return nil + } + if bufferedLen > sc.maxLen { + sc.maxLen = bufferedLen + } + // Make the new buffer halfway between the previously allocated size and the + // maximum buffer size we've seen so far, to hopefully reduce copying a bit. + sc.buffer = make([]stats.SampleContainer, 0, (bufferedLen+sc.maxLen)/2) + + return buffered +} + +// PeriodicFlusher is a small helper for asynchronously flushing buffered metric +// samples on regular intervals. The biggest benefit is having a Stop() method +// that waits for one last flush before it returns. +type PeriodicFlusher struct { + period time.Duration + flushCallback func() + stop chan struct{} + stopped chan struct{} + once *sync.Once +} + +func (pf *PeriodicFlusher) run() { + ticker := time.NewTicker(pf.period) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pf.flushCallback() + case <-pf.stop: + pf.flushCallback() + close(pf.stopped) + return + } + } +} + +// Stop waits for the periodic flusher flush one last time and exit. You can +// safely call Stop() multiple times from different goroutines, you just can't +// call it from inside of the flushing function. +func (pf *PeriodicFlusher) Stop() { + pf.once.Do(func() { + close(pf.stop) + }) + <-pf.stopped +} + +// NewPeriodicFlusher creates a new PeriodicFlusher and starts its goroutine. +func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFlusher, error) { + if period <= 0 { + return nil, fmt.Errorf("metric flush period should be positive but was %s", period) + } + + pf := &PeriodicFlusher{ + period: period, + flushCallback: flushCallback, + stop: make(chan struct{}), + stopped: make(chan struct{}), + once: &sync.Once{}, + } + + go pf.run() + + return pf, nil +} diff --git a/output/helpers_test.go b/output/helpers_test.go new file mode 100644 index 00000000000..61e66f9a268 --- /dev/null +++ b/output/helpers_test.go @@ -0,0 +1,183 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/loadimpact/k6/stats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSampleBufferBasics(t *testing.T) { + t.Parallel() + single := stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Rate), + Value: float64(123), + Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}), + } + connected := stats.ConnectedSamples{Samples: []stats.Sample{single, single}, Time: single.Time} + buffer := SampleBuffer{} + + assert.Empty(t, buffer.GetBufferedSamples()) + buffer.AddMetricSamples([]stats.SampleContainer{single, single}) + buffer.AddMetricSamples([]stats.SampleContainer{single, connected, single}) + assert.Equal(t, []stats.SampleContainer{single, single, single, connected, single}, buffer.GetBufferedSamples()) + assert.Empty(t, buffer.GetBufferedSamples()) + + // Verify some internals + assert.Equal(t, cap(buffer.buffer), 5) + buffer.AddMetricSamples([]stats.SampleContainer{single, connected}) + buffer.AddMetricSamples(nil) + buffer.AddMetricSamples([]stats.SampleContainer{}) + buffer.AddMetricSamples([]stats.SampleContainer{single}) + assert.Equal(t, []stats.SampleContainer{single, connected, single}, buffer.GetBufferedSamples()) + assert.Equal(t, cap(buffer.buffer), 4) + buffer.AddMetricSamples([]stats.SampleContainer{single}) + assert.Equal(t, []stats.SampleContainer{single}, buffer.GetBufferedSamples()) + assert.Equal(t, cap(buffer.buffer), 3) + assert.Empty(t, buffer.GetBufferedSamples()) +} + +func TestSampleBufferConcurrently(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + t.Logf("Random source seeded with %d\n", seed) + + producersCount := 50 + r.Intn(50) + sampleCount := 10 + r.Intn(10) + sleepModifier := 10 + r.Intn(10) + buffer := SampleBuffer{} + + wg := make(chan struct{}) + fillBuffer := func() { + for i := 0; i < sampleCount; i++ { + buffer.AddMetricSamples([]stats.SampleContainer{stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: float64(i), + Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}), + }}) + time.Sleep(time.Duration(i*sleepModifier) * time.Microsecond) + } + wg <- struct{}{} + } + for i := 0; i < producersCount; i++ { + go fillBuffer() + } + + timer := time.NewTicker(5 * time.Millisecond) + timeout := time.After(5 * time.Second) + defer timer.Stop() + readSamples := make([]stats.SampleContainer, 0, sampleCount*producersCount) + finishedProducers := 0 +loop: + for { + select { + case <-timer.C: + readSamples = append(readSamples, buffer.GetBufferedSamples()...) + case <-wg: + finishedProducers++ + if finishedProducers == producersCount { + readSamples = append(readSamples, buffer.GetBufferedSamples()...) + break loop + } + case <-timeout: + t.Fatalf("test timed out") + } + } + assert.Equal(t, sampleCount*producersCount, len(readSamples)) + for _, s := range readSamples { + require.NotNil(t, s) + ss := s.GetSamples() + require.Len(t, ss, 1) + assert.Equal(t, "my_metric", ss[0].Metric.Name) + } +} + +func TestPeriodicFlusherBasics(t *testing.T) { + t.Parallel() + + f, err := NewPeriodicFlusher(-1*time.Second, func() {}) + assert.Error(t, err) + assert.Nil(t, f) + f, err = NewPeriodicFlusher(0, func() {}) + assert.Error(t, err) + assert.Nil(t, f) + + count := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + f, err = NewPeriodicFlusher(100*time.Millisecond, func() { + count++ + if count == 2 { + wg.Done() + } + }) + assert.NotNil(t, f) + assert.Nil(t, err) + wg.Wait() + f.Stop() + assert.Equal(t, 3, count) +} + +func TestPeriodicFlusherConcurrency(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + randStops := 10 + r.Intn(10) + t.Logf("Random source seeded with %d\n", seed) + + count := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + f, err := NewPeriodicFlusher(1000*time.Microsecond, func() { + // Sleep intentionally may be longer than the flush period. Also, this + // should never happen concurrently, so it's intentionally not locked. + time.Sleep(time.Duration(700+r.Intn(1000)) * time.Microsecond) + count++ + if count == 100 { + wg.Done() + } + }) + assert.NotNil(t, f) + assert.Nil(t, err) + wg.Wait() + + stopWG := &sync.WaitGroup{} + stopWG.Add(randStops) + for i := 0; i < randStops; i++ { + go func() { + f.Stop() + stopWG.Done() + }() + } + stopWG.Wait() + assert.True(t, count >= 101) // due to the short intervals, we might not get exactly 101 +} diff --git a/output/json/json.go b/output/json/json.go new file mode 100644 index 00000000000..4e1c74d440b --- /dev/null +++ b/output/json/json.go @@ -0,0 +1,153 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package json + +import ( + "compress/gzip" + stdlibjson "encoding/json" + "fmt" + "strings" + "time" + + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" +) + +// TODO: add option for emitting proper JSON files (https://github.com/loadimpact/k6/issues/1052) +const flushPeriod = 200 * time.Millisecond // TODO: make this configurable + +// Output funnels all passed metrics to an (optionally gzipped) JSON file. +type Output struct { + output.SampleBuffer + + params output.Params + periodicFlusher *output.PeriodicFlusher + + logger logrus.FieldLogger + filename string + encoder *stdlibjson.Encoder + closeFn func() error + seenMetrics map[string]struct{} +} + +// New returns a new JSON output. +func New(params output.Params) (output.Output, error) { + return &Output{ + params: params, + filename: params.ConfigArgument, + logger: params.Logger.WithFields(logrus.Fields{ + "output": "json", + "filename": params.ConfigArgument, + }), + seenMetrics: make(map[string]struct{}), + }, nil +} + +// Description returns a human-readable description of the output. +func (o *Output) Description() string { + if o.filename == "" || o.filename == "-" { + return "json(stdout)" + } + return fmt.Sprintf("json (%s)", o.filename) +} + +// Start opens the tries to specified JSON file and starts the goroutine for +// metric flushing. +func (o *Output) Start() error { + o.logger.Debug("Starting...") + + if o.filename == "" || o.filename == "-" { + o.encoder = stdlibjson.NewEncoder(o.params.StdOut) + o.closeFn = func() error { + return nil + } + } else { + logfile, err := o.params.FS.Create(o.filename) + if err != nil { + return err + } + + if strings.HasSuffix(o.filename, ".gz") { + outfile := gzip.NewWriter(logfile) + + o.closeFn = func() error { + _ = outfile.Close() + return logfile.Close() + } + o.encoder = stdlibjson.NewEncoder(outfile) + } else { + o.closeFn = logfile.Close + o.encoder = stdlibjson.NewEncoder(logfile) + } + } + + pf, err := output.NewPeriodicFlusher(flushPeriod, o.flushMetrics) + if err != nil { + return err + } + o.logger.Debug("Started!") + o.periodicFlusher = pf + + return nil +} + +// Stop flushes any remaining metrics and stops the goroutine. +func (o *Output) Stop() error { + o.logger.Debug("Stopping...") + defer o.logger.Debug("Stopped!") + o.periodicFlusher.Stop() + return o.closeFn() +} + +func (o *Output) flushMetrics() { + samples := o.GetBufferedSamples() + start := time.Now() + var count int + for _, sc := range samples { + samples := sc.GetSamples() + count += len(samples) + for _, sample := range samples { + sample := sample + o.handleMetric(sample.Metric) + err := o.encoder.Encode(WrapSample(sample)) + if err != nil { + // Skip metric if it can't be made into JSON or envelope is null. + o.logger.WithError(err).Error("Sample couldn't be marshalled to JSON") + } + } + } + if count > 0 { + o.logger.WithField("t", time.Since(start)).WithField("count", count).Debug("Wrote metrics to JSON") + } +} + +func (o *Output) handleMetric(m *stats.Metric) { + if _, ok := o.seenMetrics[m.Name]; ok { + return + } + o.seenMetrics[m.Name] = struct{}{} + + err := o.encoder.Encode(wrapMetric(m)) + if err != nil { + o.logger.WithError(err).Error("Metric couldn't be marshalled to JSON") + } +} diff --git a/output/json/json_test.go b/output/json/json_test.go new file mode 100644 index 00000000000..da104f95261 --- /dev/null +++ b/output/json/json_test.go @@ -0,0 +1,187 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package json + +import ( + "bufio" + "bytes" + "compress/gzip" + "io" + "testing" + "time" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/loadimpact/k6/lib/testutils" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" +) + +func getValidator(t *testing.T, expected []string) func(io.Reader) { + return func(rawJSONLines io.Reader) { + s := bufio.NewScanner(rawJSONLines) + i := 0 + for s.Scan() { + i++ + if i > len(expected) { + t.Errorf("Read unexpected line number %d, expected only %d entries", i, len(expected)) + continue + } + assert.JSONEq(t, expected[i-1], string(s.Bytes())) + } + assert.NoError(t, s.Err()) + assert.Equal(t, len(expected), i) + } +} + +func generateTestMetricSamples(t *testing.T) ([]stats.SampleContainer, func(io.Reader)) { + metric1 := stats.New("my_metric1", stats.Gauge) + metric2 := stats.New("my_metric2", stats.Counter, stats.Data) + time1 := time.Date(2021, time.February, 24, 13, 37, 10, 0, time.UTC) + time2 := time1.Add(10 * time.Second) + time3 := time2.Add(10 * time.Second) + connTags := stats.NewSampleTags(map[string]string{"key": "val"}) + + samples := []stats.SampleContainer{ + stats.Sample{Time: time1, Metric: metric1, Value: float64(1), Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"})}, + stats.Sample{Time: time1, Metric: metric1, Value: float64(2), Tags: stats.NewSampleTags(map[string]string{"tag2": "val2"})}, + stats.ConnectedSamples{Samples: []stats.Sample{ + {Time: time2, Metric: metric2, Value: float64(3), Tags: connTags}, + {Time: time2, Metric: metric1, Value: float64(4), Tags: connTags}, + }, Time: time2, Tags: connTags}, + stats.Sample{Time: time3, Metric: metric2, Value: float64(5), Tags: stats.NewSampleTags(map[string]string{"tag3": "val3"})}, + } + // TODO: fix JSON thresholds (https://github.com/loadimpact/k6/issues/1052) + expected := []string{ + `{"type":"Metric","data":{"name":"my_metric1","type":"gauge","contains":"default","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":1,"tags":{"tag1":"val1"}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":2,"tags":{"tag2":"val2"}},"metric":"my_metric1"}`, + `{"type":"Metric","data":{"name":"my_metric2","type":"counter","contains":"data","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric2"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":3,"tags":{"key":"val"}},"metric":"my_metric2"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":4,"tags":{"key":"val"}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:30Z","value":5,"tags":{"tag3":"val3"}},"metric":"my_metric2"}`, + } + + return samples, getValidator(t, expected) +} + +func TestJsonOutputStdout(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + validateResults(stdout) +} + +func TestJsonOutputFileError(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewReadOnlyFs(afero.NewMemMapFs()) + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output", + }) + require.NoError(t, err) + assert.Error(t, out.Start()) +} + +func TestJsonOutputFile(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewMemMapFs() + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output", + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + + assert.Empty(t, stdout.Bytes()) + file, err := fs.Open("/json-output") + require.NoError(t, err) + validateResults(file) + assert.NoError(t, file.Close()) +} + +func TestJsonOutputFileGzipped(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewMemMapFs() + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output.gz", + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + + assert.Empty(t, stdout.Bytes()) + file, err := fs.Open("/json-output.gz") + require.NoError(t, err) + reader, err := gzip.NewReader(file) + require.NoError(t, err) + validateResults(reader) + assert.NoError(t, file.Close()) +} + +func TestWrapSampleWithSamplePointer(t *testing.T) { + t.Parallel() + out := WrapSample(stats.Sample{ + Metric: &stats.Metric{}, + }) + assert.NotEqual(t, out, (*Envelope)(nil)) +} + +func TestWrapMetricWithMetricPointer(t *testing.T) { + t.Parallel() + out := wrapMetric(&stats.Metric{}) + assert.NotEqual(t, out, (*Envelope)(nil)) +} diff --git a/stats/json/wrapper.go b/output/json/wrapper.go similarity index 72% rename from stats/json/wrapper.go rename to output/json/wrapper.go index d0dc77b8056..9885386de1e 100644 --- a/stats/json/wrapper.go +++ b/output/json/wrapper.go @@ -26,38 +26,40 @@ import ( "github.com/loadimpact/k6/stats" ) +// Envelope is the data format we use to export both metrics and metric samples +// to the JSON file. type Envelope struct { Type string `json:"type"` Data interface{} `json:"data"` Metric string `json:"metric,omitempty"` } -type JSONSample struct { +// Sample is the data format for metric sample data in the JSON file. +type Sample struct { Time time.Time `json:"time"` Value float64 `json:"value"` Tags *stats.SampleTags `json:"tags"` } -func NewJSONSample(sample *stats.Sample) *JSONSample { - return &JSONSample{ +func newJSONSample(sample stats.Sample) Sample { + return Sample{ Time: sample.Time, Value: sample.Value, Tags: sample.Tags, } } -func WrapSample(sample *stats.Sample) *Envelope { - if sample == nil { - return nil - } - return &Envelope{ +// WrapSample is used to package a metric sample in a way that's nice to export +// to JSON. +func WrapSample(sample stats.Sample) Envelope { + return Envelope{ Type: "Point", Metric: sample.Metric.Name, - Data: NewJSONSample(sample), + Data: newJSONSample(sample), } } -func WrapMetric(metric *stats.Metric) *Envelope { +func wrapMetric(metric *stats.Metric) *Envelope { if metric == nil { return nil } diff --git a/output/types.go b/output/types.go new file mode 100644 index 00000000000..efac5014aeb --- /dev/null +++ b/output/types.go @@ -0,0 +1,90 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "encoding/json" + "io" + "net/url" + + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" + "github.com/spf13/afero" +) + +// Params contains all possible constructor parameters an output may need. +type Params struct { + OutputType string // --out $OutputType=$ConfigArgument, K6_OUT="$OutputType=$ConfigArgument" + ConfigArgument string + JSONConfig json.RawMessage + + Logger logrus.FieldLogger + Environment map[string]string + StdOut io.Writer + StdErr io.Writer + FS afero.Fs + ScriptPath *url.URL + ScriptOptions lib.Options + RuntimeOptions lib.RuntimeOptions + ExecutionPlan []lib.ExecutionStep +} + +// An Output abstracts the process of funneling samples to an external storage +// backend, such as a file or something like an InfluxDB instance. +// +// N.B: All outputs should have non-blocking AddMetricSamples() methods and +// should spawn their own goroutine to flush metrics asynchronously. +type Output interface { + // Returns a human-readable description of the output that will be shown in + // `k6 run`. For extensions it probably should include the version as well. + Description() string + + // Start is called before the Engine tries to use the output and should be + // used for any long initialization tasks, as well as for starting a + // goroutine to asynchronously flush metrics to the output. + Start() error + + // A method to receive the latest metric samples from the Engine. This + // method is never called concurrently, so do not do anything blocking here + // that might take a long time. Preferably, just use the SampleBuffer or + // something like it to buffer metrics until they are flushed. + AddMetricSamples(samples []stats.SampleContainer) + + // Flush all remaining metrics and finalize the test run. + Stop() error +} + +// WithThresholds is an output that requires the Engine to give it the +// thresholds before it can be started. +type WithThresholds interface { + Output + SetThresholds(map[string]stats.Thresholds) +} + +// TODO: add some way for outputs to report mid-test errors and potentially +// abort the whole test run + +// WithRunStatusUpdates means the output can receivetest run status updates. +type WithRunStatusUpdates interface { + Output + SetRunStatus(latestStatus lib.RunStatus) +} diff --git a/stats/cloud/bench_test.go b/stats/cloud/bench_test.go index 5473fa51191..27a674feb46 100644 --- a/stats/cloud/bench_test.go +++ b/stats/cloud/bench_test.go @@ -42,16 +42,10 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) func BenchmarkAggregateHTTP(b *testing.B) { - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -61,7 +55,7 @@ func BenchmarkAggregateHTTP(b *testing.B) { AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), }) - collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) now := time.Now() collector.referenceID = "something" @@ -295,11 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags } func BenchmarkHTTPPush(b *testing.B) { - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -323,7 +312,7 @@ func BenchmarkHTTPPush(b *testing.B) { AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), }) - collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) collector.referenceID = "fake" diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index 7b41a37e901..a36889a2195 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -44,7 +44,6 @@ import ( "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" "github.com/loadimpact/k6/lib/netext/httpext" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) @@ -91,7 +90,7 @@ var _ lib.Collector = &Collector{} // New creates a new cloud collector func New( logger logrus.FieldLogger, - conf cloudapi.Config, src *loader.SourceData, opts lib.Options, executionPlan []lib.ExecutionStep, version string, + conf cloudapi.Config, scriptURL fmt.Stringer, opts lib.Options, executionPlan []lib.ExecutionStep, version string, ) (*Collector, error) { if err := cloudapi.MergeFromExternal(opts.External, &conf); err != nil { return nil, err @@ -102,7 +101,7 @@ func New( } if !conf.Name.Valid || conf.Name.String == "" { - conf.Name = null.StringFrom(filepath.Base(src.URL.String())) + conf.Name = null.StringFrom(filepath.Base(scriptURL.String())) } if conf.Name.String == "-" { conf.Name = null.StringFrom(TestName) diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go index 44c57eb27ea..343e2138798 100644 --- a/stats/cloud/collector_test.go +++ b/stats/cloud/collector_test.go @@ -50,7 +50,6 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) @@ -177,11 +176,6 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -190,7 +184,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -335,11 +329,6 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -348,7 +337,7 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -427,11 +416,6 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -441,7 +425,7 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { NoCompress: null.BoolFrom(true), MaxMetricSamplesPerPackage: null.IntFrom(50), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -548,11 +532,6 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -561,7 +540,7 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -607,11 +586,6 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -620,7 +594,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "path/to/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) gotIterations := false @@ -730,10 +704,7 @@ func TestNewName(t *testing.T) { testCase := testCase t.Run(testCase.url.String(), func(t *testing.T) { - script := &loader.SourceData{ - URL: testCase.url, - } - collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), script, lib.Options{ + collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), testCase.url, lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), }, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) @@ -766,10 +737,6 @@ func TestPublishMetric(t *testing.T) { })) defer server.Close() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -777,7 +744,7 @@ func TestPublishMetric(t *testing.T) { Host: null.StringFrom(server.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) samples := []*Sample{ diff --git a/stats/csv/config.go b/stats/csv/config.go index 95914d2a749..f75f8b1bd45 100644 --- a/stats/csv/config.go +++ b/stats/csv/config.go @@ -21,12 +21,14 @@ package csv import ( + "encoding/json" "fmt" "strings" "time" "gopkg.in/guregu/null.v3" + "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib/types" ) @@ -87,3 +89,33 @@ func ParseArg(arg string) (Config, error) { return c, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + arg config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if arg != "" { + urlConf, err := ParseArg(arg) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/datadog/collector.go b/stats/datadog/collector.go index 415adff68ee..24586278888 100644 --- a/stats/datadog/collector.go +++ b/stats/datadog/collector.go @@ -21,8 +21,10 @@ package datadog import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" @@ -115,3 +117,25 @@ func New(logger logrus.FieldLogger, conf Config) (*common.Collector, error) { Logger: logger, }, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + return result, nil +} diff --git a/stats/dummy/collector.go b/stats/dummy/collector.go deleted file mode 100644 index fa147c699ab..00000000000 --- a/stats/dummy/collector.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package dummy - -import ( - "context" - - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/stats" -) - -// Collector implements the lib.Collector interface and should be used only for testing -type Collector struct { - RunStatus lib.RunStatus - - SampleContainers []stats.SampleContainer - Samples []stats.Sample -} - -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} - -// Init does nothing, it's only included to satisfy the lib.Collector interface -func (c *Collector) Init() error { return nil } - -// MakeConfig does nothing, it's only included to satisfy the lib.Collector interface -func (c *Collector) MakeConfig() interface{} { return nil } - -// Run just blocks until the context is done -func (c *Collector) Run(ctx context.Context) { - <-ctx.Done() -} - -// Collect just appends all of the samples passed to it to the internal sample slice. -// According to the the lib.Collector interface, it should never be called concurrently, -// so there's no locking on purpose - that way Go's race condition detector can actually -// detect incorrect usage. -// Also, theoretically the collector doesn't have to actually Run() before samples start -// being collected, it only has to be initialized. -func (c *Collector) Collect(scs []stats.SampleContainer) { - for _, sc := range scs { - c.SampleContainers = append(c.SampleContainers, sc) - c.Samples = append(c.Samples, sc.GetSamples()...) - } -} - -// Link returns a dummy string, it's only included to satisfy the lib.Collector interface -func (c *Collector) Link() string { - return "http://example.com/" -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus just saves the passed status for later inspection -func (c *Collector) SetRunStatus(status lib.RunStatus) { - c.RunStatus = status -} diff --git a/stats/dummy/collector_test.go b/stats/dummy/collector_test.go deleted file mode 100644 index 148f1b43988..00000000000 --- a/stats/dummy/collector_test.go +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package dummy - -import ( - "context" - "sync" - "testing" - - "github.com/loadimpact/k6/stats" - "github.com/stretchr/testify/assert" -) - -func TestCollectorRun(t *testing.T) { - var wg sync.WaitGroup - c := &Collector{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - defer wg.Done() - c.Run(ctx) - }() - - c.SetRunStatus(1) - - cancel() - wg.Wait() -} - -func TestCollectorCollect(t *testing.T) { - c := &Collector{} - c.Collect([]stats.SampleContainer{stats.Sample{}}) - assert.Len(t, c.Samples, 1) -} diff --git a/stats/influxdb/collector_test.go b/stats/influxdb/collector_test.go index 8fc7231bd0b..a9ce7f294f9 100644 --- a/stats/influxdb/collector_test.go +++ b/stats/influxdb/collector_test.go @@ -42,21 +42,21 @@ func TestBadConcurrentWrites(t *testing.T) { logger := testutils.NewLogger(t) t.Run("0", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(0) - _, err := New(logger, *c) + _, err := New(logger, c) require.Error(t, err) require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") }) t.Run("-2", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(-2) - _, err := New(logger, *c) + _, err := New(logger, c) require.Error(t, err) require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") }) t.Run("2", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(2) - _, err := New(logger, *c) + _, err := New(logger, c) require.NoError(t, err) }) } @@ -83,7 +83,7 @@ func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testin config := NewConfig() config.Addr = null.StringFrom("http://" + l.Addr().String()) - c, err := New(testutils.NewLogger(t), *config) + c, err := New(testutils.NewLogger(t), config) require.NoError(t, err) require.NoError(t, c.Init()) @@ -149,7 +149,7 @@ func TestExtractTagsToValues(t *testing.T) { "floatField:float", "intField:int", } - collector, err := New(testutils.NewLogger(t), *c) + collector, err := New(testutils.NewLogger(t), c) require.NoError(t, err) tags := map[string]string{ "stringField": "string", diff --git a/stats/influxdb/config.go b/stats/influxdb/config.go index fe4416bb06b..53dea280a53 100644 --- a/stats/influxdb/config.go +++ b/stats/influxdb/config.go @@ -21,11 +21,13 @@ package influxdb import ( + "encoding/json" "net/url" "strconv" "strings" "time" + "github.com/kelseyhightower/envconfig" "github.com/kubernetes/helm/pkg/strvals" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -52,8 +54,9 @@ type Config struct { TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"K6_INFLUXDB_TAGS_AS_FIELDS"` } -func NewConfig() *Config { - c := &Config{ +// NewConfig creates a new InfluxDB output config with some default values. +func NewConfig() Config { + c := Config{ Addr: null.NewString("http://localhost:8086", false), DB: null.NewString("k6", false), TagsAsFields: []string{"vu", "iter", "url"}, @@ -135,6 +138,14 @@ func ParseMap(m map[string]interface{}) (Config, error) { return c, err } +// ParseJSON parses the supplied JSON into a Config. +func ParseJSON(data json.RawMessage) (Config, error) { + conf := Config{} + err := json.Unmarshal(data, &conf) + return conf, err +} + +// ParseURL parses the supplied URL into a Config. func ParseURL(text string) (Config, error) { c := Config{} u, err := url.Parse(text) @@ -198,3 +209,33 @@ func ParseURL(text string) (Config, error) { } return c, err } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + URL config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, url string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf, err := ParseJSON(jsonRawConf) + if err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if url != "" { + urlConf, err := ParseURL(url) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/influxdb/util_test.go b/stats/influxdb/util_test.go index 2a98dcb2ef0..6faf48b4a06 100644 --- a/stats/influxdb/util_test.go +++ b/stats/influxdb/util_test.go @@ -53,22 +53,22 @@ func TestFieldKinds(t *testing.T) { // Error case 1 (duplicated bool fields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:bool"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Error case 2 (duplicated fields in bool and float ields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:float"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Error case 3 (duplicated fields in BoolFields and IntFields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "boolField:int"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Normal case conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "intField:int"} - fieldKinds, err = MakeFieldKinds(*conf) + fieldKinds, err = MakeFieldKinds(conf) require.NoError(t, err) require.Equal(t, fieldKinds["boolField"], Bool) diff --git a/stats/json/collector.go b/stats/json/collector.go deleted file mode 100644 index f8e006ac536..00000000000 --- a/stats/json/collector.go +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "compress/gzip" - "context" - "encoding/json" - "os" - "strings" - "sync" - "time" - - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/stats" -) - -type Collector struct { - closeFn func() error - fname string - seenMetrics []string - logger logrus.FieldLogger - - encoder *json.Encoder - - buffer []stats.Sample - bufferLock sync.Mutex -} - -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} - -func (c *Collector) HasSeenMetric(str string) bool { - for _, n := range c.seenMetrics { - if n == str { - return true - } - } - return false -} - -// New return new JSON collector -func New(logger logrus.FieldLogger, fs afero.Fs, fname string) (*Collector, error) { - c := &Collector{ - fname: fname, - logger: logger, - } - if fname == "" || fname == "-" { - c.encoder = json.NewEncoder(os.Stdout) - c.closeFn = func() error { - return nil - } - return c, nil - } - logfile, err := fs.Create(c.fname) - if err != nil { - return nil, err - } - - if strings.HasSuffix(c.fname, ".gz") { - outfile := gzip.NewWriter(logfile) - - c.closeFn = func() error { - _ = outfile.Close() - return logfile.Close() - } - c.encoder = json.NewEncoder(outfile) - } else { - c.closeFn = logfile.Close - c.encoder = json.NewEncoder(logfile) - } - - return c, nil -} - -func (c *Collector) Init() error { - return nil -} - -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - -func (c *Collector) Run(ctx context.Context) { - const timeout = 200 - c.logger.Debug("JSON output: Running!") - ticker := time.NewTicker(time.Millisecond * timeout) - defer func() { - _ = c.closeFn() - }() - for { - select { - case <-ticker.C: - c.commit() - case <-ctx.Done(): - c.commit() - return - } - } -} - -func (c *Collector) HandleMetric(m *stats.Metric) { - if c.HasSeenMetric(m.Name) { - return - } - - c.seenMetrics = append(c.seenMetrics, m.Name) - err := c.encoder.Encode(WrapMetric(m)) - if err != nil { - c.logger.WithField("filename", c.fname).WithError(err).Warning( - "JSON: Envelope is nil or Metric couldn't be marshalled to JSON") - return - } -} - -func (c *Collector) Collect(scs []stats.SampleContainer) { - c.bufferLock.Lock() - defer c.bufferLock.Unlock() - for _, sc := range scs { - c.buffer = append(c.buffer, sc.GetSamples()...) - } -} - -func (c *Collector) commit() { - c.bufferLock.Lock() - samples := c.buffer - c.buffer = nil - c.bufferLock.Unlock() - start := time.Now() - var count int - for _, sc := range samples { - samples := sc.GetSamples() - count += len(samples) - for _, sample := range sc.GetSamples() { - sample := sample - c.HandleMetric(sample.Metric) - err := c.encoder.Encode(WrapSample(&sample)) - if err != nil { - // Skip metric if it can't be made into JSON or envelope is null. - c.logger.WithField("filename", c.fname).WithError(err).Warning( - "JSON: Sample couldn't be marshalled to JSON") - continue - } - } - } - if count > 0 { - c.logger.WithField("filename", c.fname).WithField("t", time.Since(start)). - WithField("count", count).Debug("JSON: Wrote JSON metrics") - } -} - -func (c *Collector) Link() string { - return "" -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} diff --git a/stats/json/collector_test.go b/stats/json/collector_test.go deleted file mode 100644 index 2a256fcef2a..00000000000 --- a/stats/json/collector_test.go +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "os" - "testing" - - "github.com/loadimpact/k6/lib/testutils" - "github.com/spf13/afero" - "github.com/stretchr/testify/assert" -) - -func TestNew(t *testing.T) { - testdata := map[string]bool{ - "/nonexistent/badplacetolog.log": false, - "./okplacetolog.log": true, - "okplacetolog.log": true, - } - - for path, succ := range testdata { - path, succ := path, succ - t.Run("path="+path, func(t *testing.T) { - defer func() { _ = os.Remove(path) }() - - collector, err := New(testutils.NewLogger(t), afero.NewOsFs(), path) - if succ { - assert.NoError(t, err) - assert.NotNil(t, collector) - } else { - assert.Error(t, err) - assert.Nil(t, collector) - } - }) - } -} diff --git a/stats/json/wrapper_test.go b/stats/json/wrapper_test.go deleted file mode 100644 index c50e8dadc7a..00000000000 --- a/stats/json/wrapper_test.go +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/loadimpact/k6/stats" -) - -func TestWrapersWithNilArg(t *testing.T) { - out := WrapSample(nil) - assert.Equal(t, out, (*Envelope)(nil)) - out = WrapMetric(nil) - assert.Equal(t, out, (*Envelope)(nil)) -} - -func TestWrapSampleWithSamplePointer(t *testing.T) { - out := WrapSample(&stats.Sample{ - Metric: &stats.Metric{}, - }) - assert.NotEqual(t, out, (*Envelope)(nil)) -} - -func TestWrapMetricWithMetricPointer(t *testing.T) { - out := WrapMetric(&stats.Metric{}) - assert.NotEqual(t, out, (*Envelope)(nil)) -} diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go index c3da5baa5de..db402cf553a 100644 --- a/stats/kafka/collector.go +++ b/stats/kafka/collector.go @@ -30,9 +30,9 @@ import ( "github.com/sirupsen/logrus" "github.com/loadimpact/k6/lib" + jsono "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/influxdb" - jsonc "github.com/loadimpact/k6/stats/json" ) // Collector implements the lib.Collector interface and should be used only for testing @@ -125,7 +125,7 @@ func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) { } default: for _, sample := range samples { - env := jsonc.WrapSample(&sample) + env := jsono.WrapSample(sample) metric, err := json.Marshal(env) if err != nil { return nil, err diff --git a/stats/kafka/config.go b/stats/kafka/config.go index 9d391155e5e..7b11f36a627 100644 --- a/stats/kafka/config.go +++ b/stats/kafka/config.go @@ -21,8 +21,10 @@ package kafka import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/kubernetes/helm/pkg/strvals" "github.com/mitchellh/mapstructure" "gopkg.in/guregu/null.v3" @@ -120,3 +122,33 @@ func ParseArg(arg string) (Config, error) { return c, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + arg config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if arg != "" { + urlConf, err := ParseArg(arg) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/statsd/collector.go b/stats/statsd/collector.go index b8d5f2a62b7..c49f8d2d9ea 100644 --- a/stats/statsd/collector.go +++ b/stats/statsd/collector.go @@ -21,8 +21,10 @@ package statsd import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" @@ -96,3 +98,25 @@ func New(logger logrus.FieldLogger, conf common.Config) (*common.Collector, erro Logger: logger, }, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + return result, nil +}