Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLATFORM-9039: webhooks reliability #98

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions driver/registry_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type RegistryDefault struct {
rwl sync.RWMutex
l *logrusx.Logger
c *config.Config
rc map[string]*retryablehttp.Client
rc *sync.Map

ctxer contextx.Contextualizer

Expand Down Expand Up @@ -237,7 +237,7 @@ func (m *RegistryDefault) RegisterRoutes(ctx context.Context, public *x.RouterPu
func NewRegistryDefault() *RegistryDefault {
return &RegistryDefault{
trc: otelx.NewNoop(nil, new(otelx.Config)),
rc: map[string]*retryablehttp.Client{},
rc: new(sync.Map),
}
}

Expand Down Expand Up @@ -815,14 +815,8 @@ func (m *RegistryDefault) PrometheusManager() *prometheus.MetricsManager {
}

func (m *RegistryDefault) NamedHTTPClient(ctx context.Context, name string, opts ...httpx.ResilientOptions) *retryablehttp.Client {
var rc *retryablehttp.Client
if cl, ok := m.rc[name]; ok {
rc = cl
} else {
rc = m.HTTPClient(ctx, opts...)
m.rc[name] = rc
}
return rc
res, _ := m.rc.LoadOrStore(name, m.HTTPClient(ctx, opts...))
return res.(*retryablehttp.Client)
}

func (m *RegistryDefault) HTTPClient(ctx context.Context, opts ...httpx.ResilientOptions) *retryablehttp.Client {
Expand Down
68 changes: 0 additions & 68 deletions identity/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ func (h *Handler) RegisterAdminRoutes(admin *x.RouterAdmin) {
admin.POST(RouteCollection, h.create)
// fandom-start - add API to validate email before saving user in UCP
admin.POST(RouteValidate, h.validate)
// allow admins to remove multifactor authentication for an identity
admin.DELETE(RouteMultifactor, h.deleteMultifactorCredential)
// fandom-end
admin.PATCH(RouteCollection, h.batchPatchIdentities)
admin.PUT(RouteItem, h.update)
Expand Down Expand Up @@ -736,72 +734,6 @@ func (h *Handler) validate(w http.ResponseWriter, r *http.Request, _ httprouter.
}

// fandom-end
// fandom start - allow admins to remove multifactor authentication for an identity

// swagger:parameters adminDeleteIdentityCredential
// nolint:deadcode,unused
type adminDeleteIdentityCredential struct {
// ID is the identity's ID.
//
// required: true
// in: path
ID string `json:"id"`
// credentialType is the type of credential
//
// required: true
// in: path
CredentialType string `json:"credentialType"`
}

// swagger:route DELETE /identities/{id}/credential/{credentialType} v0alpha2 adminDeleteIdentityCredential
//
// # Update an Identity
//
// Allow admins to remove multifactor authentication for an identity
//
// Consumes:
// - application/json
//
// Produces:
// - application/json
//
// Schemes: http, https
//
// Security:
// oryAccessToken:
//
// Responses:
// 204: emptyResponse
// 404: jsonError
// 500: jsonError
func (h *Handler) deleteMultifactorCredential(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identity, err := h.r.PrivilegedIdentityPool().GetIdentityConfidential(r.Context(), x.ParseUUID(ps.ByName("id")))
if err != nil {
h.r.Writer().WriteError(w, r, errors.WithStack(herodot.ErrBadRequest.WithReasonf("Invalid identity")))
return
}

credentialType := ps.ByName("credentialType")
if credentialType != CredentialsTypeTOTP.String() && credentialType != CredentialsTypeLookup.String() &&
credentialType != CredentialsTypeWebAuthn.String() {
h.r.Writer().WriteError(w, r, errors.WithStack(herodot.ErrBadRequest.WithReasonf("Invalid credential type: %s", credentialType)))
return
}

identity.DeleteCredentialsType(CredentialsType(credentialType))
if err := h.r.IdentityManager().Update(
r.Context(),
identity,
ManagerAllowWriteProtectedTraits,
); err != nil {
h.r.Writer().WriteError(w, r, err)
return
}
w.WriteHeader(http.StatusAccepted)
h.r.Writer().Write(w, r, "OK")
}

// fandom - end

// Delete Identity Parameters
//
Expand Down
80 changes: 23 additions & 57 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
if resp.StatusCode >= http.StatusBadRequest {
span.SetStatus(codes.Error, "HTTP status code >= 400")
if canInterrupt || parseResponse {
// TODO: double-check if we could use upstream `parseWebhookResponse`
if err := e.parseResponse(resp); err != nil {
if err := e.parseWebhookResponse(resp, data.Identity); err != nil {
return err
}
}
Expand All @@ -524,8 +523,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
}

if parseResponse {
// TODO: double-check if we could use upstream `parseWebhookResponse`
return e.parseResponse(resp)
return e.parseWebhookResponse(resp, data.Identity)
}
return nil
}
Expand All @@ -540,17 +538,28 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
return nil
}

func parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error) {
func (e *WebHook) parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try to push it to the upstream

Copy link
Author

@korzepadawid korzepadawid Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise, we won't have an access to the logger

Copy link
Author

@korzepadawid korzepadawid Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logger is only available through e.deps

if resp == nil {
return errors.Errorf("empty response provided from the webhook")
}

// fandom-start
body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "could not read response body")
}

e.deps.Logger().WithField("response", string(body)).WithField("status_code", resp.StatusCode).Debug("webhook: received response")
// fandom-end

if resp.StatusCode == http.StatusOK {
var hookResponse struct {
Identity *identity.Identity `json:"identity"`
}

if err := json.NewDecoder(resp.Body).Decode(&hookResponse); err != nil {
// todo: replace it with json.NewDecoder(resp.Body).Decode(&hookResponse) to be consistent with upstream
// used json.Unmarshal for now, because json.NewDecoder(resp.Body).Decode(&hookResponse) panics
if err := json.Unmarshal(body, &hookResponse); err != nil {
return errors.Wrap(err, "webhook response could not be unmarshalled properly from JSON")
}

Expand Down Expand Up @@ -595,7 +604,9 @@ func parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error
return nil
} else if resp.StatusCode >= http.StatusBadRequest {
var hookResponse rawHookResponse
if err := json.NewDecoder(resp.Body).Decode(&hookResponse); err != nil {
// todo: replace it with json.NewDecoder(resp.Body).Decode(&hookResponse) to be consistent with upstream
// used json.Unmarshal for now, because json.NewDecoder(resp.Body).Decode(&hookResponse) panics
if err = json.Unmarshal(body, &hookResponse); err != nil {
return errors.Wrap(err, "webhook response could not be unmarshalled properly from JSON")
}

Expand All @@ -618,12 +629,16 @@ func parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error
}
validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, "a webhook target returned an error", messages))
}
// fandom-start
validationErr := schema.NewValidationListError(validationErrs)

