Skip to content

Commit

Permalink
WIP: Plumb error info to x-ray
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoffatt committed Jan 4, 2024
1 parent 752114b commit 4f81bad
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
34 changes: 32 additions & 2 deletions lambda/invoke_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error {
}

func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error {
errorPayload := safeMarshal(invokeErr)
errorForXRay := makeErrorForXRay(invokeErr)
errorPayload := errorForXRay.Exceptions[0]
log.Printf("%s", errorPayload)
if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON); err != nil {

causeForXRay, err := json.Marshal(errorForXRay)
if err != nil {
return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err)
}

if err := invoke.xfailure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err)
}
return nil
Expand Down Expand Up @@ -166,3 +173,26 @@ func safeMarshal(v interface{}) []byte {
}
return payload
}

type errorForXRay struct {
WorkingDirectory string `json:"working_directory"`
Exceptions []json.RawMessage `json:"exceptions"` // returned as bytes to avoid double-serializing
Paths []string `json:"paths"`
}

func makeErrorForXRay(invokeResponseError *messages.InvokeResponse_Error) *errorForXRay {
pathSet := make(map[string]struct{}, len(invokeResponseError.StackTrace))
for _, frame := range invokeResponseError.StackTrace {
pathSet[frame.Path] = struct{}{}
}
paths := make([]string, 0, len(pathSet))
for path := range pathSet {
paths = append(paths, path)
}
cwd, _ := os.Getwd()
return &errorForXRay{
WorkingDirectory: cwd,
Paths: paths,
Exceptions: []json.RawMessage{safeMarshal(invokeResponseError)},
}
}
43 changes: 43 additions & 0 deletions lambda/invoke_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,47 @@ func TestCustomErrorMarshaling(t *testing.T) {
}
}

func TestXRayCausePlumbing(t *testing.T) {
errors := []error{
messages.InvokeResponse_Error{
Type: "yolo",
Message: "hello",
StackTrace: []*messages.InvokeResponse_Error_StackFrame{
{Label: "yolo", Path: "yolo", Line: 2},
},
},
}
wd, _ := os.Getwd()
expected := []string{
`{
"working_directory":"` + wd + `",
"paths": ["yolo"],
"exceptions": [{
"errorType": "yolo",
"errorMessage": "hello",
"stackTrace": [
{"label": "yolo", "path": "yolo", "line": 2}
]
}]
}`,
}
require.Equal(t, len(errors), len(expected))
ts, record := runtimeAPIServer(``, len(errors))
defer ts.Close()
n := 0
handler := NewHandler(func() error {
defer func() { n++ }()
return errors[n]
})
endpoint := strings.Split(ts.URL, "://")[1]
expectedError := fmt.Sprintf("failed to GET http://%s/2018-06-01/runtime/invocation/next: got unexpected status code: 410", endpoint)
assert.EqualError(t, startRuntimeAPILoop(endpoint, handler), expectedError)
for i := range errors {
assert.JSONEq(t, expected[i], string(record.xrayCauses[i]))
}

}

func TestRuntimeAPIContextPlumbing(t *testing.T) {
handler := NewHandler(func(ctx context.Context) (interface{}, error) {
lc, _ := lambdacontext.FromContext(ctx)
Expand Down Expand Up @@ -271,6 +312,7 @@ type requestRecord struct {
nPosts int
responses [][]byte
contentTypes []string
xrayCauses []string
}

type eventMetadata struct {
Expand Down Expand Up @@ -336,6 +378,7 @@ func runtimeAPIServer(eventPayload string, failAfter int, overrides ...eventMeta
w.WriteHeader(http.StatusAccepted)
record.responses = append(record.responses, response.Bytes())
record.contentTypes = append(record.contentTypes, r.Header.Get("Content-Type"))
record.xrayCauses = append(record.xrayCauses, r.Header.Get(headerXRayErrorCause))
default:
w.WriteHeader(http.StatusBadRequest)
}
Expand Down
24 changes: 21 additions & 3 deletions lambda/runtime_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"log"
"net/http"
"runtime"

"github.com/aws/aws-lambda-go/lambda/messages"
)

const (
Expand All @@ -22,11 +24,13 @@ const (
headerCognitoIdentity = "Lambda-Runtime-Cognito-Identity"
headerClientContext = "Lambda-Runtime-Client-Context"
headerInvokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn"
headerXRayErrorCause = "Lambda-Runtime-Function-Xray-Error-Cause"
trailerLambdaErrorType = "Lambda-Runtime-Function-Error-Type"
trailerLambdaErrorBody = "Lambda-Runtime-Function-Error-Body"
contentTypeJSON = "application/json"
contentTypeBytes = "application/octet-stream"
apiVersion = "2018-06-01"
xrayErrorCauseMaxSize = 1024 * 1024
)

type runtimeAPIClient struct {
Expand All @@ -52,12 +56,18 @@ type invoke struct {
client *runtimeAPIClient
}

type failure struct {

Check failure on line 59 in lambda/runtime_api_client.go

View workflow job for this annotation

GitHub Actions / run golangci-golint on the project

type `failure` is unused (unused)
WorkingDirectory string `json:"working_directory"`
Exceptions []messages.InvokeResponse_Error `json:"exceptions"`
Paths []string `json:"paths"`
}

// success sends the response payload for an in-progress invocation.
// Notes:
// - An invoke is not complete until next() is called again!
func (i *invoke) success(body io.Reader, contentType string) error {
url := i.client.baseURL + i.id + "/response"
return i.client.post(url, body, contentType)
return i.client.post(url, body, contentType, nil)
}

// failure sends the payload to the Runtime API. This marks the function's invoke as a failure.
Expand All @@ -66,8 +76,12 @@ func (i *invoke) success(body io.Reader, contentType string) error {
// - A Lambda Function continues to be re-used for future invokes even after a failure.
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
func (i *invoke) failure(body io.Reader, contentType string) error {
return i.xfailure(body, contentType, nil)
}

func (i *invoke) xfailure(body io.Reader, contentType string, causeForXRay []byte) error {
url := i.client.baseURL + i.id + "/error"
return i.client.post(url, body, contentType)
return i.client.post(url, body, contentType, causeForXRay)
}

// next connects to the Runtime API and waits for a new invoke Request to be available.
Expand Down Expand Up @@ -108,7 +122,7 @@ func (c *runtimeAPIClient) next() (*invoke, error) {
}, nil
}

func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) error {
func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string, xrayErrorCause []byte) error {
b := newErrorCapturingReader(body)
req, err := http.NewRequest(http.MethodPost, url, b)
if err != nil {
Expand All @@ -118,6 +132,10 @@ func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string)
req.Header.Set("User-Agent", c.userAgent)
req.Header.Set("Content-Type", contentType)

if xrayErrorCause != nil && len(xrayErrorCause) < xrayErrorCauseMaxSize {
req.Header.Set(headerXRayErrorCause, string(xrayErrorCause))
}

resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to POST to %s: %v", url, err)
Expand Down

0 comments on commit 4f81bad

Please sign in to comment.