diff --git a/api/server_test.go b/api/server_test.go
index cafec68d9e8f..d269a2b957d0 100644
--- a/api/server_test.go
+++ b/api/server_test.go
@@ -82,7 +82,7 @@ func TestWithEngine(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
rw := httptest.NewRecorder()
diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go
index e2a30db52cfe..e74539e0585b 100644
--- a/api/v1/group_routes_test.go
+++ b/api/v1/group_routes_test.go
@@ -51,7 +51,7 @@ func TestGetGroups(t *testing.T) {
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
t.Run("list", func(t *testing.T) {
diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go
index 423a5eb5c5e0..b5a21d49d4f6 100644
--- a/api/v1/metric_routes_test.go
+++ b/api/v1/metric_routes_test.go
@@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
engine.Metrics = map[string]*stats.Metric{
@@ -88,7 +88,7 @@ func TestGetMetric(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
engine.Metrics = map[string]*stats.Metric{
diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go
index bb9c5ad083e5..a4b0214d814d 100644
--- a/api/v1/setup_teardown_routes_test.go
+++ b/api/v1/setup_teardown_routes_test.go
@@ -155,7 +155,7 @@ func TestSetupData(t *testing.T) {
})
execScheduler, err := local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
globalCtx, globalCancel := context.WithCancel(context.Background())
diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go
index 1378f97c5a95..e102d3397b5b 100644
--- a/api/v1/status_routes_test.go
+++ b/api/v1/status_routes_test.go
@@ -47,7 +47,7 @@ func TestGetStatus(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
rw := httptest.NewRecorder()
@@ -101,7 +101,7 @@ func TestPatchStatus(t *testing.T) {
t.Run(name, func(t *testing.T) {
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger)
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, logger)
+ engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
diff --git a/cloudapi/config.go b/cloudapi/config.go
index be5bdd0fd79a..e113ff596453 100644
--- a/cloudapi/config.go
+++ b/cloudapi/config.go
@@ -26,6 +26,7 @@ import (
"gopkg.in/guregu/null.v3"
+ "github.com/kelseyhightower/envconfig"
"github.com/loadimpact/k6/lib/types"
)
@@ -241,7 +242,8 @@ func (c Config) Apply(cfg Config) Config {
return c
}
-// MergeFromExternal merges three fields from json in a loadimpact key of the provided external map
+// MergeFromExternal merges three fields from the JSON in a loadimpact key of
+// the provided external map. Used for options.ext.loadimpact settings.
func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error {
if val, ok := external["loadimpact"]; ok {
// TODO: Important! Separate configs and fix the whole 2 configs mess!
@@ -262,3 +264,29 @@ func MergeFromExternal(external map[string]json.RawMessage, conf *Config) error
}
return nil
}
+
+// GetConsolidatedConfig combines the default config values with the JSON config
+// values and environment variables and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, configArg string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf := Config{}
+ if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ if configArg != "" {
+ result.Name = null.StringFrom(configArg)
+ }
+
+ return result, nil
+}
diff --git a/cmd/cloud.go b/cmd/cloud.go
index 939b583c8399..680168a746e5 100644
--- a/cmd/cloud.go
+++ b/cmd/cloud.go
@@ -33,7 +33,6 @@ import (
"syscall"
"time"
- "github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
@@ -105,7 +104,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth
return err
}
- runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ()))
+ osEnvironment := buildEnvMap(os.Environ())
+ runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment)
if err != nil {
return err
}
@@ -141,8 +141,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth
}
// Cloud config
- cloudConfig := cloudapi.NewConfig().Apply(derivedConf.Collectors.Cloud)
- if err = envconfig.Process("", &cloudConfig); err != nil {
+ cloudConfig, err := cloudapi.GetConsolidatedConfig(derivedConf.Collectors["cloud"], osEnvironment, "")
+ if err != nil {
return err
}
if !cloudConfig.Token.Valid {
@@ -153,8 +153,8 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth
arc := r.MakeArchive()
// TODO: Fix this
// We reuse cloud.Config for parsing options.ext.loadimpact, but this probably shouldn't be
- // done as the idea of options.ext is that they are extensible without touching k6. But in
- // order for this to happen we shouldn't actually marshall cloud.Config on top of it because
+ // done, as the idea of options.ext is that they are extensible without touching k6. But in
+ // order for this to happen, we shouldn't actually marshall cloud.Config on top of it, because
// it will be missing some fields that aren't actually mentioned in the struct.
// So in order for use to copy the fields that we need for loadimpact's api we unmarshal in
// map[string]interface{} and copy what we need if it isn't set already
diff --git a/cmd/collectors.go b/cmd/collectors.go
deleted file mode 100644
index 158048e27945..000000000000
--- a/cmd/collectors.go
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package cmd
-
-import (
- "fmt"
- "strings"
-
- "gopkg.in/guregu/null.v3"
-
- "github.com/kelseyhightower/envconfig"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "github.com/spf13/afero"
-
- "github.com/loadimpact/k6/cloudapi"
- "github.com/loadimpact/k6/lib"
- "github.com/loadimpact/k6/lib/consts"
- "github.com/loadimpact/k6/loader"
- "github.com/loadimpact/k6/stats"
- "github.com/loadimpact/k6/stats/cloud"
- "github.com/loadimpact/k6/stats/csv"
- "github.com/loadimpact/k6/stats/datadog"
- "github.com/loadimpact/k6/stats/influxdb"
- jsonc "github.com/loadimpact/k6/stats/json"
- "github.com/loadimpact/k6/stats/kafka"
- "github.com/loadimpact/k6/stats/statsd"
-)
-
-const (
- collectorInfluxDB = "influxdb"
- collectorJSON = "json"
- collectorKafka = "kafka"
- collectorCloud = "cloud"
- collectorStatsD = "statsd"
- collectorDatadog = "datadog"
- collectorCSV = "csv"
-)
-
-func parseCollector(s string) (t, arg string) {
- parts := strings.SplitN(s, "=", 2)
- switch len(parts) {
- case 0:
- return "", ""
- case 1:
- return parts[0], ""
- default:
- return parts[0], parts[1]
- }
-}
-
-// TODO: totally refactor this...
-func getCollector(
- logger logrus.FieldLogger,
- collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep,
-) (lib.Collector, error) {
- switch collectorName {
- case collectorJSON:
- return jsonc.New(logger, afero.NewOsFs(), arg)
- case collectorInfluxDB:
- config := influxdb.NewConfig().Apply(conf.Collectors.InfluxDB)
- if err := envconfig.Process("", &config); err != nil {
- return nil, err
- }
- urlConfig, err := influxdb.ParseURL(arg)
- if err != nil {
- return nil, err
- }
- config = config.Apply(urlConfig)
-
- return influxdb.New(logger, config)
- case collectorCloud:
- config := cloudapi.NewConfig().Apply(conf.Collectors.Cloud)
- if err := envconfig.Process("", &config); err != nil {
- return nil, err
- }
- if arg != "" {
- config.Name = null.StringFrom(arg)
- }
-
- return cloud.New(logger, config, src, conf.Options, executionPlan, consts.Version)
- case collectorKafka:
- config := kafka.NewConfig().Apply(conf.Collectors.Kafka)
- if err := envconfig.Process("", &config); err != nil {
- return nil, err
- }
- if arg != "" {
- cmdConfig, err := kafka.ParseArg(arg)
- if err != nil {
- return nil, err
- }
- config = config.Apply(cmdConfig)
- }
-
- return kafka.New(logger, config)
- case collectorStatsD:
- config := statsd.NewConfig().Apply(conf.Collectors.StatsD)
- if err := envconfig.Process("k6_statsd", &config); err != nil {
- return nil, err
- }
-
- return statsd.New(logger, config)
- case collectorDatadog:
- config := datadog.NewConfig().Apply(conf.Collectors.Datadog)
- if err := envconfig.Process("k6_datadog", &config); err != nil {
- return nil, err
- }
-
- return datadog.New(logger, config)
- case collectorCSV:
- config := csv.NewConfig().Apply(conf.Collectors.CSV)
- if err := envconfig.Process("", &config); err != nil {
- return nil, err
- }
- if arg != "" {
- cmdConfig, err := csv.ParseArg(arg)
- if err != nil {
- return nil, err
- }
-
- config = config.Apply(cmdConfig)
- }
-
- return csv.New(logger, afero.NewOsFs(), conf.SystemTags.Map(), config)
-
- default:
- return nil, errors.Errorf("unknown output type: %s", collectorName)
- }
-}
-
-func newCollector(
- logger logrus.FieldLogger,
- collectorName, arg string, src *loader.SourceData, conf Config, executionPlan []lib.ExecutionStep,
-) (lib.Collector, error) {
- collector, err := getCollector(logger, collectorName, arg, src, conf, executionPlan)
- if err != nil {
- return collector, err
- }
-
- // Check if all required tags are present
- missingRequiredTags := []string{}
- requiredTags := collector.GetRequiredSystemTags()
- for _, tag := range stats.SystemTagSetValues() {
- if requiredTags.Has(tag) && !conf.SystemTags.Has(tag) {
- missingRequiredTags = append(missingRequiredTags, tag.String())
- }
- }
- if len(missingRequiredTags) > 0 {
- return collector, fmt.Errorf(
- "the specified collector '%s' needs the following system tags enabled: %s",
- collectorName,
- strings.Join(missingRequiredTags, ", "),
- )
- }
-
- return collector, nil
-}
diff --git a/cmd/config.go b/cmd/config.go
index 23035befcd0b..8ad10b75faff 100644
--- a/cmd/config.go
+++ b/cmd/config.go
@@ -33,16 +33,10 @@ import (
"github.com/spf13/pflag"
"gopkg.in/guregu/null.v3"
- "github.com/loadimpact/k6/cloudapi"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/executor"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
- "github.com/loadimpact/k6/stats/csv"
- "github.com/loadimpact/k6/stats/datadog"
- "github.com/loadimpact/k6/stats/influxdb"
- "github.com/loadimpact/k6/stats/kafka"
- "github.com/loadimpact/k6/stats/statsd"
)
// configFlagSet returns a FlagSet with the default run configuration flags.
@@ -62,14 +56,8 @@ type Config struct {
Linger null.Bool `json:"linger" envconfig:"K6_LINGER"`
NoUsageReport null.Bool `json:"noUsageReport" envconfig:"K6_NO_USAGE_REPORT"`
- Collectors struct {
- InfluxDB influxdb.Config `json:"influxdb"`
- Kafka kafka.Config `json:"kafka"`
- Cloud cloudapi.Config `json:"cloud"`
- StatsD statsd.Config `json:"statsd"`
- Datadog datadog.Config `json:"datadog"`
- CSV csv.Config `json:"csv"`
- } `json:"collectors"`
+ // TODO: deprecate
+ Collectors map[string]json.RawMessage `json:"collectors"`
}
// Validate checks if all of the specified options make sense
@@ -92,12 +80,6 @@ func (c Config) Apply(cfg Config) Config {
if cfg.NoUsageReport.Valid {
c.NoUsageReport = cfg.NoUsageReport
}
- c.Collectors.InfluxDB = c.Collectors.InfluxDB.Apply(cfg.Collectors.InfluxDB)
- c.Collectors.Cloud = c.Collectors.Cloud.Apply(cfg.Collectors.Cloud)
- c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka)
- c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD)
- c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog)
- c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV)
return c
}
@@ -167,19 +149,11 @@ func writeDiskConfig(fs afero.Fs, configPath string, conf Config) error {
}
// Reads configuration variables from the environment.
-func readEnvConfig() (conf Config, err error) {
- // TODO: replace envconfig and refactor the whole configuration from the groun up :/
- for _, err := range []error{
- envconfig.Process("", &conf),
- envconfig.Process("", &conf.Collectors.Cloud),
- envconfig.Process("", &conf.Collectors.InfluxDB),
- envconfig.Process("", &conf.Collectors.Kafka),
- envconfig.Process("k6_statsd", &conf.Collectors.StatsD),
- envconfig.Process("k6_datadog", &conf.Collectors.Datadog),
- } {
- return conf, err
- }
- return conf, nil
+func readEnvConfig() (Config, error) {
+ // TODO: replace envconfig and refactor the whole configuration from the ground up :/
+ conf := Config{}
+ err := envconfig.Process("", &conf)
+ return conf, err
}
// Assemble the final consolidated configuration from all of the different sources:
@@ -192,12 +166,6 @@ func readEnvConfig() (conf Config, err error) {
// TODO: add better validation, more explicit default values and improve consistency between formats
// TODO: accumulate all errors and differentiate between the layers?
func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf Config, err error) {
- cliConf.Collectors.InfluxDB = influxdb.NewConfig().Apply(cliConf.Collectors.InfluxDB)
- cliConf.Collectors.Cloud = cloudapi.NewConfig().Apply(cliConf.Collectors.Cloud)
- cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka)
- cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD)
- cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog)
-
fileConf, _, err := readDiskConfig(fs)
if err != nil {
return conf, err
diff --git a/cmd/convert.go b/cmd/convert.go
index 39350ccbc65e..ececf3ceb076 100644
--- a/cmd/convert.go
+++ b/cmd/convert.go
@@ -33,8 +33,10 @@ import (
"github.com/loadimpact/k6/lib"
)
+// TODO: fix this... or remove k6 convert
+//nolint: gochecknoglobals
var (
- output string
+ convertOutput string
optionsFilePath string
minSleep uint
maxSleep uint
@@ -107,12 +109,12 @@ func getConvertCmd() *cobra.Command {
}
// Write script content to stdout or file
- if output == "" || output == "-" { //nolint:nestif
+ if convertOutput == "" || convertOutput == "-" { //nolint:nestif
if _, err := io.WriteString(defaultWriter, script); err != nil {
return err
}
} else {
- f, err := defaultFs.Create(output)
+ f, err := defaultFs.Create(convertOutput)
if err != nil {
return err
}
@@ -131,8 +133,14 @@ func getConvertCmd() *cobra.Command {
}
convertCmd.Flags().SortFlags = false
- convertCmd.Flags().StringVarP(&output, "output", "O", output, "k6 script output filename (stdout by default)")
- convertCmd.Flags().StringVarP(&optionsFilePath, "options", "", output, "path to a JSON file with options that would be injected in the output script")
+ convertCmd.Flags().StringVarP(
+ &convertOutput, "output", "O", convertOutput,
+ "k6 script output filename (stdout by default)",
+ )
+ convertCmd.Flags().StringVarP(
+ &optionsFilePath, "options", "", optionsFilePath,
+ "path to a JSON file with options that would be injected in the output script",
+ )
convertCmd.Flags().StringSliceVarP(&only, "only", "", []string{}, "include only requests from the given domains")
convertCmd.Flags().StringSliceVarP(&skip, "skip", "", []string{}, "skip requests from the given domains")
convertCmd.Flags().UintVarP(&threshold, "batch-threshold", "", 500, "batch request idle time threshold (see example)")
diff --git a/cmd/login_cloud.go b/cmd/login_cloud.go
index 1e7a0aca7aa2..a22e81e40d7c 100644
--- a/cmd/login_cloud.go
+++ b/cmd/login_cloud.go
@@ -21,6 +21,7 @@
package cmd
import (
+ "encoding/json"
"os"
"syscall"
@@ -58,22 +59,33 @@ This will set the default token used when just "k6 run -o cloud" is passed.`,
RunE: func(cmd *cobra.Command, args []string) error {
fs := afero.NewOsFs()
- k6Conf, err := getConsolidatedConfig(fs, Config{}, nil)
+ currentDiskConf, configPath, err := readDiskConfig(fs)
if err != nil {
return err
}
- currentDiskConf, configPath, err := readDiskConfig(fs)
+ currentJSONConfig := cloudapi.Config{}
+ currentJSONConfigRaw := currentDiskConf.Collectors["cloud"]
+ if currentJSONConfigRaw != nil {
+ // We only want to modify this config, see comment below
+ if jsonerr := json.Unmarshal(currentJSONConfigRaw, ¤tJSONConfig); jsonerr != nil {
+ return jsonerr
+ }
+ }
+
+ // We want to use this fully consolidated config for things like
+ // host addresses, so users can overwrite them with env vars.
+ consolidatedCurrentConfig, err := cloudapi.GetConsolidatedConfig(currentJSONConfigRaw, buildEnvMap(os.Environ()), "")
if err != nil {
return err
}
+ // But we don't want to save them back to the JSON file, we only
+ // want to save what already existed there and the login details.
+ newCloudConf := currentJSONConfig
show := getNullBool(cmd.Flags(), "show")
reset := getNullBool(cmd.Flags(), "reset")
token := getNullString(cmd.Flags(), "token")
-
- newCloudConf := cloudapi.NewConfig().Apply(currentDiskConf.Collectors.Cloud)
-
switch {
case reset.Valid:
newCloudConf.Token = null.StringFromPtr(nil)
@@ -104,7 +116,7 @@ This will set the default token used when just "k6 run -o cloud" is passed.`,
email := vals["Email"].(string)
password := vals["Password"].(string)
- client := cloudapi.NewClient(logger, "", k6Conf.Collectors.Cloud.Host.String, consts.Version)
+ client := cloudapi.NewClient(logger, "", consolidatedCurrentConfig.Host.String, consts.Version)
res, err := client.Login(email, password)
if err != nil {
return err
@@ -117,7 +129,10 @@ This will set the default token used when just "k6 run -o cloud" is passed.`,
newCloudConf.Token = null.StringFrom(res.Token)
}
- currentDiskConf.Collectors.Cloud = newCloudConf
+ currentDiskConf.Collectors["cloud"], err = json.Marshal(newCloudConf)
+ if err != nil {
+ return err
+ }
if err := writeDiskConfig(fs, configPath, currentDiskConf); err != nil {
return err
}
diff --git a/cmd/login_influxdb.go b/cmd/login_influxdb.go
index 9e85bcf8ffdc..264d5083b33a 100644
--- a/cmd/login_influxdb.go
+++ b/cmd/login_influxdb.go
@@ -21,6 +21,7 @@
package cmd
import (
+ "encoding/json"
"os"
"syscall"
"time"
@@ -52,7 +53,15 @@ This will set the default server used when just "-o influxdb" is passed.`,
return err
}
- conf := influxdb.NewConfig().Apply(config.Collectors.InfluxDB)
+ conf := influxdb.NewConfig()
+ jsonConf := config.Collectors["influxdb"]
+ if jsonConf != nil {
+ jsonConfParsed, jsonerr := influxdb.ParseJSON(jsonConf)
+ if jsonerr != nil {
+ return jsonerr
+ }
+ conf = conf.Apply(jsonConfParsed)
+ }
if len(args) > 0 {
urlConf, err := influxdb.ParseURL(args[0]) //nolint:govet
if err != nil {
@@ -112,7 +121,10 @@ This will set the default server used when just "-o influxdb" is passed.`,
return err
}
- config.Collectors.InfluxDB = conf
+ config.Collectors["influxdb"], err = json.Marshal(conf)
+ if err != nil {
+ return err
+ }
return writeDiskConfig(fs, configPath, config)
},
}
diff --git a/cmd/outputs.go b/cmd/outputs.go
new file mode 100644
index 000000000000..02fb2ed9de62
--- /dev/null
+++ b/cmd/outputs.go
@@ -0,0 +1,273 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/afero"
+
+ "github.com/loadimpact/k6/cloudapi"
+ "github.com/loadimpact/k6/lib"
+ "github.com/loadimpact/k6/lib/consts"
+ "github.com/loadimpact/k6/loader"
+ "github.com/loadimpact/k6/output"
+ "github.com/loadimpact/k6/output/json"
+ "github.com/loadimpact/k6/stats"
+ "github.com/loadimpact/k6/stats/cloud"
+ "github.com/loadimpact/k6/stats/csv"
+ "github.com/loadimpact/k6/stats/datadog"
+ "github.com/loadimpact/k6/stats/influxdb"
+ "github.com/loadimpact/k6/stats/kafka"
+ "github.com/loadimpact/k6/stats/statsd"
+)
+
+// TODO: move this to an output sub-module after we get rid of the old collectors?
+//nolint: funlen
+func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) {
+ // Start with the built-in outputs
+ result := map[string]func(output.Params) (output.Output, error){
+ "json": json.New,
+
+ // TODO: remove all of these
+ "influxdb": func(params output.Params) (output.Output, error) {
+ conf, err := influxdb.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
+ if err != nil {
+ return nil, err
+ }
+ influxc, err := influxdb.New(params.Logger, conf)
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, influxc)
+ },
+ "cloud": func(params output.Params) (output.Output, error) {
+ conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
+ if err != nil {
+ return nil, err
+ }
+ cloudc, err := cloud.New(
+ params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version,
+ )
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, cloudc)
+ },
+ "kafka": func(params output.Params) (output.Output, error) {
+ conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
+ if err != nil {
+ return nil, err
+ }
+ kafkac, err := kafka.New(params.Logger, conf)
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, kafkac)
+ },
+ "statsd": func(params output.Params) (output.Output, error) {
+ conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment)
+ if err != nil {
+ return nil, err
+ }
+ statsdc, err := statsd.New(params.Logger, conf)
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, statsdc)
+ },
+ "datadog": func(params output.Params) (output.Output, error) {
+ conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment)
+ if err != nil {
+ return nil, err
+ }
+ datadogc, err := datadog.New(params.Logger, conf)
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, datadogc)
+ },
+ "csv": func(params output.Params) (output.Output, error) {
+ conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
+ if err != nil {
+ return nil, err
+ }
+ csvc, err := csv.New(params.Logger, params.FS, params.ScriptOptions.SystemTags.Map(), conf)
+ if err != nil {
+ return nil, err
+ }
+ return newCollectorAdapter(params, csvc)
+ },
+ }
+
+ exts := output.GetExtensions()
+ for k, v := range exts {
+ if _, ok := result[k]; ok {
+ return nil, fmt.Errorf("invalid output extension %s, built-in output with the same type already exists", k)
+ }
+ result[k] = v
+ }
+
+ return result, nil
+}
+
+func getPossibleIDList(constrs map[string]func(output.Params) (output.Output, error)) string {
+ res := make([]string, 0, len(constrs))
+ for k := range constrs {
+ res = append(res, k)
+ }
+ sort.Strings(res)
+ return strings.Join(res, ", ")
+}
+
+func createOutputs(
+ outputFullArguments []string, src *loader.SourceData, conf Config, rtOpts lib.RuntimeOptions,
+ executionPlan []lib.ExecutionStep, osEnvironment map[string]string, logger logrus.FieldLogger,
+) ([]output.Output, error) {
+ outputConstructors, err := getAllOutputConstructors()
+ if err != nil {
+ return nil, err
+ }
+ baseParams := output.Params{
+ ScriptPath: src.URL,
+ Logger: logger,
+ Environment: osEnvironment,
+ StdOut: stdout,
+ StdErr: stderr,
+ FS: afero.NewOsFs(),
+ ScriptOptions: conf.Options,
+ RuntimeOptions: rtOpts,
+ ExecutionPlan: executionPlan,
+ }
+ result := make([]output.Output, 0, len(outputFullArguments))
+
+ for _, outputFullArg := range outputFullArguments {
+ outputType, outputArg := parseOutputArgument(outputFullArg)
+ outputConstructor, ok := outputConstructors[outputType]
+ if !ok {
+ return nil, fmt.Errorf(
+ "invalid output type '%s', available types are: %s",
+ outputType, getPossibleIDList(outputConstructors),
+ )
+ }
+
+ params := baseParams
+ params.OutputType = outputType
+ params.ConfigArgument = outputArg
+ params.JSONConfig = conf.Collectors[outputType]
+
+ output, err := outputConstructor(params)
+ if err != nil {
+ return nil, fmt.Errorf("could not create the '%s' output: %w", outputType, err)
+ }
+ result = append(result, output)
+ }
+
+ return result, nil
+}
+
+func parseOutputArgument(s string) (t, arg string) {
+ parts := strings.SplitN(s, "=", 2)
+ switch len(parts) {
+ case 0:
+ return "", ""
+ case 1:
+ return parts[0], ""
+ default:
+ return parts[0], parts[1]
+ }
+}
+
+// TODO: remove this after we transition every collector to the output interface
+
+func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) {
+ // Check if all required tags are present
+ missingRequiredTags := []string{}
+ requiredTags := collector.GetRequiredSystemTags()
+ for _, tag := range stats.SystemTagSetValues() {
+ if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) {
+ missingRequiredTags = append(missingRequiredTags, tag.String())
+ }
+ }
+ if len(missingRequiredTags) > 0 {
+ return nil, fmt.Errorf(
+ "the specified output '%s' needs the following system tags enabled: %s",
+ params.OutputType, strings.Join(missingRequiredTags, ", "),
+ )
+ }
+
+ return &collectorAdapter{
+ outputType: params.OutputType,
+ collector: collector,
+ stopCh: make(chan struct{}),
+ }, nil
+}
+
+// collectorAdapter is a _temporary_ fix until we move all of the old
+// "collectors" to the new output interface
+type collectorAdapter struct {
+ collector lib.Collector
+ outputType string
+ runCtx context.Context
+ runCtxCancel func()
+ stopCh chan struct{}
+}
+
+func (ca *collectorAdapter) Description() string {
+ link := ca.collector.Link()
+ if link != "" {
+ return fmt.Sprintf("%s (%s)", ca.outputType, link)
+ }
+ return ca.outputType
+}
+
+func (ca *collectorAdapter) Start() error {
+ if err := ca.collector.Init(); err != nil {
+ return err
+ }
+ ca.runCtx, ca.runCtxCancel = context.WithCancel(context.Background())
+ go func() {
+ ca.collector.Run(ca.runCtx)
+ close(ca.stopCh)
+ }()
+ return nil
+}
+
+func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) {
+ ca.collector.Collect(samples)
+}
+
+func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) {
+ ca.collector.SetRunStatus(latestStatus)
+}
+
+// Stop implements the new output interface.
+func (ca *collectorAdapter) Stop() error {
+ ca.runCtxCancel()
+ <-ca.stopCh
+ return nil
+}
+
+var _ output.WithRunStatusUpdates = &collectorAdapter{}
diff --git a/cmd/run.go b/cmd/run.go
index 81f05cce4ae9..708799e67679 100644
--- a/cmd/run.go
+++ b/cmd/run.go
@@ -116,7 +116,8 @@ a commandline interface for interacting with it.`,
return err
}
- runtimeOptions, err := getRuntimeOptions(cmd.Flags(), buildEnvMap(os.Environ()))
+ osEnvironment := buildEnvMap(os.Environ())
+ runtimeOptions, err := getRuntimeOptions(cmd.Flags(), osEnvironment)
if err != nil {
return err
}
@@ -171,8 +172,6 @@ a commandline interface for interacting with it.`,
return err
}
- executionState := execScheduler.GetState()
-
// This is manually triggered after the Engine's Run() has completed,
// and things like a single Ctrl+C don't affect it. We use it to make
// sure that the progressbars finish updating with the latest execution
@@ -191,26 +190,18 @@ a commandline interface for interacting with it.`,
progressBarWG.Done()
}()
- // Create an engine.
- initBar.Modify(pb.WithConstProgress(0, "Init engine"))
- engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, logger)
+ // Create all outputs.
+ executionPlan := execScheduler.GetExecutionPlan()
+ outputs, err := createOutputs(conf.Out, src, conf, runtimeOptions, executionPlan, osEnvironment, logger)
if err != nil {
return err
}
- executionPlan := execScheduler.GetExecutionPlan()
- // Create a collector and assign it to the engine if requested.
- initBar.Modify(pb.WithConstProgress(0, "Init metric outputs"))
- for _, out := range conf.Out {
- t, arg := parseCollector(out)
- collector, cerr := newCollector(logger, t, arg, src, conf, executionPlan)
- if cerr != nil {
- return cerr
- }
- if cerr = collector.Init(); cerr != nil {
- return cerr
- }
- engine.Collectors = append(engine.Collectors, collector)
+ // Create the engine.
+ initBar.Modify(pb.WithConstProgress(0, "Init engine"))
+ engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, outputs, logger)
+ if err != nil {
+ return err
}
// Spin up the REST API server, if not disabled.
@@ -230,9 +221,17 @@ a commandline interface for interacting with it.`,
}()
}
+ // We do this here so we can get any output URLs below.
+ initBar.Modify(pb.WithConstProgress(0, "Starting outputs"))
+ err = engine.StartOutputs()
+ if err != nil {
+ return err
+ }
+ defer engine.StopOutputs()
+
printExecutionDescription(
"local", filename, "", conf, execScheduler.GetState().ExecutionTuple,
- executionPlan, engine.Collectors)
+ executionPlan, outputs)
// Trap Interrupts, SIGINTs and SIGTERMs.
sigC := make(chan os.Signal, 1)
@@ -286,6 +285,7 @@ a commandline interface for interacting with it.`,
progressCancel()
progressBarWG.Wait()
+ executionState := execScheduler.GetState()
// Warn if no iterations could be completed.
if executionState.GetFullIterationCount() == 0 {
logger.Warn("No script iterations finished, consider making the test duration longer")
diff --git a/cmd/ui.go b/cmd/ui.go
index 6021145d9595..3d00c8eed657 100644
--- a/cmd/ui.go
+++ b/cmd/ui.go
@@ -36,6 +36,7 @@ import (
"golang.org/x/crypto/ssh/terminal"
"github.com/loadimpact/k6/lib"
+ "github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/ui"
"github.com/loadimpact/k6/ui/pb"
)
@@ -111,31 +112,25 @@ func modifyAndPrintBar(bar *pb.ProgressBar, options ...pb.ProgressBarOption) {
// Print execution description for both cloud and local execution.
// TODO: Clean this up as part of #1499 or #1427
func printExecutionDescription(
- execution, filename, output string, conf Config, et *lib.ExecutionTuple,
- execPlan []lib.ExecutionStep, collectors []lib.Collector,
+ execution, filename, outputOverride string, conf Config, et *lib.ExecutionTuple,
+ execPlan []lib.ExecutionStep, outputs []output.Output,
) {
fprintf(stdout, " execution: %s\n", ui.ValueColor.Sprint(execution))
fprintf(stdout, " script: %s\n", ui.ValueColor.Sprint(filename))
- if execution == "local" {
- out := "-"
- link := ""
-
- for idx, collector := range collectors {
- if out != "-" {
- out = out + "; " + conf.Out[idx]
- } else {
- out = conf.Out[idx]
- }
-
- if l := collector.Link(); l != "" {
- link = link + " (" + l + ")"
- }
+ var outputDescriptions []string
+ switch {
+ case outputOverride != "":
+ outputDescriptions = []string{outputOverride}
+ case len(outputs) == 0:
+ outputDescriptions = []string{"-"}
+ default:
+ for _, out := range outputs {
+ outputDescriptions = append(outputDescriptions, out.Description())
}
- fprintf(stdout, " output: %s%s\n", ui.ValueColor.Sprint(out), ui.ExtraColor.Sprint(link))
- } else {
- fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(output))
}
+
+ fprintf(stdout, " output: %s\n", ui.ValueColor.Sprint(strings.Join(outputDescriptions, ", ")))
fprintf(stdout, "\n")
maxDuration, _ := lib.GetEndOffset(execPlan)
diff --git a/core/engine.go b/core/engine.go
index 9953e018d7b4..daf5bed35052 100644
--- a/core/engine.go
+++ b/core/engine.go
@@ -32,6 +32,7 @@ import (
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/metrics"
+ "github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
)
@@ -56,7 +57,7 @@ type Engine struct {
Options lib.Options
runtimeOptions lib.RuntimeOptions
- Collectors []lib.Collector
+ outputs []output.Output
logger *logrus.Entry
stopOnce sync.Once
@@ -77,7 +78,7 @@ type Engine struct {
// NewEngine instantiates a new Engine, without doing any heavy initialization.
func NewEngine(
- ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, logger *logrus.Logger,
+ ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, outputs []output.Output, logger *logrus.Logger,
) (*Engine, error) {
if ex == nil {
return nil, errors.New("missing ExecutionScheduler instance")
@@ -89,6 +90,7 @@ func NewEngine(
Options: opts,
runtimeOptions: rtOpts,
+ outputs: outputs,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64),
stopChan: make(chan struct{}),
@@ -109,10 +111,47 @@ func NewEngine(
return e, nil
}
+// StartOutputs spins up all configured outputs, giving the thresholds to any
+// that can accept them. And if some output fails, stop the already started
+// ones. This may take some time, since some outputs make initial network
+// requests to set up whatever remote services are going to listen to them.
+//
+// TODO: this doesn't really need to be in the Engine, so take it out?
+func (e *Engine) StartOutputs() error {
+ e.logger.Debugf("Starting %d outputs...", len(e.outputs))
+ for i, out := range e.outputs {
+ if thresholdOut, ok := out.(output.WithThresholds); ok {
+ thresholdOut.SetThresholds(e.thresholds)
+ }
+
+ if err := out.Start(); err != nil {
+ e.stopOutputs(i)
+ return err
+ }
+ }
+ return nil
+}
+
+// StopOutputs stops all configured outputs.
+func (e *Engine) StopOutputs() {
+ e.stopOutputs(len(e.outputs))
+}
+
+func (e *Engine) stopOutputs(upToID int) {
+ e.logger.Debugf("Stopping %d outputs...", upToID)
+ for i := 0; i < upToID; i++ {
+ if err := e.outputs[i].Stop(); err != nil {
+ e.logger.WithError(err).Errorf("Stopping output %d failed", i)
+ }
+ }
+}
+
// Init is used to initialize the execution scheduler and all metrics processing
// in the engine. The first is a costly operation, since it initializes all of
-// the planned VUs and could potentially take a long time. It either returns an
-// error immediately, or it returns test run() and wait() functions.
+// the planned VUs and could potentially take a long time.
+//
+// This method either returns an error immediately, or it returns test run() and
+// wait() functions.
//
// Things to note:
// - The first lambda, Run(), synchronously executes the actual load test.
@@ -130,7 +169,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
}
// TODO: move all of this in a separate struct? see main TODO above
-
runSubCtx, runSubCancel := context.WithCancel(runCtx)
resultCh := make(chan error)
@@ -154,6 +192,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
return err
}
+
waitFn := e.startBackgroundProcesses(globalCtx, runCtx, resultCh, runSubCancel, processMetricsAfterRun)
return runFn, waitFn, nil
}
@@ -162,20 +201,11 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
// test run status when it ends. It returns a function that can be used after
// the provided context is called, to wait for the complete winding down of all
// started goroutines.
-func (e *Engine) startBackgroundProcesses( //nolint:funlen
+func (e *Engine) startBackgroundProcesses(
globalCtx, runCtx context.Context, runResult <-chan error, runSubCancel func(), processMetricsAfterRun chan struct{},
) (wait func()) {
processes := new(sync.WaitGroup)
- // Spin up all configured collectors
- for _, collector := range e.Collectors {
- processes.Add(1)
- go func(collector lib.Collector) {
- collector.Run(globalCtx)
- processes.Done()
- }(collector)
- }
-
// Siphon and handle all produced metric samples
processes.Add(1)
go func() {
@@ -300,8 +330,10 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu
}
func (e *Engine) setRunStatus(status lib.RunStatus) {
- for _, c := range e.Collectors {
- c.SetRunStatus(status)
+ for _, out := range e.outputs {
+ if statUpdOut, ok := out.(output.WithRunStatusUpdates); ok {
+ statUpdOut.SetRunStatus(status)
+ }
}
}
@@ -443,7 +475,7 @@ func (e *Engine) processSamples(sampleContainers []stats.SampleContainer) {
e.processSamplesForMetrics(sampleContainers)
}
- for _, collector := range e.Collectors {
- collector.Collect(sampleContainers)
+ for _, out := range e.outputs {
+ out.AddMetricSamples(sampleContainers)
}
}
diff --git a/core/engine_test.go b/core/engine_test.go
index a6765b459cb1..157eced789c8 100644
--- a/core/engine_test.go
+++ b/core/engine_test.go
@@ -41,17 +41,18 @@ import (
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/testutils/minirunner"
+ "github.com/loadimpact/k6/lib/testutils/mockoutput"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/loader"
+ "github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
- "github.com/loadimpact/k6/stats/dummy"
)
const isWindows = runtime.GOOS == "windows"
// Wrapper around NewEngine that applies a logger and manages the options.
func newTestEngine( //nolint:golint
- t *testing.T, runCtx context.Context, runner lib.Runner, collectors []lib.Collector, opts lib.Options,
+ t *testing.T, runCtx context.Context, runner lib.Runner, outputs []output.Output, opts lib.Options,
) (engine *Engine, run func() error, wait func()) {
if runner == nil {
runner = &minirunner.MiniRunner{}
@@ -76,11 +77,9 @@ func newTestEngine( //nolint:golint
execScheduler, err := local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)
- engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, logger)
+ engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, outputs, logger)
require.NoError(t, err)
- engine.Collectors = collectors
-
run, waitFn, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)
@@ -142,10 +141,9 @@ func TestEngineRun(t *testing.T) {
return nil
}}
- c := &dummy.Collector{}
-
+ mockOutput := mockoutput.New()
ctx, cancel := context.WithCancel(context.Background())
- _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{
+ _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{
VUs: null.IntFrom(1),
Iterations: null.IntFrom(1),
})
@@ -158,7 +156,7 @@ func TestEngineRun(t *testing.T) {
wait()
found := 0
- for _, s := range c.Samples {
+ for _, s := range mockOutput.Samples {
if s.Metric != testMetric {
continue
}
@@ -197,7 +195,7 @@ func TestEngineStopped(t *testing.T) {
e.Stop() // test that a second stop doesn't panic
}
-func TestEngineCollector(t *testing.T) {
+func TestEngineOutput(t *testing.T) {
testMetric := stats.New("test_metric", stats.Trend)
runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error {
@@ -205,8 +203,8 @@ func TestEngineCollector(t *testing.T) {
return nil
}}
- c := &dummy.Collector{}
- e, run, wait := newTestEngine(t, nil, runner, []lib.Collector{c}, lib.Options{
+ mockOutput := mockoutput.New()
+ e, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{
VUs: null.IntFrom(1),
Iterations: null.IntFrom(1),
})
@@ -215,7 +213,7 @@ func TestEngineCollector(t *testing.T) {
wait()
cSamples := []stats.Sample{}
- for _, sample := range c.Samples {
+ for _, sample := range mockOutput.Samples {
if sample.Metric == testMetric {
cSamples = append(cSamples, sample)
}
@@ -224,9 +222,9 @@ func TestEngineCollector(t *testing.T) {
if assert.NotNil(t, metric) {
sink := metric.Sink.(*stats.TrendSink)
if assert.NotNil(t, sink) {
- numCollectorSamples := len(cSamples)
+ numOutputSamples := len(cSamples)
numEngineSamples := len(sink.Values)
- assert.Equal(t, numEngineSamples, numCollectorSamples)
+ assert.Equal(t, numEngineSamples, numOutputSamples)
}
}
}
@@ -361,8 +359,8 @@ func TestEngine_processThresholds(t *testing.T) {
}
}
-func getMetricSum(collector *dummy.Collector, name string) (result float64) {
- for _, sc := range collector.SampleContainers {
+func getMetricSum(mo *mockoutput.MockOutput, name string) (result float64) {
+ for _, sc := range mo.SampleContainers {
for _, s := range sc.GetSamples() {
if s.Metric.Name == name {
result += s.Value
@@ -372,8 +370,8 @@ func getMetricSum(collector *dummy.Collector, name string) (result float64) {
return
}
-func getMetricCount(collector *dummy.Collector, name string) (result uint) {
- for _, sc := range collector.SampleContainers {
+func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) {
+ for _, sc := range mo.SampleContainers {
for _, s := range sc.GetSamples() {
if s.Metric.Name == name {
result++
@@ -448,8 +446,8 @@ func TestSentReceivedMetrics(t *testing.T) {
)
require.NoError(t, err)
- collector := &dummy.Collector{}
- _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{
+ mockOutput := mockoutput.New()
+ _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{
Iterations: null.IntFrom(tc.Iterations),
VUs: null.IntFrom(tc.VUs),
Hosts: tb.Dialer.Hosts,
@@ -470,7 +468,7 @@ func TestSentReceivedMetrics(t *testing.T) {
wait()
checkData := func(name string, expected int64) float64 {
- data := getMetricSum(collector, name)
+ data := getMetricSum(mockOutput, name)
expectedDataMin := float64(expected * tc.Iterations)
expectedDataMax := float64((expected + ts.NumRequests*expectedHeaderMaxLength) * tc.Iterations)
@@ -582,8 +580,8 @@ func TestRunTags(t *testing.T) {
)
require.NoError(t, err)
- collector := &dummy.Collector{}
- _, run, wait := newTestEngine(t, nil, r, []lib.Collector{collector}, lib.Options{
+ mockOutput := mockoutput.New()
+ _, run, wait := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{
Iterations: null.IntFrom(3),
VUs: null.IntFrom(2),
Hosts: tb.Dialer.Hosts,
@@ -617,7 +615,7 @@ func TestRunTags(t *testing.T) {
return "the rainbow"
}
- for _, s := range collector.Samples {
+ for _, s := range mockOutput.Samples {
for key, expVal := range runTagsMap {
val, ok := s.Tags.Get(key)
@@ -738,8 +736,8 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) {
)
require.NoError(t, err)
- collector := &dummy.Collector{}
- engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, lib.Options{})
+ mockOutput := mockoutput.New()
+ engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{})
errC := make(chan error)
go func() { errC <- run() }()
@@ -756,11 +754,11 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) {
// The 3.1 sleep in the default function would cause the first VU to complete 2 full iterations
// and stat executing its third one, while the second VU will only fully complete 1 iteration
// and will be canceled in the middle of its second one.
- assert.Equal(t, 3.0, getMetricSum(collector, metrics.Iterations.Name))
+ assert.Equal(t, 3.0, getMetricSum(mockOutput, metrics.Iterations.Name))
// That means that we expect to see 8 HTTP requests in total, 3*2=6 from the complete iterations
// and one each from the two iterations that would be canceled in the middle of their execution
- assert.Equal(t, 8.0, getMetricSum(collector, metrics.HTTPReqs.Name))
+ assert.Equal(t, 8.0, getMetricSum(mockOutput, metrics.HTTPReqs.Name))
// And we expect to see the data_received for all 8 of those requests. Previously, the data for
// the 8th request (the 3rd one in the first VU before the test ends) was cut off by the engine
@@ -769,7 +767,7 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) {
// it was interrupted.
dataReceivedExpectedMin := 15000.0 * 8
dataReceivedExpectedMax := (15000.0 + expectedHeaderMaxLength) * 8
- dataReceivedActual := getMetricSum(collector, metrics.DataReceived.Name)
+ dataReceivedActual := getMetricSum(mockOutput, metrics.DataReceived.Name)
if dataReceivedActual < dataReceivedExpectedMin || dataReceivedActual > dataReceivedExpectedMax {
t.Errorf(
"The data_received sum should be in the interval [%f, %f] but was %f",
@@ -779,9 +777,9 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) {
// Also, the interrupted iterations shouldn't affect the average iteration_duration in any way, only
// complete iterations should be taken into account
- durationCount := float64(getMetricCount(collector, metrics.IterationDuration.Name))
+ durationCount := float64(getMetricCount(mockOutput, metrics.IterationDuration.Name))
assert.Equal(t, 3.0, durationCount)
- durationSum := getMetricSum(collector, metrics.IterationDuration.Name)
+ durationSum := getMetricSum(mockOutput, metrics.IterationDuration.Name)
assert.InDelta(t, 3.35, durationSum/(1000*durationCount), 0.25)
}
@@ -843,8 +841,8 @@ func TestMetricsEmission(t *testing.T) {
)
require.NoError(t, err)
- collector := &dummy.Collector{}
- engine, run, wait := newTestEngine(t, nil, runner, []lib.Collector{collector}, runner.GetOptions())
+ mockOutput := mockoutput.New()
+ engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, runner.GetOptions())
errC := make(chan error)
go func() { errC <- run() }()
@@ -858,8 +856,8 @@ func TestMetricsEmission(t *testing.T) {
require.False(t, engine.IsTainted())
}
- assert.Equal(t, tc.expIters, getMetricSum(collector, metrics.Iterations.Name))
- assert.Equal(t, tc.expCount, getMetricSum(collector, "testcounter"))
+ assert.Equal(t, tc.expIters, getMetricSum(mockOutput, metrics.Iterations.Name))
+ assert.Equal(t, tc.expCount, getMetricSum(mockOutput, "testcounter"))
})
}
}
@@ -961,8 +959,8 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) {
},
}
- c := &dummy.Collector{}
- _, run, wait := newTestEngine(t, ctx, runner, []lib.Collector{c}, lib.Options{
+ mockOutput := mockoutput.New()
+ _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{
VUs: null.IntFrom(1), Iterations: null.IntFrom(1),
})
@@ -970,7 +968,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) {
wait()
var count float64
- for _, sample := range c.Samples {
+ for _, sample := range mockOutput.Samples {
if sample.Metric == testMetric {
count += sample.Value
}
diff --git a/js/runner_test.go b/js/runner_test.go
index af9fca2275e5..910196b39a0f 100644
--- a/js/runner_test.go
+++ b/js/runner_test.go
@@ -58,10 +58,11 @@ import (
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
+ "github.com/loadimpact/k6/lib/testutils/mockoutput"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/loader"
+ "github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
- "github.com/loadimpact/k6/stats/dummy"
)
func TestRunnerNew(t *testing.T) {
@@ -295,15 +296,17 @@ func TestSetupDataIsolation(t *testing.T) {
execScheduler, err := local.NewExecutionScheduler(runner, testutils.NewLogger(t))
require.NoError(t, err)
- engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, testutils.NewLogger(t))
+
+ mockOutput := mockoutput.New()
+ engine, err := core.NewEngine(
+ execScheduler, options, lib.RuntimeOptions{}, []output.Output{mockOutput}, testutils.NewLogger(t),
+ )
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
run, wait, err := engine.Init(ctx, ctx)
require.NoError(t, err)
- collector := &dummy.Collector{}
- engine.Collectors = []lib.Collector{collector}
require.Empty(t, runner.defaultGroup.Groups)
errC := make(chan error)
@@ -322,7 +325,7 @@ func TestSetupDataIsolation(t *testing.T) {
require.Contains(t, runner.defaultGroup.Groups, "setup")
require.Contains(t, runner.defaultGroup.Groups, "teardown")
var count int
- for _, s := range collector.Samples {
+ for _, s := range mockOutput.Samples {
if s.Metric.Name == "mycounter" {
count += int(s.Value)
}
diff --git a/lib/collector.go b/lib/collector.go
index d3700ad0d113..408f517002e0 100644
--- a/lib/collector.go
+++ b/lib/collector.go
@@ -26,6 +26,8 @@ import (
"github.com/loadimpact/k6/stats"
)
+// TODO: move to some other package - types? models?
+
// RunStatus values can be used by k6 to denote how a script run ends
// and by the cloud executor and collector so that k6 knows the current
// status of a particular script run.
@@ -46,6 +48,8 @@ const (
RunStatusAbortedThreshold RunStatus = 8
)
+// TODO: remove
+
// A Collector abstracts the process of funneling samples to an external storage backend,
// such as an InfluxDB instance.
type Collector interface {
diff --git a/lib/testutils/mockoutput/mockoutput.go b/lib/testutils/mockoutput/mockoutput.go
new file mode 100644
index 000000000000..b03273f23bd8
--- /dev/null
+++ b/lib/testutils/mockoutput/mockoutput.go
@@ -0,0 +1,83 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package mockoutput
+
+import (
+ "github.com/loadimpact/k6/lib"
+ "github.com/loadimpact/k6/output"
+ "github.com/loadimpact/k6/stats"
+)
+
+// New exists so that the usage from tests avoids repetition, i.e. is
+// mockoutput.New() instead of &mockoutput.MockOutput{}
+func New() *MockOutput {
+ return &MockOutput{}
+}
+
+// MockOutput can be used in tests to mock an actual output.
+type MockOutput struct {
+ SampleContainers []stats.SampleContainer
+ Samples []stats.Sample
+ RunStatus lib.RunStatus
+
+ DescFn func() string
+ StartFn func() error
+ StopFn func() error
+}
+
+var _ output.WithRunStatusUpdates = &MockOutput{}
+
+// AddMetricSamples just saves the results in memory.
+func (mo *MockOutput) AddMetricSamples(scs []stats.SampleContainer) {
+ mo.SampleContainers = append(mo.SampleContainers, scs...)
+ for _, sc := range scs {
+ mo.Samples = append(mo.Samples, sc.GetSamples()...)
+ }
+}
+
+// SetRunStatus updates the RunStatus property.
+func (mo *MockOutput) SetRunStatus(latestStatus lib.RunStatus) {
+ mo.RunStatus = latestStatus
+}
+
+// Description calls the supplied DescFn callback, if available.
+func (mo *MockOutput) Description() string {
+ if mo.DescFn != nil {
+ return mo.DescFn()
+ }
+ return "mock"
+}
+
+// Start calls the supplied StartFn callback, if available.
+func (mo *MockOutput) Start() error {
+ if mo.StartFn != nil {
+ return mo.StartFn()
+ }
+ return nil
+}
+
+// Stop calls the supplied StopFn callback, if available.
+func (mo *MockOutput) Stop() error {
+ if mo.StopFn != nil {
+ return mo.StopFn()
+ }
+ return nil
+}
diff --git a/output/extensions.go b/output/extensions.go
new file mode 100644
index 000000000000..d136b136234f
--- /dev/null
+++ b/output/extensions.go
@@ -0,0 +1,55 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package output
+
+import (
+ "fmt"
+ "sync"
+)
+
+//nolint:gochecknoglobals
+var (
+ extensions = make(map[string]func(Params) (Output, error))
+ mx sync.RWMutex
+)
+
+// GetExtensions returns all registered extensions.
+func GetExtensions() map[string]func(Params) (Output, error) {
+ mx.RLock()
+ defer mx.RUnlock()
+ res := make(map[string]func(Params) (Output, error), len(extensions))
+ for k, v := range extensions {
+ res[k] = v
+ }
+ return res
+}
+
+// RegisterExtension registers the given output extension constructor. This
+// function panics if a module with the same name is already registered.
+func RegisterExtension(name string, mod func(Params) (Output, error)) {
+ mx.Lock()
+ defer mx.Unlock()
+
+ if _, ok := extensions[name]; ok {
+ panic(fmt.Sprintf("output extension already registered: %s", name))
+ }
+ extensions[name] = mod
+}
diff --git a/output/helpers.go b/output/helpers.go
new file mode 100644
index 000000000000..647ccd76f0f0
--- /dev/null
+++ b/output/helpers.go
@@ -0,0 +1,125 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package output
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/loadimpact/k6/stats"
+)
+
+// SampleBuffer is a simple thread-safe buffer for metric samples. It should be
+// used by most outputs, since we generally want to flush metric samples to the
+// remote service asynchronously. We want to do it only every several seconds,
+// and we don't want to block the Engine in the meantime.
+type SampleBuffer struct {
+ sync.Mutex
+ buffer []stats.SampleContainer
+ maxLen int
+}
+
+// AddMetricSamples adds the given metric samples to the internal buffer.
+func (sc *SampleBuffer) AddMetricSamples(samples []stats.SampleContainer) {
+ if len(samples) == 0 {
+ return
+ }
+ sc.Lock()
+ sc.buffer = append(sc.buffer, samples...)
+ sc.Unlock()
+}
+
+// GetBufferedSamples returns the currently buffered metric samples and makes a
+// new internal buffer with some hopefully realistic size. If the internal
+// buffer is empty, it will return nil.
+func (sc *SampleBuffer) GetBufferedSamples() []stats.SampleContainer {
+ sc.Lock()
+ defer sc.Unlock()
+
+ buffered, bufferedLen := sc.buffer, len(sc.buffer)
+ if bufferedLen == 0 {
+ return nil
+ }
+ if bufferedLen > sc.maxLen {
+ sc.maxLen = bufferedLen
+ }
+ // Make the new buffer halfway between the previously allocated size and the
+ // maximum buffer size we've seen so far, to hopefully reduce copying a bit.
+ sc.buffer = make([]stats.SampleContainer, 0, (bufferedLen+sc.maxLen)/2)
+
+ return buffered
+}
+
+// PeriodicFlusher is a small helper for asynchronously flushing buffered metric
+// samples on regular intervals. The biggest benefit is having a Stop() method
+// that waits for one last flush before it returns.
+type PeriodicFlusher struct {
+ period time.Duration
+ flushCallback func()
+ stop chan struct{}
+ stopped chan struct{}
+ once *sync.Once
+}
+
+func (pf *PeriodicFlusher) run() {
+ ticker := time.NewTicker(pf.period)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ pf.flushCallback()
+ case <-pf.stop:
+ pf.flushCallback()
+ close(pf.stopped)
+ return
+ }
+ }
+}
+
+// Stop waits for the periodic flusher flush one last time and exit. You can
+// safely call Stop() multiple times from different goroutines, you just can't
+// call it from inside of the flushing function.
+func (pf *PeriodicFlusher) Stop() {
+ pf.once.Do(func() {
+ close(pf.stop)
+ })
+ <-pf.stopped
+}
+
+// NewPeriodicFlusher creates a new PeriodicFlusher and starts its goroutine.
+func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFlusher, error) {
+ if period <= 0 {
+ return nil, fmt.Errorf("metric flush period should be positive but was %s", period)
+ }
+
+ pf := &PeriodicFlusher{
+ period: period,
+ flushCallback: flushCallback,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ once: &sync.Once{},
+ }
+
+ go pf.run()
+
+ return pf, nil
+}
diff --git a/output/helpers_test.go b/output/helpers_test.go
new file mode 100644
index 000000000000..61e66f9a268e
--- /dev/null
+++ b/output/helpers_test.go
@@ -0,0 +1,183 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package output
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/loadimpact/k6/stats"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSampleBufferBasics(t *testing.T) {
+ t.Parallel()
+ single := stats.Sample{
+ Time: time.Now(),
+ Metric: stats.New("my_metric", stats.Rate),
+ Value: float64(123),
+ Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}),
+ }
+ connected := stats.ConnectedSamples{Samples: []stats.Sample{single, single}, Time: single.Time}
+ buffer := SampleBuffer{}
+
+ assert.Empty(t, buffer.GetBufferedSamples())
+ buffer.AddMetricSamples([]stats.SampleContainer{single, single})
+ buffer.AddMetricSamples([]stats.SampleContainer{single, connected, single})
+ assert.Equal(t, []stats.SampleContainer{single, single, single, connected, single}, buffer.GetBufferedSamples())
+ assert.Empty(t, buffer.GetBufferedSamples())
+
+ // Verify some internals
+ assert.Equal(t, cap(buffer.buffer), 5)
+ buffer.AddMetricSamples([]stats.SampleContainer{single, connected})
+ buffer.AddMetricSamples(nil)
+ buffer.AddMetricSamples([]stats.SampleContainer{})
+ buffer.AddMetricSamples([]stats.SampleContainer{single})
+ assert.Equal(t, []stats.SampleContainer{single, connected, single}, buffer.GetBufferedSamples())
+ assert.Equal(t, cap(buffer.buffer), 4)
+ buffer.AddMetricSamples([]stats.SampleContainer{single})
+ assert.Equal(t, []stats.SampleContainer{single}, buffer.GetBufferedSamples())
+ assert.Equal(t, cap(buffer.buffer), 3)
+ assert.Empty(t, buffer.GetBufferedSamples())
+}
+
+func TestSampleBufferConcurrently(t *testing.T) {
+ t.Parallel()
+
+ seed := time.Now().UnixNano()
+ r := rand.New(rand.NewSource(seed)) //nolint:gosec
+ t.Logf("Random source seeded with %d\n", seed)
+
+ producersCount := 50 + r.Intn(50)
+ sampleCount := 10 + r.Intn(10)
+ sleepModifier := 10 + r.Intn(10)
+ buffer := SampleBuffer{}
+
+ wg := make(chan struct{})
+ fillBuffer := func() {
+ for i := 0; i < sampleCount; i++ {
+ buffer.AddMetricSamples([]stats.SampleContainer{stats.Sample{
+ Time: time.Unix(1562324644, 0),
+ Metric: stats.New("my_metric", stats.Gauge),
+ Value: float64(i),
+ Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"}),
+ }})
+ time.Sleep(time.Duration(i*sleepModifier) * time.Microsecond)
+ }
+ wg <- struct{}{}
+ }
+ for i := 0; i < producersCount; i++ {
+ go fillBuffer()
+ }
+
+ timer := time.NewTicker(5 * time.Millisecond)
+ timeout := time.After(5 * time.Second)
+ defer timer.Stop()
+ readSamples := make([]stats.SampleContainer, 0, sampleCount*producersCount)
+ finishedProducers := 0
+loop:
+ for {
+ select {
+ case <-timer.C:
+ readSamples = append(readSamples, buffer.GetBufferedSamples()...)
+ case <-wg:
+ finishedProducers++
+ if finishedProducers == producersCount {
+ readSamples = append(readSamples, buffer.GetBufferedSamples()...)
+ break loop
+ }
+ case <-timeout:
+ t.Fatalf("test timed out")
+ }
+ }
+ assert.Equal(t, sampleCount*producersCount, len(readSamples))
+ for _, s := range readSamples {
+ require.NotNil(t, s)
+ ss := s.GetSamples()
+ require.Len(t, ss, 1)
+ assert.Equal(t, "my_metric", ss[0].Metric.Name)
+ }
+}
+
+func TestPeriodicFlusherBasics(t *testing.T) {
+ t.Parallel()
+
+ f, err := NewPeriodicFlusher(-1*time.Second, func() {})
+ assert.Error(t, err)
+ assert.Nil(t, f)
+ f, err = NewPeriodicFlusher(0, func() {})
+ assert.Error(t, err)
+ assert.Nil(t, f)
+
+ count := 0
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ f, err = NewPeriodicFlusher(100*time.Millisecond, func() {
+ count++
+ if count == 2 {
+ wg.Done()
+ }
+ })
+ assert.NotNil(t, f)
+ assert.Nil(t, err)
+ wg.Wait()
+ f.Stop()
+ assert.Equal(t, 3, count)
+}
+
+func TestPeriodicFlusherConcurrency(t *testing.T) {
+ t.Parallel()
+
+ seed := time.Now().UnixNano()
+ r := rand.New(rand.NewSource(seed)) //nolint:gosec
+ randStops := 10 + r.Intn(10)
+ t.Logf("Random source seeded with %d\n", seed)
+
+ count := 0
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ f, err := NewPeriodicFlusher(1000*time.Microsecond, func() {
+ // Sleep intentionally may be longer than the flush period. Also, this
+ // should never happen concurrently, so it's intentionally not locked.
+ time.Sleep(time.Duration(700+r.Intn(1000)) * time.Microsecond)
+ count++
+ if count == 100 {
+ wg.Done()
+ }
+ })
+ assert.NotNil(t, f)
+ assert.Nil(t, err)
+ wg.Wait()
+
+ stopWG := &sync.WaitGroup{}
+ stopWG.Add(randStops)
+ for i := 0; i < randStops; i++ {
+ go func() {
+ f.Stop()
+ stopWG.Done()
+ }()
+ }
+ stopWG.Wait()
+ assert.True(t, count >= 101) // due to the short intervals, we might not get exactly 101
+}
diff --git a/output/json/json.go b/output/json/json.go
new file mode 100644
index 000000000000..4e1c74d440b1
--- /dev/null
+++ b/output/json/json.go
@@ -0,0 +1,153 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package json
+
+import (
+ "compress/gzip"
+ stdlibjson "encoding/json"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/loadimpact/k6/output"
+ "github.com/loadimpact/k6/stats"
+ "github.com/sirupsen/logrus"
+)
+
+// TODO: add option for emitting proper JSON files (https://github.com/loadimpact/k6/issues/1052)
+const flushPeriod = 200 * time.Millisecond // TODO: make this configurable
+
+// Output funnels all passed metrics to an (optionally gzipped) JSON file.
+type Output struct {
+ output.SampleBuffer
+
+ params output.Params
+ periodicFlusher *output.PeriodicFlusher
+
+ logger logrus.FieldLogger
+ filename string
+ encoder *stdlibjson.Encoder
+ closeFn func() error
+ seenMetrics map[string]struct{}
+}
+
+// New returns a new JSON output.
+func New(params output.Params) (output.Output, error) {
+ return &Output{
+ params: params,
+ filename: params.ConfigArgument,
+ logger: params.Logger.WithFields(logrus.Fields{
+ "output": "json",
+ "filename": params.ConfigArgument,
+ }),
+ seenMetrics: make(map[string]struct{}),
+ }, nil
+}
+
+// Description returns a human-readable description of the output.
+func (o *Output) Description() string {
+ if o.filename == "" || o.filename == "-" {
+ return "json(stdout)"
+ }
+ return fmt.Sprintf("json (%s)", o.filename)
+}
+
+// Start opens the tries to specified JSON file and starts the goroutine for
+// metric flushing.
+func (o *Output) Start() error {
+ o.logger.Debug("Starting...")
+
+ if o.filename == "" || o.filename == "-" {
+ o.encoder = stdlibjson.NewEncoder(o.params.StdOut)
+ o.closeFn = func() error {
+ return nil
+ }
+ } else {
+ logfile, err := o.params.FS.Create(o.filename)
+ if err != nil {
+ return err
+ }
+
+ if strings.HasSuffix(o.filename, ".gz") {
+ outfile := gzip.NewWriter(logfile)
+
+ o.closeFn = func() error {
+ _ = outfile.Close()
+ return logfile.Close()
+ }
+ o.encoder = stdlibjson.NewEncoder(outfile)
+ } else {
+ o.closeFn = logfile.Close
+ o.encoder = stdlibjson.NewEncoder(logfile)
+ }
+ }
+
+ pf, err := output.NewPeriodicFlusher(flushPeriod, o.flushMetrics)
+ if err != nil {
+ return err
+ }
+ o.logger.Debug("Started!")
+ o.periodicFlusher = pf
+
+ return nil
+}
+
+// Stop flushes any remaining metrics and stops the goroutine.
+func (o *Output) Stop() error {
+ o.logger.Debug("Stopping...")
+ defer o.logger.Debug("Stopped!")
+ o.periodicFlusher.Stop()
+ return o.closeFn()
+}
+
+func (o *Output) flushMetrics() {
+ samples := o.GetBufferedSamples()
+ start := time.Now()
+ var count int
+ for _, sc := range samples {
+ samples := sc.GetSamples()
+ count += len(samples)
+ for _, sample := range samples {
+ sample := sample
+ o.handleMetric(sample.Metric)
+ err := o.encoder.Encode(WrapSample(sample))
+ if err != nil {
+ // Skip metric if it can't be made into JSON or envelope is null.
+ o.logger.WithError(err).Error("Sample couldn't be marshalled to JSON")
+ }
+ }
+ }
+ if count > 0 {
+ o.logger.WithField("t", time.Since(start)).WithField("count", count).Debug("Wrote metrics to JSON")
+ }
+}
+
+func (o *Output) handleMetric(m *stats.Metric) {
+ if _, ok := o.seenMetrics[m.Name]; ok {
+ return
+ }
+ o.seenMetrics[m.Name] = struct{}{}
+
+ err := o.encoder.Encode(wrapMetric(m))
+ if err != nil {
+ o.logger.WithError(err).Error("Metric couldn't be marshalled to JSON")
+ }
+}
diff --git a/output/json/json_test.go b/output/json/json_test.go
new file mode 100644
index 000000000000..da104f95261b
--- /dev/null
+++ b/output/json/json_test.go
@@ -0,0 +1,187 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package json
+
+import (
+ "bufio"
+ "bytes"
+ "compress/gzip"
+ "io"
+ "testing"
+ "time"
+
+ "github.com/spf13/afero"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/loadimpact/k6/lib/testutils"
+ "github.com/loadimpact/k6/output"
+ "github.com/loadimpact/k6/stats"
+)
+
+func getValidator(t *testing.T, expected []string) func(io.Reader) {
+ return func(rawJSONLines io.Reader) {
+ s := bufio.NewScanner(rawJSONLines)
+ i := 0
+ for s.Scan() {
+ i++
+ if i > len(expected) {
+ t.Errorf("Read unexpected line number %d, expected only %d entries", i, len(expected))
+ continue
+ }
+ assert.JSONEq(t, expected[i-1], string(s.Bytes()))
+ }
+ assert.NoError(t, s.Err())
+ assert.Equal(t, len(expected), i)
+ }
+}
+
+func generateTestMetricSamples(t *testing.T) ([]stats.SampleContainer, func(io.Reader)) {
+ metric1 := stats.New("my_metric1", stats.Gauge)
+ metric2 := stats.New("my_metric2", stats.Counter, stats.Data)
+ time1 := time.Date(2021, time.February, 24, 13, 37, 10, 0, time.UTC)
+ time2 := time1.Add(10 * time.Second)
+ time3 := time2.Add(10 * time.Second)
+ connTags := stats.NewSampleTags(map[string]string{"key": "val"})
+
+ samples := []stats.SampleContainer{
+ stats.Sample{Time: time1, Metric: metric1, Value: float64(1), Tags: stats.NewSampleTags(map[string]string{"tag1": "val1"})},
+ stats.Sample{Time: time1, Metric: metric1, Value: float64(2), Tags: stats.NewSampleTags(map[string]string{"tag2": "val2"})},
+ stats.ConnectedSamples{Samples: []stats.Sample{
+ {Time: time2, Metric: metric2, Value: float64(3), Tags: connTags},
+ {Time: time2, Metric: metric1, Value: float64(4), Tags: connTags},
+ }, Time: time2, Tags: connTags},
+ stats.Sample{Time: time3, Metric: metric2, Value: float64(5), Tags: stats.NewSampleTags(map[string]string{"tag3": "val3"})},
+ }
+ // TODO: fix JSON thresholds (https://github.com/loadimpact/k6/issues/1052)
+ expected := []string{
+ `{"type":"Metric","data":{"name":"my_metric1","type":"gauge","contains":"default","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric1"}`,
+ `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":1,"tags":{"tag1":"val1"}},"metric":"my_metric1"}`,
+ `{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":2,"tags":{"tag2":"val2"}},"metric":"my_metric1"}`,
+ `{"type":"Metric","data":{"name":"my_metric2","type":"counter","contains":"data","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric2"}`,
+ `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":3,"tags":{"key":"val"}},"metric":"my_metric2"}`,
+ `{"type":"Point","data":{"time":"2021-02-24T13:37:20Z","value":4,"tags":{"key":"val"}},"metric":"my_metric1"}`,
+ `{"type":"Point","data":{"time":"2021-02-24T13:37:30Z","value":5,"tags":{"tag3":"val3"}},"metric":"my_metric2"}`,
+ }
+
+ return samples, getValidator(t, expected)
+}
+
+func TestJsonOutputStdout(t *testing.T) {
+ t.Parallel()
+
+ stdout := new(bytes.Buffer)
+ out, err := New(output.Params{
+ Logger: testutils.NewLogger(t),
+ StdOut: stdout,
+ })
+ require.NoError(t, err)
+ require.NoError(t, out.Start())
+
+ samples, validateResults := generateTestMetricSamples(t)
+ out.AddMetricSamples(samples[:2])
+ out.AddMetricSamples(samples[2:])
+ require.NoError(t, out.Stop())
+ validateResults(stdout)
+}
+
+func TestJsonOutputFileError(t *testing.T) {
+ t.Parallel()
+
+ stdout := new(bytes.Buffer)
+ fs := afero.NewReadOnlyFs(afero.NewMemMapFs())
+ out, err := New(output.Params{
+ Logger: testutils.NewLogger(t),
+ StdOut: stdout,
+ FS: fs,
+ ConfigArgument: "/json-output",
+ })
+ require.NoError(t, err)
+ assert.Error(t, out.Start())
+}
+
+func TestJsonOutputFile(t *testing.T) {
+ t.Parallel()
+
+ stdout := new(bytes.Buffer)
+ fs := afero.NewMemMapFs()
+ out, err := New(output.Params{
+ Logger: testutils.NewLogger(t),
+ StdOut: stdout,
+ FS: fs,
+ ConfigArgument: "/json-output",
+ })
+ require.NoError(t, err)
+ require.NoError(t, out.Start())
+
+ samples, validateResults := generateTestMetricSamples(t)
+ out.AddMetricSamples(samples[:2])
+ out.AddMetricSamples(samples[2:])
+ require.NoError(t, out.Stop())
+
+ assert.Empty(t, stdout.Bytes())
+ file, err := fs.Open("/json-output")
+ require.NoError(t, err)
+ validateResults(file)
+ assert.NoError(t, file.Close())
+}
+
+func TestJsonOutputFileGzipped(t *testing.T) {
+ t.Parallel()
+
+ stdout := new(bytes.Buffer)
+ fs := afero.NewMemMapFs()
+ out, err := New(output.Params{
+ Logger: testutils.NewLogger(t),
+ StdOut: stdout,
+ FS: fs,
+ ConfigArgument: "/json-output.gz",
+ })
+ require.NoError(t, err)
+ require.NoError(t, out.Start())
+
+ samples, validateResults := generateTestMetricSamples(t)
+ out.AddMetricSamples(samples[:2])
+ out.AddMetricSamples(samples[2:])
+ require.NoError(t, out.Stop())
+
+ assert.Empty(t, stdout.Bytes())
+ file, err := fs.Open("/json-output.gz")
+ require.NoError(t, err)
+ reader, err := gzip.NewReader(file)
+ require.NoError(t, err)
+ validateResults(reader)
+ assert.NoError(t, file.Close())
+}
+
+func TestWrapSampleWithSamplePointer(t *testing.T) {
+ t.Parallel()
+ out := WrapSample(stats.Sample{
+ Metric: &stats.Metric{},
+ })
+ assert.NotEqual(t, out, (*Envelope)(nil))
+}
+
+func TestWrapMetricWithMetricPointer(t *testing.T) {
+ t.Parallel()
+ out := wrapMetric(&stats.Metric{})
+ assert.NotEqual(t, out, (*Envelope)(nil))
+}
diff --git a/stats/json/wrapper.go b/output/json/wrapper.go
similarity index 72%
rename from stats/json/wrapper.go
rename to output/json/wrapper.go
index d0dc77b80567..9885386de1e1 100644
--- a/stats/json/wrapper.go
+++ b/output/json/wrapper.go
@@ -26,38 +26,40 @@ import (
"github.com/loadimpact/k6/stats"
)
+// Envelope is the data format we use to export both metrics and metric samples
+// to the JSON file.
type Envelope struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Metric string `json:"metric,omitempty"`
}
-type JSONSample struct {
+// Sample is the data format for metric sample data in the JSON file.
+type Sample struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags *stats.SampleTags `json:"tags"`
}
-func NewJSONSample(sample *stats.Sample) *JSONSample {
- return &JSONSample{
+func newJSONSample(sample stats.Sample) Sample {
+ return Sample{
Time: sample.Time,
Value: sample.Value,
Tags: sample.Tags,
}
}
-func WrapSample(sample *stats.Sample) *Envelope {
- if sample == nil {
- return nil
- }
- return &Envelope{
+// WrapSample is used to package a metric sample in a way that's nice to export
+// to JSON.
+func WrapSample(sample stats.Sample) Envelope {
+ return Envelope{
Type: "Point",
Metric: sample.Metric.Name,
- Data: NewJSONSample(sample),
+ Data: newJSONSample(sample),
}
}
-func WrapMetric(metric *stats.Metric) *Envelope {
+func wrapMetric(metric *stats.Metric) *Envelope {
if metric == nil {
return nil
}
diff --git a/output/types.go b/output/types.go
new file mode 100644
index 000000000000..efac5014aeb7
--- /dev/null
+++ b/output/types.go
@@ -0,0 +1,90 @@
+/*
+ *
+ * k6 - a next-generation load testing tool
+ * Copyright (C) 2021 Load Impact
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package output
+
+import (
+ "encoding/json"
+ "io"
+ "net/url"
+
+ "github.com/loadimpact/k6/lib"
+ "github.com/loadimpact/k6/stats"
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/afero"
+)
+
+// Params contains all possible constructor parameters an output may need.
+type Params struct {
+ OutputType string // --out $OutputType=$ConfigArgument, K6_OUT="$OutputType=$ConfigArgument"
+ ConfigArgument string
+ JSONConfig json.RawMessage
+
+ Logger logrus.FieldLogger
+ Environment map[string]string
+ StdOut io.Writer
+ StdErr io.Writer
+ FS afero.Fs
+ ScriptPath *url.URL
+ ScriptOptions lib.Options
+ RuntimeOptions lib.RuntimeOptions
+ ExecutionPlan []lib.ExecutionStep
+}
+
+// An Output abstracts the process of funneling samples to an external storage
+// backend, such as a file or something like an InfluxDB instance.
+//
+// N.B: All outputs should have non-blocking AddMetricSamples() methods and
+// should spawn their own goroutine to flush metrics asynchronously.
+type Output interface {
+ // Returns a human-readable description of the output that will be shown in
+ // `k6 run`. For extensions it probably should include the version as well.
+ Description() string
+
+ // Start is called before the Engine tries to use the output and should be
+ // used for any long initialization tasks, as well as for starting a
+ // goroutine to asynchronously flush metrics to the output.
+ Start() error
+
+ // A method to receive the latest metric samples from the Engine. This
+ // method is never called concurrently, so do not do anything blocking here
+ // that might take a long time. Preferably, just use the SampleBuffer or
+ // something like it to buffer metrics until they are flushed.
+ AddMetricSamples(samples []stats.SampleContainer)
+
+ // Flush all remaining metrics and finalize the test run.
+ Stop() error
+}
+
+// WithThresholds is an output that requires the Engine to give it the
+// thresholds before it can be started.
+type WithThresholds interface {
+ Output
+ SetThresholds(map[string]stats.Thresholds)
+}
+
+// TODO: add some way for outputs to report mid-test errors and potentially
+// abort the whole test run
+
+// WithRunStatusUpdates means the output can receivetest run status updates.
+type WithRunStatusUpdates interface {
+ Output
+ SetRunStatus(latestStatus lib.RunStatus)
+}
diff --git a/stats/cloud/bench_test.go b/stats/cloud/bench_test.go
index 5473fa511917..27a674feb46a 100644
--- a/stats/cloud/bench_test.go
+++ b/stats/cloud/bench_test.go
@@ -42,16 +42,10 @@ import (
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/types"
- "github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/stats"
)
func BenchmarkAggregateHTTP(b *testing.B) {
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -61,7 +55,7 @@ func BenchmarkAggregateHTTP(b *testing.B) {
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
})
- collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
now := time.Now()
collector.referenceID = "something"
@@ -295,11 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags
}
func BenchmarkHTTPPush(b *testing.B) {
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -323,7 +312,7 @@ func BenchmarkHTTPPush(b *testing.B) {
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
})
- collector, err := New(testutils.NewLogger(b), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
collector.referenceID = "fake"
diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go
index 7b41a37e9019..a36889a2195a 100644
--- a/stats/cloud/collector.go
+++ b/stats/cloud/collector.go
@@ -44,7 +44,6 @@ import (
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/netext"
"github.com/loadimpact/k6/lib/netext/httpext"
- "github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/stats"
)
@@ -91,7 +90,7 @@ var _ lib.Collector = &Collector{}
// New creates a new cloud collector
func New(
logger logrus.FieldLogger,
- conf cloudapi.Config, src *loader.SourceData, opts lib.Options, executionPlan []lib.ExecutionStep, version string,
+ conf cloudapi.Config, scriptURL fmt.Stringer, opts lib.Options, executionPlan []lib.ExecutionStep, version string,
) (*Collector, error) {
if err := cloudapi.MergeFromExternal(opts.External, &conf); err != nil {
return nil, err
@@ -102,7 +101,7 @@ func New(
}
if !conf.Name.Valid || conf.Name.String == "" {
- conf.Name = null.StringFrom(filepath.Base(src.URL.String()))
+ conf.Name = null.StringFrom(filepath.Base(scriptURL.String()))
}
if conf.Name.String == "-" {
conf.Name = null.StringFrom(TestName)
diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go
index 44c57eb27ea2..343e2138798a 100644
--- a/stats/cloud/collector_test.go
+++ b/stats/cloud/collector_test.go
@@ -50,7 +50,6 @@ import (
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/types"
- "github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/stats"
)
@@ -177,11 +176,6 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) {
}))
defer tb.Cleanup()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -190,7 +184,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) {
Host: null.StringFrom(tb.ServerHTTP.URL),
NoCompress: null.BoolFrom(true),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
assert.True(t, collector.config.Host.Valid)
@@ -335,11 +329,6 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) {
}))
defer tb.Cleanup()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -348,7 +337,7 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) {
Host: null.StringFrom(tb.ServerHTTP.URL),
NoCompress: null.BoolFrom(true),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
now := time.Now()
tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"})
@@ -427,11 +416,6 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) {
}))
defer tb.Cleanup()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -441,7 +425,7 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) {
NoCompress: null.BoolFrom(true),
MaxMetricSamplesPerPackage: null.IntFrom(50),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
now := time.Now()
tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"})
@@ -548,11 +532,6 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) {
}))
defer tb.Cleanup()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -561,7 +540,7 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) {
Host: null.StringFrom(tb.ServerHTTP.URL),
NoCompress: null.BoolFrom(true),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
assert.True(t, collector.config.Host.Valid)
@@ -607,11 +586,6 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) {
}))
defer tb.Cleanup()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
-
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -620,7 +594,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) {
Host: null.StringFrom(tb.ServerHTTP.URL),
NoCompress: null.BoolFrom(true),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "path/to/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
gotIterations := false
@@ -730,10 +704,7 @@ func TestNewName(t *testing.T) {
testCase := testCase
t.Run(testCase.url.String(), func(t *testing.T) {
- script := &loader.SourceData{
- URL: testCase.url,
- }
- collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), script, lib.Options{
+ collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), testCase.url, lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
@@ -766,10 +737,6 @@ func TestPublishMetric(t *testing.T) {
}))
defer server.Close()
- script := &loader.SourceData{
- Data: []byte(""),
- URL: &url.URL{Path: "/script.js"},
- }
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
@@ -777,7 +744,7 @@ func TestPublishMetric(t *testing.T) {
Host: null.StringFrom(server.URL),
NoCompress: null.BoolFrom(true),
})
- collector, err := New(testutils.NewLogger(t), config, script, options, []lib.ExecutionStep{}, "1.0")
+ collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(t, err)
samples := []*Sample{
diff --git a/stats/csv/config.go b/stats/csv/config.go
index 95914d2a7495..f75f8b1bd459 100644
--- a/stats/csv/config.go
+++ b/stats/csv/config.go
@@ -21,12 +21,14 @@
package csv
import (
+ "encoding/json"
"fmt"
"strings"
"time"
"gopkg.in/guregu/null.v3"
+ "github.com/kelseyhightower/envconfig"
"github.com/loadimpact/k6/lib/types"
)
@@ -87,3 +89,33 @@ func ParseArg(arg string) (Config, error) {
return c, nil
}
+
+// GetConsolidatedConfig combines {default config values + JSON config +
+// environment vars + arg config values}, and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf := Config{}
+ if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ if arg != "" {
+ urlConf, err := ParseArg(arg)
+ if err != nil {
+ return result, err
+ }
+ result = result.Apply(urlConf)
+ }
+
+ return result, nil
+}
diff --git a/stats/datadog/collector.go b/stats/datadog/collector.go
index 415adff68ee3..245862788883 100644
--- a/stats/datadog/collector.go
+++ b/stats/datadog/collector.go
@@ -21,8 +21,10 @@
package datadog
import (
+ "encoding/json"
"time"
+ "github.com/kelseyhightower/envconfig"
"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"
@@ -115,3 +117,25 @@ func New(logger logrus.FieldLogger, conf Config) (*common.Collector, error) {
Logger: logger,
}, nil
}
+
+// GetConsolidatedConfig combines {default config values + JSON config +
+// environment vars}, and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf := Config{}
+ if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ return result, nil
+}
diff --git a/stats/dummy/collector.go b/stats/dummy/collector.go
deleted file mode 100644
index fa147c699ab6..000000000000
--- a/stats/dummy/collector.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package dummy
-
-import (
- "context"
-
- "github.com/loadimpact/k6/lib"
- "github.com/loadimpact/k6/stats"
-)
-
-// Collector implements the lib.Collector interface and should be used only for testing
-type Collector struct {
- RunStatus lib.RunStatus
-
- SampleContainers []stats.SampleContainer
- Samples []stats.Sample
-}
-
-// Verify that Collector implements lib.Collector
-var _ lib.Collector = &Collector{}
-
-// Init does nothing, it's only included to satisfy the lib.Collector interface
-func (c *Collector) Init() error { return nil }
-
-// MakeConfig does nothing, it's only included to satisfy the lib.Collector interface
-func (c *Collector) MakeConfig() interface{} { return nil }
-
-// Run just blocks until the context is done
-func (c *Collector) Run(ctx context.Context) {
- <-ctx.Done()
-}
-
-// Collect just appends all of the samples passed to it to the internal sample slice.
-// According to the the lib.Collector interface, it should never be called concurrently,
-// so there's no locking on purpose - that way Go's race condition detector can actually
-// detect incorrect usage.
-// Also, theoretically the collector doesn't have to actually Run() before samples start
-// being collected, it only has to be initialized.
-func (c *Collector) Collect(scs []stats.SampleContainer) {
- for _, sc := range scs {
- c.SampleContainers = append(c.SampleContainers, sc)
- c.Samples = append(c.Samples, sc.GetSamples()...)
- }
-}
-
-// Link returns a dummy string, it's only included to satisfy the lib.Collector interface
-func (c *Collector) Link() string {
- return "http://example.com/"
-}
-
-// GetRequiredSystemTags returns which sample tags are needed by this collector
-func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet {
- return stats.SystemTagSet(0) // There are no required tags for this collector
-}
-
-// SetRunStatus just saves the passed status for later inspection
-func (c *Collector) SetRunStatus(status lib.RunStatus) {
- c.RunStatus = status
-}
diff --git a/stats/dummy/collector_test.go b/stats/dummy/collector_test.go
deleted file mode 100644
index 148f1b43988d..000000000000
--- a/stats/dummy/collector_test.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package dummy
-
-import (
- "context"
- "sync"
- "testing"
-
- "github.com/loadimpact/k6/stats"
- "github.com/stretchr/testify/assert"
-)
-
-func TestCollectorRun(t *testing.T) {
- var wg sync.WaitGroup
- c := &Collector{}
- ctx, cancel := context.WithCancel(context.Background())
- wg.Add(1)
- go func() {
- defer wg.Done()
- c.Run(ctx)
- }()
-
- c.SetRunStatus(1)
-
- cancel()
- wg.Wait()
-}
-
-func TestCollectorCollect(t *testing.T) {
- c := &Collector{}
- c.Collect([]stats.SampleContainer{stats.Sample{}})
- assert.Len(t, c.Samples, 1)
-}
diff --git a/stats/influxdb/collector_test.go b/stats/influxdb/collector_test.go
index 8fc7231bd0b4..a9ce7f294f9a 100644
--- a/stats/influxdb/collector_test.go
+++ b/stats/influxdb/collector_test.go
@@ -42,21 +42,21 @@ func TestBadConcurrentWrites(t *testing.T) {
logger := testutils.NewLogger(t)
t.Run("0", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(0)
- _, err := New(logger, *c)
+ _, err := New(logger, c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number")
})
t.Run("-2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(-2)
- _, err := New(logger, *c)
+ _, err := New(logger, c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number")
})
t.Run("2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(2)
- _, err := New(logger, *c)
+ _, err := New(logger, c)
require.NoError(t, err)
})
}
@@ -83,7 +83,7 @@ func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testin
config := NewConfig()
config.Addr = null.StringFrom("http://" + l.Addr().String())
- c, err := New(testutils.NewLogger(t), *config)
+ c, err := New(testutils.NewLogger(t), config)
require.NoError(t, err)
require.NoError(t, c.Init())
@@ -149,7 +149,7 @@ func TestExtractTagsToValues(t *testing.T) {
"floatField:float",
"intField:int",
}
- collector, err := New(testutils.NewLogger(t), *c)
+ collector, err := New(testutils.NewLogger(t), c)
require.NoError(t, err)
tags := map[string]string{
"stringField": "string",
diff --git a/stats/influxdb/config.go b/stats/influxdb/config.go
index fe4416bb06bd..53dea280a532 100644
--- a/stats/influxdb/config.go
+++ b/stats/influxdb/config.go
@@ -21,11 +21,13 @@
package influxdb
import (
+ "encoding/json"
"net/url"
"strconv"
"strings"
"time"
+ "github.com/kelseyhightower/envconfig"
"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
@@ -52,8 +54,9 @@ type Config struct {
TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"K6_INFLUXDB_TAGS_AS_FIELDS"`
}
-func NewConfig() *Config {
- c := &Config{
+// NewConfig creates a new InfluxDB output config with some default values.
+func NewConfig() Config {
+ c := Config{
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
@@ -135,6 +138,14 @@ func ParseMap(m map[string]interface{}) (Config, error) {
return c, err
}
+// ParseJSON parses the supplied JSON into a Config.
+func ParseJSON(data json.RawMessage) (Config, error) {
+ conf := Config{}
+ err := json.Unmarshal(data, &conf)
+ return conf, err
+}
+
+// ParseURL parses the supplied URL into a Config.
func ParseURL(text string) (Config, error) {
c := Config{}
u, err := url.Parse(text)
@@ -198,3 +209,33 @@ func ParseURL(text string) (Config, error) {
}
return c, err
}
+
+// GetConsolidatedConfig combines {default config values + JSON config +
+// environment vars + URL config values}, and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, url string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf, err := ParseJSON(jsonRawConf)
+ if err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ if url != "" {
+ urlConf, err := ParseURL(url)
+ if err != nil {
+ return result, err
+ }
+ result = result.Apply(urlConf)
+ }
+
+ return result, nil
+}
diff --git a/stats/influxdb/util_test.go b/stats/influxdb/util_test.go
index 2a98dcb2ef03..6faf48b4a060 100644
--- a/stats/influxdb/util_test.go
+++ b/stats/influxdb/util_test.go
@@ -53,22 +53,22 @@ func TestFieldKinds(t *testing.T) {
// Error case 1 (duplicated bool fields)
conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:bool"}
- _, err = MakeFieldKinds(*conf)
+ _, err = MakeFieldKinds(conf)
require.Error(t, err)
// Error case 2 (duplicated fields in bool and float ields)
conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "boolField:float"}
- _, err = MakeFieldKinds(*conf)
+ _, err = MakeFieldKinds(conf)
require.Error(t, err)
// Error case 3 (duplicated fields in BoolFields and IntFields)
conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "boolField:int"}
- _, err = MakeFieldKinds(*conf)
+ _, err = MakeFieldKinds(conf)
require.Error(t, err)
// Normal case
conf.TagsAsFields = []string{"vu", "iter", "url", "boolField:bool", "floatField:float", "intField:int"}
- fieldKinds, err = MakeFieldKinds(*conf)
+ fieldKinds, err = MakeFieldKinds(conf)
require.NoError(t, err)
require.Equal(t, fieldKinds["boolField"], Bool)
diff --git a/stats/json/collector.go b/stats/json/collector.go
deleted file mode 100644
index f8e006ac5368..000000000000
--- a/stats/json/collector.go
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package json
-
-import (
- "compress/gzip"
- "context"
- "encoding/json"
- "os"
- "strings"
- "sync"
- "time"
-
- "github.com/sirupsen/logrus"
- "github.com/spf13/afero"
-
- "github.com/loadimpact/k6/lib"
- "github.com/loadimpact/k6/stats"
-)
-
-type Collector struct {
- closeFn func() error
- fname string
- seenMetrics []string
- logger logrus.FieldLogger
-
- encoder *json.Encoder
-
- buffer []stats.Sample
- bufferLock sync.Mutex
-}
-
-// Verify that Collector implements lib.Collector
-var _ lib.Collector = &Collector{}
-
-func (c *Collector) HasSeenMetric(str string) bool {
- for _, n := range c.seenMetrics {
- if n == str {
- return true
- }
- }
- return false
-}
-
-// New return new JSON collector
-func New(logger logrus.FieldLogger, fs afero.Fs, fname string) (*Collector, error) {
- c := &Collector{
- fname: fname,
- logger: logger,
- }
- if fname == "" || fname == "-" {
- c.encoder = json.NewEncoder(os.Stdout)
- c.closeFn = func() error {
- return nil
- }
- return c, nil
- }
- logfile, err := fs.Create(c.fname)
- if err != nil {
- return nil, err
- }
-
- if strings.HasSuffix(c.fname, ".gz") {
- outfile := gzip.NewWriter(logfile)
-
- c.closeFn = func() error {
- _ = outfile.Close()
- return logfile.Close()
- }
- c.encoder = json.NewEncoder(outfile)
- } else {
- c.closeFn = logfile.Close
- c.encoder = json.NewEncoder(logfile)
- }
-
- return c, nil
-}
-
-func (c *Collector) Init() error {
- return nil
-}
-
-func (c *Collector) SetRunStatus(status lib.RunStatus) {}
-
-func (c *Collector) Run(ctx context.Context) {
- const timeout = 200
- c.logger.Debug("JSON output: Running!")
- ticker := time.NewTicker(time.Millisecond * timeout)
- defer func() {
- _ = c.closeFn()
- }()
- for {
- select {
- case <-ticker.C:
- c.commit()
- case <-ctx.Done():
- c.commit()
- return
- }
- }
-}
-
-func (c *Collector) HandleMetric(m *stats.Metric) {
- if c.HasSeenMetric(m.Name) {
- return
- }
-
- c.seenMetrics = append(c.seenMetrics, m.Name)
- err := c.encoder.Encode(WrapMetric(m))
- if err != nil {
- c.logger.WithField("filename", c.fname).WithError(err).Warning(
- "JSON: Envelope is nil or Metric couldn't be marshalled to JSON")
- return
- }
-}
-
-func (c *Collector) Collect(scs []stats.SampleContainer) {
- c.bufferLock.Lock()
- defer c.bufferLock.Unlock()
- for _, sc := range scs {
- c.buffer = append(c.buffer, sc.GetSamples()...)
- }
-}
-
-func (c *Collector) commit() {
- c.bufferLock.Lock()
- samples := c.buffer
- c.buffer = nil
- c.bufferLock.Unlock()
- start := time.Now()
- var count int
- for _, sc := range samples {
- samples := sc.GetSamples()
- count += len(samples)
- for _, sample := range sc.GetSamples() {
- sample := sample
- c.HandleMetric(sample.Metric)
- err := c.encoder.Encode(WrapSample(&sample))
- if err != nil {
- // Skip metric if it can't be made into JSON or envelope is null.
- c.logger.WithField("filename", c.fname).WithError(err).Warning(
- "JSON: Sample couldn't be marshalled to JSON")
- continue
- }
- }
- }
- if count > 0 {
- c.logger.WithField("filename", c.fname).WithField("t", time.Since(start)).
- WithField("count", count).Debug("JSON: Wrote JSON metrics")
- }
-}
-
-func (c *Collector) Link() string {
- return ""
-}
-
-// GetRequiredSystemTags returns which sample tags are needed by this collector
-func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet {
- return stats.SystemTagSet(0) // There are no required tags for this collector
-}
diff --git a/stats/json/collector_test.go b/stats/json/collector_test.go
deleted file mode 100644
index 2a256fcef2ad..000000000000
--- a/stats/json/collector_test.go
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package json
-
-import (
- "os"
- "testing"
-
- "github.com/loadimpact/k6/lib/testutils"
- "github.com/spf13/afero"
- "github.com/stretchr/testify/assert"
-)
-
-func TestNew(t *testing.T) {
- testdata := map[string]bool{
- "/nonexistent/badplacetolog.log": false,
- "./okplacetolog.log": true,
- "okplacetolog.log": true,
- }
-
- for path, succ := range testdata {
- path, succ := path, succ
- t.Run("path="+path, func(t *testing.T) {
- defer func() { _ = os.Remove(path) }()
-
- collector, err := New(testutils.NewLogger(t), afero.NewOsFs(), path)
- if succ {
- assert.NoError(t, err)
- assert.NotNil(t, collector)
- } else {
- assert.Error(t, err)
- assert.Nil(t, collector)
- }
- })
- }
-}
diff --git a/stats/json/wrapper_test.go b/stats/json/wrapper_test.go
deleted file mode 100644
index c50e8dadc7aa..000000000000
--- a/stats/json/wrapper_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * k6 - a next-generation load testing tool
- * Copyright (C) 2016 Load Impact
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-package json
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/loadimpact/k6/stats"
-)
-
-func TestWrapersWithNilArg(t *testing.T) {
- out := WrapSample(nil)
- assert.Equal(t, out, (*Envelope)(nil))
- out = WrapMetric(nil)
- assert.Equal(t, out, (*Envelope)(nil))
-}
-
-func TestWrapSampleWithSamplePointer(t *testing.T) {
- out := WrapSample(&stats.Sample{
- Metric: &stats.Metric{},
- })
- assert.NotEqual(t, out, (*Envelope)(nil))
-}
-
-func TestWrapMetricWithMetricPointer(t *testing.T) {
- out := WrapMetric(&stats.Metric{})
- assert.NotEqual(t, out, (*Envelope)(nil))
-}
diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go
index c3da5baa5de8..db402cf553a6 100644
--- a/stats/kafka/collector.go
+++ b/stats/kafka/collector.go
@@ -30,9 +30,9 @@ import (
"github.com/sirupsen/logrus"
"github.com/loadimpact/k6/lib"
+ jsono "github.com/loadimpact/k6/output/json"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/influxdb"
- jsonc "github.com/loadimpact/k6/stats/json"
)
// Collector implements the lib.Collector interface and should be used only for testing
@@ -125,7 +125,7 @@ func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) {
}
default:
for _, sample := range samples {
- env := jsonc.WrapSample(&sample)
+ env := jsono.WrapSample(sample)
metric, err := json.Marshal(env)
if err != nil {
return nil, err
diff --git a/stats/kafka/config.go b/stats/kafka/config.go
index 9d391155e5ec..7b11f36a627d 100644
--- a/stats/kafka/config.go
+++ b/stats/kafka/config.go
@@ -21,8 +21,10 @@
package kafka
import (
+ "encoding/json"
"time"
+ "github.com/kelseyhightower/envconfig"
"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
"gopkg.in/guregu/null.v3"
@@ -120,3 +122,33 @@ func ParseArg(arg string) (Config, error) {
return c, nil
}
+
+// GetConsolidatedConfig combines {default config values + JSON config +
+// environment vars + arg config values}, and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf := Config{}
+ if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ if arg != "" {
+ urlConf, err := ParseArg(arg)
+ if err != nil {
+ return result, err
+ }
+ result = result.Apply(urlConf)
+ }
+
+ return result, nil
+}
diff --git a/stats/statsd/collector.go b/stats/statsd/collector.go
index b8d5f2a62b75..c49f8d2d9ea6 100644
--- a/stats/statsd/collector.go
+++ b/stats/statsd/collector.go
@@ -21,8 +21,10 @@
package statsd
import (
+ "encoding/json"
"time"
+ "github.com/kelseyhightower/envconfig"
"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"
@@ -96,3 +98,25 @@ func New(logger logrus.FieldLogger, conf common.Config) (*common.Collector, erro
Logger: logger,
}, nil
}
+
+// GetConsolidatedConfig combines {default config values + JSON config +
+// environment vars}, and returns the final result.
+func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string) (Config, error) {
+ result := NewConfig()
+ if jsonRawConf != nil {
+ jsonConf := Config{}
+ if err := json.Unmarshal(jsonRawConf, &jsonConf); err != nil {
+ return result, err
+ }
+ result = result.Apply(jsonConf)
+ }
+
+ envConfig := Config{}
+ if err := envconfig.Process("", &envConfig); err != nil {
+ // TODO: get rid of envconfig and actually use the env parameter...
+ return result, err
+ }
+ result = result.Apply(envConfig)
+
+ return result, nil
+}