Skip to content

Commit

Permalink
config: add support to file:// and http(s):// URIs
Browse files Browse the repository at this point in the history
Extends the current plugin config to use instead a URI. In the case
of `file://` the behavior is the same as it is currently.
In the case of `http(s)://` it will fetch the URI and try to
evaluate it as a wasm payload.

This PR is based on earlier work on `dapr/component-contrib`.
See: dapr/components-contrib#3005

Signed-off-by: Edoardo Vacchi <[email protected]>
  • Loading branch information
evacchi committed Jul 28, 2023
1 parent ce8959d commit 2799a90
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 101 deletions.
2 changes: 1 addition & 1 deletion internal/e2e/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
)

// Pass the profiling context to the plugin.
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath})
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: "file://" + guestPath})
if err != nil {
log.Panicln("failed to create plugin:", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/e2e/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func TestCycleStateCoherence(t *testing.T) {
ctx := context.Background()

plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathTestCycleState})
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: test.URLTestCycleState})
if err != nil {
t.Fatalf("failed to create plugin: %v", err)
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func BenchmarkExample_NodeNumber(b *testing.B) {

func newNodeNumberPlugin(ctx context.Context, t e2e.Testing, reverse bool) framework.Plugin {
plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{
GuestPath: test.PathExampleNodeNumber,
GuestURL: test.URLExampleNodeNumber,
GuestConfig: fmt.Sprintf(`{"reverse": %v}`, reverse),
})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ replace (

require (
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.1
github.com/tetratelabs/wazero v1.3.1
k8s.io/api v0.27.3
k8s.io/apimachinery v0.27.3
Expand Down Expand Up @@ -90,6 +91,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions scheduler/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package wasm

type PluginConfig struct {
// GuestPath is the path to the guest wasm.
GuestPath string `json:"guestPath"`
// GuestURL is the URL to the guest wasm.
// Valid schemes are file:// for a local file or http[s]:// for one
// retrieved via HTTP.
GuestURL string `json:"guestURL"`

// GuestConfig is any configuration to give to the guest.
GuestConfig string `json:"guestConfig"`
Expand Down
49 changes: 49 additions & 0 deletions scheduler/plugin/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package wasm

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
)

// httpClient decorates an http.Client with convenience methods.
type httpClient struct {
c http.Client
}

// newHTTPFetcher is a constructor for httpFetcher.
//
// It is possible to plug a custom http.RoundTripper to handle other concerns (e.g. retries)
// Compression is handled transparently and automatically by http.Client.
func newHTTPCLient(transport http.RoundTripper) *httpClient {
return &httpClient{
c: http.Client{Transport: transport},
}
}

// fetch returns a byte slice of the wasm module found at the given URL, or an error otherwise.
func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
resp, err := f.c.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body) //nolint
resp.Body.Close()
return nil, fmt.Errorf("received %v status code from %q", resp.StatusCode, u)
}

bytes, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
return bytes, nil
}
73 changes: 73 additions & 0 deletions scheduler/plugin/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package wasm

import (
"compress/gzip"
"context"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"
)

var wasmMagicNumber = []byte{0x00, 0x61, 0x73, 0x6d}

func TestWasmHTTPFetch(t *testing.T) {
wasmBinary := wasmMagicNumber
wasmBinary = append(wasmBinary, 0x00, 0x00, 0x00, 0x00)
cases := []struct {
name string
handler http.HandlerFunc
expectedError string
}{
{
name: "plain wasm binary",
handler: func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(wasmBinary)
},
},
// Compressed payloads are handled automatically by http.Client.
{
name: "compressed payload",
handler: func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Encoding", "gzip")

gw := gzip.NewWriter(w)
defer gw.Close()
_, _ = gw.Write(wasmBinary)
},
},
{
name: "http error",
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
},
expectedError: "received 500 status code",
},
}

for _, proto := range []string{"http", "https"} {
t.Run(proto, func(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ts := httptest.NewServer(tc.handler)
defer ts.Close()
c := newHTTPCLient(http.DefaultTransport)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
parse, err := url.Parse(ts.URL)
require.NoError(t, err)
_, err = c.get(ctx, parse)
if tc.expectedError != "" {
require.ErrorContains(t, err, tc.expectedError)
return
}
require.NoError(t, err, "Wasm download got an unexpected error: %v", err)
})
}
})
}
}
22 changes: 20 additions & 2 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package wasm
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -51,9 +53,9 @@ func New(configuration runtime.Object, frameworkHandle framework.Handle) (framew
// NewFromConfig is like New, except it allows us to explicitly provide the
// context and configuration of the plugin. This allows flexibility in tests.
func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin, error) {
guestBin, err := os.ReadFile(config.GuestPath)
guestBin, err := readFromURI(ctx, config.GuestURL)
if err != nil {
return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestPath, err)
return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestURL, err)
}

runtime, guestModule, err := prepareRuntime(ctx, guestBin, config.GuestConfig)
Expand All @@ -77,6 +79,22 @@ func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin,
}
}

func readFromURI(ctx context.Context, u string) ([]byte, error) {
uri, err := url.ParseRequestURI(u)
if err != nil {
return nil, err
}
switch uri.Scheme {
case "file":
return os.ReadFile(uri.Path)
case "http", "https":
c := newHTTPCLient(http.DefaultTransport)
return c.get(ctx, uri)
default:
return nil, fmt.Errorf("unsupported URL scheme: %s", uri.Scheme)
}
}

// newWasmPlugin is extracted to prevent small bugs: The caller must close the
// wazero.Runtime to avoid leaking mmapped files.
func newWasmPlugin(ctx context.Context, runtime wazero.Runtime, guestModule wazero.CompiledModule, config PluginConfig) (*wasmPlugin, error) {
Expand Down
Loading

0 comments on commit 2799a90

Please sign in to comment.