Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReadableStream support to k6 #3696

Merged
merged 22 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d7fa621
Add base implementation of ReadableStream (Streams API)
joanlopez Apr 19, 2024
b5e0124
Base tooling for Web Platform Tests
joanlopez Apr 19, 2024
bed700b
Web Platform Tests for ReadableStream
joanlopez Apr 19, 2024
5461ef1
Expose k6/experimental/streams
joanlopez Apr 19, 2024
d546366
Add k6/experimental/streams example
joanlopez Apr 19, 2024
347a26d
Chore: Fix linter errors
joanlopez Apr 19, 2024
b881b97
Resolve import cycle
joanlopez Apr 19, 2024
b73c40f
Remove handmade WPTs
joanlopez Apr 24, 2024
3ecab30
Remove test specifics from shared modulestest
joanlopez Apr 24, 2024
7401502
Adjust ReadableStream WPTs to use checked out code
joanlopez Apr 24, 2024
c691261
Add GH Workflow to run WPTs for Streams
joanlopez Apr 24, 2024
60e087d
Merge remote-tracking branch 'upstream/master' into readable-stream
joanlopez Apr 24, 2024
1e06c44
Apply Pull Request suggestions
oleiade Apr 25, 2024
cf4f439
Apply Pull Request suggestions
oleiade Apr 25, 2024
31a2554
Apply code review suggestions
joanlopez Apr 25, 2024
08824fa
Merge branch 'readable-stream' of github.com:grafana/k6 into readable…
joanlopez Apr 25, 2024
2a90895
Fix Web Platform Tests execution for streams
joanlopez Apr 26, 2024
c301edc
Apply code review suggestions
joanlopez Apr 26, 2024
386dd73
Merge remote-tracking branch 'upstream/master' into readable-stream
joanlopez Apr 26, 2024
c028e12
Fix linting issue
joanlopez Apr 26, 2024
a35dad0
Apply pull request review suggestions
oleiade Apr 29, 2024
89302f2
Merge branch 'master' into readable-stream
oleiade Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/wpt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Web Platform Tests
on:
workflow_dispatch:
pull_request:

defaults:
run:
shell: bash

jobs:
streams:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
check-latest: true
- name: Run tests
run: |
set -x
cd js/modules/k6/experimental/streams/tests
sh checkout.sh
go test ../... -tags=wpt
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/dist
/pkg-build
/js/tc39/TestTC39
/js/modules/k6/experimental/streams/tests/wpt

.vscode
*.sublime-workspace
Expand Down
34 changes: 34 additions & 0 deletions examples/experimental/streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { ReadableStream } from 'k6/experimental/streams'
import { setTimeout } from 'k6/timers'

function numbersStream() {
let currentNumber = 0

return new ReadableStream({
start(controller) {
const fn = () => {
if (currentNumber < 5) {
controller.enqueue(++currentNumber)
setTimeout(fn, 1000)
return;
}

controller.close()
}
setTimeout(fn, 1000)
},
})
}

export default async function () {
const stream = numbersStream()
const reader = stream.getReader()

while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(`received number ${value} from stream`)
}

