From b07c045dab6a22f321aa24946f647dd9a41bf122 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Sun, 21 Jan 2018 21:11:44 +0100 Subject: [PATCH 1/2] Add support for headers and query params to workflow invocations Additionally: - Extracted typedvalue<->fission request logic - Refactored header input values from header_* to headers syntax --- pkg/fnenv/fission/envproxy.go | 31 ++-- pkg/fnenv/fission/httputil.go | 44 ----- pkg/fnenv/fission/request.go | 278 ++++++++++++++++++++++++++++++ pkg/fnenv/fission/request_test.go | 154 +++++++++++++++++ pkg/fnenv/fission/resolver.go | 1 + pkg/fnenv/fission/runtime.go | 172 ++---------------- pkg/types/types.go | 3 + 7 files changed, 468 insertions(+), 215 deletions(-) delete mode 100644 pkg/fnenv/fission/httputil.go create mode 100644 pkg/fnenv/fission/request.go create mode 100644 pkg/fnenv/fission/request_test.go diff --git a/pkg/fnenv/fission/envproxy.go b/pkg/fnenv/fission/envproxy.go index a9bba6b7..c067ef82 100644 --- a/pkg/fnenv/fission/envproxy.go +++ b/pkg/fnenv/fission/envproxy.go @@ -25,7 +25,8 @@ import ( "strings" ) -// Proxy between Fission and Workflow to ensure that workflowInvocations comply with Fission function interface +// Proxy between Fission and Workflow to ensure that workflowInvocations comply with Fission function interface. This +// ensures that workflows can be executed exactly like Fission functions are executed. type Proxy struct { invocationServer apiserver.WorkflowInvocationAPIServer workflowServer apiserver.WorkflowAPIServer @@ -77,49 +78,49 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { fp.fissionIds[fnId] = true } - // Map Inputs to function parameters + // Map request to workflow inputs inputs := map[string]*types.TypedValue{} - err := ParseRequest(r, inputs) + err := parseRequest(r, inputs) if err != nil { logrus.Errorf("Failed to parse inputs: %v", err) http.Error(w, "Failed to parse inputs", 400) return } + wfSpec := &types.WorkflowInvocationSpec{ + WorkflowId: fnId, + Inputs: inputs, + } + // Temporary: in case of query header 'X-Async' being present, make request async if len(r.Header.Get("X-Async")) > 0 { - invocatinId, err := fp.invocationServer.Invoke(ctx, &types.WorkflowInvocationSpec{ - WorkflowId: fnId, - Inputs: inputs, - }) + invocationId, err := fp.invocationServer.Invoke(ctx, wfSpec) if err != nil { logrus.Errorf("Failed to invoke: %v", err) http.Error(w, err.Error(), 500) return } w.WriteHeader(200) - w.Write([]byte(invocatinId.Id)) + w.Write([]byte(invocationId.Id)) return } // Otherwise, the request synchronous like other Fission functions - invocation, err := fp.invocationServer.InvokeSync(ctx, &types.WorkflowInvocationSpec{ - WorkflowId: fnId, - Inputs: inputs, - }) + invocation, err := fp.invocationServer.InvokeSync(ctx, wfSpec) if err != nil { logrus.Errorf("Failed to invoke: %v", err) - http.Error(w, err.Error(), 500) + http.Error(w, err.Error(), http.StatusInternalServerError) return } + // In case of an error, create an error response corresponding to Fission function errors if !invocation.Status.Status.Successful() { logrus.Errorf("Invocation not successful, was '%v'", invocation.Status.Status.String()) http.Error(w, invocation.Status.Status.String(), 500) return } - // TODO determine header based on the output value + // Otherwise, create a response corresponding to Fission function responses. var resp []byte if invocation.Status.Output != nil { resp = invocation.Status.Output.Value @@ -127,7 +128,7 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { } else { logrus.Infof("Invocation '%v' has no output.", fnId) } - w.WriteHeader(200) + w.WriteHeader(http.StatusOK) w.Write(resp) } diff --git a/pkg/fnenv/fission/httputil.go b/pkg/fnenv/fission/httputil.go deleted file mode 100644 index 7682cd36..00000000 --- a/pkg/fnenv/fission/httputil.go +++ /dev/null @@ -1,44 +0,0 @@ -package fission - -import ( - "net/http" - - "errors" - "io/ioutil" - - "encoding/json" - - "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/typedvalues" - "github.com/sirupsen/logrus" -) - -func ParseRequest(r *http.Request, target map[string]*types.TypedValue) error { - contentType := r.Header.Get("Content-Type") - logrus.WithField("url", r.URL).WithField("content-type", contentType).Info("Request content-type") - // Map Inputs to function parameters - body, err := ioutil.ReadAll(r.Body) - defer r.Body.Close() - if err != nil { - panic(err) - } - - var i interface{} = body - if len(body) > 0 { - err = json.Unmarshal(body, &i) - if err != nil { - logrus.WithField("body", len(body)).Infof("Input is not json: %v", err) - i = body - } - } - - parsedInput, err := typedvalues.Parse(i) - if err != nil { - logrus.Errorf("Failed to parse body: %v", err) - return errors.New("failed to parse body") - } - - logrus.WithField(types.INPUT_MAIN, parsedInput).Info("Parsed body") - target[types.INPUT_MAIN] = parsedInput - return nil -} diff --git a/pkg/fnenv/fission/request.go b/pkg/fnenv/fission/request.go new file mode 100644 index 00000000..31c7a0fa --- /dev/null +++ b/pkg/fnenv/fission/request.go @@ -0,0 +1,278 @@ +package fission + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "reflect" + "strings" + + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/sirupsen/logrus" +) + +const ( + InputBody = "body" // or 'default' + InputHttpMethod = "method" + InputContentType = "content_type" // to force the content type + + defaultContentType = "text/plain" + headerContentType = "Content-Type" +) + +// Format maps values of the source map to the (Fission) request. +func formatRequest(r *http.Request, source map[string]*types.TypedValue) error { + // Map headers inputs to request + formatHeaders(r, source) // TODO move error handling here + + // Map HTTP method inputs to request, or use default + formatHttpMethod(r, source) + + // Map query inputs to request + formatQuery(r.URL, source) // TODO move error handling here + + // Set the Content-Type + r.Header.Set(headerContentType, formatContentType(source, defaultContentType)) + + // Map body inputs to request + r.Body = formatBody(source) + + return nil +} + +// Parse maps a (Fission) request to a target map. +func parseRequest(r *http.Request, target map[string]*types.TypedValue) error { + // Content-Type is a common problem, so log this for every request + contentType := r.Header.Get(headerContentType) + logrus.WithField("url", r.URL).WithField(headerContentType, contentType).Info("Request Content-Type") + + // Map body to "main" input + bodyInput, err := parseBody(r.Body, contentType) + defer r.Body.Close() + if err != nil { + return fmt.Errorf("failed to parse request: %v", err) + } + target[types.INPUT_MAIN] = bodyInput + + // Map query to "query.x" + err = parseQuery(r, target) + if err != nil { + return fmt.Errorf("failed to parse request: %v", err) + } + + // Map headers to "headers.x" + err = parseHeaders(r, target) + if err != nil { + return fmt.Errorf("failed to parse request: %v", err) + } + + // Map http method to "method" + err = parseMethod(r, target) + if err != nil { + return fmt.Errorf("failed to parse request: %v", err) + } + + return nil +} + +// parseBody maps the body from a request to the "main" key in the target map +func parseBody(b io.ReadCloser, contentType string) (*types.TypedValue, error) { + body, err := ioutil.ReadAll(b) + if err != nil { + return nil, errors.New("failed to read body") + } + + var i interface{} = body + // TODO fix this, remove the hardcoded JSON transform + if strings.Contains(contentType, "application/json") || strings.Contains(contentType, "text/json") { + err = json.Unmarshal(body, &i) + if err != nil { + logrus.WithField("body", len(body)).Infof("Input is not json: %v", err) + i = body + } + } + + parsedInput, err := typedvalues.Parse(i) + if err != nil { + logrus.Errorf("Failed to parse body: %v", err) + return parsedInput, errors.New("failed to parse body") + } + + return parsedInput, nil +} + +// parseHeaders maps the headers from a request to the "headers" key in the target map +func parseHeaders(r *http.Request, target map[string]*types.TypedValue) error { + // For now we do not support multi-valued headers + headers := flattenMultimap(r.Header) + + tv, err := typedvalues.Parse(headers) + if err != nil { + logrus.Errorf("Failed to parse headers: %v", err) + return fmt.Errorf("failed to parse headers: %v", err) + } + target[types.INPUT_HEADERS] = tv + return nil +} + +// parseQuery maps the query params from a request to the "query" key in the target map +func parseQuery(r *http.Request, target map[string]*types.TypedValue) error { + // For now we do not support multi-valued query params + query := flattenMultimap(r.URL.Query()) + + tv, err := typedvalues.Parse(query) + if err != nil { + logrus.Errorf("Failed to parse query: %v", err) + return fmt.Errorf("failed to parse query: %v", err) + } + target[types.INPUT_QUERY] = tv + return nil +} + +// parseMethod maps the method param from a request to the "method" key in the target map +func parseMethod(r *http.Request, target map[string]*types.TypedValue) error { + method, err := typedvalues.Parse(r.Method) + if err != nil { + logrus.Errorf("Failed to parse the http method: %v", err) + return errors.New("failed to parse http method") + } + target[types.INPUT_METHOD] = method + return nil +} + +func flattenMultimap(mm map[string][]string) map[string]interface{} { + target := map[string]interface{}{} + for k, v := range mm { + target[k] = v[0] + } + return target +} + +// formatting logic + +func formatHttpMethod(target *http.Request, inputs map[string]*types.TypedValue) { + _, tv := getFirstDefinedTypedValue(inputs, InputHttpMethod) + httpMethod := toString(tv) + if httpMethod != "" { + target.Method = httpMethod + } +} + +// TODO support multivalued query params at some point +func formatQuery(targetUrl *url.URL, inputs map[string]*types.TypedValue) { + queryInput := inputs[types.INPUT_QUERY] + if queryInput == nil { + return + } + + i, err := typedvalues.Format(queryInput) + if err != nil { + logrus.Errorf("Failed to format headers: %v", err) + } + + switch i.(type) { + case map[string]interface{}: + origQuery := targetUrl.Query() + for k, v := range i.(map[string]interface{}) { + origQuery.Add(k, fmt.Sprintf("%v", v)) + } + targetUrl.RawQuery = origQuery.Encode() + default: + logrus.Warnf("Ignoring invalid type of query input (expected map[string]interface{}, was %v)", + reflect.TypeOf(i)) + } +} + +// TODO support multi-headers at some point +func formatHeaders(target *http.Request, inputs map[string]*types.TypedValue) { + rawHeaders := inputs[types.INPUT_HEADERS] + if rawHeaders == nil { + return + } + + i, err := typedvalues.Format(rawHeaders) + if err != nil { + logrus.Errorf("Failed to format headers: %v", err) + } + + switch i.(type) { + case map[string]interface{}: + if target.Header == nil { + target.Header = http.Header{} + } + for k, v := range i.(map[string]interface{}) { + target.Header.Add(k, fmt.Sprintf("%v", v)) + } + default: + logrus.Warnf("Ignoring invalid type of headers input (expected map[string]interface{}, was %v)", + reflect.TypeOf(i)) + } +} + +func formatBody(inputs map[string]*types.TypedValue) io.ReadCloser { + var input []byte + _, mainInput := getFirstDefinedTypedValue(inputs, types.INPUT_MAIN, InputBody) + if mainInput != nil { + // TODO ensure that it is a byte-representation 1-1 of actual value not the representation in TypedValue + input = mainInput.Value + } + + return ioutil.NopCloser(bytes.NewReader(input)) +} + +func formatContentType(inputs map[string]*types.TypedValue, defaultContentType string) string { + // Check if content type is forced + _, tv := getFirstDefinedTypedValue(inputs, InputContentType) // TODO lookup in headers? + contentType := toString(tv) + if contentType != "" { + return contentType + } + return inferContentType(inputs[types.INPUT_MAIN], defaultContentType) +} + +func inferContentType(mainInput *types.TypedValue, defaultContentType string) string { + // Infer content type from main input (TODO Temporary solution) + if mainInput != nil && strings.HasPrefix(mainInput.Type, "json") { + return "application/json" + } + + // Use default content type + return defaultContentType +} + +// Util functions + +// getFirstDefinedTypedValue returns the first input and key of the inputs argument that matches a field in fields. +// For example, given inputs { a : b, c : d }, getFirstDefinedTypedValue(inputs, z, x, c, a) would return (c, d) +func getFirstDefinedTypedValue(inputs map[string]*types.TypedValue, fields ...string) (string, *types.TypedValue) { + var result *types.TypedValue + var key string + for _, key = range fields { + val, ok := inputs[key] + if ok { + result = val + break + } + } + return key, result +} + +// toString is a utility function to do an unsafe conversion of a TypedValue to a String. fmt.Sprintf is used to +// convert other types to their string representation. +func toString(tv *types.TypedValue) string { + if tv == nil { + return "" + } + i, err := typedvalues.Format(tv) + if err != nil { + logrus.Warn("Failed to format input: %v", err) + } + + return fmt.Sprintf("%v", i) +} diff --git a/pkg/fnenv/fission/request_test.go b/pkg/fnenv/fission/request_test.go new file mode 100644 index 00000000..30e79eb9 --- /dev/null +++ b/pkg/fnenv/fission/request_test.go @@ -0,0 +1,154 @@ +package fission + +import ( + "fmt" + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/stretchr/testify/assert" + "io" + "io/ioutil" + "net/http" + "net/url" + "strings" + "testing" +) + +func TestFormatRequest(t *testing.T) { + body := "some body input" + query := map[string]interface{}{ + "queryKey": "queryVal", + } + headers := map[string]interface{}{ + "Header-Key": "headerVal", + } + method := http.MethodPost + reqUrl, err := url.Parse("http://bar.example") + if err != nil { + panic(err) + } + target := &http.Request{ + URL: reqUrl, + // TODO verify that existing headers, query params, etc stay in tact. + } + source := map[string]*types.TypedValue{ + types.INPUT_MAIN: unsafe(typedvalues.Parse(body)), + types.INPUT_QUERY: unsafe(typedvalues.Parse(query)), + types.INPUT_HEADERS: unsafe(typedvalues.Parse(headers)), + types.INPUT_METHOD: unsafe(typedvalues.Parse(method)), + } + + err = formatRequest(target, source) + assert.NoError(t, err) + + // Check body + bs, err := ioutil.ReadAll(target.Body) + assert.NoError(t, err) + assert.Equal(t, "\""+body+"\"", string(bs)) + + // Check headers + assert.Equal(t, headers["Header-Key"], target.Header["Header-Key"][0]) + + // Check query + fmt.Println(query) + fmt.Println(target.URL.Query()) + assert.Equal(t, query["queryKey"], target.URL.Query()["queryKey"][0]) + + // Check method + assert.Equal(t, method, target.Method) +} + +func TestParseRequestComplete(t *testing.T) { + body := "hello world!" + req := createRequest(http.MethodPut, "http://foo.example?a=b", map[string]string{ + "header1": "value1", + }, strings.NewReader("\""+body+"\"")) + req.Header.Set("Content-Type", "application/json") + target := map[string]*types.TypedValue{} + + err := parseRequest(req, target) + assert.NoError(t, err) + + // Check body + ibody, err := typedvalues.Format(target[types.INPUT_MAIN]) + assert.NoError(t, err) + assert.Equal(t, body, ibody) + + // Check method + method, err := typedvalues.Format(target[types.INPUT_METHOD]) + assert.NoError(t, err) + assert.Equal(t, http.MethodPut, method) + + // Check headers + rawHeader, err := typedvalues.Format(target[types.INPUT_HEADERS]) + assert.NoError(t, err) + headers := rawHeader.(map[string]interface{}) + assert.IsType(t, map[string]interface{}{}, rawHeader) + assert.Equal(t, req.Header["header1"][0], headers["header1"]) + assert.Equal(t, nil, headers["nonExistent"]) + + // Check query + rawQuery, err := typedvalues.Format(target[types.INPUT_QUERY]) + assert.NoError(t, err) + assert.IsType(t, map[string]interface{}{}, rawQuery) + query := rawQuery.(map[string]interface{}) + assert.Equal(t, req.URL.Query()["a"][0], query["a"]) + assert.Equal(t, nil, query["nonExistent"]) +} + +// Tests whether accessing non-existent headers/query will not error +func TestParseRequestMinimal(t *testing.T) { + body := "hello world!" + req := createRequest(http.MethodPut, "http://foo.example", map[string]string{}, + strings.NewReader("\""+body+"\"")) + req.Header.Set("Content-Type", "application/json") + target := map[string]*types.TypedValue{} + + err := parseRequest(req, target) + assert.NoError(t, err) + + // Check body + ibody, err := typedvalues.Format(target[types.INPUT_MAIN]) + assert.NoError(t, err) + assert.Equal(t, body, ibody) + + // Check method + method, err := typedvalues.Format(target[types.INPUT_METHOD]) + assert.NoError(t, err) + assert.Equal(t, http.MethodPut, method) + + // Check headers + rawHeader, err := typedvalues.Format(target[types.INPUT_HEADERS]) + assert.NoError(t, err) + assert.IsType(t, map[string]interface{}{}, rawHeader) + headers := rawHeader.(map[string]interface{}) + assert.Equal(t, nil, headers["nonExistent"]) + + // Check query + rawQuery, err := typedvalues.Format(target[types.INPUT_QUERY]) + assert.NoError(t, err) + assert.IsType(t, map[string]interface{}{}, rawQuery) + query := rawQuery.(map[string]interface{}) + assert.Equal(t, nil, query["nonExistent"]) +} + +func createRequest(method string, rawUrl string, headers map[string]string, bodyReader io.Reader) *http.Request { + mheaders := http.Header{} + for k, v := range headers { + mheaders[k] = []string{v} + } + requrl, _ := url.Parse(rawUrl) + body := ioutil.NopCloser(bodyReader) + return &http.Request{ + Method: method, + URL: requrl, + Header: mheaders, + Body: body, + } +} + +func unsafe(i *types.TypedValue, e error) *types.TypedValue { + if e != nil { + panic(e) + } + return i +} diff --git a/pkg/fnenv/fission/resolver.go b/pkg/fnenv/fission/resolver.go index ed8b76b9..4c02e440 100644 --- a/pkg/fnenv/fission/resolver.go +++ b/pkg/fnenv/fission/resolver.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Resolver implements the resolver interface to allow functions to be resolved through Fission type Resolver struct { controller *client.Client } diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 015fcf45..a6de3e6a 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -1,41 +1,30 @@ package fission import ( - "bytes" "fmt" "net/http" + "net/url" - "encoding/json" - "io/ioutil" + "github.com/sirupsen/logrus" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" - executor "github.com/fission/fission/executor/client" - "github.com/sirupsen/logrus" - - "strings" + executor "github.com/fission/fission/executor/client" "github.com/fission/fission/router" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" - "net/url" ) -// FunctionEnv adapts the Fission platform to the function execution runtime. +// FunctionEnv adapts the Fission platform to the function execution runtime. This allows the workflow engine +// to invoke Fission functions. type FunctionEnv struct { executor *executor.Client } const ( - InputBody = "body" // or 'default' - InputHttpMethod = "http_method" - InputHeaderPrefix = "header_" - InputQueryPrefix = "query_" - InputContentType = "content_type" // to force the content type - - defaultHttpMethod = http.MethodPost - defaultProtocol = "http" - defaultContentType = "text/plain" + defaultHttpMethod = http.MethodPost + defaultProtocol = "http" ) func NewFunctionEnv(executor *executor.Client) *FunctionEnv { @@ -52,11 +41,12 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } logrus.WithFields(logrus.Fields{ "name": meta.Name, - "UID": meta.UID, + "uid": meta.UID, "ns": meta.Namespace, }).Info("Invoking Fission function.") // Get reqUrl + // TODO use router instead once we can route to a specific function uid serviceUrl, err := fe.executor.GetServiceForFunction(meta) if err != nil { logrus.WithFields(logrus.Fields{ @@ -72,52 +62,18 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca panic(err) } - // Map body parameter - var input []byte - mainInput, _ := getFirstDefinedTypedValue(spec.Inputs, types.INPUT_MAIN, InputBody) - var short string - if mainInput != nil { - input = mainInput.Value - short = mainInput.Short() - } - - r := bytes.NewReader(input) - logrus.Infof("[request][body]: %v", short) - - // Map HTTP method - httpMethod := httpMethod(spec.Inputs, defaultHttpMethod) - logrus.Infof("Using HTTP method: %v", httpMethod) - - // Map headers - headers := headers(spec.Inputs) - - // Determine ContentType - reqContentType := contentType(spec.Inputs, mainInput, defaultContentType) - - // Map query and add to url - query := query(spec.Inputs) - reqUrl.RawQuery = query.Encode() - // Construct request and add body - req, err := http.NewRequest(httpMethod, reqUrl.String(), nil) + req, err := http.NewRequest(defaultHttpMethod, reqUrl.String(), nil) if err != nil { panic(fmt.Errorf("failed to make request for '%s': %v", serviceUrl, err)) } - // Add body - req.Body = ioutil.NopCloser(r) - defer req.Body.Close() + // Map task inputs to request + formatRequest(req, spec.Inputs) // Add parameters normally added by Fission router.MetadataToHeaders(router.HEADERS_FISSION_FUNCTION_PREFIX, meta, req) - // Set headers - for k, v := range headers { - req.Header.Set(k, v) - // TODO check that no special headers are overwritten - } - req.Header.Set("Content-Type", reqContentType) - // Perform request resp, err := http.DefaultClient.Do(req) if err != nil { @@ -125,7 +81,10 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } // Parse output - output := toTypedValue(resp) + output, err := parseBody(resp.Body, resp.Header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("failed to parse output: %v", err) + } logrus.Infof("[%s][Content-Type]: %v ", meta.Name, resp.Header.Get("Content-Type")) logrus.Infof("[%s][output]: %v", meta.Name, output.Short()) @@ -149,102 +108,3 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca Output: output, }, nil } - -func toTypedValue(resp *http.Response) *types.TypedValue { - contentType := strings.ToLower(resp.Header.Get("Content-Type")) - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - var i interface{} = body - if strings.Contains(contentType, "application/json") || strings.Contains(contentType, "text/json") { - logrus.Info("Assuming JSON") - err := json.Unmarshal(body, &i) - if err != nil { - logrus.Warnf("Expected JSON response could not be parsed: %v", err) - } - } - - tv, err := typedvalues.Parse(i) - if err != nil { - panic(err) - } - return tv -} - -func getFirstDefinedTypedValue(inputs map[string]*types.TypedValue, fields ...string) (*types.TypedValue, string) { - var result *types.TypedValue - var f string - for _, f = range fields { - val, ok := inputs[f] - if ok { - result = val - break - } - } - return result, f -} - -func toString(tv *types.TypedValue) string { - if tv == nil { - return "" - } - i, err := typedvalues.Format(tv) - if err != nil { - logrus.Warn("Failed to format input: %v", err) - } - - return fmt.Sprintf("%v", i) -} - -func httpMethod(inputs map[string]*types.TypedValue, defaultHttpMethod string) string { - tv, _ := getFirstDefinedTypedValue(inputs, InputHttpMethod) - - httpMethod := toString(tv) - if httpMethod == "" { - return defaultHttpMethod - } - return httpMethod -} - -func contentType(inputs map[string]*types.TypedValue, mainInput *types.TypedValue, defaultContentType string) string { - // Check if content type is forced - tv, _ := getFirstDefinedTypedValue(inputs, InputContentType, InputHeaderPrefix+InputContentType) - contentType := toString(tv) - if contentType != "" { - return contentType - } - return inferContentType(mainInput, defaultContentType) -} - -func inferContentType(mainInput *types.TypedValue, defaultContentType string) string { - // Infer content type from main input (TODO Temporary solution) - if mainInput != nil && strings.HasPrefix(mainInput.Type, "json") { - return "application/json" - } - - // Use default content type - return defaultContentType -} - -func headers(inputs map[string]*types.TypedValue) map[string]string { // TODO support multi-headers at some point - result := map[string]string{} - for k, v := range inputs { - if strings.HasPrefix(k, InputHeaderPrefix) { - result[strings.TrimPrefix(k, InputHeaderPrefix)] = toString(v) - } - } - return result -} - -func query(inputs map[string]*types.TypedValue) url.Values { // TODO support multi-headers at some point - result := url.Values{} - for k, v := range inputs { - if strings.HasPrefix(k, InputQueryPrefix) { - result.Add(strings.TrimPrefix(k, InputQueryPrefix), toString(v)) - } - } - return result -} diff --git a/pkg/types/types.go b/pkg/types/types.go index 5f12afb1..d846559b 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -10,6 +10,9 @@ const ( SUBJECT_INVOCATION = "invocation" SUBJECT_WORKFLOW = "workflows" INPUT_MAIN = "default" + INPUT_HEADERS = "headers" + INPUT_QUERY = "query" + INPUT_METHOD = "method" typedValueShortMaxLen = 32 ) From a69e3002332fdaf8731dd0f896809bf2dd98f804 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 22 Jan 2018 14:40:21 +0100 Subject: [PATCH 2/2] Added examples on how to reference request metadata --- examples/misc/inputs.wf.yaml | 9 +++++++++ examples/whales/metadatawhale.wf.yaml | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 examples/misc/inputs.wf.yaml diff --git a/examples/misc/inputs.wf.yaml b/examples/misc/inputs.wf.yaml new file mode 100644 index 00000000..0f3932d1 --- /dev/null +++ b/examples/misc/inputs.wf.yaml @@ -0,0 +1,9 @@ +# The Inputs Workflow - simply prints the inputs it received, including body, query params and headers. +# +# Example: curl -XPUT -H "hello: world" http://$FISSION_ROUTER/fission-function/inputs?a=b +apiVersion: 1 +output: Printer +tasks: + Printer: + run: compose + inputs: "{$.Invocation.Inputs}" diff --git a/examples/whales/metadatawhale.wf.yaml b/examples/whales/metadatawhale.wf.yaml index 33f4006f..344995bc 100644 --- a/examples/whales/metadatawhale.wf.yaml +++ b/examples/whales/metadatawhale.wf.yaml @@ -5,4 +5,6 @@ tasks: PrefixedFortune: run: fortune inputs: - header_prefix: "Whale says:" + headers: + # If the 'prefix' header is non-empty, we use that. Otherwise we default to "whale says" + prefix: "{ $.Invocation.Inputs.headers.Prefix || 'Whale says: ' }"