Skip to content

Commit

Permalink
[flyteadmin] Refactor panic recovery into middleware (flyteorg#5546)
Browse files Browse the repository at this point in the history
* Refactor panic handling to middleware

Signed-off-by: Jason Parraga <[email protected]>

* Remove registration of old panicCounter

Signed-off-by: Jason Parraga <[email protected]>

* Add test coverage

Signed-off-by: Jason Parraga <[email protected]>

---------

Signed-off-by: Jason Parraga <[email protected]>
Signed-off-by: Bugra Gedik <[email protected]>
  • Loading branch information
Sovietaced authored and bgedik committed Aug 15, 2024
1 parent 7306683 commit dd2957b
Show file tree
Hide file tree
Showing 17 changed files with 177 additions and 119 deletions.
10 changes: 0 additions & 10 deletions flyteadmin/pkg/rpc/adminservice/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesUpdateRequest) (
*admin.WorkflowAttributesUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -30,7 +29,6 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad

func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesGetRequest) (
*admin.WorkflowAttributesGetResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -48,7 +46,6 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin

func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesDeleteRequest) (
*admin.WorkflowAttributesDeleteResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -66,7 +63,6 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad

func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesUpdateRequest) (
*admin.ProjectDomainAttributesUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -84,7 +80,6 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques

func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesGetRequest) (
*admin.ProjectDomainAttributesGetResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -102,7 +97,6 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *

func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesDeleteRequest) (
*admin.ProjectDomainAttributesDeleteResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -121,7 +115,6 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques
func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *admin.ProjectAttributesUpdateRequest) (
*admin.ProjectAttributesUpdateResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -140,7 +133,6 @@ func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *adm
func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.ProjectAttributesGetRequest) (
*admin.ProjectAttributesGetResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -159,7 +151,6 @@ func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.
func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *admin.ProjectAttributesDeleteRequest) (
*admin.ProjectAttributesDeleteResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -177,7 +168,6 @@ func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *adm

func (m *AdminService) ListMatchableAttributes(ctx context.Context, request *admin.ListMatchableAttributesRequest) (
*admin.ListMatchableAttributesResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
13 changes: 0 additions & 13 deletions flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"runtime/debug"

"github.com/golang/protobuf/proto"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent"
eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications"
Expand Down Expand Up @@ -44,17 +42,6 @@ type AdminService struct {
Metrics AdminMetrics
}

// Intercepts all admin requests to handle panics during execution.
func (m *AdminService) interceptPanic(ctx context.Context, request proto.Message) {
err := recover()
if err == nil {
return
}

m.Metrics.PanicCounter.Inc()
logger.Fatalf(ctx, "panic-ed for request: [%+v] with err: %v with Stack: %v", request, err, string(debug.Stack()))
}

const defaultRetries = 3

func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, configuration runtimeIfaces.Configuration,
Expand Down
40 changes: 0 additions & 40 deletions flyteadmin/pkg/rpc/adminservice/base_test.go

This file was deleted.

2 changes: 0 additions & 2 deletions flyteadmin/pkg/rpc/adminservice/description_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
)

func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin.ObjectGetRequest) (*admin.DescriptionEntity, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -36,7 +35,6 @@ func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin.
}

