Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement gRPC streaming endpoint SubscribeAccountStatuses #5406

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
853e433
Added new API method in interface
AndriiDiachuk Dec 26, 2023
eea461e
Added implementation for newly added method in API interface, generat…
AndriiDiachuk Jan 3, 2024
47654c9
Added test cases for new API method
AndriiDiachuk Jan 5, 2024
9e4721c
Merged and fixed errors
AndriiDiachuk Feb 13, 2024
18eeb78
Added message index to response of account statuses sub
AndriiDiachuk Feb 15, 2024
e7d6860
Passing address to response
AndriiDiachuk Feb 16, 2024
f737644
Added small description
AndriiDiachuk Feb 20, 2024
db9de36
Merged
AndriiDiachuk Feb 27, 2024
d0e19e9
Removed empty lines
AndriiDiachuk Feb 27, 2024
ededd30
added pointers
AndriiDiachuk Feb 27, 2024
d85591e
Added godoc and error description for account statuses backend function
AndriiDiachuk Feb 27, 2024
71e8b39
Created var instead of calling index value two times
AndriiDiachuk Feb 27, 2024
97d1f4f
Added missing godocs
AndriiDiachuk Feb 27, 2024
05c5936
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 27, 2024
7fe7ed7
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
kc1116 Feb 28, 2024
363d543
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 28, 2024
790a5fc
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 28, 2024
21795af
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 29, 2024
423de94
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Mar 4, 2024
24011ac
Added filtering for address
AndriiDiachuk Mar 9, 2024
4827412
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Mar 11, 2024
05a77b0
fixed tests and field filters init
AndriiDiachuk Mar 12, 2024
44ac85f
Merge branch 'AndriiDiachuk/access-grpc-streaming-endpoint-SubscribeA…
AndriiDiachuk Mar 12, 2024
82e7ee4
Merged
AndriiDiachuk Mar 15, 2024
ce18edd
Generated mocks
AndriiDiachuk Mar 15, 2024
0f195f4
fixed issue with replace proto
AndriiDiachuk Mar 15, 2024
b99c04f
Updated replace commit
AndriiDiachuk Mar 15, 2024
57986e3
Added logging and error
AndriiDiachuk Mar 15, 2024
2a4b72a
Fixed rest of the comments, linted
AndriiDiachuk Mar 15, 2024
6633027
Fixed issues from comments
AndriiDiachuk Mar 18, 2024
4b8d9c6
Merge branch 'master' of https://github.com/AndriiDiachuk/flow-go int…
AndriiDiachuk Mar 18, 2024
896ceda
Fixed test
AndriiDiachuk Mar 20, 2024
d2d4f2a
Fixed remarks from comment
AndriiDiachuk Mar 20, 2024
339247b
Added godoc
AndriiDiachuk Mar 20, 2024
d7a0dd5
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 21, 2024
41cd112
Fixed remarks from comments
AndriiDiachuk Mar 22, 2024
98a5df6
Linted
AndriiDiachuk Mar 22, 2024
125db0e
Merged
AndriiDiachuk Mar 25, 2024
d373009
Added more test cases to test
AndriiDiachuk Mar 26, 2024
64d5162
Fixed test
AndriiDiachuk Mar 26, 2024
7aba525
Added max account addresses to config
AndriiDiachuk Mar 27, 2024
9c782ae
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 27, 2024
a6043ae
Changed var creation, fixed godoc for struct
AndriiDiachuk Mar 27, 2024
63460b4
Removed empty line
AndriiDiachuk Mar 27, 2024
e7aaf68
Added godoc for EventsRetriever
AndriiDiachuk Mar 27, 2024
360b7e5
Refactored AccountStatuses suite
AndriiDiachuk Mar 27, 2024
296f22b
Created commons functions for both test suites
AndriiDiachuk Mar 27, 2024
32320a5
Linted
AndriiDiachuk Mar 27, 2024
d53dcf9
Added values check in test
AndriiDiachuk Mar 27, 2024
7d6bebd
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 27, 2024
cd92820
Fixed remarks, done some refactoring
AndriiDiachuk Mar 29, 2024
2482500
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 29, 2024
897b34b
Added test cases, fixed unexpected behaviour when empty account addre…
AndriiDiachuk Mar 29, 2024
03dd4cc
Updated flow project version
AndriiDiachuk Mar 29, 2024
d43b121
Refactored function
AndriiDiachuk Mar 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error)
type StateStreamBackend struct {
ExecutionDataBackend
EventsBackend
AccountStatusesBackend

log zerolog.Logger
state protocol.State
Expand Down Expand Up @@ -154,6 +155,16 @@ func New(
eventsIndex: eventsIndex,
}

b.AccountStatusesBackend = AccountStatusesBackend{
log: logger,
broadcaster: broadcaster,
sendTimeout: config.ClientSendTimeout,
responseLimit: config.ResponseLimit,
sendBufferSize: int(config.ClientSendBufferSize),
getExecutionData: b.getExecutionData,
getStartHeight: b.getStartHeight,
}

return b, nil
}

