Skip to content

Commit

Permalink
chore: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ory-bot committed Dec 20, 2022
1 parent 134295e commit 8b54913
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
50 changes: 29 additions & 21 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,29 +274,36 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}

var (
httpClient = e.deps.HTTPClient(ctx)
async = gjson.GetBytes(e.conf, "response.ignore").Bool()
parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
cancel context.CancelFunc = func() {}
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)}
errChan = make(chan error, 1)
httpClient = e.deps.HTTPClient(ctx)
ignoreResponse = gjson.GetBytes(e.conf, "response.ignore").Bool()
canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)}
errChan = make(chan error, 1)
)
if async {
// dissociate the context from the one passed into this function
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
spanOpts = append(spanOpts, trace.WithNewRoot())

req = req.WithContext(ctx)
if ignoreResponse {
// This is one of the few places where spawning a context.Background() is ok. We need to do this
// because the function runs asynchronously and we don't want to cancel the request if the
// incoming request context is cancelled.
//
// The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client.
req = req.WithContext(context.Background())
// spanOpts = append(spanOpts, trace.WithNewRoot())
}

ctx, span := tracer.Start(ctx, "Webhook", spanOpts...)
e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")
t0 := time.Now()

startTime := time.Now()
go func() {
defer close(errChan)
defer cancel()
defer span.End()

resp, err := httpClient.Do(req.WithContext(ctx))
resp, err := httpClient.Do(req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- errors.WithStack(err)
Expand All @@ -307,7 +314,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {

if resp.StatusCode >= http.StatusBadRequest {
span.SetStatus(codes.Error, "HTTP status code >= 400")
if parseResponse {
if canInterrupt {
if err := parseWebhookResponse(resp); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
Expand All @@ -320,16 +327,17 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
errChan <- nil
}()

if async {
if ignoreResponse {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
logger := e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
})
go func() {
if err := <-errChan; err != nil {
e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
}).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
logger.WithField("duration", time.Since(startTime)).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
} else {
e.deps.Logger().WithField("duration", time.Since(t0)).Info("Webhook request succeeded")
logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded")
}
}()
return nil
Expand Down
2 changes: 2 additions & 0 deletions selfservice/hook/web_hook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,13 +861,15 @@ func TestAsyncWebhook(t *testing.T) {
URL: &url.URL{Path: "/some_end_point"},
Method: http.MethodPost,
}

incomingCtx, incomingCancel := context.WithCancel(context.Background())
if deadline, ok := t.Deadline(); ok {
// cancel this context one second before test timeout for clean shutdown
var cleanup context.CancelFunc
incomingCtx, cleanup = context.WithDeadline(incomingCtx, deadline.Add(-time.Second))
defer cleanup()
}

req = req.WithContext(incomingCtx)
s := &session.Session{ID: x.NewUUID(), Identity: &identity.Identity{ID: x.NewUUID()}}
f := &login.Flow{ID: x.NewUUID()}
Expand Down

0 comments on commit 8b54913

Please sign in to comment.