diff --git a/endpoint.go b/endpoint.go deleted file mode 100644 index 7e3823c..0000000 --- a/endpoint.go +++ /dev/null @@ -1,73 +0,0 @@ -package goload - -import ( - "context" - "time" -) - -type EndpointOptions struct { - RequestsPerMinute *int32 - Timeout time.Duration -} - -func (options *EndpointOptions) Apply(opts ...EndpointOption) { - for _, opt := range opts { - opt(options) - } -} - -type EndpointOption func(options *EndpointOptions) - -func WithRequestsPerMinute(requestsPerMinute int32) EndpointOption { - return func(options *EndpointOptions) { - options.RequestsPerMinute = &requestsPerMinute - } -} - -func WithTimeout(timeout time.Duration) EndpointOption { - return func(options *EndpointOptions) { - options.Timeout = timeout - } -} - -type Endpoint interface { - // Do performs one request and is executed in a separate goroutine. - // The context is used to cancel the request on timeout. - Execute(ctx context.Context) error - - Name() string - - Options() *EndpointOptions -} - -type endpoint struct { - name string - handler func(ctx context.Context) error - options *EndpointOptions -} - -func (e *endpoint) Execute(ctx context.Context) error { - return e.handler(ctx) -} - -func (e *endpoint) Name() string { - return e.name -} - -func (e *endpoint) Options() *EndpointOptions { - return e.options -} - -func NewEndpoint(name string, handler func(ctx context.Context) error, opts ...EndpointOption) Endpoint { - options := &EndpointOptions{ - Timeout: 0, - RequestsPerMinute: nil, - } - options.Apply(opts...) - - return &endpoint{ - name: name, - handler: handler, - options: options, - } -} diff --git a/endpoint_randomizer.go b/endpoint_randomizer.go deleted file mode 100644 index af99716..0000000 --- a/endpoint_randomizer.go +++ /dev/null @@ -1,61 +0,0 @@ -package goload - -import ( - "fmt" - "math/rand" - "time" -) - -type randomizedEndpoint struct { - start, end int32 - endpoint Endpoint -} - -type EndpointRandomizer struct { - total int32 - endpoints []randomizedEndpoint - rand *rand.Rand -} - -func NewEndpointRandomizer(endpoints []Endpoint, overrides map[string]int32) (*EndpointRandomizer, error) { - randomizedEndpoints := make([]randomizedEndpoint, len(endpoints)) - var total int32 - - for i, endpoint := range endpoints { - var requestPerMinute int32 - - if rpmOverride, ok := overrides[endpoint.Name()]; ok { - requestPerMinute = rpmOverride - } else if endpoint.Options().RequestsPerMinute != nil { - requestPerMinute = *endpoint.Options().RequestsPerMinute - } else { - return nil, fmt.Errorf("Missing request per minute config for endpoint: %s", endpoint.Name()) - } - - randomizedEndpoints[i] = randomizedEndpoint{ - start: total + 1, - end: total + requestPerMinute, - endpoint: endpoint, - } - - total += requestPerMinute - } - - return &EndpointRandomizer{ - endpoints: randomizedEndpoints, - total: total, - rand: rand.New(rand.NewSource(time.Now().Unix())), - }, nil -} - -func (r *EndpointRandomizer) PickRandomEndpoint() Endpoint { - pickedRange := r.rand.Int31n(r.total) + 1 - - for _, endpoint := range r.endpoints { - if endpoint.start <= pickedRange && pickedRange <= endpoint.end { - return endpoint.endpoint - } - } - - return nil -} diff --git a/examples/dummy_executor.go b/examples/dummy_executor.go new file mode 100644 index 0000000..ff76bba --- /dev/null +++ b/examples/dummy_executor.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "fmt" + "github.com/HenriBeck/goload" +) + +type DummyExecutor struct { + text string + name string +} + +func (d DummyExecutor) Execute(ctx context.Context) goload.ExecutionResponse { + fmt.Println(d.text) + return goload.ExecutionResponse{ + Identifier: d.Name(), + Err: nil, + AdditionalData: nil, + } +} + +func (d DummyExecutor) Name() string { + return d.name +} + +func (d DummyExecutor) Options() *goload.ExecutorOptions { + return &goload.ExecutorOptions{ + Weight: 1, + Timeout: 0, + } +} diff --git a/examples/main.go b/examples/main.go index 886ec62..4442ed2 100644 --- a/examples/main.go +++ b/examples/main.go @@ -1,31 +1,46 @@ package main import ( - "time" - + "fmt" "github.com/HenriBeck/goload" goload_http "github.com/HenriBeck/goload/http" + "github.com/HenriBeck/goload/http/url_builder" + "github.com/HenriBeck/goload/pacer" + "time" ) func main() { - goload.RunLoadtest( + goload.RunLoadTest( goload.WithDuration(5*time.Minute), - goload.WithEndpoints( + //goload.WithLinearRampUpPacer(pacer.Rate{Freq: 30, Per: time.Minute}, pacer.Rate{Freq: 2, Per: time.Second}, 1*time.Minute), + goload.WithConstantPacer(pacer.Rate{Freq: 2, Per: time.Second}), + goload_http.WithBasePath("http://test.k6.io"), + goload.WithExecutors( goload_http.NewEndpoint( - goload_http.WithURLString("http://test.k6.io"), - goload_http.WithRequestsPerMinute(15), + goload_http.WithName("test"), + goload_http.WithURL("/"), + goload_http.WithValidateResponse(goload_http.Status2xxResponseValidation), ), goload_http.NewEndpoint( - goload_http.WithURLString("http://test.k6.io/news.php"), - goload_http.WithRequestsPerMinute(10), + goload_http.WithName("pi"), + goload_http.WithURLBuilder( + url_builder.WithRawURL("/pi.php"), + url_builder.WithQueryParams( + url_builder.NewQueryParameter( + url_builder.WithParamName("decimals"), + url_builder.WithRandomNumberParamValue(1, 20), + ), + ), + ), + goload_http.WithValidateResponse(goload_http.Status2xxResponseValidation), ), ), - goload.WithRampUpRPM( - []goload.Step{ - {Minute: 1, RPM: 10}, - {Minute: 2, RPM: 12}, - {Minute: 3, RPM: 6}, - }, - ), + goload.WithAdditionalResultHandler(func(_ *goload.LoadTest, result *goload.Result) { + fmt.Printf("result: %+v\n", result) + }), + goload.WithWeightOverrides(map[string]int{ + "test": 1, + "pi": 2, + }), ) } diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..2c6a152 --- /dev/null +++ b/executor.go @@ -0,0 +1,24 @@ +package goload + +import ( + "context" + "time" +) + +type ExecutionResponse struct { + Identifier string + Err error + AdditionalData any +} + +type ExecutorOptions struct { + Weight int + Timeout time.Duration +} + +type Executor interface { + Execute(ctx context.Context) ExecutionResponse + Name() string + // TODO: remove pointer and maybe split into two functions + Options() *ExecutorOptions +} diff --git a/go.mod b/go.mod index cf92da3..abb94c3 100644 --- a/go.mod +++ b/go.mod @@ -3,27 +3,21 @@ module github.com/HenriBeck/goload go 1.21 require ( - github.com/charmbracelet/lipgloss v0.8.0 - github.com/spenczar/tdigest v2.1.0+incompatible - github.com/stretchr/testify v1.8.4 + github.com/LENSHOOD/go-lock-free-ring-buffer v0.2.0 + github.com/mroth/weightedrand/v2 v2.1.0 + github.com/paulbellamy/ratecounter v0.2.0 + github.com/rs/zerolog v1.33.0 + golang.org/x/sync v0.3.0 google.golang.org/grpc v1.58.2 ) require ( - github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/lucasb-eyer/go-colorful v1.2.0 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect - github.com/mattn/go-runewidth v0.0.15 // indirect - github.com/muesli/reflow v0.3.0 // indirect - github.com/muesli/termenv v0.15.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rivo/uniseg v0.4.4 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect golang.org/x/net v0.15.0 // indirect - golang.org/x/sys v0.12.0 // indirect + golang.org/x/sys v0.24.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/protobuf v1.31.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f617969..f47a7a3 100644 --- a/go.sum +++ b/go.sum @@ -1,45 +1,40 @@ -github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= -github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= -github.com/charmbracelet/lipgloss v0.8.0 h1:IS00fk4XAHcf8uZKc3eHeMUTCxUH6NkaTrdyCQk84RU= -github.com/charmbracelet/lipgloss v0.8.0/go.mod h1:p4eYUZZJ/0oXTuCQKFF8mqyKCz0ja6y+7DniDDw5KKU= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/LENSHOOD/go-lock-free-ring-buffer v0.2.0 h1:oxVFVKYrxWno7RwwnTvVYqdHmCCeYhMBpqjHevAXasM= +github.com/LENSHOOD/go-lock-free-ring-buffer v0.2.0/go.mod h1:jNNtDmtE7fiSWNrNyKtCOTAZxbgUkisQRQ/mmHIljoI= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= -github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= -github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= -github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= -github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8= -github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= -github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= -github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/spenczar/tdigest v2.1.0+incompatible h1:fW2Amo+VWvKXzANU0TyeOGi3xS0jDcM8y5gC5CHhSHM= -github.com/spenczar/tdigest v2.1.0+incompatible/go.mod h1:taEJf1IAhnY3KPrPBSUP4dNFwy4XSWs83kbY/FzdtSU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU= +github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU= +github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -51,7 +46,5 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/group.go b/group.go new file mode 100644 index 0000000..385dd4b --- /dev/null +++ b/group.go @@ -0,0 +1,29 @@ +package goload + +import ( + "context" + "github.com/mroth/weightedrand/v2" + "time" +) + +type executorGroup struct { + name string + chooser *weightedrand.Chooser[Executor, int] + weight int + timeout time.Duration +} + +func (e *executorGroup) Execute(ctx context.Context) ExecutionResponse { + return e.chooser.Pick().Execute(ctx) +} + +func (e *executorGroup) Name() string { + return e.name +} + +func (e *executorGroup) Options() *ExecutorOptions { + return &ExecutorOptions{ + Weight: e.weight, + Timeout: e.timeout, + } +} diff --git a/group_options.go b/group_options.go new file mode 100644 index 0000000..95c8087 --- /dev/null +++ b/group_options.go @@ -0,0 +1,76 @@ +package goload + +import ( + "github.com/mroth/weightedrand/v2" + "github.com/rs/zerolog/log" + "time" +) + +type GroupOptions struct { + name string + weight int + timeout time.Duration + executors []Executor +} + +type GroupOption func(*GroupOptions) + +func WithGroup(opts ...GroupOption) Executor { + options := &GroupOptions{} + for _, opt := range opts { + opt(options) + } + + if len(options.executors) == 0 { + log.Fatal().Msg("group can't be empty") + } + choises := make([]weightedrand.Choice[Executor, int], 0, len(options.executors)) + for _, exec := range options.executors { + choises = append(choises, weightedrand.NewChoice(exec, exec.Options().Weight)) + } + chooser, err := weightedrand.NewChooser( + choises..., + ) + if err != nil { + log.Fatal().Err(err).Msg("can't create chooser") + } + + if options.weight == 0 { + weightSum := 0 + for _, exec := range options.executors { + weightSum += exec.Options().Weight + } + options.weight = weightSum + } + + return &executorGroup{ + name: options.name, + chooser: chooser, + weight: options.weight, + timeout: options.timeout, + } +} + +func WithGroupWeight(weight int) GroupOption { + return func(options *GroupOptions) { + options.weight = weight + } +} + +func WithGroupTimeout(timeout time.Duration) GroupOption { + return func(options *GroupOptions) { + options.timeout = timeout + } +} + +func WithGroupExecutors(executors ...Executor) GroupOption { + return func(options *GroupOptions) { + options.executors = append(options.executors, executors...) + } +} + +func WithGroupName(name string) GroupOption { + return func(options *GroupOptions) { + options.name = name + } +} diff --git a/http/http_endpoint.go b/http/http_endpoint.go index 0eb3b12..b6b8a50 100644 --- a/http/http_endpoint.go +++ b/http/http_endpoint.go @@ -3,169 +3,128 @@ package goload_http import ( "context" "fmt" + "github.com/HenriBeck/goload" + "github.com/rs/zerolog/log" "io" "net/http" "net/url" + "os" "time" - - "github.com/HenriBeck/goload" ) -type EndpointOptions struct { - name string - endpointOpts []goload.EndpointOption +var basePath *string - client *http.Client +type EndpointOption func(ep *endpoint) - getUrl func() url.URL - method string - body io.Reader - header http.Header +func NewEndpoint(opts ...EndpointOption) goload.Executor { + endpoint, err := renderAndValidateOptions(opts) + if err != nil { + fmt.Printf("Invalid Endpoint options: %v\n", err) + os.Exit(1) + } - validateResponse func(response *http.Response) error + return endpoint } -func (options *EndpointOptions) Name() string { - // If an explicit endpoint name is provided we have dynamic URLs per request - if options.name != "" { - return options.name - } +type endpoint struct { + name string + weight int + timeout time.Duration + + client *http.Client - // Otherwise this will resolve to a static URL which we will use as the endpoints name - uri := options.getUrl() + urlFunc func() (*url.URL, error) + methodFunc func() (string, error) + bodyFunc func() (io.Reader, error) + headerFunc func() (http.Header, error) - return fmt.Sprintf("%s %s", options.method, uri.String()) + validateResponse func(response *http.Response) error } -type HTTPEndpointOption func(options *EndpointOptions) - -// WithHTTPClient configures a static http defaultClient to be used for the loadtest endpoint. -func WithHTTPClient(client *http.Client) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.client = client +func (e *endpoint) Execute(ctx context.Context) goload.ExecutionResponse { + response := goload.ExecutionResponse{ + Identifier: e.name, } -} -// WithHTTPMethod sets the HTTP method used for the requests. -// -// By default, the `Endpoint` will use an `GET` request. -func WithHTTPMethod(method string) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.method = method + var body io.Reader + if e.bodyFunc != nil { + var err error + body, err = e.bodyFunc() + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to get body") + return response + } } -} -func WithHeader(key string, value string) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.header.Add(key, value) + targetURL, err := e.urlFunc() + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to get target URL") + return response } -} + targetURLStr := targetURL.String() -// WithRequestsPerMinute configures the targeted requests per minute compared to other endpoints. -func WithRequestsPerMinute(rpm int32) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.endpointOpts = append(options.endpointOpts, goload.WithRequestsPerMinute(rpm)) + method, err := e.methodFunc() + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to get method") + return response } -} -// WithTimeout configures a specific timeout duration for the endpoint overriding the global config. -func WithTimeout(timeout time.Duration) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.endpointOpts = append(options.endpointOpts, goload.WithTimeout(timeout)) + req, err := http.NewRequestWithContext(ctx, method, targetURLStr, body) + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to create request") + return response } -} -// WithURL allows setting a static URL for the loadtest endpoint. -func WithURL(uri url.URL) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.getUrl = func() url.URL { - return uri + if e.headerFunc != nil { + headers, err := e.headerFunc() + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to get headers") + return response } + req.Header = headers } -} -// WithURLFunc allows for a dynamic creation of the URL per request made in the loadtest. -// -// An explicit endpoint name needs to be provided here for reporting purposes as an identifier. -func WithURLFunc(endpointName string, getUrl func() url.URL) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.name = endpointName - options.getUrl = getUrl + res, err := e.client.Do(req) + if err != nil { + response.Err = err + log.Error().Err(err).Msg("failed to execute request") + return response } -} -// WithURLString allows setting a static string URL for the loadtest endpoint. -func WithURLString(uri string) HTTPEndpointOption { - parsedUri, err := url.Parse(uri) - if err != nil { - panic(err) + defer res.Body.Close() + + response.AdditionalData = map[string]string{ + "url": targetURLStr, } - return func(options *EndpointOptions) { - options.getUrl = func() url.URL { - return *parsedUri + if e.validateResponse != nil { + if err := e.validateResponse(res); err != nil { + response.Err = err } } + + return response } -// WithValidateResponse allows to configure a custom check if the request should be counted as successful or not. -// -// By default, the request is successful if it returns a 2xx status code. -func WithValidateResponse(validate func(res *http.Response) error) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.validateResponse = validate - } +func (e *endpoint) Name() string { + return e.name } -// WithName allows manually setting the endpoint name -func WithName(name string) HTTPEndpointOption { - return func(options *EndpointOptions) { - options.name = name +func (e *endpoint) Options() *goload.ExecutorOptions { + return &goload.ExecutorOptions{ + Weight: e.weight, + Timeout: e.timeout, } } -// NewEndpoint creates a new HTTP based loadtest endpoint. -// -// To configure it you can use the functional options. -func NewEndpoint(opts ...HTTPEndpointOption) goload.Endpoint { - options := &EndpointOptions{ - method: http.MethodGet, - body: http.NoBody, - client: defaultClient, - header: make(http.Header), - validateResponse: func(res *http.Response) error { - if res.StatusCode < 200 || res.StatusCode > 299 { - return fmt.Errorf("received non 200 status code from the server") - } - - return nil - }, - } - for _, config := range opts { - config(options) +func Status2xxResponseValidation(response *http.Response) error { + if response.StatusCode < 200 && response.StatusCode >= 300 { + return fmt.Errorf("non 2xx status code: %d", response.StatusCode) } - - return goload.NewEndpoint( - options.Name(), - func(ctx context.Context) error { - uri := options.getUrl() - - req, err := http.NewRequestWithContext(ctx, options.method, uri.String(), options.body) - if err != nil { - return err - } - - req.Header = options.header - - res, err := options.client.Do(req) - if err != nil { - return err - } - - defer res.Body.Close() - - return options.validateResponse(res) - }, - options.endpointOpts..., - ) + return nil } diff --git a/http/options.go b/http/options.go new file mode 100644 index 0000000..08186bc --- /dev/null +++ b/http/options.go @@ -0,0 +1,149 @@ +package goload_http + +import ( + "errors" + "github.com/HenriBeck/goload" + "github.com/HenriBeck/goload/http/url_builder" + "io" + "net/http" + "net/url" + "time" +) + +func renderAndValidateOptions(opts []EndpointOption) (*endpoint, error) { + endpoint := endpoint{ + name: "", + weight: 1, + timeout: 0, + client: defaultClient, + urlFunc: nil, + methodFunc: func() (string, error) { + return http.MethodGet, nil + }, + bodyFunc: nil, + headerFunc: nil, + validateResponse: nil, + } + + for _, opt := range opts { + opt(&endpoint) + } + + if endpoint.urlFunc == nil { + return nil, errors.New("urlFunc is required") + } + + if endpoint.name == "" { + targetURL, err := endpoint.urlFunc() + if err != nil { + return nil, err + } + endpoint.name = targetURL.Path + } + + return &endpoint, nil +} + +func WithName(name string) EndpointOption { + return func(ep *endpoint) { + ep.name = name + } +} + +func WithWeight(weight int) EndpointOption { + return func(ep *endpoint) { + ep.weight = weight + } +} + +func WithTimeout(timeout time.Duration) EndpointOption { + return func(ep *endpoint) { + ep.timeout = timeout + } +} + +func WithClient(client http.Client) EndpointOption { + return func(ep *endpoint) { + ep.client = &client + } +} + +func WithURL(rawURL string) EndpointOption { + return func(ep *endpoint) { + ep.urlFunc = func() (*url.URL, error) { + if basePath != nil { + var err error + rawURL, err = url.JoinPath(*basePath, rawURL) + if err != nil { + return nil, err + } + } + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + return u, nil + } + } +} + +func WithURLFunc(urlFunc func() (*url.URL, error)) EndpointOption { + return func(ep *endpoint) { + ep.urlFunc = urlFunc + } +} + +func WithMethod(method string) EndpointOption { + return func(ep *endpoint) { + ep.methodFunc = func() (string, error) { + return method, nil + } + } +} + +func WithMethodFunc(methodFunc func() (string, error)) EndpointOption { + return func(ep *endpoint) { + ep.methodFunc = methodFunc + } +} + +func WithBodyFunc(bodyFunc func() (io.Reader, error)) EndpointOption { + return func(ep *endpoint) { + ep.bodyFunc = bodyFunc + } +} + +func WithHeader(header http.Header) EndpointOption { + return func(ep *endpoint) { + ep.headerFunc = func() (http.Header, error) { + return header, nil + } + } +} + +func WithHeaderFunc(headerFunc func() (http.Header, error)) EndpointOption { + return func(ep *endpoint) { + ep.headerFunc = headerFunc + } +} + +func WithValidateResponse(validationFunc func(response *http.Response) error) EndpointOption { + return func(ep *endpoint) { + ep.validateResponse = validationFunc + } +} + +func WithURLBuilder(opts ...url_builder.URLBuilderOption) EndpointOption { + builder := url_builder.NewURLBuilder(opts) + return func(ep *endpoint) { + ep.urlFunc = func() (*url.URL, error) { + return builder.Build(basePath) + } + } +} + +func WithBasePath(path string) goload.LoadTestOption { + return func(_ *goload.LoadTestOptions) { + basePath = &path + } +} diff --git a/http/url_builder/options.go b/http/url_builder/options.go new file mode 100644 index 0000000..f80fae2 --- /dev/null +++ b/http/url_builder/options.go @@ -0,0 +1,19 @@ +package url_builder + +func WithRawURL(rawURL string) URLBuilderOption { + return func(builder *URLBuilder) { + builder.rawURL = rawURL + } +} + +func WithQueryParams(queryParams ...QueryParamBuilder) URLBuilderOption { + return func(builder *URLBuilder) { + builder.queryParams = append(builder.queryParams, queryParams...) + } +} + +func WithURLParam(key string, values []string) URLBuilderOption { + return func(builder *URLBuilder) { + builder.urlParameterRandomizers = append(builder.urlParameterRandomizers, URLParameterRandomizer{key, values}) + } +} diff --git a/http/url_builder/query_param.go b/http/url_builder/query_param.go new file mode 100644 index 0000000..bffeefb --- /dev/null +++ b/http/url_builder/query_param.go @@ -0,0 +1,99 @@ +package url_builder + +import ( + "github.com/HenriBeck/goload/utils/random" + "github.com/mroth/weightedrand/v2" + "github.com/rs/zerolog/log" + "net/url" +) + +type QueryParamBuilder interface { + Build() url.Values +} + +type NameFn func() string +type ShouldBeUsedFn func() bool +type ValuesFn func() []string + +type QueryParameter struct { + Name NameFn + ShouldBeUsed ShouldBeUsedFn + Value ValuesFn +} + +type QueryParameterOption func(param *QueryParameter) + +func NewQueryParameter(opts ...QueryParameterOption) *QueryParameter { + param := &QueryParameter{ + //set defaults + ShouldBeUsed: UseAlways(), + } + for _, opt := range opts { + opt(param) + } + if param.Name == nil || param.Value == nil { + log.Fatal().Msg("NewQueryParameter must contain opts for name and value") + } + return param +} + +func (s *QueryParameter) Build() url.Values { + if !s.ShouldBeUsed() { + return url.Values{} + } + values := s.Value() + + return url.Values{ + s.Name(): values, + } +} + +func UseAlways() ShouldBeUsedFn { + return func() bool { + return true + } +} + +type oneOfParam struct { + params []QueryParamBuilder +} + +func WithOneOfParam(params ...QueryParamBuilder) QueryParamBuilder { + if len(params) == 0 { + log.Fatal().Msg("WithOneOfParam must contain at least one parameter") + } + return &oneOfParam{params: params} +} + +func (p *oneOfParam) Build() url.Values { + index := random.Number(0, int64(len(p.params)-1)) + return p.params[index].Build() +} + +type chanceParam struct { + chance int + param QueryParamBuilder + r *weightedrand.Chooser[bool, int] +} + +func NewParamWithUsageChange(chance int, param QueryParamBuilder) QueryParamBuilder { + if chance > 100 || chance < 0 { + log.Fatal().Msg("NewParamWithUsageChange chance value must be between 0 and 100") + } + r, err := weightedrand.NewChooser( + weightedrand.NewChoice(true, chance), + weightedrand.NewChoice(true, 100-chance), + ) + if err != nil { + log.Fatal().Err(err).Msg("can't create chooser") + } + + return &chanceParam{chance: chance, param: param, r: r} +} + +func (p *chanceParam) Build() url.Values { + if p.r.Pick() { + return p.param.Build() + } + return url.Values{} +} diff --git a/http/url_builder/query_param_options.go b/http/url_builder/query_param_options.go new file mode 100644 index 0000000..f136463 --- /dev/null +++ b/http/url_builder/query_param_options.go @@ -0,0 +1,100 @@ +package url_builder + +import ( + "github.com/HenriBeck/goload/utils/random" + "github.com/mroth/weightedrand/v2" + "github.com/rs/zerolog/log" + "strconv" +) + +func WithParamName(name string) QueryParameterOption { + return func(param *QueryParameter) { + param.Name = func() string { + return name + } + } +} + +func WithParamUsagePercentage(pct int) QueryParameterOption { + if pct > 100 || pct < 0 { + log.Fatal().Msg("WithParamUsagePercentage pct must be between 0 and 100") + } + r, err := weightedrand.NewChooser( + weightedrand.NewChoice(true, pct), + weightedrand.NewChoice(true, 100-pct), + ) + if err != nil { + log.Fatal().Err(err).Msg("can't create chooser") + } + + return func(param *QueryParameter) { + param.ShouldBeUsed = func() bool { + return r.Pick() + } + } +} + +func WithParamValue(value string) QueryParameterOption { + return func(param *QueryParameter) { + param.Value = func() []string { + return []string{value} + } + } +} + +func WithSampledParamValues(min int64, max int64, opts []string) QueryParameterOption { + sampler := random.NewSampler(opts) + return func(param *QueryParameter) { + n := random.Number(min, max) + param.Value = func() []string { + return sampler.Get(int(n)) + } + } +} + +type WeightedValueOpt struct { + Value string + Weight int +} + +func WithWeightedParamValue(opts ...WeightedValueOpt) QueryParameterOption { + if len(opts) == 0 { + log.Fatal().Msg("WithWeightedParamValue must have at least one option") + } + values := make([]weightedrand.Choice[string, int], 0, len(opts)) + for _, opt := range opts { + values = append(values, weightedrand.NewChoice(opt.Value, opt.Weight)) + } + + r, err := weightedrand.NewChooser(values...) + if err != nil { + log.Fatal().Err(err).Msg("can't create chooser") + } + + return func(param *QueryParameter) { + param.Value = func() []string { + return []string{r.Pick()} + } + } +} + +func WithOneOfParamValue(values []string) QueryParameterOption { + if len(values) == 0 { + log.Fatal().Msg("WithOneOfParamValue must have at least one value") + } + return func(param *QueryParameter) { + param.Value = func() []string { + index := random.Number(0, int64(len(values)-1)) + return []string{values[index]} + } + } +} + +func WithRandomNumberParamValue(min int64, max int64) QueryParameterOption { + return func(param *QueryParameter) { + param.Value = func() []string { + number := random.Number(min, max) + return []string{strconv.FormatInt(number, 10)} + } + } +} diff --git a/http/url_builder/url_builder.go b/http/url_builder/url_builder.go new file mode 100644 index 0000000..927ef8f --- /dev/null +++ b/http/url_builder/url_builder.go @@ -0,0 +1,84 @@ +package url_builder + +import ( + "fmt" + "github.com/HenriBeck/goload/utils/random" + "github.com/rs/zerolog/log" + "net/url" + "strings" +) + +type URLBuilder struct { + rawURL string + urlParameterRandomizers []URLParameterRandomizer + queryParams []QueryParamBuilder +} + +type URLBuilderOption func(*URLBuilder) + +func NewURLBuilder(opts []URLBuilderOption) *URLBuilder { + urlBuilder := URLBuilder{} + + for _, opt := range opts { + opt(&urlBuilder) + } + + if urlBuilder.rawURL == "" { + log.Fatal().Msg("NewURLBuilder must include WithRawURL option") + } + + return &urlBuilder +} + +func (builder *URLBuilder) Build(basePath *string) (*url.URL, error) { + q := url.Values{} + + for _, param := range builder.queryParams { + for key, values := range param.Build() { + for _, value := range values { + q.Add(key, value) + } + } + } + + query := q.Encode() + + rawURL := builder.rawURL + if basePath != nil { + var err error + rawURL, err = url.JoinPath(*basePath, builder.rawURL) + if err != nil { + return nil, err + } + } + + for _, u := range builder.urlParameterRandomizers { + v, err := u.GetValue() + if err != nil { + return nil, err + } + rawURL = strings.Replace(rawURL, u.key, v, 1) + } + + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + u.RawQuery = query + + return u, nil +} + +type URLParameterRandomizer struct { + key string + values []string +} + +func (u *URLParameterRandomizer) GetValue() (string, error) { + if len(u.values) == 0 { + return "", fmt.Errorf("empty values for URLParameterRandomizer") + } + index := random.Number(0, int64(len(u.values)-1)) + return u.values[index], nil +} diff --git a/loadtest.go b/loadtest.go index 182fa97..7b90248 100644 --- a/loadtest.go +++ b/loadtest.go @@ -3,174 +3,130 @@ package goload import ( "context" "fmt" + "github.com/HenriBeck/goload/pacer" + ctx_utils "github.com/HenriBeck/goload/utils/ctx" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "math" "os" - "os/signal" - "sync" - "syscall" "time" ) +type resultHandler func(lt *LoadTest, result *Result) + type LoadTest struct { - UI *UI - Options *LoadTestOptions - Results *LoadTestResults + Pacer pacer.Pacer + Runner *Runner + Executors []Executor - done chan bool - ticker *time.Ticker -} + duration time.Duration -func RunLoadtest(configs ...LoadTestConfig) { - options := &LoadTestOptions{} - for _, config := range configs { - config(options) - } + resultHandlers []resultHandler + resultAggregator *resultAggregator + reportInterval time.Duration - ui := NewUI(os.Stdout) + done chan struct{} +} - ui.PrintStartMessage() +type LoadTestOptions struct { + pacer pacer.Pacer + executors []Executor + duration time.Duration + initialWorkers int + maxWorkers int + resultHandlers []resultHandler + weightOverrides map[string]int + reportInterval time.Duration + ctxModifier func(ctx context.Context) context.Context + defaultTimeout time.Duration +} +type LoadTestOption func(*LoadTestOptions) - loadtest := &LoadTest{ - Options: options, - Results: NewResults(options.Endpoints), - UI: ui, +func RunLoadTest(opts ...LoadTestOption) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}) - done: make(chan bool), - ticker: initializeTicker(options, ui), + // set default options + options, err := renderAndValidateOptions(opts) + if err != nil { + fmt.Printf("Invalid options: %v\n", err) + os.Exit(1) } - loadtest.WaitForLoadTestEnd() - loadtest.ListenForAbort() - - loadtest.Run() - - ui.ReportResults(loadtest.Results) -} - -func (loadtest *LoadTest) WaitForLoadTestEnd() { - duration := loadtest.Options.LoadTestDuration - if duration.Nanoseconds() == 0 { - return + resultAggregator := newResultAggregator() + options.resultHandlers = append(options.resultHandlers, resultAggregator.resultAggregationHandler) + + loadTest := &LoadTest{ + Pacer: options.pacer, + Runner: NewRunner(options), + Executors: options.executors, + duration: options.duration, + resultHandlers: options.resultHandlers, + resultAggregator: resultAggregator, + reportInterval: options.reportInterval, + done: make(chan struct{}), } - // Cancel the timer after the duration of the loadtest has elapsed - go func() { - time.Sleep(duration) - loadtest.done <- true - loadtest.ticker.Stop() - }() -} - -func (loadtest *LoadTest) ListenForAbort() { - // Cancel the loadtest if the program is stopped using OS signals - go func() { - ch := make(chan os.Signal, 1) - signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM) - - <-ch - - fmt.Println() - loadtest.UI.PrintAbortMessage() + ctx := ctx_utils.ContextWithInterrupt(context.Background()) - loadtest.done <- true - loadtest.ticker.Stop() - }() + loadTest.Run(ctx) } -func initializeTicker(options *LoadTestOptions, ui *UI) *time.Ticker { - initialRPM := options.RPMStrategy.GetRPMForMinute(0) - ticker := time.NewTicker( - time.Minute / time.Duration(initialRPM), - ) +func (lt *LoadTest) Run(ctx context.Context) { + resultChan := lt.Runner.Run(ctx, lt.Executors, lt.Pacer, lt.duration) + lt.runReporter(ctx) - ui.ReportInitialRPM(initialRPM) + for result := range resultChan { + for _, handler := range lt.resultHandlers { + handler(lt, result) + } + } + close(lt.done) +} +func (lt *LoadTest) runReporter(ctx context.Context) { go func() { - minute := int32(0) - previousRPM := initialRPM - t := time.NewTicker(time.Minute) - - for range t.C { - minute++ - rpm := options.RPMStrategy.GetRPMForMinute(minute) - - if previousRPM == rpm { - continue - } - - if previousRPM < rpm { - ui.ReportIncreaseInRPM(rpm) - } else { - ui.ReportDecreaseInRPM(rpm) + ticker := time.NewTicker(lt.reportInterval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + fmt.Printf("expected pace: %.2f/s\n", lt.Pacer.Rate(time.Now().Sub(*lt.Runner.startedAt))) + fmt.Printf("actual pace: %.2f/s\n", float64(lt.resultAggregator.rateCounter.Rate())/10) + fmt.Printf("total hits: %d\n", lt.resultAggregator.total.Load()) + fmt.Printf("total failures: %d\n", lt.resultAggregator.failures.Load()) } - - ticker.Reset( - time.Minute / time.Duration(rpm), - ) - previousRPM = rpm } }() - - return ticker } -func (loadtest *LoadTest) Run() { - endpointRandomizer, err := NewEndpointRandomizer( - loadtest.Options.Endpoints, - loadtest.Options.RequestPerMinutePerEndpoint, - ) - if err != nil { - panic(err) +var defaultResultHandlers []resultHandler + +func renderAndValidateOptions(opts []LoadTestOption) (LoadTestOptions, error) { + options := LoadTestOptions{ + pacer: nil, + executors: nil, + duration: 0, + initialWorkers: 10, + maxWorkers: math.MaxInt, + resultHandlers: defaultResultHandlers, + weightOverrides: nil, + reportInterval: 10 * time.Second, } - g := new(sync.WaitGroup) - -loop: - for { - select { - case <-loadtest.done: - break loop - - case <-loadtest.ticker.C: - go func() { - g.Add(1) - defer g.Done() - - endpoint := endpointRandomizer.PickRandomEndpoint() - - ctx := context.Background() - for _, fn := range loadtest.Options.ContextModifiers { - ctx = fn(ctx) - } - - var timeout time.Duration - switch { - case endpoint.Options().Timeout.Nanoseconds() > 0: - timeout = endpoint.Options().Timeout - case loadtest.Options.DefaultEndpointTimeout.Nanoseconds() > 0: - timeout = loadtest.Options.DefaultEndpointTimeout - } - - if timeout.Nanoseconds() > 0 { - _ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - ctx = _ctx - } - - startTime := time.Now() - err := endpoint.Execute(ctx) - duration := time.Since(startTime) - - loadtest.Results.SaveEndpointResult( - endpoint, - EndpointResult{ - Failed: err != nil, - Duration: duration, - }, - ) - }() - } + + for _, opt := range opts { + opt(&options) + } + + if options.pacer == nil { + return LoadTestOptions{}, fmt.Errorf("pacer is required") + } + if len(options.executors) == 0 { + return LoadTestOptions{}, fmt.Errorf("should define at least one executor") + } + if options.initialWorkers == 0 || options.maxWorkers == 0 { + return LoadTestOptions{}, fmt.Errorf("inital and max workers must be > 0") } - // Wait until all requests have finished - g.Wait() + return options, nil } diff --git a/options.go b/options.go index 7f32a0b..2946477 100644 --- a/options.go +++ b/options.go @@ -2,67 +2,72 @@ package goload import ( "context" + "github.com/HenriBeck/goload/pacer" "time" ) -type LoadTestOptions struct { - Endpoints []Endpoint - // A config which allows to set the requests per minute for each registered endpoint at a central place - // - // This overrides any local endpoint setting where a default RPM value was set - RequestPerMinutePerEndpoint map[string]int32 - - DefaultEndpointTimeout time.Duration - - LoadTestDuration time.Duration - - RPMStrategy RPMStrategy +func WithDuration(loadTestDuration time.Duration) LoadTestOption { + return func(options *LoadTestOptions) { + options.duration = loadTestDuration + } +} - ContextModifiers []func(ctx context.Context) context.Context +func WithExecutors(executors ...Executor) LoadTestOption { + return func(options *LoadTestOptions) { + options.executors = append(options.executors, executors...) + } } -type LoadTestConfig func(options *LoadTestOptions) +func WithLinearRampUpPacer(startRate, endRate pacer.Rate, rampUpDuration time.Duration) LoadTestOption { + return func(options *LoadTestOptions) { + options.pacer = pacer.NewLinearRampUpPacer(startRate, endRate, rampUpDuration) + } +} -func WithEndpoints(endpoints ...Endpoint) LoadTestConfig { +func WithConstantPacer(rate pacer.Rate) LoadTestOption { return func(options *LoadTestOptions) { - options.Endpoints = append(options.Endpoints, endpoints...) + options.pacer = pacer.NewConstantPacer(rate) } } -func WithRequestsPerMinuteForEndpoints(data map[string]int32) LoadTestConfig { +func WithReportInterval(reportInterval time.Duration) LoadTestOption { return func(options *LoadTestOptions) { - options.RequestPerMinutePerEndpoint = data + options.reportInterval = reportInterval } } -func WithContextModifier(fn func(ctx context.Context) context.Context) LoadTestConfig { +func WithInitialWorkerCount(count int) LoadTestOption { return func(options *LoadTestOptions) { - options.ContextModifiers = append(options.ContextModifiers, fn) + options.initialWorkers = count } } -func WithDefaultEndpointTimeout(timeout time.Duration) LoadTestConfig { +func WithMaxWorkerCount(count int) LoadTestOption { return func(options *LoadTestOptions) { - options.DefaultEndpointTimeout = timeout + options.maxWorkers = count } } -func WithDuration(loadTestDuration time.Duration) LoadTestConfig { +func WithAdditionalResultHandler(handler resultHandler) LoadTestOption { return func(options *LoadTestOptions) { - options.LoadTestDuration = loadTestDuration + options.resultHandlers = append(options.resultHandlers, handler) } } -func WithRPMStrategy(strategy RPMStrategy) LoadTestConfig { +func WithContextModifier(fn func(ctx context.Context) context.Context) LoadTestOption { return func(options *LoadTestOptions) { - options.RPMStrategy = strategy + options.ctxModifier = fn } } -func WithStaticRPM(rpm int32) LoadTestConfig { - return WithRPMStrategy(NewStaticRPMStrategy(rpm)) +func WithDefaultTimeout(timeout time.Duration) LoadTestOption { + return func(options *LoadTestOptions) { + options.defaultTimeout = timeout + } } -func WithRampUpRPM(steps []Step) LoadTestConfig { - return WithRPMStrategy(NewRampUpRPMStrategy(steps)) +func WithWeightOverrides(overrides map[string]int) LoadTestOption { + return func(options *LoadTestOptions) { + options.weightOverrides = overrides + } } diff --git a/pacer/constant.go b/pacer/constant.go new file mode 100644 index 0000000..23ba143 --- /dev/null +++ b/pacer/constant.go @@ -0,0 +1,49 @@ +package pacer + +import ( + "math" + "time" +) + +func NewConstantPacer(rate Rate) Pacer { + return constantPacer{ + rate: rate, + } +} + +type constantPacer struct { + rate Rate +} + +func (p constantPacer) Pace(elapsed time.Duration, hits uint64) time.Duration { + expectedHits := p.expectedHits(elapsed) + if hits == 0 || hits < uint64(expectedHits) { + // Running behind, send next hit immediately. + return 0 + } + + rate := p.Rate(elapsed) + interval := math.Round(1e9 / rate) + + if n := uint64(interval); n != 0 && math.MaxInt64/n < hits { + // We would overflow wait if we continued, so stop the execution. + return 0 + } + + delta := float64(hits+1) - expectedHits + wait := time.Duration(interval * delta) + + return wait +} + +func (p constantPacer) Rate(elapsed time.Duration) float64 { + return p.rate.hitsPerSec() +} + +func (p constantPacer) expectedHits(t time.Duration) float64 { + if t < 0 { + return 0 + } + + return p.rate.hitsPerSec() * t.Seconds() +} diff --git a/pacer/linear_ramp_up.go b/pacer/linear_ramp_up.go new file mode 100644 index 0000000..533e9e7 --- /dev/null +++ b/pacer/linear_ramp_up.go @@ -0,0 +1,87 @@ +package pacer + +import ( + "github.com/rs/zerolog/log" + "math" + "time" +) + +// NewLinearRampUpPacer creates a linear increase from the start to Target rate within the given RampUpDuration +func NewLinearRampUpPacer(startRate Rate, targetRate Rate, rampUpDuration time.Duration) Pacer { + diff := targetRate.hitsPerSec() - startRate.hitsPerSec() + + return linearRampUpPacer{ + StartRate: startRate, + TargetRate: targetRate, + RampUpDuration: rampUpDuration, + a: diff / rampUpDuration.Seconds(), + b: startRate.hitsPerSec(), + } +} + +type linearRampUpPacer struct { + StartRate Rate + TargetRate Rate + RampUpDuration time.Duration + + a float64 + b float64 +} + +// Pace determines the length of time to sleep until the next hit is sent. +func (p linearRampUpPacer) Pace(elapsed time.Duration, hits uint64) time.Duration { + switch { + case p.StartRate.Per == 0 || p.StartRate.Freq == 0: + return 0 // Zero value = infinite rate + case p.StartRate.Per < 0 || p.StartRate.Freq < 0: + log.Fatal().Msg("rate can't be negative") + } + + expectedHits := p.expectedHits(elapsed) + if hits == 0 || hits < uint64(expectedHits) { + // Running behind, send next hit immediately. + return 0 + } + + rate := p.Rate(elapsed) + interval := math.Round(1e9 / rate) + + if n := uint64(interval); n != 0 && math.MaxInt64/n < hits { + // We would overflow wait if we continued, so stop the execution. + return 0 + } + + delta := float64(hits+1) - expectedHits + wait := time.Duration(interval * delta) + + return wait +} + +func (p linearRampUpPacer) Rate(elapsed time.Duration) float64 { + if elapsed > p.RampUpDuration { + return p.TargetRate.hitsPerSec() + } + + x := elapsed.Seconds() + + return p.a*x + p.b +} + +func (p linearRampUpPacer) expectedHits(t time.Duration) float64 { + if t < 0 { + return 0 + } + + x := min(t.Seconds(), p.RampUpDuration.Seconds()) + + hitsInSlope := (p.a*math.Pow(x, 2))/2 + p.b*x + + if t < p.RampUpDuration { + return hitsInSlope + } + + linearDuration := t.Seconds() - p.RampUpDuration.Seconds() + targetRate := p.TargetRate.hitsPerSec() + + return hitsInSlope + linearDuration*targetRate +} diff --git a/pacer/pacer.go b/pacer/pacer.go new file mode 100644 index 0000000..6e1f9f7 --- /dev/null +++ b/pacer/pacer.go @@ -0,0 +1,28 @@ +package pacer + +import ( + "time" +) + +// Pacer idea copied from https://github.com/tsenart/vegeta + +// A Pacer defines the rate of hits during an Attack. +type Pacer interface { + // Pace returns the duration an Runner should wait until + // hitting the next Target, given an already elapsed duration and + // completed hits. + Pace(elapsed time.Duration, hits uint64) (wait time.Duration) + + // Rate returns a Pacer's instantaneous hit rate (per seconds) + // at the given elapsed duration of an execution. + Rate(elapsed time.Duration) float64 +} + +type Rate struct { + Freq int // Frequency (number of occurrences) per ... + Per time.Duration // Time unit, usually 1s +} + +func (cp Rate) hitsPerSec() float64 { + return (float64(cp.Freq) / float64(cp.Per)) * 1e9 +} diff --git a/result.go b/result.go new file mode 100644 index 0000000..757c810 --- /dev/null +++ b/result.go @@ -0,0 +1,35 @@ +package goload + +import ( + "github.com/paulbellamy/ratecounter" + "sync/atomic" + "time" +) + +type Result struct { + Identifier string + Timestamp time.Time + Latency time.Duration + Err error + AdditionalData any +} + +type resultAggregator struct { + rateCounter *ratecounter.RateCounter + total atomic.Int64 + failures atomic.Int64 +} + +func newResultAggregator() *resultAggregator { + return &resultAggregator{ + rateCounter: ratecounter.NewRateCounter(10 * time.Second), + } +} + +func (ra *resultAggregator) resultAggregationHandler(_ *LoadTest, result *Result) { + ra.rateCounter.Incr(1) + ra.total.Add(1) + if result.Err != nil { + ra.failures.Add(1) + } +} diff --git a/results.go b/results.go deleted file mode 100644 index 8d886cb..0000000 --- a/results.go +++ /dev/null @@ -1,74 +0,0 @@ -package goload - -import ( - "sort" - "sync/atomic" - "time" -) - -type EndpointResult struct { - Failed bool - Duration time.Duration -} - -type EndpointResults struct { - Name string - total uint64 - failed uint64 - totalDuration uint64 -} - -func (e *EndpointResults) GetTotalRequests() uint64 { - return atomic.LoadUint64(&e.total) -} - -func (e *EndpointResults) GetTotalFailedRequests() uint64 { - return atomic.LoadUint64(&e.failed) -} - -func (e *EndpointResults) GetAverageDuration() float64 { - return float64(atomic.LoadUint64(&e.totalDuration)) / float64(atomic.LoadUint64(&e.total)) -} - -type LoadTestResults struct { - endpoints map[string]*EndpointResults -} - -func NewResults(endpoints []Endpoint) *LoadTestResults { - results := &LoadTestResults{ - endpoints: map[string]*EndpointResults{}, - } - - for _, endpoint := range endpoints { - results.endpoints[endpoint.Name()] = &EndpointResults{ - Name: endpoint.Name(), - } - } - - return results -} - -func (results *LoadTestResults) Iter() []*EndpointResults { - endpoints := []*EndpointResults{} - for _, endpoint := range results.endpoints { - endpoints = append(endpoints, endpoint) - } - - sort.SliceStable(endpoints, func(i, j int) bool { - return endpoints[i].Name < endpoints[j].Name - }) - - return endpoints -} - -func (results *LoadTestResults) SaveEndpointResult(Endpoint Endpoint, result EndpointResult) { - atomic.AddUint64(&results.endpoints[Endpoint.Name()].total, 1) - if result.Failed { - atomic.AddUint64(&results.endpoints[Endpoint.Name()].failed, 1) - } - - atomic.AddUint64( - &results.endpoints[Endpoint.Name()].totalDuration, - uint64(result.Duration.Milliseconds()), - ) -} diff --git a/rpm_strategies.go b/rpm_strategies.go deleted file mode 100644 index 1423c0e..0000000 --- a/rpm_strategies.go +++ /dev/null @@ -1,104 +0,0 @@ -package goload - -import ( - "math" - "sort" -) - -// An RPM strategy defines how many requests per minute will be sent to the target. -// -// The GetRPMForMinute function will be called every minute and the loadtest will be adjusted as such. -type RPMStrategy interface { - GetRPMForMinute(minute int32) int32 -} - -// A static rpm strategy which outputs a constant request number to the target. -type StaticRPMStrategy struct { - rpm int32 -} - -func NewStaticRPMStrategy(rpm int32) RPMStrategy { - return &StaticRPMStrategy{ - rpm: rpm, - } -} - -func (strategy *StaticRPMStrategy) GetRPMForMinute(minute int32) int32 { - return strategy.rpm -} - -type RampUpRPMStrategy struct { - steps []Step -} - -func NewRampUpRPMStrategy(steps []Step) RPMStrategy { - isSorted := sort.SliceIsSorted(steps, func(i, j int) bool { - return steps[i].Minute < steps[j].Minute - }) - - if !isSorted { - panic("config isn't sorted") - } - - return &RampUpRPMStrategy{ - steps: steps, - } -} - -type Step struct { - Minute int32 - RPM int32 -} - -func (strategy *RampUpRPMStrategy) GetRPMForMinute(minute int32) int32 { - // In case there is a direct match we just return the RPM count for it - for _, step := range strategy.steps { - if step.Minute == minute { - return step.RPM - } - } - - // If the `minute` is before our first step we just return the starting RPM - firstStep := strategy.steps[0] - if firstStep.Minute > minute { - return firstStep.RPM - } - - // Otherwise we look for the steps which encloses the current `minute` - for index := 0; index < len(strategy.steps)-1; index++ { - nextIndex := index + 1 - step := strategy.steps[index] - nextStep := strategy.steps[nextIndex] - - // Check if the `step` is before the `minute` and if the `nextStep` is after the `minute` - if step.Minute < minute && minute < nextStep.Minute { - // We need to calculate the correct interpolation between the two steps and our wanted minute. - // - // As an example: - // Step Minute 1 has 2 RPM - // We are currently at Minute 2 - // Step for Minute 3 has 8 RPM - // - // In this case we need to take the difference between our two steps, - // and then interpolate it in relation to the `minute`. - // We then calculate how much RPM we need to add on top of our previous `step`. - // - // The example above would result in 5 RPM. (2 + 6 * 1 / 2) - // - // This also works for the reverse in case this is used with a ramp down steps. - rpmDiffBetweenPoints := float64(nextStep.RPM) - float64(step.RPM) - timeDiffBetweenPoints := float64(nextStep.Minute) - float64(step.Minute) - timeDiffToPreviousPoint := float64(minute) - float64(step.Minute) - - return int32( - float64(step.RPM) + math.Floor(rpmDiffBetweenPoints*timeDiffToPreviousPoint/timeDiffBetweenPoints), - ) - } - } - - // If we don't find steps that encloses the current `minute`, - // it means that the `minute` is larger than our last step - // so we just return the RPM for it. - lastPoint := strategy.steps[len(strategy.steps)-1] - return lastPoint.RPM -} diff --git a/rpm_strategies_test.go b/rpm_strategies_test.go deleted file mode 100644 index 8166274..0000000 --- a/rpm_strategies_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package goload - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestStaticRPMStrategy(t *testing.T) { - t.Run("GetRPMForMinute should return the static RPM for any minute", func(t *testing.T) { - strategy := NewStaticRPMStrategy(50) - - assert.Equal( - t, - int32(50), - strategy.GetRPMForMinute(0), - ) - assert.Equal( - t, - int32(50), - strategy.GetRPMForMinute(1), - ) - assert.Equal( - t, - int32(50), - strategy.GetRPMForMinute(5), - ) - }) -} - -func TestRampUpRPMStrategy(t *testing.T) { - t.Run("should panic if the points are not sorted properly", func(t *testing.T) { - assert.PanicsWithValue( - t, - "config isn't sorted", - func() { - NewRampUpRPMStrategy( - []Step{ - {Minute: 12}, - {Minute: 5}, - }, - ) - }, - ) - }) -} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..ca5bc6e --- /dev/null +++ b/runner.go @@ -0,0 +1,180 @@ +package goload + +import ( + "context" + "github.com/HenriBeck/goload/pacer" + "github.com/mroth/weightedrand/v2" + "github.com/rs/zerolog/log" + "sync" + "time" +) + +type Runner struct { + stopch chan struct{} + stopOnce sync.Once + workers int + maxWorkers int + weightOverrides map[string]int + + ctxModifier func(ctx context.Context) context.Context + defaultTimeout time.Duration + + startedAt *time.Time +} + +func NewRunner(loadTestOptions LoadTestOptions) *Runner { + a := &Runner{ + stopch: make(chan struct{}), + stopOnce: sync.Once{}, + workers: loadTestOptions.initialWorkers, + maxWorkers: loadTestOptions.maxWorkers, + weightOverrides: loadTestOptions.weightOverrides, + ctxModifier: loadTestOptions.ctxModifier, + defaultTimeout: loadTestOptions.defaultTimeout, + startedAt: nil, + } + + return a +} + +func (r *Runner) Run(ctx context.Context, exs []Executor, p pacer.Pacer, du time.Duration) <-chan *Result { + var wg sync.WaitGroup + + workers := r.workers + if workers > r.maxWorkers { + workers = r.maxWorkers + } + + chooser := r.getExecutorChooser(exs) + + now := time.Now() + r.startedAt = &now + + results := make(chan *Result) + ticks := make(chan struct{}) + for i := 0; i < workers; i++ { + wg.Add(1) + go r.run(chooser, now, &wg, ticks, results) + } + + go func() { + <-ctx.Done() + r.Stop() + }() + + go func() { + defer func() { + close(ticks) + wg.Wait() + close(results) + r.Stop() + }() + + count := uint64(0) + for { + elapsed := time.Since(now) + if du > 0 && elapsed > du { + return + } + + wait := p.Pace(elapsed, count) + + time.Sleep(wait) + + if workers < r.maxWorkers { + select { + case ticks <- struct{}{}: + count++ + continue + case <-r.stopch: + return + default: + // all workers are blocked. start one more and try again + workers++ + wg.Add(1) + go r.run(chooser, now, &wg, ticks, results) + } + } + + select { + case ticks <- struct{}{}: + count++ + case <-r.stopch: + return + } + } + }() + + return results +} + +func (r *Runner) getExecutorChooser(exs []Executor) *weightedrand.Chooser[Executor, int] { + choices := make([]weightedrand.Choice[Executor, int], 0, len(exs)) + for _, ex := range exs { + weight := ex.Options().Weight + if override, ok := r.weightOverrides[ex.Name()]; ok { + weight = override + } + choices = append(choices, weightedrand.NewChoice(ex, weight)) + } + chooser, err := weightedrand.NewChooser(choices...) + if err != nil { + log.Fatal().Err(err).Msg("can't create chooser") + } + return chooser +} + +// Stop stops the current execution. The return value indicates whether this call +// has signalled the execution to stop (`true` for the first call) or whether it +// was a noop because it has been previously signalled to stop (`false` for any +// subsequent calls). +func (r *Runner) Stop() bool { + select { + case <-r.stopch: + return false + default: + r.stopOnce.Do(func() { close(r.stopch) }) + return true + } +} + +func (r *Runner) run(chooser *weightedrand.Chooser[Executor, int], began time.Time, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) { + defer workers.Done() + + for range ticks { + results <- r.hit(chooser.Pick(), began) + } +} + +func (r *Runner) hit(ex Executor, began time.Time) *Result { + res := Result{ + Timestamp: began.Add(time.Since(began)), + } + + defer func() { + res.Latency = time.Since(res.Timestamp) + }() + + ctx := context.Background() + if ex.Options().Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), ex.Options().Timeout) + defer cancel() + } else if r.defaultTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), ex.Options().Timeout) + defer cancel() + } + + if r.ctxModifier != nil { + ctx = r.ctxModifier(ctx) + } + + resp := ex.Execute(ctx) + + res.Identifier = resp.Identifier + res.AdditionalData = resp.AdditionalData + res.Err = resp.Err + + return &res +} diff --git a/ui.go b/ui.go deleted file mode 100644 index 0220d5f..0000000 --- a/ui.go +++ /dev/null @@ -1,182 +0,0 @@ -package goload - -import ( - "fmt" - "io" - "math" - "strings" - - "github.com/charmbracelet/lipgloss" -) - -type UI struct { - output io.Writer -} - -func NewUI(output io.Writer) *UI { - return &UI{ - output: output, - } -} - -func (ui *UI) PrintStartMessage() { - style := lipgloss.NewStyle().Foreground(lipgloss.Color("#48d597")) - - fmt.Fprintf( - ui.output, - "Starting a %s Test\n", - style.Render("GoLoad"), - ) - fmt.Fprintln(ui.output) -} - -func (ui *UI) PrintAbortMessage() { - fmt.Fprintln(ui.output) - fmt.Fprintln(ui.output, "Interrupt received.") - fmt.Fprintln(ui.output, "Waiting for existing requests to finish...") - fmt.Fprintln(ui.output) -} - -func (ui *UI) ReportInitialRPM(rpm int32) { - fmt.Fprintf(ui.output, "Starting load test with initial %d RPM\n", rpm) -} - -func (ui *UI) ReportIncreaseInRPM(rpm int32) { - fmt.Fprintf(ui.output, "Increasing RPM to %d\n", rpm) -} - -func (ui *UI) ReportDecreaseInRPM(rpm int32) { - fmt.Fprintf(ui.output, "Decreasing RPM to %d\n", rpm) -} - -func (ui *UI) ReportResults(results *LoadTestResults) { - fmt.Fprintln(ui.output) - - rows := generateResultRows(results) - - addTotalRequestsColumn(rows, results) - addFailedRequestsColumn(rows, results) - addAverageResponseTimeColumn(rows, results) - - for _, row := range rows { - fmt.Fprintln( - ui.output, - strings.Join(row, " "), - ) - } -} - -func generateResultRows(results *LoadTestResults) [][]string { - maxLen := 0 - for _, endpoint := range results.Iter() { - if maxLen < len(endpoint.Name) { - maxLen = len(endpoint.Name) - } - } - - columns := [][]string{} - for _, endpoint := range results.Iter() { - columns = append( - columns, - []string{ - fmt.Sprintf( - "%s%s", - endpoint.Name, - lipgloss. - NewStyle(). - Foreground(lipgloss.Color("#8C8FA3")). - Render( - fmt.Sprintf( - "%s:", - strings.Repeat(".", maxLen-len(endpoint.Name)+3), - ), - ), - ), - }, - ) - } - - return columns -} - -func addTotalRequestsColumn(columns [][]string, results *LoadTestResults) { - items := []string{} - for _, endpoint := range results.Iter() { - items = append( - items, - fmt.Sprintf("total=%d", endpoint.GetTotalRequests()), - ) - } - - maxLen := 0 - for _, item := range items { - if maxLen < len(item) { - maxLen = len(item) - } - } - - for index, item := range items { - columns[index] = append( - columns[index], - fmt.Sprintf("%s%s", item, strings.Repeat(" ", maxLen-len(item))), - ) - } -} - -func addFailedRequestsColumn(columns [][]string, results *LoadTestResults) { - items := []string{} - for _, endpoint := range results.Iter() { - items = append( - items, - fmt.Sprintf("failed=%d", endpoint.GetTotalFailedRequests()), - ) - } - - maxLen := 0 - for _, item := range items { - if maxLen < len(item) { - maxLen = len(item) - } - } - - for index, item := range items { - columns[index] = append( - columns[index], - fmt.Sprintf("%s%s", item, strings.Repeat(" ", maxLen-len(item))), - ) - } -} - -func addAverageResponseTimeColumn(columns [][]string, results *LoadTestResults) { - items := []string{} - for _, endpoint := range results.Iter() { - averageDuration := endpoint.GetAverageDuration() - if math.IsNaN(averageDuration) { - items = append(items, "") - continue - } - - items = append( - items, - fmt.Sprintf("avg=%.2fms", endpoint.GetAverageDuration()), - ) - } - - maxLen := 0 - for _, item := range items { - if maxLen < len(item) { - maxLen = len(item) - } - } - - if maxLen == 0 { - return - } - - for index, item := range items { - columns[index] = append( - columns[index], - fmt.Sprintf("%s%s", item, strings.Repeat(" ", maxLen-len(item))), - ) - } -} diff --git a/utils/ctx/ctx.go b/utils/ctx/ctx.go new file mode 100644 index 0000000..5eef77c --- /dev/null +++ b/utils/ctx/ctx.go @@ -0,0 +1,29 @@ +package ctx_utils + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" +) + +func ContextWithInterrupt(ctx context.Context) context.Context { + newCtx, cancel := context.WithCancel(ctx) + + go func() { + sigint := make(chan os.Signal, 1) + + signal.Notify(sigint, os.Interrupt) + signal.Notify(sigint, syscall.SIGTERM) + signal.Notify(sigint, syscall.SIGHUP) + + <-sigint + + fmt.Println("received interrupt signal: canceling context") + + cancel() + }() + + return newCtx +} diff --git a/utils/random/random.go b/utils/random/random.go new file mode 100644 index 0000000..d59ba5c --- /dev/null +++ b/utils/random/random.go @@ -0,0 +1,7 @@ +package random + +import "math/rand" + +func Number(min int64, max int64) int64 { + return rand.Int63n(max+1-min) + min +} diff --git a/utils/random/sampler.go b/utils/random/sampler.go new file mode 100644 index 0000000..17bf45a --- /dev/null +++ b/utils/random/sampler.go @@ -0,0 +1,46 @@ +package random + +import ( + lfring "github.com/LENSHOOD/go-lock-free-ring-buffer" + "golang.org/x/sync/singleflight" + "math/rand" +) + +type Sampler[T any] struct { + Values []T + rb lfring.RingBuffer[T] + sfg *singleflight.Group +} + +func NewSampler[T any](values []T) *Sampler[T] { + return &Sampler[T]{ + Values: values, + rb: lfring.New[T](lfring.NodeBased, uint64(len(values)*2)), + sfg: &singleflight.Group{}, + } +} + +// Get returns n 'random' items +// because the values gets preshuffled in a ring buffer there is no guaranty that there are no duplicate items +func (s *Sampler[T]) Get(n int) []T { + values := make([]T, 0, n) + for len(values) < n { + val, ok := s.rb.Poll() + if !ok { + _, _, _ = s.sfg.Do("fillBuffer", func() (interface{}, error) { + s.fillBuffer() + return nil, nil + }) + continue + } + values = append(values, val) + } + return values +} + +func (s *Sampler[T]) fillBuffer() { + rand.Shuffle(len(s.Values), func(i, j int) { s.Values[i], s.Values[j] = s.Values[j], s.Values[i] }) + for _, value := range s.Values { + s.rb.Offer(value) + } +}