Expand Down
79 changes: 79 additions & 0 deletions engine/access/state_stream/backend/backend_account_statuses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package backend

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/utils/logging"
)

type AccountStatusesResponse struct {
BlockID flow.Identifier
Events flow.EventsList
MessageIndex uint64
}

type AccountStatusesBackend struct {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
log zerolog.Logger
broadcaster *engine.Broadcaster
sendTimeout time.Duration
responseLimit float64
sendBufferSize int

getExecutionData GetExecutionDataFunc
getStartHeight GetStartHeightFunc
}

func (b AccountStatusesBackend) SubscribeAccountStatuses(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.StatusFilter) state_stream.Subscription {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
if err != nil {
return NewFailedSubscription(err, "could not get start height")
}

messageIndex := counters.NewMonotonousCounter(0)

AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
sub := NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getAccountStatusResponseFactory(&messageIndex, filter))

AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
go NewStreamer(b.log, b.broadcaster, b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

// getAccountStatusResponseFactory returns a function function that returns the account statuses response for a given height.
func (b AccountStatusesBackend) getAccountStatusResponseFactory(messageIndex *counters.StrictMonotonousCounter, filter state_stream.StatusFilter) GetDataByHeightFunc {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

events := []flow.Event{}
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
}

b.log.Trace().
Hex("block_id", logging.ID(executionData.BlockID)).
Uint64("height", height).
Msgf("sending %d events", len(events))
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved

response := &AccountStatusesResponse{
BlockID: executionData.BlockID,
Events: events,
MessageIndex: messageIndex.Value(),
}

if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
b.log.Debug().Msg("message index already incremented")
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}

return response, nil
}
}
214 changes: 214 additions & 0 deletions engine/access/state_stream/backend/backend_account_statuses_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package backend

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

var testCoreEventTypes = []flow.EventType{
"flow.AccountCreated",
"flow.AccountKeyAdded",
"flow.AccountKeyRemoved",
}

type BackendAccountStatusesSuite struct {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
BackendExecutionDataSuite
}

func TestBackendAccountStatusesSuite(t *testing.T) {
suite.Run(t, new(BackendAccountStatusesSuite))
}

func (s *BackendAccountStatusesSuite) SetupTest() {
s.BackendExecutionDataSuite.SetupTest()
}

// TestSubscribeAccountStatuses tests the SubscribeAccountStatuses method happy path
func (s *BackendAccountStatusesSuite) TestSubscribeAccountStatuses() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var err error

type testType struct {
name string
highestBackfill int
startBlockID flow.Identifier
startHeight uint64
filters state_stream.StatusFilter
}

baseTests := []testType{
{
name: "happy path - all new blocks",
highestBackfill: -1, // no backfill
startBlockID: flow.ZeroID,
startHeight: 0,
},
{
name: "happy path - partial backfill",
highestBackfill: 2, // backfill the first 3 blocks
startBlockID: flow.ZeroID,
startHeight: s.blocks[0].Header.Height,
},
{
name: "happy path - complete backfill",
highestBackfill: len(s.blocks) - 1, // backfill all blocks
startBlockID: s.blocks[0].ID(),
startHeight: 0,
},
{
name: "happy path - start from root block by height",
highestBackfill: len(s.blocks) - 1, // backfill all blocks
startBlockID: flow.ZeroID,
startHeight: s.backend.rootBlockHeight, // start from root block
},
{
name: "happy path - start from root block by id",
highestBackfill: len(s.blocks) - 1, // backfill all blocks
startBlockID: s.backend.rootBlockID, // start from root block
startHeight: 0,
},
}

// create variations for each of the base test
tests := make([]testType, 0, len(baseTests)*3)
for _, test := range baseTests {
t1 := test
t1.name = fmt.Sprintf("%s - all events", test.name)
t1.filters = state_stream.StatusFilter{}
tests = append(tests, t1)

t2 := test
t2.name = fmt.Sprintf("%s - some events", test.name)
t2.filters, err = state_stream.NewStatusFilter([]string{string(testEventTypes[0])}, chainID.Chain())
require.NoError(s.T(), err)
tests = append(tests, t2)

t3 := test
t3.name = fmt.Sprintf("%s - no events", test.name)
t3.filters, err = state_stream.NewStatusFilter([]string{"A.0x1.NonExistent.Event"}, chainID.Chain())
require.NoError(s.T(), err)
tests = append(tests, t3)
}

