diff --git a/api/server_test.go b/api/server_test.go index cafec68d9e8..d269a2b957d 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -82,7 +82,7 @@ func TestWithEngine(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) rw := httptest.NewRecorder() diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index e2a30db52cf..e74539e0585 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -51,7 +51,7 @@ func TestGetGroups(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) t.Run("list", func(t *testing.T) { diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 423a5eb5c5e..b5a21d49d4f 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) engine.Metrics = map[string]*stats.Metric{ @@ -88,7 +88,7 @@ func TestGetMetric(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) engine.Metrics = map[string]*stats.Metric{ diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index bb9c5ad083e..a4b0214d814 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -155,7 +155,7 @@ func TestSetupData(t *testing.T) { }) execScheduler, err := local.NewExecutionScheduler(runner, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 1378f97c5a9..e102d3397b5 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -47,7 +47,7 @@ func TestGetStatus(t *testing.T) { logger.SetOutput(testutils.NewTestOutput(t)) execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) rw := httptest.NewRecorder() @@ -101,7 +101,7 @@ func TestPatchStatus(t *testing.T) { t.Run(name, func(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, logger) + engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/cloudapi/config.go b/cloudapi/config.go index be5bdd0fd79..e113ff59645 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -26,6 +26,7 @@ import ( "gopkg.in/guregu/null.v3" + "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib/types" ) @@ -241,7 +242,8 @@ func (c Config) Apply(cfg Config) Config { return c } -// MergeFromExternal merges three fields from json in a loadimpact key of the provided external map +// MergeFromExternal merges three fields from the JSON in a loadimpact key of +// the provided external map. Used for options.ext.loadimpact settings. func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error { if val, ok := external["loadimpact"]; ok { // TODO: Important! Separate configs and fix the whole 2 configs mess! @@ -262,3 +264,29 @@ func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error } return nil } + +// GetConsolidatedConfig combines the default config values with the JSON config +// values and environment variables and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, configArg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if configArg != "" { + result.Name = null.StringFrom(configArg) + } + + return result, nil +} diff --git a/cmd/cloud.go b/cmd/cloud.go index 939b583c839..680168a746e 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -33,7 +33,6 @@ import ( "syscall" "time" - "github.com/kelseyhightower/envconfig" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/afero" @@ -105,7 +104,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth return err } - runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ())) + osEnvironment := buildEnvMap(os.Environ()) + runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment) if err != nil { return err } @@ -141,8 +141,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth } // Cloud config - cloudConfig := cloudapi.NewConfig().Apply(derivedConf.Collectors.Cloud) - if err = envconfig.Process("", &cloudConfig); err != nil { + cloudConfig, err := cloudapi.GetConsolidatedConfig(derivedConf.Collectors["cloud"], osEnvironment, "") + if err != nil { return err } if !cloudConfig.Token.Valid { @@ -153,8 +153,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth arc := r.MakeArchive() // TODO: Fix this // We reuse cloud.Config for parsing options.ext.loadimpact, but this probably shouldn't be - // done as the idea of options.ext is that they are extensible without touching k6. But in - // order for this to happen we shouldn't actually marshall cloud.Config on top of it because + // done, as the idea of options.ext is that they are extensible without touching k6. But in + // order for this to happen, we shouldn't actually marshall cloud.Config on top of it, because // it will be missing some fields that aren't actually mentioned in the struct. // So in order for use to copy the fields that we need for loadimpact's api we unmarshal in // map[string]interface{} and copy what we need if it isn't set already diff --git a/cmd/collectors.go b/cmd/collectors.go deleted file mode 100644 index 158048e2794..00000000000 --- a/cmd/collectors.go +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package cmd - -import ( - "fmt" - "strings" - - "gopkg.in/guregu/null.v3" - - "github.com/kelseyhightower/envconfig" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - - "github.com/loadimpact/k6/cloudapi" - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/lib/consts" - "github.com/loadimpact/k6/loader" - "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/cloud" - "github.com/loadimpact/k6/stats/csv" - "github.com/loadimpact/k6/stats/datadog" - "github.com/loadimpact/k6/stats/influxdb" - jsonc "github.com/loadimpact/k6/stats/json" - "github.com/loadimpact/k6/stats/kafka" - "github.com/loadimpact/k6/stats/statsd" -) - -const ( - collectorInfluxDB = "influxdb" - collectorJSON = "json" - collectorKafka = "kafka" - collectorCloud = "cloud" - collectorStatsD = "statsd" - collectorDatadog = "datadog" - collectorCSV = "csv" -) - -func parseCollector(s string) (t, arg string) { - parts := strings.SplitN(s, "=", 2) - switch len(parts) { - case 0: - return "", "" - case 1: - return parts[0], "" - default: - return parts[0], parts[1] - } -} - -// TODO: totally refactor this... -func getCollector( - logger logrus.FieldLogger, - collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep, -) (lib.Collector, error) { - switch collectorName { - case collectorJSON: - return jsonc.New(logger, afero.NewOsFs(), arg) - case collectorInfluxDB: - config := influxdb.NewConfig().Apply(conf.Collectors.InfluxDB) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - urlConfig, err := influxdb.ParseURL(arg) - if err != nil { - return nil, err - } - config = config.Apply(urlConfig) - - return influxdb.New(logger, config) - case collectorCloud: - config := cloudapi.NewConfig().Apply(conf.Collectors.Cloud) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - config.Name = null.StringFrom(arg) - } - - return cloud.New(logger, config, src, conf.Options, executionPlan, consts.Version) - case collectorKafka: - config := kafka.NewConfig().Apply(conf.Collectors.Kafka) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - cmdConfig, err := kafka.ParseArg(arg) - if err != nil { - return nil, err - } - config = config.Apply(cmdConfig) - } - - return kafka.New(logger, config) - case collectorStatsD: - config := statsd.NewConfig().Apply(conf.Collectors.StatsD) - if err := envconfig.Process("k6_statsd", &config); err != nil { - return nil, err - } - - return statsd.New(logger, config) - case collectorDatadog: - config := datadog.NewConfig().Apply(conf.Collectors.Datadog) - if err := envconfig.Process("k6_datadog", &config); err != nil { - return nil, err - } - - return datadog.New(logger, config) - case collectorCSV: - config := csv.NewConfig().Apply(conf.Collectors.CSV) - if err := envconfig.Process("", &config); err != nil { - return nil, err - } - if arg != "" { - cmdConfig, err := csv.ParseArg(arg) - if err != nil { - return nil, err - } - - config = config.Apply(cmdConfig) - } - - return csv.New(logger, afero.NewOsFs(), conf.SystemTags.Map(), config) - - default: - return nil, errors.Errorf("unknown output type: %s", collectorName) - } -} - -func newCollector( - logger logrus.FieldLogger, - collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep, -) (lib.Collector, error) { - collector, err := getCollector(logger, collectorName, arg, src, conf, executionPlan) - if err != nil { - return collector, err - } - - // Check if all required tags are present - missingRequiredTags := []string{} - requiredTags := collector.GetRequiredSystemTags() - for _, tag := range stats.SystemTagSetValues() { - if requiredTags.Has(tag) && !conf.SystemTags.Has(tag) { - missingRequiredTags = append(missingRequiredTags, tag.String()) - } - } - if len(missingRequiredTags) > 0 { - return collector, fmt.Errorf( - "the specified collector '%s' needs the following system tags enabled: %s", - collectorName, - strings.Join(missingRequiredTags, ", "), - ) - } - - return collector, nil -} diff --git a/cmd/config.go b/cmd/config.go index 23035befcd0..8ad10b75faf 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -33,16 +33,10 @@ import ( "github.com/spf13/pflag" "gopkg.in/guregu/null.v3" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/executor" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/csv" - "github.com/loadimpact/k6/stats/datadog" - "github.com/loadimpact/k6/stats/influxdb" - "github.com/loadimpact/k6/stats/kafka" - "github.com/loadimpact/k6/stats/statsd" ) // configFlagSet returns a FlagSet with the default run configuration flags. @@ -62,14 +56,8 @@ type Config struct { Linger null.Bool `json:"linger" envconfig:"K6_LINGER"` NoUsageReport null.Bool `json:"noUsageReport" envconfig:"K6_NO_USAGE_REPORT"` - Collectors struct { - InfluxDB influxdb.Config `json:"influxdb"` - Kafka kafka.Config `json:"kafka"` - Cloud cloudapi.Config `json:"cloud"` - StatsD statsd.Config `json:"statsd"` - Datadog datadog.Config `json:"datadog"` - CSV csv.Config `json:"csv"` - } `json:"collectors"` + // TODO: deprecate + Collectors map[string]json.RawMessage `json:"collectors"` } // Validate checks if all of the specified options make sense @@ -92,12 +80,6 @@ func (c Config) Apply(cfg Config) Config { if cfg.NoUsageReport.Valid { c.NoUsageReport = cfg.NoUsageReport } - c.Collectors.InfluxDB = c.Collectors.InfluxDB.Apply(cfg.Collectors.InfluxDB) - c.Collectors.Cloud = c.Collectors.Cloud.Apply(cfg.Collectors.Cloud) - c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka) - c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD) - c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog) - c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV) return c } @@ -167,19 +149,11 @@ func writeDiskConfig(fs afero.Fs, configPath string, conf Config) error { } // Reads configuration variables from the environment. -func readEnvConfig() (conf Config, err error) { - // TODO: replace envconfig and refactor the whole configuration from the groun up :/ - for _, err := range []error{ - envconfig.Process("", &conf), - envconfig.Process("", &conf.Collectors.Cloud), - envconfig.Process("", &conf.Collectors.InfluxDB), - envconfig.Process("", &conf.Collectors.Kafka), - envconfig.Process("k6_statsd", &conf.Collectors.StatsD), - envconfig.Process("k6_datadog", &conf.Collectors.Datadog), - } { - return conf, err - } - return conf, nil +func readEnvConfig() (Config, error) { + // TODO: replace envconfig and refactor the whole configuration from the ground up :/ + conf := Config{} + err := envconfig.Process("", &conf) + return conf, err } // Assemble the final consolidated configuration from all of the different sources: @@ -192,12 +166,6 @@ func readEnvConfig() (conf Config, err error) { // TODO: add better validation, more explicit default values and improve consistency between formats // TODO: accumulate all errors and differentiate between the layers? func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf Config, err error) { - cliConf.Collectors.InfluxDB = influxdb.NewConfig().Apply(cliConf.Collectors.InfluxDB) - cliConf.Collectors.Cloud = cloudapi.NewConfig().Apply(cliConf.Collectors.Cloud) - cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka) - cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD) - cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog) - fileConf, _, err := readDiskConfig(fs) if err != nil { return conf, err diff --git a/cmd/convert.go b/cmd/convert.go index 39350ccbc65..ececf3ceb07 100644 --- a/cmd/convert.go +++ b/cmd/convert.go @@ -33,8 +33,10 @@ import ( "github.com/loadimpact/k6/lib" ) +// TODO: fix this... or remove k6 convert +//nolint: gochecknoglobals var ( - output string + convertOutput string optionsFilePath string minSleep uint maxSleep uint @@ -107,12 +109,12 @@ func getConvertCmd() *cobra.Command { } // Write script content to stdout or file - if output == "" || output == "-" { //nolint:nestif + if convertOutput == "" || convertOutput == "-" { //nolint:nestif if _, err := io.WriteString(defaultWriter, script); err != nil { return err } } else { - f, err := defaultFs.Create(output) + f, err := defaultFs.Create(convertOutput) if err != nil { return err } @@ -131,8 +133,14 @@ func getConvertCmd() *cobra.Command { } convertCmd.Flags().SortFlags = false - convertCmd.Flags().StringVarP(&output, "output", "O", output, "k6 script output filename (stdout by default)") - convertCmd.Flags().StringVarP(&optionsFilePath, "options", "", output, "path to a JSON file with options that would be injected in the output script") + convertCmd.Flags().StringVarP( + &convertOutput, "output", "O", convertOutput, + "k6 script output filename (stdout by default)", + ) + convertCmd.Flags().StringVarP( + &optionsFilePath, "options", "", optionsFilePath, + "path to a JSON file with options that would be injected in the output script", + ) convertCmd.Flags().StringSliceVarP(&only, "only", "", []string{}, "include only requests from the given domains") convertCmd.Flags().StringSliceVarP(&skip, "skip", "", []string{}, "skip requests from the given domains") convertCmd.Flags().UintVarP(&threshold, "batch-threshold", "", 500, "batch request idle time threshold (see example)") diff --git a/cmd/login_cloud.go b/cmd/login_cloud.go index 1e7a0aca7aa..a22e81e40d7 100644 --- a/cmd/login_cloud.go +++ b/cmd/login_cloud.go @@ -21,6 +21,7 @@ package cmd import ( + "encoding/json" "os" "syscall" @@ -58,22 +59,33 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, RunE: func(cmd *cobra.Command, args []string) error { fs := afero.NewOsFs() - k6Conf, err := getConsolidatedConfig(fs, Config{}, nil) + currentDiskConf, configPath, err := readDiskConfig(fs) if err != nil { return err } - currentDiskConf, configPath, err := readDiskConfig(fs) + currentJSONConfig := cloudapi.Config{} + currentJSONConfigRaw := currentDiskConf.Collectors["cloud"] + if currentJSONConfigRaw != nil { + // We only want to modify this config, see comment below + if jsonerr := json.Unmarshal(currentJSONConfigRaw, ¤tJSONConfig); jsonerr != nil { + return jsonerr + } + } + + // We want to use this fully consolidated config for things like + // host addresses, so users can overwrite them with env vars. + consolidatedCurrentConfig, err := cloudapi.GetConsolidatedConfig(currentJSONConfigRaw, buildEnvMap(os.Environ()), "") if err != nil { return err } + // But we don't want to save them back to the JSON file, we only + // want to save what already existed there and the login details. + newCloudConf := currentJSONConfig show := getNullBool(cmd.Flags(), "show") reset := getNullBool(cmd.Flags(), "reset") token := getNullString(cmd.Flags(), "token") - - newCloudConf := cloudapi.NewConfig().Apply(currentDiskConf.Collectors.Cloud) - switch { case reset.Valid: newCloudConf.Token = null.StringFromPtr(nil) @@ -104,7 +116,7 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, email := vals["Email"].(string) password := vals["Password"].(string) - client := cloudapi.NewClient(logger, "", k6Conf.Collectors.Cloud.Host.String, consts.Version) + client := cloudapi.NewClient(logger, "", consolidatedCurrentConfig.Host.String, consts.Version) res, err := client.Login(email, password) if err != nil { return err @@ -117,7 +129,10 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, newCloudConf.Token = null.StringFrom(res.Token) } - currentDiskConf.Collectors.Cloud = newCloudConf + currentDiskConf.Collectors["cloud"], err = json.Marshal(newCloudConf) + if err != nil { + return err + } if err := writeDiskConfig(fs, configPath, currentDiskConf); err != nil { return err } diff --git a/cmd/login_influxdb.go b/cmd/login_influxdb.go index 9e85bcf8ffd..264d5083b33 100644 --- a/cmd/login_influxdb.go +++ b/cmd/login_influxdb.go @@ -21,6 +21,7 @@ package cmd import ( + "encoding/json" "os" "syscall" "time" @@ -52,7 +53,15 @@ This will set the default server used when just "-o influxdb" is passed.`, return err } - conf := influxdb.NewConfig().Apply(config.Collectors.InfluxDB) + conf := influxdb.NewConfig() + jsonConf := config.Collectors["influxdb"] + if jsonConf != nil { + jsonConfParsed, jsonerr := influxdb.ParseJSON(jsonConf) + if jsonerr != nil { + return jsonerr + } + conf = conf.Apply(jsonConfParsed) + } if len(args) > 0 { urlConf, err := influxdb.ParseURL(args[0]) //nolint:govet if err != nil { @@ -112,7 +121,10 @@ This will set the default server used when just "-o influxdb" is passed.`, return err } - config.Collectors.InfluxDB = conf + config.Collectors["influxdb"], err = json.Marshal(conf) + if err != nil { + return err + } return writeDiskConfig(fs, configPath, config) }, } diff --git a/cmd/outputs.go b/cmd/outputs.go new file mode 100644 index 00000000000..02fb2ed9de6 --- /dev/null +++ b/cmd/outputs.go @@ -0,0 +1,273 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package cmd + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/afero" + + "github.com/loadimpact/k6/cloudapi" + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/consts" + "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/output/json" + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/csv" + "github.com/loadimpact/k6/stats/datadog" + "github.com/loadimpact/k6/stats/influxdb" + "github.com/loadimpact/k6/stats/kafka" + "github.com/loadimpact/k6/stats/statsd" +) + +// TODO: move this to an output sub-module after we get rid of the old collectors? +//nolint: funlen +func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) { + // Start with the built-in outputs + result := map[string]func(output.Params) (output.Output, error){ + "json": json.New, + + // TODO: remove all of these + "influxdb": func(params output.Params) (output.Output, error) { + conf, err := influxdb.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + influxc, err := influxdb.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, influxc) + }, + "cloud": func(params output.Params) (output.Output, error) { + conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + cloudc, err := cloud.New( + params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version, + ) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, cloudc) + }, + "kafka": func(params output.Params) (output.Output, error) { + conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + kafkac, err := kafka.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, kafkac) + }, + "statsd": func(params output.Params) (output.Output, error) { + conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment) + if err != nil { + return nil, err + } + statsdc, err := statsd.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, statsdc) + }, + "datadog": func(params output.Params) (output.Output, error) { + conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment) + if err != nil { + return nil, err + } + datadogc, err := datadog.New(params.Logger, conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, datadogc) + }, + "csv": func(params output.Params) (output.Output, error) { + conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + csvc, err := csv.New(params.Logger, params.FS, params.ScriptOptions.SystemTags.Map(), conf) + if err != nil { + return nil, err + } + return newCollectorAdapter(params, csvc) + }, + } + + exts := output.GetExtensions() + for k, v := range exts { + if _, ok := result[k]; ok { + return nil, fmt.Errorf("invalid output extension %s, built-in output with the same type already exists", k) + } + result[k] = v + } + + return result, nil +} + +func getPossibleIDList(constrs map[string]func(output.Params) (output.Output, error)) string { + res := make([]string, 0, len(constrs)) + for k := range constrs { + res = append(res, k) + } + sort.Strings(res) + return strings.Join(res, ", ") +} + +func createOutputs( + outputFullArguments []string, src *loader.SourceData, conf Config, rtOpts lib.RuntimeOptions, + executionPlan []lib.ExecutionStep, osEnvironment map[string]string, logger logrus.FieldLogger, +) ([]output.Output, error) { + outputConstructors, err := getAllOutputConstructors() + if err != nil { + return nil, err + } + baseParams := output.Params{ + ScriptPath: src.URL, + Logger: logger, + Environment: osEnvironment, + StdOut: stdout, + StdErr: stderr, + FS: afero.NewOsFs(), + ScriptOptions: conf.Options, + RuntimeOptions: rtOpts, + ExecutionPlan: executionPlan, + } + result := make([]output.Output, 0, len(outputFullArguments)) + + for _, outputFullArg := range outputFullArguments { + outputType, outputArg := parseOutputArgument(outputFullArg) + outputConstructor, ok := outputConstructors[outputType] + if !ok { + return nil, fmt.Errorf( + "invalid output type '%s', available types are: %s", + outputType, getPossibleIDList(outputConstructors), + ) + } + + params := baseParams + params.OutputType = outputType + params.ConfigArgument = outputArg + params.JSONConfig = conf.Collectors[outputType] + + output, err := outputConstructor(params) + if err != nil { + return nil, fmt.Errorf("could not create the '%s' output: %w", outputType, err) + } + result = append(result, output) + } + + return result, nil +} + +func parseOutputArgument(s string) (t, arg string) { + parts := strings.SplitN(s, "=", 2) + switch len(parts) { + case 0: + return "", "" + case 1: + return parts[0], "" + default: + return parts[0], parts[1] + } +} + +// TODO: remove this after we transition every collector to the output interface + +func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) { + // Check if all required tags are present + missingRequiredTags := []string{} + requiredTags := collector.GetRequiredSystemTags() + for _, tag := range stats.SystemTagSetValues() { + if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return nil, fmt.Errorf( + "the specified output '%s' needs the following system tags enabled: %s", + params.OutputType, strings.Join(missingRequiredTags, ", "), + ) + } + + return &collectorAdapter{ + outputType: params.OutputType, + collector: collector, + stopCh: make(chan struct{}), + }, nil +} + +// collectorAdapter is a _temporary_ fix until we move all of the old +// "collectors" to the new output interface +type collectorAdapter struct { + collector lib.Collector + outputType string + runCtx context.Context + runCtxCancel func() + stopCh chan struct{} +} + +func (ca *collectorAdapter) Description() string { + link := ca.collector.Link() + if link != "" { + return fmt.Sprintf("%s (%s)", ca.outputType, link) + } + return ca.outputType +} + +func (ca *collectorAdapter) Start() error { + if err := ca.collector.Init(); err != nil { + return err + } + ca.runCtx, ca.runCtxCancel = context.WithCancel(context.Background()) + go func() { + ca.collector.Run(ca.runCtx) + close(ca.stopCh) + }() + return nil +} + +func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) { + ca.collector.Collect(samples) +} + +func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) { + ca.collector.SetRunStatus(latestStatus) +} + +// Stop implements the new output interface. +func (ca *collectorAdapter) Stop() error { + ca.runCtxCancel() + <-ca.stopCh + return nil +} + +var _ output.WithRunStatusUpdates = &collectorAdapter{} diff --git a/cmd/run.go b/cmd/run.go index 81f05cce4ae..708799e6767 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -116,7 +116,8 @@ a commandline interface for interacting with it.`, return err } - runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ())) + osEnvironment := buildEnvMap(os.Environ()) + runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment) if err != nil { return err } @@ -171,8 +172,6 @@ a commandline interface for interacting with it.`, return err } - executionState := execScheduler.GetState() - // This is manually triggered after the Engine's Run() has completed, // and things like a single Ctrl+C don't affect it. We use it to make // sure that the progressbars finish updating with the latest execution @@ -191,26 +190,18 @@ a commandline interface for interacting with it.`, progressBarWG.Done() }() - // Create an engine. - initBar.Modify(pb.WithConstProgress(0, "Init engine")) - engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, logger) + // Create all outputs. + executionPlan := execScheduler.GetExecutionPlan() + outputs, err := createOutputs(conf.Out, src, conf, runtimeOptions, executionPlan, osEnvironment, logger) if err != nil { return err } - executionPlan := execScheduler.GetExecutionPlan() - // Create a collector and assign it to the engine if requested. - initBar.Modify(pb.WithConstProgress(0, "Init metric outputs")) - for _, out := range conf.Out { - t, arg := parseCollector(out) - collector, cerr := newCollector(logger, t, arg, src, conf, executionPlan) - if cerr != nil { - return cerr - } - if cerr = collector.Init(); cerr != nil { - return cerr - } - engine.Collectors = append(engine.Collectors, collector) + // Create the engine. + initBar.Modify(pb.WithConstProgress(0, "Init engine")) + engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, outputs, logger) + if err != nil { + return err } // Spin up the REST API server, if not disabled. @@ -230,9 +221,17 @@ a commandline interface for interacting with it.`, }() } + // We do this here so we can get any output URLs below. + initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) + err = engine.StartOutputs() + if err != nil { + return err + } + defer engine.StopOutputs() + printExecutionDescription( "local", filename, "", conf, execScheduler.GetState().ExecutionTuple, - executionPlan, engine.Collectors) + executionPlan, outputs) // Trap Interrupts, SIGINTs and SIGTERMs. sigC := make(chan os.Signal, 1) @@ -286,6 +285,7 @@ a commandline interface for interacting with it.`, progressCancel() progressBarWG.Wait() + executionState := execScheduler.GetState() // Warn if no iterations could be completed. if executionState.GetFullIterationCount() == 0 { logger.Warn("No script iterations finished, consider making the test duration longer") diff --git a/cmd/ui.go b/cmd/ui.go index 6021145d959..3d00c8eed65 100644 --- a/cmd/ui.go +++ b/cmd/ui.go @@ -36,6 +36,7 @@ import ( "golang.org/x/crypto/ssh/terminal" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/ui" "github.com/loadimpact/k6/ui/pb" ) @@ -111,31 +112,25 @@ func modifyAndPrintBar(bar *pb.ProgressBar, options ...pb.ProgressBarOption) { // Print execution description for both cloud and local execution. // TODO: Clean this up as part of #1499 or #1427 func printExecutionDescription( - execution, filename, output string, conf Config, et *lib.ExecutionTuple, - execPlan []lib.ExecutionStep, collectors []lib.Collector, + execution, filename, outputOverride string, conf Config, et *lib.ExecutionTuple, + execPlan []lib.ExecutionStep, outputs []output.Output, ) { fprintf(stdout, " execution: %s\n", ui.ValueColor.Sprint(execution)) fprintf(stdout, " script: %s\n", ui.ValueColor.Sprint(filename)) - if execution == "local" { - out := "-" - link := "" - - for idx, collector := range collectors { - if out != "-" { - out = out + "; " + conf.Out[idx] - } else { - out = conf.Out[idx] - } - - if l := collector.Link(); l != "" { - link = link + " (" + l + ")" - } + var outputDescriptions []string + switch { + case outputOverride != "": + outputDescriptions = []string{outputOverride} + case len(outputs) == 0: + outputDescriptions = []string{"-"} + default: + for _, out := range outputs { + outputDescriptions = append(outputDescriptions, out.Description()) } - fprintf(stdout, " output: %s%s\n", ui.ValueColor.Sprint(out), ui.ExtraColor.Sprint(link)) - } else { - fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(output)) } + + fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(strings.Join(outputDescriptions, ", "))) fprintf(stdout, "\n") maxDuration, _ := lib.GetEndOffset(execPlan) diff --git a/core/engine.go b/core/engine.go index 9953e018d7b..daf5bed3505 100644 --- a/core/engine.go +++ b/core/engine.go @@ -32,6 +32,7 @@ import ( "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/metrics" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) @@ -56,7 +57,7 @@ type Engine struct { Options lib.Options runtimeOptions lib.RuntimeOptions - Collectors []lib.Collector + outputs []output.Output logger *logrus.Entry stopOnce sync.Once @@ -77,7 +78,7 @@ type Engine struct { // NewEngine instantiates a new Engine, without doing any heavy initialization. func NewEngine( - ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, logger *logrus.Logger, + ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, outputs []output.Output, logger *logrus.Logger, ) (*Engine, error) { if ex == nil { return nil, errors.New("missing ExecutionScheduler instance") @@ -89,6 +90,7 @@ func NewEngine( Options: opts, runtimeOptions: rtOpts, + outputs: outputs, Metrics: make(map[string]*stats.Metric), Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64), stopChan: make(chan struct{}), @@ -109,10 +111,47 @@ func NewEngine( return e, nil } +// StartOutputs spins up all configured outputs, giving the thresholds to any +// that can accept them. And if some output fails, stop the already started +// ones. This may take some time, since some outputs make initial network +// requests to set up whatever remote services are going to listen to them. +// +// TODO: this doesn't really need to be in the Engine, so take it out? +func (e *Engine) StartOutputs() error { + e.logger.Debugf("Starting %d outputs...", len(e.outputs)) + for i, out := range e.outputs { + if thresholdOut, ok := out.(output.WithThresholds); ok { + thresholdOut.SetThresholds(e.thresholds) + } + + if err := out.Start(); err != nil { + e.stopOutputs(i) + return err + } + } + return nil +} + +// StopOutputs stops all configured outputs. +func (e *Engine) StopOutputs() { + e.stopOutputs(len(e.outputs)) +} + +func (e *Engine) stopOutputs(upToID int) { + e.logger.Debugf("Stopping %d outputs...", upToID) + for i := 0; i < upToID; i++ { + if err := e.outputs[i].Stop(); err != nil { + e.logger.WithError(err).Errorf("Stopping output %d failed", i) + } + } +} + // Init is used to initialize the execution scheduler and all metrics processing // in the engine. The first is a costly operation, since it initializes all of -// the planned VUs and could potentially take a long time. It either returns an -// error immediately, or it returns test run() and wait() functions. +// the planned VUs and could potentially take a long time. +// +// This method either returns an error immediately, or it returns test run() and +// wait() functions. // // Things to note: // - The first lambda, Run(), synchronously executes the actual load test. @@ -130,7 +169,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait } // TODO: move all of this in a separate struct? see main TODO above - runSubCtx, runSubCancel := context.WithCancel(runCtx) resultCh := make(chan error) @@ -154,6 +192,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait return err } + waitFn := e.startBackgroundProcesses(globalCtx, runCtx, resultCh, runSubCancel, processMetricsAfterRun) return runFn, waitFn, nil } @@ -162,20 +201,11 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // test run status when it ends. It returns a function that can be used after // the provided context is called, to wait for the complete winding down of all // started goroutines. -func (e *Engine) startBackgroundProcesses( //nolint:funlen +func (e *Engine) startBackgroundProcesses( globalCtx, runCtx context.Context, runResult <-chan error, runSubCancel func(), processMetricsAfterRun chan struct{}, ) (wait func()) { processes := new(sync.WaitGroup) - // Spin up all configured collectors - for _, collector := range e.Collectors { - processes.Add(1) - go func(collector lib.Collector) { - collector.Run(globalCtx) - processes.Done() - }(collector) - } - // Siphon and handle all produced metric samples processes.Add(1) go func() { @@ -300,8 +330,10 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu } func (e *Engine) setRunStatus(status lib.RunStatus) { - for _, c := range e.Collectors { - c.SetRunStatus(status) + for _, out := range e.outputs { + if statUpdOut, ok := out.(output.WithRunStatusUpdates); ok { + statUpdOut.SetRunStatus(status) + } } } @@ -443,7 +475,7 @@ func (e *Engine) processSamples(sampleContainers []stats.SampleContainer) { e.processSamplesForMetrics(sampleContainers) } - for _, collector := range e.Collectors { - collector.Collect(sampleContainers) + for _, out := range e.outputs { + out.AddMetricSamples(sampleContainers) } } diff --git a/core/engine_test.go b/core/engine_test.go index a6765b459cb..157eced789c 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -41,17 +41,18 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/testutils/minirunner" + "github.com/loadimpact/k6/lib/testutils/mockoutput" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/dummy" ) const isWindows = runtime.GOOS == "windows" // Wrapper around NewEngine that applies a logger and manages the options. func newTestEngine( //nolint:golint - t *testing.T, runCtx context.Context, runner lib.Runner, collectors []lib.Collector, opts lib.Options, + t *testing.T, runCtx context.Context, runner lib.Runner, outputs []output.Output, opts lib.Options, ) (engine *Engine, run func() error, wait func()) { if runner == nil { runner = &minirunner.MiniRunner{} @@ -76,11 +77,9 @@ func newTestEngine( //nolint:golint execScheduler, err := local.NewExecutionScheduler(runner, logger) require.NoError(t, err) - engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, logger) + engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, outputs, logger) require.NoError(t, err) - engine.Collectors = collectors - run, waitFn, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) @@ -142,10 +141,9 @@ func TestEngineRun(t *testing.T) { return nil }} - c := &dummy.Collector{} - + mockOutput := mockoutput.New() ctx, cancel := context.WithCancel(context.Background()) - _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{ + _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -158,7 +156,7 @@ func TestEngineRun(t *testing.T) { wait() found := 0 - for _, s := range c.Samples { + for _, s := range mockOutput.Samples { if s.Metric != testMetric { continue } @@ -197,7 +195,7 @@ func TestEngineStopped(t *testing.T) { e.Stop() // test that a second stop doesn't panic } -func TestEngineCollector(t *testing.T) { +func TestEngineOutput(t *testing.T) { testMetric := stats.New("test_metric", stats.Trend) runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { @@ -205,8 +203,8 @@ func TestEngineCollector(t *testing.T) { return nil }} - c := &dummy.Collector{} - e, run, wait := newTestEngine(t, nil, runner, []lib.Collector{c}, lib.Options{ + mockOutput := mockoutput.New() + e, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -215,7 +213,7 @@ func TestEngineCollector(t *testing.T) { wait() cSamples := []stats.Sample{} - for _, sample := range c.Samples { + for _, sample := range mockOutput.Samples { if sample.Metric == testMetric { cSamples = append(cSamples, sample) } @@ -224,9 +222,9 @@ func TestEngineCollector(t *testing.T) { if assert.NotNil(t, metric) { sink := metric.Sink.(*stats.TrendSink) if assert.NotNil(t, sink) { - numCollectorSamples := len(cSamples) + numOutputSamples := len(cSamples) numEngineSamples := len(sink.Values) - assert.Equal(t, numEngineSamples, numCollectorSamples) + assert.Equal(t, numEngineSamples, numOutputSamples) } } } @@ -361,8 +359,8 @@ func TestEngine_processThresholds(t *testing.T) { } } -func getMetricSum(collector *dummy.Collector, name string) (result float64) { - for _, sc := range collector.SampleContainers { +func getMetricSum(mo *mockoutput.MockOutput, name string) (result float64) { + for _, sc := range mo.SampleContainers { for _, s := range sc.GetSamples() { if s.Metric.Name == name { result += s.Value @@ -372,8 +370,8 @@ func getMetricSum(collector *dummy.Collector, name string) (result float64) { return } -func getMetricCount(collector *dummy.Collector, name string) (result uint) { - for _, sc := range collector.SampleContainers { +func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) { + for _, sc := range mo.SampleContainers { for _, s := range sc.GetSamples() { if s.Metric.Name == name { result++ @@ -448,8 +446,8 @@ func TestSentReceivedMetrics(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{ Iterations: null.IntFrom(tc.Iterations), VUs: null.IntFrom(tc.VUs), Hosts: tb.Dialer.Hosts, @@ -470,7 +468,7 @@ func TestSentReceivedMetrics(t *testing.T) { wait() checkData := func(name string, expected int64) float64 { - data := getMetricSum(collector, name) + data := getMetricSum(mockOutput, name) expectedDataMin := float64(expected * tc.Iterations) expectedDataMax := float64((expected + ts.NumRequests*expectedHeaderMaxLength) * tc.Iterations) @@ -582,8 +580,8 @@ func TestRunTags(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{ Iterations: null.IntFrom(3), VUs: null.IntFrom(2), Hosts: tb.Dialer.Hosts, @@ -617,7 +615,7 @@ func TestRunTags(t *testing.T) { return "the rainbow" } - for _, s := range collector.Samples { + for _, s := range mockOutput.Samples { for key, expVal := range runTagsMap { val, ok := s.Tags.Get(key) @@ -738,8 +736,8 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, lib.Options{}) + mockOutput := mockoutput.New() + engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{}) errC := make(chan error) go func() { errC <- run() }() @@ -756,11 +754,11 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // The 3.1 sleep in the default function would cause the first VU to complete 2 full iterations // and stat executing its third one, while the second VU will only fully complete 1 iteration // and will be canceled in the middle of its second one. - assert.Equal(t, 3.0, getMetricSum(collector, metrics.Iterations.Name)) + assert.Equal(t, 3.0, getMetricSum(mockOutput, metrics.Iterations.Name)) // That means that we expect to see 8 HTTP requests in total, 3*2=6 from the complete iterations // and one each from the two iterations that would be canceled in the middle of their execution - assert.Equal(t, 8.0, getMetricSum(collector, metrics.HTTPReqs.Name)) + assert.Equal(t, 8.0, getMetricSum(mockOutput, metrics.HTTPReqs.Name)) // And we expect to see the data_received for all 8 of those requests. Previously, the data for // the 8th request (the 3rd one in the first VU before the test ends) was cut off by the engine @@ -769,7 +767,7 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // it was interrupted. dataReceivedExpectedMin := 15000.0 * 8 dataReceivedExpectedMax := (15000.0 + expectedHeaderMaxLength) * 8 - dataReceivedActual := getMetricSum(collector, metrics.DataReceived.Name) + dataReceivedActual := getMetricSum(mockOutput, metrics.DataReceived.Name) if dataReceivedActual < dataReceivedExpectedMin || dataReceivedActual > dataReceivedExpectedMax { t.Errorf( "The data_received sum should be in the interval [%f, %f] but was %f", @@ -779,9 +777,9 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { // Also, the interrupted iterations shouldn't affect the average iteration_duration in any way, only // complete iterations should be taken into account - durationCount := float64(getMetricCount(collector, metrics.IterationDuration.Name)) + durationCount := float64(getMetricCount(mockOutput, metrics.IterationDuration.Name)) assert.Equal(t, 3.0, durationCount) - durationSum := getMetricSum(collector, metrics.IterationDuration.Name) + durationSum := getMetricSum(mockOutput, metrics.IterationDuration.Name) assert.InDelta(t, 3.35, durationSum/(1000*durationCount), 0.25) } @@ -843,8 +841,8 @@ func TestMetricsEmission(t *testing.T) { ) require.NoError(t, err) - collector := &dummy.Collector{} - engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, runner.GetOptions()) + mockOutput := mockoutput.New() + engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, runner.GetOptions()) errC := make(chan error) go func() { errC <- run() }() @@ -858,8 +856,8 @@ func TestMetricsEmission(t *testing.T) { require.False(t, engine.IsTainted()) } - assert.Equal(t, tc.expIters, getMetricSum(collector, metrics.Iterations.Name)) - assert.Equal(t, tc.expCount, getMetricSum(collector, "testcounter")) + assert.Equal(t, tc.expIters, getMetricSum(mockOutput, metrics.Iterations.Name)) + assert.Equal(t, tc.expCount, getMetricSum(mockOutput, "testcounter")) }) } } @@ -961,8 +959,8 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { }, } - c := &dummy.Collector{} - _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{ + mockOutput := mockoutput.New() + _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -970,7 +968,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { wait() var count float64 - for _, sample := range c.Samples { + for _, sample := range mockOutput.Samples { if sample.Metric == testMetric { count += sample.Value } diff --git a/js/runner_test.go b/js/runner_test.go index af9fca2275e..910196b39a0 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -58,10 +58,11 @@ import ( "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" + "github.com/loadimpact/k6/lib/testutils/mockoutput" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/loader" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/dummy" ) func TestRunnerNew(t *testing.T) { @@ -295,15 +296,17 @@ func TestSetupDataIsolation(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(runner, testutils.NewLogger(t)) require.NoError(t, err) - engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, testutils.NewLogger(t)) + + mockOutput := mockoutput.New() + engine, err := core.NewEngine( + execScheduler, options, lib.RuntimeOptions{}, []output.Output{mockOutput}, testutils.NewLogger(t), + ) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) run, wait, err := engine.Init(ctx, ctx) require.NoError(t, err) - collector := &dummy.Collector{} - engine.Collectors = []lib.Collector{collector} require.Empty(t, runner.defaultGroup.Groups) errC := make(chan error) @@ -322,7 +325,7 @@ func TestSetupDataIsolation(t *testing.T) { require.Contains(t, runner.defaultGroup.Groups, "setup") require.Contains(t, runner.defaultGroup.Groups, "teardown") var count int - for _, s := range collector.Samples { + for _, s := range mockOutput.Samples { if s.Metric.Name == "mycounter" { count += int(s.Value) } diff --git a/lib/collector.go b/lib/collector.go index d3700ad0d11..408f517002e 100644 --- a/lib/collector.go +++ b/lib/collector.go @@ -26,6 +26,8 @@ import ( "github.com/loadimpact/k6/stats" ) +// TODO: move to some other package - types? models? + // RunStatus values can be used by k6 to denote how a script run ends // and by the cloud executor and collector so that k6 knows the current // status of a particular script run. @@ -46,6 +48,8 @@ const ( RunStatusAbortedThreshold RunStatus = 8 ) +// TODO: remove + // A Collector abstracts the process of funneling samples to an external storage backend, // such as an InfluxDB instance. type Collector interface { diff --git a/lib/testutils/mockoutput/mockoutput.go b/lib/testutils/mockoutput/mockoutput.go new file mode 100644 index 00000000000..b03273f23bd --- /dev/null +++ b/lib/testutils/mockoutput/mockoutput.go @@ -0,0 +1,83 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package mockoutput + +import ( + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" +) + +// New exists so that the usage from tests avoids repetition, i.e. is +// mockoutput.New() instead of &mockoutput.MockOutput{} +func New() *MockOutput { + return &MockOutput{} +} + +// MockOutput can be used in tests to mock an actual output. +type MockOutput struct { + SampleContainers []stats.SampleContainer + Samples []stats.Sample + RunStatus lib.RunStatus + + DescFn func() string + StartFn func() error + StopFn func() error +} + +var _ output.WithRunStatusUpdates = &MockOutput{} + +// AddMetricSamples just saves the results in memory. +func (mo *MockOutput) AddMetricSamples(scs []stats.SampleContainer) { + mo.SampleContainers = append(mo.SampleContainers, scs...) + for _, sc := range scs { + mo.Samples = append(mo.Samples, sc.GetSamples()...) + } +} + +// SetRunStatus updates the RunStatus property. +func (mo *MockOutput) SetRunStatus(latestStatus lib.RunStatus) { + mo.RunStatus = latestStatus +} + +// Description calls the supplied DescFn callback, if available. +func (mo *MockOutput) Description() string { + if mo.DescFn != nil { + return mo.DescFn() + } + return "mock" +} + +// Start calls the supplied StartFn callback, if available. +func (mo *MockOutput) Start() error { + if mo.StartFn != nil { + return mo.StartFn() + } + return nil +} + +// Stop calls the supplied StopFn callback, if available. +func (mo *MockOutput) Stop() error { + if mo.StopFn != nil { + return mo.StopFn() + } + return nil +} diff --git a/output/extensions.go b/output/extensions.go new file mode 100644 index 00000000000..d136b136234 --- /dev/null +++ b/output/extensions.go @@ -0,0 +1,55 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "fmt" + "sync" +) + +//nolint:gochecknoglobals +var ( + extensions = make(map[string]func(Params) (Output, error)) + mx sync.RWMutex +) + +// GetExtensions returns all registered extensions. +func GetExtensions() map[string]func(Params) (Output, error) { + mx.RLock() + defer mx.RUnlock() + res := make(map[string]func(Params) (Output, error), len(extensions)) + for k, v := range extensions { + res[k] = v + } + return res +} + +// RegisterExtension registers the given output extension constructor. This +// function panics if a module with the same name is already registered. +func RegisterExtension(name string, mod func(Params) (Output, error)) { + mx.Lock() + defer mx.Unlock() + + if _, ok := extensions[name]; ok { + panic(fmt.Sprintf("output extension already registered: %s", name)) + } + extensions[name] = mod +} diff --git a/output/helpers.go b/output/helpers.go new file mode 100644 index 00000000000..647ccd76f0f --- /dev/null +++ b/output/helpers.go @@ -0,0 +1,125 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "fmt" + "sync" + "time" + + "github.com/loadimpact/k6/stats" +) + +// SampleBuffer is a simple thread-safe buffer for metric samples. It should be +// used by most outputs, since we generally want to flush metric samples to the +// remote service asynchronously. We want to do it only every several seconds, +// and we don't want to block the Engine in the meantime. +type SampleBuffer struct { + sync.Mutex + buffer []stats.SampleContainer + maxLen int +} + +// AddMetricSamples adds the given metric samples to the internal buffer. +func (sc *SampleBuffer) AddMetricSamples(samples []stats.SampleContainer) { + if len(samples) == 0 { + return + } + sc.Lock() + sc.buffer = append(sc.buffer, samples...) + sc.Unlock() +} + +// GetBufferedSamples returns the currently buffered metric samples and makes a +// new internal buffer with some hopefully realistic size. If the internal +// buffer is empty, it will return nil. +func (sc *SampleBuffer) GetBufferedSamples() []stats.SampleContainer { + sc.Lock() + defer sc.Unlock() + + buffered, bufferedLen := sc.buffer, len(sc.buffer) + if bufferedLen == 0 { + return nil + } + if bufferedLen > sc.maxLen { + sc.maxLen = bufferedLen + } + // Make the new buffer halfway between the previously allocated size and the + // maximum buffer size we've seen so far, to hopefully reduce copying a bit. + sc.buffer = make([]stats.SampleContainer, 0, (bufferedLen+sc.maxLen)/2) + + return buffered +} + +// PeriodicFlusher is a small helper for asynchronously flushing buffered metric +// samples on regular intervals. The biggest benefit is having a Stop() method +// that waits for one last flush before it returns. +type PeriodicFlusher struct { + period time.Duration + flushCallback func() + stop chan struct{} + stopped chan struct{} + once *sync.Once +} + +func (pf *PeriodicFlusher) run() { + ticker := time.NewTicker(pf.period) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pf.flushCallback() + case <-pf.stop: + pf.flushCallback() + close(pf.stopped) + return + } + } +} + +// Stop waits for the periodic flusher flush one last time and exit. You can +// safely call Stop() multiple times from different goroutines, you just can't +// call it from inside of the flushing function. +func (pf *PeriodicFlusher) Stop() { + pf.once.Do(func() { + close(pf.stop) + }) + <-pf.stopped +} + +// NewPeriodicFlusher creates a new PeriodicFlusher and starts its goroutine. +func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFlusher, error) { + if period <= 0 { + return nil, fmt.Errorf("metric flush period should be positive but was %s", period) + } + + pf := &PeriodicFlusher{ + period: period, + flushCallback: flushCallback, + stop: make(chan struct{}), + stopped: make(chan struct{}), + once: &sync.Once{}, + } + + go pf.run() + + return pf, nil +} diff --git a/output/helpers_test.go b/output/helpers_test.go new file mode 100644 index 00000000000..61e66f9a268 --- /dev/null +++ b/output/helpers_test.go @@ -0,0 +1,183 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/loadimpact/k6/stats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSampleBufferBasics(t *testing.T) { + t.Parallel() + single := stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Rate), + Value: float64(123), + Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}), + } + connected := stats.ConnectedSamples{Samples: []stats.Sample{single, single}, Time: single.Time} + buffer := SampleBuffer{} + + assert.Empty(t, buffer.GetBufferedSamples()) + buffer.AddMetricSamples([]stats.SampleContainer{single, single}) + buffer.AddMetricSamples([]stats.SampleContainer{single, connected, single}) + assert.Equal(t, []stats.SampleContainer{single, single, single, connected, single}, buffer.GetBufferedSamples()) + assert.Empty(t, buffer.GetBufferedSamples()) + + // Verify some internals + assert.Equal(t, cap(buffer.buffer), 5) + buffer.AddMetricSamples([]stats.SampleContainer{single, connected}) + buffer.AddMetricSamples(nil) + buffer.AddMetricSamples([]stats.SampleContainer{}) + buffer.AddMetricSamples([]stats.SampleContainer{single}) + assert.Equal(t, []stats.SampleContainer{single, connected, single}, buffer.GetBufferedSamples()) + assert.Equal(t, cap(buffer.buffer), 4) + buffer.AddMetricSamples([]stats.SampleContainer{single}) + assert.Equal(t, []stats.SampleContainer{single}, buffer.GetBufferedSamples()) + assert.Equal(t, cap(buffer.buffer), 3) + assert.Empty(t, buffer.GetBufferedSamples()) +} + +func TestSampleBufferConcurrently(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + t.Logf("Random source seeded with %d\n", seed) + + producersCount := 50 + r.Intn(50) + sampleCount := 10 + r.Intn(10) + sleepModifier := 10 + r.Intn(10) + buffer := SampleBuffer{} + + wg := make(chan struct{}) + fillBuffer := func() { + for i := 0; i < sampleCount; i++ { + buffer.AddMetricSamples([]stats.SampleContainer{stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: float64(i), + Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}), + }}) + time.Sleep(time.Duration(i*sleepModifier) * time.Microsecond) + } + wg <- struct{}{} + } + for i := 0; i < producersCount; i++ { + go fillBuffer() + } + + timer := time.NewTicker(5 * time.Millisecond) + timeout := time.After(5 * time.Second) + defer timer.Stop() + readSamples := make([]stats.SampleContainer, 0, sampleCount*producersCount) + finishedProducers := 0 +loop: + for { + select { + case <-timer.C: + readSamples = append(readSamples, buffer.GetBufferedSamples()...) + case <-wg: + finishedProducers++ + if finishedProducers == producersCount { + readSamples = append(readSamples, buffer.GetBufferedSamples()...) + break loop + } + case <-timeout: + t.Fatalf("test timed out") + } + } + assert.Equal(t, sampleCount*producersCount, len(readSamples)) + for _, s := range readSamples { + require.NotNil(t, s) + ss := s.GetSamples() + require.Len(t, ss, 1) + assert.Equal(t, "my_metric", ss[0].Metric.Name) + } +} + +func TestPeriodicFlusherBasics(t *testing.T) { + t.Parallel() + + f, err := NewPeriodicFlusher(-1*time.Second, func() {}) + assert.Error(t, err) + assert.Nil(t, f) + f, err = NewPeriodicFlusher(0, func() {}) + assert.Error(t, err) + assert.Nil(t, f) + + count := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + f, err = NewPeriodicFlusher(100*time.Millisecond, func() { + count++ + if count == 2 { + wg.Done() + } + }) + assert.NotNil(t, f) + assert.Nil(t, err) + wg.Wait() + f.Stop() + assert.Equal(t, 3, count) +} + +func TestPeriodicFlusherConcurrency(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + randStops := 10 + r.Intn(10) + t.Logf("Random source seeded with %d\n", seed) + + count := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + f, err := NewPeriodicFlusher(1000*time.Microsecond, func() { + // Sleep intentionally may be longer than the flush period. Also, this + // should never happen concurrently, so it's intentionally not locked. + time.Sleep(time.Duration(700+r.Intn(1000)) * time.Microsecond) + count++ + if count == 100 { + wg.Done() + } + }) + assert.NotNil(t, f) + assert.Nil(t, err) + wg.Wait() + + stopWG := &sync.WaitGroup{} + stopWG.Add(randStops) + for i := 0; i < randStops; i++ { + go func() { + f.Stop() + stopWG.Done() + }() + } + stopWG.Wait() + assert.True(t, count >= 101) // due to the short intervals, we might not get exactly 101 +} diff --git a/output/json/json.go b/output/json/json.go new file mode 100644 index 00000000000..4e1c74d440b --- /dev/null +++ b/output/json/json.go @@ -0,0 +1,153 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package json + +import ( + "compress/gzip" + stdlibjson "encoding/json" + "fmt" + "strings" + "time" + + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" +) + +// TODO: add option for emitting proper JSON files (https://github.com/loadimpact/k6/issues/1052) +const flushPeriod = 200 * time.Millisecond // TODO: make this configurable + +// Output funnels all passed metrics to an (optionally gzipped) JSON file. +type Output struct { + output.SampleBuffer + + params output.Params + periodicFlusher *output.PeriodicFlusher + + logger logrus.FieldLogger + filename string + encoder *stdlibjson.Encoder + closeFn func() error + seenMetrics map[string]struct{} +} + +// New returns a new JSON output. +func New(params output.Params) (output.Output, error) { + return &Output{ + params: params, + filename: params.ConfigArgument, + logger: params.Logger.WithFields(logrus.Fields{ + "output": "json", + "filename": params.ConfigArgument, + }), + seenMetrics: make(map[string]struct{}), + }, nil +} + +// Description returns a human-readable description of the output. +func (o *Output) Description() string { + if o.filename == "" || o.filename == "-" { + return "json(stdout)" + } + return fmt.Sprintf("json (%s)", o.filename) +} + +// Start opens the tries to specified JSON file and starts the goroutine for +// metric flushing. +func (o *Output) Start() error { + o.logger.Debug("Starting...") + + if o.filename == "" || o.filename == "-" { + o.encoder = stdlibjson.NewEncoder(o.params.StdOut) + o.closeFn = func() error { + return nil + } + } else { + logfile, err := o.params.FS.Create(o.filename) + if err != nil { + return err + } + + if strings.HasSuffix(o.filename, ".gz") { + outfile := gzip.NewWriter(logfile) + + o.closeFn = func() error { + _ = outfile.Close() + return logfile.Close() + } + o.encoder = stdlibjson.NewEncoder(outfile) + } else { + o.closeFn = logfile.Close + o.encoder = stdlibjson.NewEncoder(logfile) + } + } + + pf, err := output.NewPeriodicFlusher(flushPeriod, o.flushMetrics) + if err != nil { + return err + } + o.logger.Debug("Started!") + o.periodicFlusher = pf + + return nil +} + +// Stop flushes any remaining metrics and stops the goroutine. +func (o *Output) Stop() error { + o.logger.Debug("Stopping...") + defer o.logger.Debug("Stopped!") + o.periodicFlusher.Stop() + return o.closeFn() +} + +func (o *Output) flushMetrics() { + samples := o.GetBufferedSamples() + start := time.Now() + var count int + for _, sc := range samples { + samples := sc.GetSamples() + count += len(samples) + for _, sample := range samples { + sample := sample + o.handleMetric(sample.Metric) + err := o.encoder.Encode(WrapSample(sample)) + if err != nil { + // Skip metric if it can't be made into JSON or envelope is null. + o.logger.WithError(err).Error("Sample couldn't be marshalled to JSON") + } + } + } + if count > 0 { + o.logger.WithField("t", time.Since(start)).WithField("count", count).Debug("Wrote metrics to JSON") + } +} + +func (o *Output) handleMetric(m *stats.Metric) { + if _, ok := o.seenMetrics[m.Name]; ok { + return + } + o.seenMetrics[m.Name] = struct{}{} + + err := o.encoder.Encode(wrapMetric(m)) + if err != nil { + o.logger.WithError(err).Error("Metric couldn't be marshalled to JSON") + } +} diff --git a/output/json/json_test.go b/output/json/json_test.go new file mode 100644 index 00000000000..da104f95261 --- /dev/null +++ b/output/json/json_test.go @@ -0,0 +1,187 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package json + +import ( + "bufio" + "bytes" + "compress/gzip" + "io" + "testing" + "time" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/loadimpact/k6/lib/testutils" + "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/stats" +) + +func getValidator(t *testing.T, expected []string) func(io.Reader) { + return func(rawJSONLines io.Reader) { + s := bufio.NewScanner(rawJSONLines) + i := 0 + for s.Scan() { + i++ + if i > len(expected) { + t.Errorf("Read unexpected line number %d, expected only %d entries", i, len(expected)) + continue + } + assert.JSONEq(t, expected[i-1], string(s.Bytes())) + } + assert.NoError(t, s.Err()) + assert.Equal(t, len(expected), i) + } +} + +func generateTestMetricSamples(t *testing.T) ([]stats.SampleContainer, func(io.Reader)) { + metric1 := stats.New("my_metric1", stats.Gauge) + metric2 := stats.New("my_metric2", stats.Counter, stats.Data) + time1 := time.Date(2021, time.February, 24, 13, 37, 10, 0, time.UTC) + time2 := time1.Add(10 * time.Second) + time3 := time2.Add(10 * time.Second) + connTags := stats.NewSampleTags(map[string]string{"key": "val"}) + + samples := []stats.SampleContainer{ + stats.Sample{Time: time1, Metric: metric1, Value: float64(1), Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"})}, + stats.Sample{Time: time1, Metric: metric1, Value: float64(2), Tags: stats.NewSampleTags(map[string]string{"tag2": "val2"})}, + stats.ConnectedSamples{Samples: []stats.Sample{ + {Time: time2, Metric: metric2, Value: float64(3), Tags: connTags}, + {Time: time2, Metric: metric1, Value: float64(4), Tags: connTags}, + }, Time: time2, Tags: connTags}, + stats.Sample{Time: time3, Metric: metric2, Value: float64(5), Tags: stats.NewSampleTags(map[string]string{"tag3": "val3"})}, + } + // TODO: fix JSON thresholds (https://github.com/loadimpact/k6/issues/1052) + expected := []string{ + `{"type":"Metric","data":{"name":"my_metric1","type":"gauge","contains":"default","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":1,"tags":{"tag1":"val1"}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":2,"tags":{"tag2":"val2"}},"metric":"my_metric1"}`, + `{"type":"Metric","data":{"name":"my_metric2","type":"counter","contains":"data","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric2"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":3,"tags":{"key":"val"}},"metric":"my_metric2"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":4,"tags":{"key":"val"}},"metric":"my_metric1"}`, + `{"type":"Point","data":{"time":"2021-02-24T13:37:30Z","value":5,"tags":{"tag3":"val3"}},"metric":"my_metric2"}`, + } + + return samples, getValidator(t, expected) +} + +func TestJsonOutputStdout(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + validateResults(stdout) +} + +func TestJsonOutputFileError(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewReadOnlyFs(afero.NewMemMapFs()) + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output", + }) + require.NoError(t, err) + assert.Error(t, out.Start()) +} + +func TestJsonOutputFile(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewMemMapFs() + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output", + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + + assert.Empty(t, stdout.Bytes()) + file, err := fs.Open("/json-output") + require.NoError(t, err) + validateResults(file) + assert.NoError(t, file.Close()) +} + +func TestJsonOutputFileGzipped(t *testing.T) { + t.Parallel() + + stdout := new(bytes.Buffer) + fs := afero.NewMemMapFs() + out, err := New(output.Params{ + Logger: testutils.NewLogger(t), + StdOut: stdout, + FS: fs, + ConfigArgument: "/json-output.gz", + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + + samples, validateResults := generateTestMetricSamples(t) + out.AddMetricSamples(samples[:2]) + out.AddMetricSamples(samples[2:]) + require.NoError(t, out.Stop()) + + assert.Empty(t, stdout.Bytes()) + file, err := fs.Open("/json-output.gz") + require.NoError(t, err) + reader, err := gzip.NewReader(file) + require.NoError(t, err) + validateResults(reader) + assert.NoError(t, file.Close()) +} + +func TestWrapSampleWithSamplePointer(t *testing.T) { + t.Parallel() + out := WrapSample(stats.Sample{ + Metric: &stats.Metric{}, + }) + assert.NotEqual(t, out, (*Envelope)(nil)) +} + +func TestWrapMetricWithMetricPointer(t *testing.T) { + t.Parallel() + out := wrapMetric(&stats.Metric{}) + assert.NotEqual(t, out, (*Envelope)(nil)) +} diff --git a/stats/json/wrapper.go b/output/json/wrapper.go similarity index 72% rename from stats/json/wrapper.go rename to output/json/wrapper.go index d0dc77b8056..9885386de1e 100644 --- a/stats/json/wrapper.go +++ b/output/json/wrapper.go @@ -26,38 +26,40 @@ import ( "github.com/loadimpact/k6/stats" ) +// Envelope is the data format we use to export both metrics and metric samples +// to the JSON file. type Envelope struct { Type string `json:"type"` Data interface{} `json:"data"` Metric string `json:"metric,omitempty"` } -type JSONSample struct { +// Sample is the data format for metric sample data in the JSON file. +type Sample struct { Time time.Time `json:"time"` Value float64 `json:"value"` Tags *stats.SampleTags `json:"tags"` } -func NewJSONSample(sample *stats.Sample) *JSONSample { - return &JSONSample{ +func newJSONSample(sample stats.Sample) Sample { + return Sample{ Time: sample.Time, Value: sample.Value, Tags: sample.Tags, } } -func WrapSample(sample *stats.Sample) *Envelope { - if sample == nil { - return nil - } - return &Envelope{ +// WrapSample is used to package a metric sample in a way that's nice to export +// to JSON. +func WrapSample(sample stats.Sample) Envelope { + return Envelope{ Type: "Point", Metric: sample.Metric.Name, - Data: NewJSONSample(sample), + Data: newJSONSample(sample), } } -func WrapMetric(metric *stats.Metric) *Envelope { +func wrapMetric(metric *stats.Metric) *Envelope { if metric == nil { return nil } diff --git a/output/types.go b/output/types.go new file mode 100644 index 00000000000..efac5014aeb --- /dev/null +++ b/output/types.go @@ -0,0 +1,90 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package output + +import ( + "encoding/json" + "io" + "net/url" + + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" + "github.com/spf13/afero" +) + +// Params contains all possible constructor parameters an output may need. +type Params struct { + OutputType string // --out $OutputType=$ConfigArgument, K6_OUT="$OutputType=$ConfigArgument" + ConfigArgument string + JSONConfig json.RawMessage + + Logger logrus.FieldLogger + Environment map[string]string + StdOut io.Writer + StdErr io.Writer + FS afero.Fs + ScriptPath *url.URL + ScriptOptions lib.Options + RuntimeOptions lib.RuntimeOptions + ExecutionPlan []lib.ExecutionStep +} + +// An Output abstracts the process of funneling samples to an external storage +// backend, such as a file or something like an InfluxDB instance. +// +// N.B: All outputs should have non-blocking AddMetricSamples() methods and +// should spawn their own goroutine to flush metrics asynchronously. +type Output interface { + // Returns a human-readable description of the output that will be shown in + // `k6 run`. For extensions it probably should include the version as well. + Description() string + + // Start is called before the Engine tries to use the output and should be + // used for any long initialization tasks, as well as for starting a + // goroutine to asynchronously flush metrics to the output. + Start() error + + // A method to receive the latest metric samples from the Engine. This + // method is never called concurrently, so do not do anything blocking here + // that might take a long time. Preferably, just use the SampleBuffer or + // something like it to buffer metrics until they are flushed. + AddMetricSamples(samples []stats.SampleContainer) + + // Flush all remaining metrics and finalize the test run. + Stop() error +} + +// WithThresholds is an output that requires the Engine to give it the +// thresholds before it can be started. +type WithThresholds interface { + Output + SetThresholds(map[string]stats.Thresholds) +} + +// TODO: add some way for outputs to report mid-test errors and potentially +// abort the whole test run + +// WithRunStatusUpdates means the output can receivetest run status updates. +type WithRunStatusUpdates interface { + Output + SetRunStatus(latestStatus lib.RunStatus) +} diff --git a/stats/cloud/bench_test.go b/stats/cloud/bench_test.go index 5473fa51191..27a674feb46 100644 --- a/stats/cloud/bench_test.go +++ b/stats/cloud/bench_test.go @@ -42,16 +42,10 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) func BenchmarkAggregateHTTP(b *testing.B) { - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -61,7 +55,7 @@ func BenchmarkAggregateHTTP(b *testing.B) { AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), }) - collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) now := time.Now() collector.referenceID = "something" @@ -295,11 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags } func BenchmarkHTTPPush(b *testing.B) { - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -323,7 +312,7 @@ func BenchmarkHTTPPush(b *testing.B) { AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), }) - collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) collector.referenceID = "fake" diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index 7b41a37e901..a36889a2195 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -44,7 +44,6 @@ import ( "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" "github.com/loadimpact/k6/lib/netext/httpext" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) @@ -91,7 +90,7 @@ var _ lib.Collector = &Collector{} // New creates a new cloud collector func New( logger logrus.FieldLogger, - conf cloudapi.Config, src *loader.SourceData, opts lib.Options, executionPlan []lib.ExecutionStep, version string, + conf cloudapi.Config, scriptURL fmt.Stringer, opts lib.Options, executionPlan []lib.ExecutionStep, version string, ) (*Collector, error) { if err := cloudapi.MergeFromExternal(opts.External, &conf); err != nil { return nil, err @@ -102,7 +101,7 @@ func New( } if !conf.Name.Valid || conf.Name.String == "" { - conf.Name = null.StringFrom(filepath.Base(src.URL.String())) + conf.Name = null.StringFrom(filepath.Base(scriptURL.String())) } if conf.Name.String == "-" { conf.Name = null.StringFrom(TestName) diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go index 44c57eb27ea..343e2138798 100644 --- a/stats/cloud/collector_test.go +++ b/stats/cloud/collector_test.go @@ -50,7 +50,6 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/stats" ) @@ -177,11 +176,6 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -190,7 +184,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -335,11 +329,6 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -348,7 +337,7 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -427,11 +416,6 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -441,7 +425,7 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { NoCompress: null.BoolFrom(true), MaxMetricSamplesPerPackage: null.IntFrom(50), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -548,11 +532,6 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -561,7 +540,7 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -607,11 +586,6 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { })) defer tb.Cleanup() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } - options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -620,7 +594,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { Host: null.StringFrom(tb.ServerHTTP.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "path/to/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) gotIterations := false @@ -730,10 +704,7 @@ func TestNewName(t *testing.T) { testCase := testCase t.Run(testCase.url.String(), func(t *testing.T) { - script := &loader.SourceData{ - URL: testCase.url, - } - collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), script, lib.Options{ + collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), testCase.url, lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), }, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) @@ -766,10 +737,6 @@ func TestPublishMetric(t *testing.T) { })) defer server.Close() - script := &loader.SourceData{ - Data: []byte(""), - URL: &url.URL{Path: "/script.js"}, - } options := lib.Options{ Duration: types.NullDurationFrom(1 * time.Second), } @@ -777,7 +744,7 @@ func TestPublishMetric(t *testing.T) { Host: null.StringFrom(server.URL), NoCompress: null.BoolFrom(true), }) - collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0") + collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) samples := []*Sample{ diff --git a/stats/csv/config.go b/stats/csv/config.go index 95914d2a749..f75f8b1bd45 100644 --- a/stats/csv/config.go +++ b/stats/csv/config.go @@ -21,12 +21,14 @@ package csv import ( + "encoding/json" "fmt" "strings" "time" "gopkg.in/guregu/null.v3" + "github.com/kelseyhightower/envconfig" "github.com/loadimpact/k6/lib/types" ) @@ -87,3 +89,33 @@ func ParseArg(arg string) (Config, error) { return c, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + arg config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if arg != "" { + urlConf, err := ParseArg(arg) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/datadog/collector.go b/stats/datadog/collector.go index 415adff68ee..24586278888 100644 --- a/stats/datadog/collector.go +++ b/stats/datadog/collector.go @@ -21,8 +21,10 @@ package datadog import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" @@ -115,3 +117,25 @@ func New(logger logrus.FieldLogger, conf Config) (*common.Collector, error) { Logger: logger, }, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + return result, nil +} diff --git a/stats/dummy/collector.go b/stats/dummy/collector.go deleted file mode 100644 index fa147c699ab..00000000000 --- a/stats/dummy/collector.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package dummy - -import ( - "context" - - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/stats" -) - -// Collector implements the lib.Collector interface and should be used only for testing -type Collector struct { - RunStatus lib.RunStatus - - SampleContainers []stats.SampleContainer - Samples []stats.Sample -} - -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} - -// Init does nothing, it's only included to satisfy the lib.Collector interface -func (c *Collector) Init() error { return nil } - -// MakeConfig does nothing, it's only included to satisfy the lib.Collector interface -func (c *Collector) MakeConfig() interface{} { return nil } - -// Run just blocks until the context is done -func (c *Collector) Run(ctx context.Context) { - <-ctx.Done() -} - -// Collect just appends all of the samples passed to it to the internal sample slice. -// According to the the lib.Collector interface, it should never be called concurrently, -// so there's no locking on purpose - that way Go's race condition detector can actually -// detect incorrect usage. -// Also, theoretically the collector doesn't have to actually Run() before samples start -// being collected, it only has to be initialized. -func (c *Collector) Collect(scs []stats.SampleContainer) { - for _, sc := range scs { - c.SampleContainers = append(c.SampleContainers, sc) - c.Samples = append(c.Samples, sc.GetSamples()...) - } -} - -// Link returns a dummy string, it's only included to satisfy the lib.Collector interface -func (c *Collector) Link() string { - return "http://example.com/" -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus just saves the passed status for later inspection -func (c *Collector) SetRunStatus(status lib.RunStatus) { - c.RunStatus = status -} diff --git a/stats/dummy/collector_test.go b/stats/dummy/collector_test.go deleted file mode 100644 index 148f1b43988..00000000000 --- a/stats/dummy/collector_test.go +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package dummy - -import ( - "context" - "sync" - "testing" - - "github.com/loadimpact/k6/stats" - "github.com/stretchr/testify/assert" -) - -func TestCollectorRun(t *testing.T) { - var wg sync.WaitGroup - c := &Collector{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - defer wg.Done() - c.Run(ctx) - }() - - c.SetRunStatus(1) - - cancel() - wg.Wait() -} - -func TestCollectorCollect(t *testing.T) { - c := &Collector{} - c.Collect([]stats.SampleContainer{stats.Sample{}}) - assert.Len(t, c.Samples, 1) -} diff --git a/stats/influxdb/collector_test.go b/stats/influxdb/collector_test.go index 8fc7231bd0b..a9ce7f294f9 100644 --- a/stats/influxdb/collector_test.go +++ b/stats/influxdb/collector_test.go @@ -42,21 +42,21 @@ func TestBadConcurrentWrites(t *testing.T) { logger := testutils.NewLogger(t) t.Run("0", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(0) - _, err := New(logger, *c) + _, err := New(logger, c) require.Error(t, err) require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") }) t.Run("-2", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(-2) - _, err := New(logger, *c) + _, err := New(logger, c) require.Error(t, err) require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") }) t.Run("2", func(t *testing.T) { c.ConcurrentWrites = null.IntFrom(2) - _, err := New(logger, *c) + _, err := New(logger, c) require.NoError(t, err) }) } @@ -83,7 +83,7 @@ func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testin config := NewConfig() config.Addr = null.StringFrom("http://" + l.Addr().String()) - c, err := New(testutils.NewLogger(t), *config) + c, err := New(testutils.NewLogger(t), config) require.NoError(t, err) require.NoError(t, c.Init()) @@ -149,7 +149,7 @@ func TestExtractTagsToValues(t *testing.T) { "floatField:float", "intField:int", } - collector, err := New(testutils.NewLogger(t), *c) + collector, err := New(testutils.NewLogger(t), c) require.NoError(t, err) tags := map[string]string{ "stringField": "string", diff --git a/stats/influxdb/config.go b/stats/influxdb/config.go index fe4416bb06b..53dea280a53 100644 --- a/stats/influxdb/config.go +++ b/stats/influxdb/config.go @@ -21,11 +21,13 @@ package influxdb import ( + "encoding/json" "net/url" "strconv" "strings" "time" + "github.com/kelseyhightower/envconfig" "github.com/kubernetes/helm/pkg/strvals" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -52,8 +54,9 @@ type Config struct { TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"K6_INFLUXDB_TAGS_AS_FIELDS"` } -func NewConfig() *Config { - c := &Config{ +// NewConfig creates a new InfluxDB output config with some default values. +func NewConfig() Config { + c := Config{ Addr: null.NewString("http://localhost:8086", false), DB: null.NewString("k6", false), TagsAsFields: []string{"vu", "iter", "url"}, @@ -135,6 +138,14 @@ func ParseMap(m map[string]interface{}) (Config, error) { return c, err } +// ParseJSON parses the supplied JSON into a Config. +func ParseJSON(data json.RawMessage) (Config, error) { + conf := Config{} + err := json.Unmarshal(data, &conf) + return conf, err +} + +// ParseURL parses the supplied URL into a Config. func ParseURL(text string) (Config, error) { c := Config{} u, err := url.Parse(text) @@ -198,3 +209,33 @@ func ParseURL(text string) (Config, error) { } return c, err } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + URL config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, url string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf, err := ParseJSON(jsonRawConf) + if err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if url != "" { + urlConf, err := ParseURL(url) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/influxdb/util_test.go b/stats/influxdb/util_test.go index 2a98dcb2ef0..6faf48b4a06 100644 --- a/stats/influxdb/util_test.go +++ b/stats/influxdb/util_test.go @@ -53,22 +53,22 @@ func TestFieldKinds(t *testing.T) { // Error case 1 (duplicated bool fields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:bool"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Error case 2 (duplicated fields in bool and float ields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:float"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Error case 3 (duplicated fields in BoolFields and IntFields) conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "boolField:int"} - _, err = MakeFieldKinds(*conf) + _, err = MakeFieldKinds(conf) require.Error(t, err) // Normal case conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "intField:int"} - fieldKinds, err = MakeFieldKinds(*conf) + fieldKinds, err = MakeFieldKinds(conf) require.NoError(t, err) require.Equal(t, fieldKinds["boolField"], Bool) diff --git a/stats/json/collector.go b/stats/json/collector.go deleted file mode 100644 index f8e006ac536..00000000000 --- a/stats/json/collector.go +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "compress/gzip" - "context" - "encoding/json" - "os" - "strings" - "sync" - "time" - - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - - "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/stats" -) - -type Collector struct { - closeFn func() error - fname string - seenMetrics []string - logger logrus.FieldLogger - - encoder *json.Encoder - - buffer []stats.Sample - bufferLock sync.Mutex -} - -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} - -func (c *Collector) HasSeenMetric(str string) bool { - for _, n := range c.seenMetrics { - if n == str { - return true - } - } - return false -} - -// New return new JSON collector -func New(logger logrus.FieldLogger, fs afero.Fs, fname string) (*Collector, error) { - c := &Collector{ - fname: fname, - logger: logger, - } - if fname == "" || fname == "-" { - c.encoder = json.NewEncoder(os.Stdout) - c.closeFn = func() error { - return nil - } - return c, nil - } - logfile, err := fs.Create(c.fname) - if err != nil { - return nil, err - } - - if strings.HasSuffix(c.fname, ".gz") { - outfile := gzip.NewWriter(logfile) - - c.closeFn = func() error { - _ = outfile.Close() - return logfile.Close() - } - c.encoder = json.NewEncoder(outfile) - } else { - c.closeFn = logfile.Close - c.encoder = json.NewEncoder(logfile) - } - - return c, nil -} - -func (c *Collector) Init() error { - return nil -} - -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - -func (c *Collector) Run(ctx context.Context) { - const timeout = 200 - c.logger.Debug("JSON output: Running!") - ticker := time.NewTicker(time.Millisecond * timeout) - defer func() { - _ = c.closeFn() - }() - for { - select { - case <-ticker.C: - c.commit() - case <-ctx.Done(): - c.commit() - return - } - } -} - -func (c *Collector) HandleMetric(m *stats.Metric) { - if c.HasSeenMetric(m.Name) { - return - } - - c.seenMetrics = append(c.seenMetrics, m.Name) - err := c.encoder.Encode(WrapMetric(m)) - if err != nil { - c.logger.WithField("filename", c.fname).WithError(err).Warning( - "JSON: Envelope is nil or Metric couldn't be marshalled to JSON") - return - } -} - -func (c *Collector) Collect(scs []stats.SampleContainer) { - c.bufferLock.Lock() - defer c.bufferLock.Unlock() - for _, sc := range scs { - c.buffer = append(c.buffer, sc.GetSamples()...) - } -} - -func (c *Collector) commit() { - c.bufferLock.Lock() - samples := c.buffer - c.buffer = nil - c.bufferLock.Unlock() - start := time.Now() - var count int - for _, sc := range samples { - samples := sc.GetSamples() - count += len(samples) - for _, sample := range sc.GetSamples() { - sample := sample - c.HandleMetric(sample.Metric) - err := c.encoder.Encode(WrapSample(&sample)) - if err != nil { - // Skip metric if it can't be made into JSON or envelope is null. - c.logger.WithField("filename", c.fname).WithError(err).Warning( - "JSON: Sample couldn't be marshalled to JSON") - continue - } - } - } - if count > 0 { - c.logger.WithField("filename", c.fname).WithField("t", time.Since(start)). - WithField("count", count).Debug("JSON: Wrote JSON metrics") - } -} - -func (c *Collector) Link() string { - return "" -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} diff --git a/stats/json/collector_test.go b/stats/json/collector_test.go deleted file mode 100644 index 2a256fcef2a..00000000000 --- a/stats/json/collector_test.go +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "os" - "testing" - - "github.com/loadimpact/k6/lib/testutils" - "github.com/spf13/afero" - "github.com/stretchr/testify/assert" -) - -func TestNew(t *testing.T) { - testdata := map[string]bool{ - "/nonexistent/badplacetolog.log": false, - "./okplacetolog.log": true, - "okplacetolog.log": true, - } - - for path, succ := range testdata { - path, succ := path, succ - t.Run("path="+path, func(t *testing.T) { - defer func() { _ = os.Remove(path) }() - - collector, err := New(testutils.NewLogger(t), afero.NewOsFs(), path) - if succ { - assert.NoError(t, err) - assert.NotNil(t, collector) - } else { - assert.Error(t, err) - assert.Nil(t, collector) - } - }) - } -} diff --git a/stats/json/wrapper_test.go b/stats/json/wrapper_test.go deleted file mode 100644 index c50e8dadc7a..00000000000 --- a/stats/json/wrapper_test.go +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package json - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/loadimpact/k6/stats" -) - -func TestWrapersWithNilArg(t *testing.T) { - out := WrapSample(nil) - assert.Equal(t, out, (*Envelope)(nil)) - out = WrapMetric(nil) - assert.Equal(t, out, (*Envelope)(nil)) -} - -func TestWrapSampleWithSamplePointer(t *testing.T) { - out := WrapSample(&stats.Sample{ - Metric: &stats.Metric{}, - }) - assert.NotEqual(t, out, (*Envelope)(nil)) -} - -func TestWrapMetricWithMetricPointer(t *testing.T) { - out := WrapMetric(&stats.Metric{}) - assert.NotEqual(t, out, (*Envelope)(nil)) -} diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go index c3da5baa5de..db402cf553a 100644 --- a/stats/kafka/collector.go +++ b/stats/kafka/collector.go @@ -30,9 +30,9 @@ import ( "github.com/sirupsen/logrus" "github.com/loadimpact/k6/lib" + jsono "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/influxdb" - jsonc "github.com/loadimpact/k6/stats/json" ) // Collector implements the lib.Collector interface and should be used only for testing @@ -125,7 +125,7 @@ func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) { } default: for _, sample := range samples { - env := jsonc.WrapSample(&sample) + env := jsono.WrapSample(sample) metric, err := json.Marshal(env) if err != nil { return nil, err diff --git a/stats/kafka/config.go b/stats/kafka/config.go index 9d391155e5e..7b11f36a627 100644 --- a/stats/kafka/config.go +++ b/stats/kafka/config.go @@ -21,8 +21,10 @@ package kafka import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/kubernetes/helm/pkg/strvals" "github.com/mitchellh/mapstructure" "gopkg.in/guregu/null.v3" @@ -120,3 +122,33 @@ func ParseArg(arg string) (Config, error) { return c, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars + arg config values}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + if arg != "" { + urlConf, err := ParseArg(arg) + if err != nil { + return result, err + } + result = result.Apply(urlConf) + } + + return result, nil +} diff --git a/stats/statsd/collector.go b/stats/statsd/collector.go index b8d5f2a62b7..c49f8d2d9ea 100644 --- a/stats/statsd/collector.go +++ b/stats/statsd/collector.go @@ -21,8 +21,10 @@ package statsd import ( + "encoding/json" "time" + "github.com/kelseyhightower/envconfig" "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" @@ -96,3 +98,25 @@ func New(logger logrus.FieldLogger, conf common.Config) (*common.Collector, erro Logger: logger, }, nil } + +// GetConsolidatedConfig combines {default config values + JSON config + +// environment vars}, and returns the final result. +func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) { + result := NewConfig() + if jsonRawConf != nil { + jsonConf := Config{} + if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil { + return result, err + } + result = result.Apply(jsonConf) + } + + envConfig := Config{} + if err := envconfig.Process("", &envConfig); err != nil { + // TODO: get rid of envconfig and actually use the env parameter... + return result, err + } + result = result.Apply(envConfig) + + return result, nil +}