if len(validationErrs) == 0 {
e.deps.Logger().WithField("validations", validationErr).Debug("webhook: parsed validations")
return errors.New("error while parsing webhook response: got no validation errors")
}

return schema.NewValidationListError(validationErrs)
return errors.WithStack(validationErr)
// fandom-end
}

return nil
Expand All @@ -633,52 +648,3 @@ func isTimeoutError(err error) bool {
var te interface{ Timeout() bool }
return errors.As(err, &te) && te.Timeout() || errors.Is(err, context.DeadlineExceeded)
}

//nolint:deadcode,unused
func (e *WebHook) parseResponse(resp *http.Response) (err error) {
if resp == nil {
return fmt.Errorf("empty response provided from the webhook")
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "could not read response body")
}

hookResponse := &rawHookResponse{
Messages: []errorMessage{},
}

// fandom-start
e.deps.Logger().WithField("response", string(body)).WithField("status_code", resp.StatusCode).Debug("webhook: received response")
// fandom-end

if err = json.Unmarshal(body, &hookResponse); err != nil {
return errors.Wrap(err, "hook response could not be unmarshalled properly")
}

var validationErrs []*schema.ValidationError

for _, msg := range hookResponse.Messages {
messages := text.Messages{}
for _, detail := range msg.DetailedMessages {
messages.Add(&text.Message{
ID: text.ID(detail.ID),
Text: detail.Text,
Type: text.UITextType(detail.Type),
Context: detail.Context,
})
}
validationErrs = append(validationErrs, schema.NewHookValidationError(msg.InstancePtr, msg.DetailedMessages[0].Text, messages))
}
validationErr := schema.NewValidationListError(validationErrs)

if len(validationErrs) == 0 {
// fandom-start
e.deps.Logger().WithField("validations", validationErr).Debug("webhook: parsed validations")
// fandom-end
return errors.New("error while parsing hook response: got no validation errors")
}

return errors.WithStack(validationErr)
}
8 changes: 7 additions & 1 deletion selfservice/hook/web_hook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ func TestWebHooks(t *testing.T) {
whDeps := struct {
x.SimpleLoggerWithClient
*jsonnetsecure.TestProvider
x.ResilientClientProvider
}{
x.SimpleLoggerWithClient{L: logger, C: reg.HTTPClient(context.Background()), T: otelx.NewNoop(logger, &otelx.Config{ServiceName: "kratos"})},
jsonnetsecure.NewTestProvider(t),
reg,
}
type WebHookRequest struct {
Body string
Expand Down Expand Up @@ -934,8 +936,8 @@ func TestWebHooks(t *testing.T) {
// error.

var wg sync.WaitGroup
wg.Add(3) // HTTP client does 3 attempts
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts := newServer(func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
wg.Add(1)
defer wg.Done()
w.WriteHeader(500)
_, _ = w.Write([]byte(`{"error":"some error"}`))
Expand Down Expand Up @@ -1011,9 +1013,11 @@ func TestDisallowPrivateIPRanges(t *testing.T) {
whDeps := struct {
x.SimpleLoggerWithClient
*jsonnetsecure.TestProvider
x.ResilientClientProvider
}{
x.SimpleLoggerWithClient{L: logger, C: reg.HTTPClient(context.Background()), T: otelx.NewNoop(logger, &otelx.Config{ServiceName: "kratos"})},
jsonnetsecure.NewTestProvider(t),
reg,
}

req := &http.Request{
Expand Down Expand Up @@ -1081,9 +1085,11 @@ func TestAsyncWebhook(t *testing.T) {
whDeps := struct {
x.SimpleLoggerWithClient
*jsonnetsecure.TestProvider
x.ResilientClientProvider
}{
x.SimpleLoggerWithClient{L: logger, C: reg.HTTPClient(context.Background()), T: otelx.NewNoop(logger, &otelx.Config{ServiceName: "kratos"})},
jsonnetsecure.NewTestProvider(t),
reg,
}

req := &http.Request{
Expand Down
Loading