for _, test := range tests {
s.Run(test.name, func() {
s.T().Logf("len(s.execDataMap) %d", len(s.execDataMap))

// add "backfill" block - blocks that are already in the database before the test starts
// this simulates a subscription on a past block
for i := 0; i <= test.highestBackfill; i++ {
s.T().Logf("backfilling block %d", i)
s.backend.setHighestHeight(s.blocks[i].Header.Height)
}

subCtx, subCancel := context.WithCancel(ctx)
sub := s.backend.SubscribeAccountStatuses(subCtx, test.startBlockID, test.startHeight, test.filters)

expectedMsgIndex := uint64(0)

// loop over all of the blocks
for i, b := range s.blocks {
s.T().Logf("checking block %d %v", i, b.ID())

// simulate new exec data received.
// exec data for all blocks with index <= highestBackfill were already received
if i > test.highestBackfill {
s.backend.setHighestHeight(b.Header.Height)
s.broadcaster.Publish()
}

expectedEvents := flow.EventsList{}
for _, event := range s.blockEvents[b.ID()] {
if test.filters.Match(event) {
expectedEvents = append(expectedEvents, event)
}
}

// consume execution data from subscription
unittest.RequireReturnsBefore(s.T(), func() {
v, ok := <-sub.Channel()
require.True(s.T(), ok, "channel closed while waiting for exec data for block %d %v: err: %v", b.Header.Height, b.ID(), sub.Err())

resp, ok := v.(*AccountStatusesResponse)
require.True(s.T(), ok, "unexpected response type: %T", v)

assert.Equal(s.T(), b.Header.ID(), resp.BlockID)
assert.Equal(s.T(), expectedEvents, resp.Events)
assert.Equal(s.T(), expectedMsgIndex, resp.MessageIndex)
}, time.Second, fmt.Sprintf("timed out waiting for exec data for block %d %v", b.Header.Height, b.ID()))

expectedMsgIndex++
}

// make sure there are no new messages waiting. the channel should be opened with nothing waiting
unittest.RequireNeverReturnBefore(s.T(), func() {
<-sub.Channel()
}, 100*time.Millisecond, "timed out waiting for subscription to shutdown")

// stop the subscription
subCancel()

// ensure subscription shuts down gracefully
unittest.RequireReturnsBefore(s.T(), func() {
v, ok := <-sub.Channel()
assert.Nil(s.T(), v)
assert.False(s.T(), ok)
assert.ErrorIs(s.T(), sub.Err(), context.Canceled)
}, 100*time.Millisecond, "timed out waiting for subscription to shutdown")
})
}
}

func (s *BackendExecutionDataSuite) TestSubscribeAccountStatusesHandlesErrors() {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("returns error if both start blockID and start height are provided", func() {
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()

sub := s.backend.SubscribeAccountStatuses(subCtx, unittest.IdentifierFixture(), 1, state_stream.StatusFilter{})
assert.Equal(s.T(), codes.InvalidArgument, status.Code(sub.Err()))
})

s.Run("returns error for start height before root height", func() {
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()

sub := s.backend.SubscribeAccountStatuses(subCtx, flow.ZeroID, s.backend.rootBlockHeight-1, state_stream.StatusFilter{})
assert.Equal(s.T(), codes.InvalidArgument, status.Code(sub.Err()), "expected InvalidArgument, got %v: %v", status.Code(sub.Err()).String(), sub.Err())
})

s.Run("returns error for unindexed start blockID", func() {
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()

sub := s.backend.SubscribeAccountStatuses(subCtx, unittest.IdentifierFixture(), 0, state_stream.StatusFilter{})
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected NotFound, got %v: %v", status.Code(sub.Err()).String(), sub.Err())
})

// make sure we're starting with a fresh cache
s.execDataHeroCache.Clear()

s.Run("returns error for unindexed start height", func() {
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()

sub := s.backend.SubscribeAccountStatuses(subCtx, flow.ZeroID, s.blocks[len(s.blocks)-1].Header.Height+10, state_stream.StatusFilter{})
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected NotFound, got %v: %v", status.Code(sub.Err()).String(), sub.Err())
})
}
Loading
Loading