console.log('we are done')
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/streams"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
"go.k6.io/k6/js/modules/k6/grpc"
"go.k6.io/k6/js/modules/k6/html"
Expand All @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/timers": timers.New(),
"k6/execution": execution.New(),
"k6/experimental/redis": redis.New(),
"k6/experimental/streams": streams.New(),
"k6/experimental/webcrypto": webcrypto.New(),
"k6/experimental/websockets": &expws.RootModule{},
"k6/experimental/timers": newWarnExperimentalModule(timers.New(),
Expand Down
119 changes: 119 additions & 0 deletions js/modules/k6/experimental/streams/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package streams

import "github.com/dop251/goja"

func newTypeError(rt *goja.Runtime, message string) *jsError {
return newJsError(rt, rt.Get("TypeError"), TypeError, message)
}

func newRangeError(rt *goja.Runtime, message string) *jsError {
return newJsError(rt, rt.Get("RangeError"), RangeError, message)
}

func newJsError(rt *goja.Runtime, base goja.Value, kind errorKind, message string) *jsError {
constructor, ok := goja.AssertConstructor(base)
if !ok {
throw(rt, newError(kind, message))
}

e, err := constructor(nil, rt.ToValue(message))
if err != nil {
throw(rt, newError(kind, message))
}

return &jsError{err: e, msg: message}
}

// jsError is a wrapper around a JS error object.
//
// We need to use it because whenever we need to return a [TypeError]
// or a [RangeError], we want to use original JS errors, which can be
// retrieved from Goja, for instance with: goja.Runtime.Get("TypeError").
//
// However, that is implemented as a [*goja.Object], but sometimes we
// need to return that error as a Go [error], or even keep the instance
// in memory to be returned/thrown later.
//
// So, we use this wrapper instead of returning the original JS error.
// Otherwise, we would need to replace everything typed as [error] with
// [any] to be compatible, and that would be a mess.
type jsError struct {
err *goja.Object
msg string
}

func (e *jsError) Error() string {
return e.msg
}

func (e *jsError) Err() *goja.Object {
return e.err
}

func newError(k errorKind, message string) *streamError {
return &streamError{
Name: k.String(),
Message: message,
kind: k,
}
}

//go:generate enumer -type=errorKind -output errors_gen.go
type errorKind uint8

const (
// TypeError is thrown when an argument is not of an expected type
TypeError errorKind = iota + 1

// RangeError is thrown when an argument is not within the expected range
RangeError

// RuntimeError is thrown when an error occurs that was caused by the JS runtime
// and is not likely caused by the user, but rather the implementation.
RuntimeError

// AssertionError is thrown when an assertion fails
AssertionError

// NotSupportedError is thrown when a feature is not supported, or not yet implemented
NotSupportedError
)

type streamError struct {
// Name contains the name of the error
Name string `json:"name"`

// Message contains the error message
Message string `json:"message"`

// kind contains the kind of error
kind errorKind
}

// Ensure that the fsError type implements the Go `error` interface
var _ error = (*streamError)(nil)

func (e *streamError) Error() string {
return e.Name + ":" + e.Message
}

func throw(rt *goja.Runtime, err any) {
if e, ok := err.(*jsError); ok {
panic(e.Err())
}

panic(errToObj(rt, err))
}

func errToObj(rt *goja.Runtime, err any) goja.Value {
// Undefined remains undefined.
if goja.IsUndefined(rt.ToValue(err)) {
return rt.ToValue(err)
}

if e, ok := err.(*goja.Exception); ok {
return e.Value().ToObject(rt)
}

return rt.ToValue(err).ToObject(rt)
}
53 changes: 53 additions & 0 deletions js/modules/k6/experimental/streams/errors_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions js/modules/k6/experimental/streams/goja.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package streams

import (
"fmt"
"reflect"

"github.com/dop251/goja"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

// newResolvedPromise instantiates a new resolved promise.
func newResolvedPromise(vu modules.VU, with goja.Value) *goja.Promise {
promise, resolve, _ := vu.Runtime().NewPromise()
resolve(with)
return promise
}

// newRejectedPromise instantiates a new rejected promise.
func newRejectedPromise(vu modules.VU, with any) *goja.Promise {
promise, _, reject := vu.Runtime().NewPromise()
reject(with)
return promise
}

// promiseThen facilitates instantiating a new promise and defining callbacks for to be executed
// on fulfillment as well as rejection, directly from Go.
func promiseThen(
rt *goja.Runtime,
promise *goja.Promise,
onFulfilled, onRejected func(goja.Value),
) (*goja.Promise, error) {
val, err := rt.RunString(
`(function(promise, onFulfilled, onRejected) { return promise.then(onFulfilled, onRejected) })`)
if err != nil {
return nil, newError(RuntimeError, "unable to initialize promiseThen internal helper function")
}

cal, ok := goja.AssertFunction(val)
if !ok {
return nil, newError(RuntimeError, "the internal promiseThen helper is not a function")
}

if onRejected == nil {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled))
} else {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled), rt.ToValue(onRejected))
}

if err != nil {
return nil, err
}

newPromise, ok := val.Export().(*goja.Promise)
if !ok {
return nil, newError(RuntimeError, "unable to cast the internal promiseThen helper's return value to a promise")
}

return newPromise, nil
}

// isNumber returns true if the given goja value holds a number
func isNumber(value goja.Value) bool {
_, isFloat := value.Export().(float64)
_, isInt := value.Export().(int64)

return isFloat || isInt
}

// isNonNegativeNumber implements the [IsNonNegativeNumber] algorithm.
//
// [IsNonNegativeNumber]: https://streams.spec.whatwg.org/#is-non-negative-number
func isNonNegativeNumber(value goja.Value) bool {
if common.IsNullish(value) {
return false
}

if !isNumber(value) {
return false
}

if value.ToFloat() < 0 || value.ToInteger() < 0 {
return false
}

return true
}

// setReadOnlyPropertyOf sets a read-only property on the given [goja.Object].
func setReadOnlyPropertyOf(obj *goja.Object, objName, propName string, propValue goja.Value) error {
err := obj.DefineDataProperty(propName,
propValue,
goja.FLAG_FALSE,
goja.FLAG_FALSE,
goja.FLAG_TRUE,
)
if err != nil {
return fmt.Errorf("unable to define %s read-only property on %s object; reason: %w", propName, objName, err)
}

return nil
}

// isObject determines whether the given [goja.Value] is a [goja.Object] or not.
func isObject(val goja.Value) bool {
return val != nil && val.ExportType() != nil && val.ExportType().Kind() == reflect.Map
}
Loading
Loading