Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Petera/handle resourceexhaused ingestion #686

Merged
merged 6 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

pebbleDB "github.com/cockroachdb/pebble"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go/fvm/environment"
Expand All @@ -21,9 +20,12 @@ import (
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-evm-gateway/api"
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/ingestion"
Expand All @@ -33,6 +35,19 @@ import (
"github.com/onflow/flow-evm-gateway/storage/pebble"
)

const (
// DefaultMaxMessageSize is the default maximum message size for gRPC responses
DefaultMaxMessageSize = 1024 * 1024 * 1024

// DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns
// a ResourceExhausted error.
DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond

// DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server
// ResourceExhausted errors.
DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second
)

type Storages struct {
Storage *pebble.Storage
Registers *pebble.RegisterStorage
Expand Down Expand Up @@ -452,7 +467,13 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
// create access client with cross-spork capabilities
currentSporkClient, err := grpc.NewClient(
config.AccessNodeHost,
grpc.WithGRPCDialOptions(grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(1024*1024*1024))),
grpc.WithGRPCDialOptions(
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
grpcOpts.WithUnaryInterceptor(retryInterceptor(
DefaultResourceExhaustedMaxRetryDelay,
DefaultResourceExhaustedRetryDelay,
)),
),
)
if err != nil {
return nil, fmt.Errorf(
Expand Down Expand Up @@ -487,6 +508,44 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
return client, nil
}

// retryInterceptor is a gRPC client interceptor that retries the request when the server returns
// a ResourceExhausted error
func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpcOpts.ClientConn,
invoker grpcOpts.UnaryInvoker,
opts ...grpcOpts.CallOption,
) error {
start := time.Now()
attempts := 0
for {
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
return nil
}

if status.Code(err) != codes.ResourceExhausted {
return err
}

attempts++
duration := time.Since(start)
if duration >= maxDuration {
return fmt.Errorf("request failed (attempts: %d, duration: %v): %w", attempts, duration, err)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pauseDuration):
}
}
}
}

// setupStorage creates storage and initializes it with configured starting cadence height
// in case such a height doesn't already exist in the database.
func setupStorage(
Expand Down Expand Up @@ -570,9 +629,9 @@ func setupStorage(
Stringer("fvm_address_for_evm_storage_account", storageAddress).
Msgf("database initialized with cadence height: %d", cadenceHeight)
}
//else {
// else {
// // TODO(JanezP): verify storage account owner is correct
//}
// }

return db, &Storages{
Storage: store,
Expand Down
88 changes: 88 additions & 0 deletions bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package bootstrap

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestRetryInterceptor(t *testing.T) {
expecterErr := status.Error(codes.ResourceExhausted, "resource exhausted")
interceptor := retryInterceptor(100*time.Millisecond, 10*time.Millisecond)

testCases := []struct {
name string
invoker func(callCount int) error
maxRequestTime time.Duration
callCount int // expect exact count
minCallCount int // min, for when using a timeout
expectedErr error
}{
{
name: "no error",
invoker: func(callCount int) error {
return nil
},
maxRequestTime: 10 * time.Millisecond,
callCount: 1,
expectedErr: nil,
},
{
name: "succeeds on 3rd attempt",
invoker: func(callCount int) error {
if callCount >= 3 {
return nil
}
return expecterErr
},
maxRequestTime: 40 * time.Millisecond,
callCount: 3,
expectedErr: nil,
},
{
name: "fails after timeout",
invoker: func(callCount int) error {
return expecterErr
},
maxRequestTime: 150 * time.Millisecond, // add a buffer for test slowness
minCallCount: 10,
expectedErr: expecterErr,
},
}

for _, tc := range testCases {
tc := tc
Copy link
Contributor Author

@peterargue peterargue Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not including this was causing lint errors which is strange since the loop var update should be included since go1.22.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the golangci-lint version that we use: https://github.com/onflow/flow-evm-gateway/blob/main/.github/workflows/ci.yml#L41. I tried with the latest ([email protected]), and it doesn't produce this lint error. I will open up a new PR to update it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #718 .

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

callCount := 0
invoker := func(context.Context, string, any, any, *grpc.ClientConn, ...grpc.CallOption) error {
callCount++
return tc.invoker(callCount)
}

start := time.Now()
err := interceptor(
context.Background(), "", nil, nil, nil,
invoker,
)
if tc.expectedErr != nil {
assert.ErrorIs(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}

if tc.minCallCount > 0 {
assert.GreaterOrEqual(t, callCount, tc.minCallCount)
} else {
assert.Equal(t, callCount, tc.callCount)
}
assert.LessOrEqual(t, time.Since(start), tc.maxRequestTime)
})
}
}
Comment on lines +58 to +88
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in parallel tests.

Since tests are running in parallel (t.Parallel()), the callCount variable should be protected against concurrent access. While this might not cause issues in the current implementation since each test case has its own callCount, it's better to be explicit about thread safety.

 t.Run(tc.name, func(t *testing.T) {
     t.Parallel()
 
-    callCount := 0
+    var callCount atomic.Int32
     invoker := func(context.Context, string, any, any, *grpc.ClientConn, ...grpc.CallOption) error {
-        callCount++
+        currentCount := callCount.Add(1)
-        return tc.invoker(callCount)
+        return tc.invoker(int(currentCount))
     }

Don't forget to add "sync/atomic" to your imports.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really needed? It looks like the the variable is scoped to a single test run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterargue, you’re correct that each subtest has its own callCount scoped to that test’s goroutine, so there’s little risk of a race condition here. Since callCount isn’t shared across multiple subtests or invoked concurrently outside the test function, using an atomic variable isn’t strictly necessary. Keeping it as an integer is sufficient if you’re confident there’s no other concurrent access to callCount.

Loading