func (m *AdminService) ListDescriptionEntities(ctx context.Context, request *admin.DescriptionEntityListRequest) (*admin.DescriptionEntityList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
10 changes: 0 additions & 10 deletions flyteadmin/pkg/rpc/adminservice/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

func (m *AdminService) CreateExecution(
ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -32,7 +31,6 @@ func (m *AdminService) CreateExecution(

func (m *AdminService) RelaunchExecution(
ctx context.Context, request *admin.ExecutionRelaunchRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -51,7 +49,6 @@ func (m *AdminService) RelaunchExecution(

func (m *AdminService) RecoverExecution(
ctx context.Context, request *admin.ExecutionRecoverRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -70,7 +67,6 @@ func (m *AdminService) RecoverExecution(

func (m *AdminService) CreateWorkflowEvent(
ctx context.Context, request *admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -89,7 +85,6 @@ func (m *AdminService) CreateWorkflowEvent(

func (m *AdminService) GetExecution(
ctx context.Context, request *admin.WorkflowExecutionGetRequest) (*admin.Execution, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -107,7 +102,6 @@ func (m *AdminService) GetExecution(

func (m *AdminService) UpdateExecution(
ctx context.Context, request *admin.ExecutionUpdateRequest) (*admin.ExecutionUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -126,7 +120,6 @@ func (m *AdminService) UpdateExecution(

func (m *AdminService) GetExecutionData(
ctx context.Context, request *admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -144,7 +137,6 @@ func (m *AdminService) GetExecutionData(

func (m *AdminService) GetExecutionMetrics(
ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -162,7 +154,6 @@ func (m *AdminService) GetExecutionMetrics(

func (m *AdminService) ListExecutions(
ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -180,7 +171,6 @@ func (m *AdminService) ListExecutions(

func (m *AdminService) TerminateExecution(
ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
7 changes: 0 additions & 7 deletions flyteadmin/pkg/rpc/adminservice/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

func (m *AdminService) CreateLaunchPlan(
ctx context.Context, request *admin.LaunchPlanCreateRequest) (*admin.LaunchPlanCreateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -31,7 +30,6 @@ func (m *AdminService) CreateLaunchPlan(
}

func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectGetRequest) (*admin.LaunchPlan, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -55,7 +53,6 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG
}

func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.ActiveLaunchPlanRequest) (*admin.LaunchPlan, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -73,7 +70,6 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A

func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.LaunchPlanUpdateRequest) (
*admin.LaunchPlanUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -97,7 +93,6 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun

func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.ResourceListRequest) (
*admin.LaunchPlanList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand All @@ -116,7 +111,6 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou

func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin.ActiveLaunchPlanListRequest) (
*admin.LaunchPlanList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand All @@ -135,7 +129,6 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin

func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand Down
7 changes: 1 addition & 6 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
package adminservice

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flyte/flyteadmin/pkg/rpc/adminservice/util"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
Expand Down Expand Up @@ -115,8 +113,7 @@ type descriptionEntityEndpointMetrics struct {
}

type AdminMetrics struct {
Scope promutils.Scope
PanicCounter prometheus.Counter
Scope promutils.Scope

executionEndpointMetrics executionEndpointMetrics
launchPlanEndpointMetrics launchPlanEndpointMetrics
Expand All @@ -137,8 +134,6 @@ type AdminMetrics struct {
func InitMetrics(adminScope promutils.Scope) AdminMetrics {
return AdminMetrics{
Scope: adminScope,
PanicCounter: adminScope.MustNewCounter("handler_panic",
"panics encountered while handling requests to the admin service"),

executionEndpointMetrics: executionEndpointMetrics{
scope: adminScope,
Expand Down
61 changes: 61 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package middleware

import (
"context"
"runtime/debug"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

// RecoveryInterceptor is a struct for creating gRPC interceptors that handle panics in go
type RecoveryInterceptor struct {
panicCounter prometheus.Counter
}

// NewRecoveryInterceptor creates a new RecoveryInterceptor with metrics under the provided scope
func NewRecoveryInterceptor(adminScope promutils.Scope) *RecoveryInterceptor {
panicCounter := adminScope.MustNewCounter("handler_panic", "panics encountered while handling gRPC requests")
return &RecoveryInterceptor{
panicCounter: panicCounter,
}
}

// UnaryServerInterceptor returns a new unary server interceptor for panic recovery.
func (ri *RecoveryInterceptor) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) {

defer func() {
if r := recover(); r != nil {
ri.panicCounter.Inc()
logger.Errorf(ctx, "panic-ed for request: [%+v] to %s with err: %v with Stack: %v", req, info.FullMethod, r, string(debug.Stack()))
// Return INTERNAL to client with no info as to not leak implementation details
err = status.Errorf(codes.Internal, "")
}
}()

return handler(ctx, req)
}
}

// StreamServerInterceptor returns a new streaming server interceptor for panic recovery.
func (ri *RecoveryInterceptor) StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {

defer func() {
if r := recover(); r != nil {
ri.panicCounter.Inc()
logger.Errorf(stream.Context(), "panic-ed for stream to %s with err: %v with Stack: %v", info.FullMethod, r, string(debug.Stack()))
// Return INTERNAL to client with no info as to not leak implementation details
err = status.Errorf(codes.Internal, "")
}
}()

return handler(srv, stream)
}
}
Loading

0 comments on commit dd2957b

Please sign in to comment.