Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement gRPC streaming endpoint SubscribeAccountStatuses #5406

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
853e433
Added new API method in interface
AndriiDiachuk Dec 26, 2023
eea461e
Added implementation for newly added method in API interface, generat…
AndriiDiachuk Jan 3, 2024
47654c9
Added test cases for new API method
AndriiDiachuk Jan 5, 2024
9e4721c
Merged and fixed errors
AndriiDiachuk Feb 13, 2024
18eeb78
Added message index to response of account statuses sub
AndriiDiachuk Feb 15, 2024
e7d6860
Passing address to response
AndriiDiachuk Feb 16, 2024
f737644
Added small description
AndriiDiachuk Feb 20, 2024
db9de36
Merged
AndriiDiachuk Feb 27, 2024
d0e19e9
Removed empty lines
AndriiDiachuk Feb 27, 2024
ededd30
added pointers
AndriiDiachuk Feb 27, 2024
d85591e
Added godoc and error description for account statuses backend function
AndriiDiachuk Feb 27, 2024
71e8b39
Created var instead of calling index value two times
AndriiDiachuk Feb 27, 2024
97d1f4f
Added missing godocs
AndriiDiachuk Feb 27, 2024
05c5936
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 27, 2024
7fe7ed7
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
kc1116 Feb 28, 2024
363d543
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 28, 2024
790a5fc
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 28, 2024
21795af
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Feb 29, 2024
423de94
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Mar 4, 2024
24011ac
Added filtering for address
AndriiDiachuk Mar 9, 2024
4827412
Merge branch 'master' into AndriiDiachuk/access-grpc-streaming-endpoi…
franklywatson Mar 11, 2024
05a77b0
fixed tests and field filters init
AndriiDiachuk Mar 12, 2024
44ac85f
Merge branch 'AndriiDiachuk/access-grpc-streaming-endpoint-SubscribeA…
AndriiDiachuk Mar 12, 2024
82e7ee4
Merged
AndriiDiachuk Mar 15, 2024
ce18edd
Generated mocks
AndriiDiachuk Mar 15, 2024
0f195f4
fixed issue with replace proto
AndriiDiachuk Mar 15, 2024
b99c04f
Updated replace commit
AndriiDiachuk Mar 15, 2024
57986e3
Added logging and error
AndriiDiachuk Mar 15, 2024
2a4b72a
Fixed rest of the comments, linted
AndriiDiachuk Mar 15, 2024
6633027
Fixed issues from comments
AndriiDiachuk Mar 18, 2024
4b8d9c6
Merge branch 'master' of https://github.com/AndriiDiachuk/flow-go int…
AndriiDiachuk Mar 18, 2024
896ceda
Fixed test
AndriiDiachuk Mar 20, 2024
d2d4f2a
Fixed remarks from comment
AndriiDiachuk Mar 20, 2024
339247b
Added godoc
AndriiDiachuk Mar 20, 2024
d7a0dd5
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 21, 2024
41cd112
Fixed remarks from comments
AndriiDiachuk Mar 22, 2024
98a5df6
Linted
AndriiDiachuk Mar 22, 2024
125db0e
Merged
AndriiDiachuk Mar 25, 2024
d373009
Added more test cases to test
AndriiDiachuk Mar 26, 2024
64d5162
Fixed test
AndriiDiachuk Mar 26, 2024
7aba525
Added max account addresses to config
AndriiDiachuk Mar 27, 2024
9c782ae
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 27, 2024
a6043ae
Changed var creation, fixed godoc for struct
AndriiDiachuk Mar 27, 2024
63460b4
Removed empty line
AndriiDiachuk Mar 27, 2024
e7aaf68
Added godoc for EventsRetriever
AndriiDiachuk Mar 27, 2024
360b7e5
Refactored AccountStatuses suite
AndriiDiachuk Mar 27, 2024
296f22b
Created commons functions for both test suites
AndriiDiachuk Mar 27, 2024
32320a5
Linted
AndriiDiachuk Mar 27, 2024
d53dcf9
Added values check in test
AndriiDiachuk Mar 27, 2024
7d6bebd
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 27, 2024
cd92820
Fixed remarks, done some refactoring
AndriiDiachuk Mar 29, 2024
2482500
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into Andrii…
AndriiDiachuk Mar 29, 2024
897b34b
Added test cases, fixed unexpected behaviour when empty account addre…
AndriiDiachuk Mar 29, 2024
03dd4cc
Updated flow project version
AndriiDiachuk Mar 29, 2024
d43b121
Refactored function
AndriiDiachuk Mar 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.stateStreamConf.MaxAddresses = value
case "Contracts":
builder.stateStreamConf.MaxContracts = value
case "AccountAddresses":
builder.stateStreamConf.MaxAccountAddress = value
}
}
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled
Expand Down Expand Up @@ -1223,17 +1225,17 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
if builder.stateStreamConf.ClientSendBufferSize == 0 {
return errors.New("state-stream-send-buffer-size must be greater than 0")
}
if len(builder.stateStreamFilterConf) > 3 {
return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts)")
if len(builder.stateStreamFilterConf) > 4 {
return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts, AccountAddresses)")
}
for key, value := range builder.stateStreamFilterConf {
switch key {
case "EventTypes", "Addresses", "Contracts":
case "EventTypes", "Addresses", "Contracts", "AccountAddresses":
if value <= 0 {
return fmt.Errorf("state-stream-event-filter-limits %s must be greater than 0", key)
}
default:
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts")
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts, AccountAddresses")
}
}
if builder.stateStreamConf.ResponseLimit < 0 {
Expand Down
10 changes: 6 additions & 4 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,17 +761,17 @@ func (builder *ObserverServiceBuilder) extraFlags() {
if builder.stateStreamConf.ClientSendBufferSize == 0 {
return errors.New("state-stream-send-buffer-size must be greater than 0")
}
if len(builder.stateStreamFilterConf) > 3 {
return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts)")
if len(builder.stateStreamFilterConf) > 4 {
return errors.New("state-stream-event-filter-limits must have at most 4 keys (EventTypes, Addresses, Contracts, AccountAddresses)")
}
for key, value := range builder.stateStreamFilterConf {
switch key {
case "EventTypes", "Addresses", "Contracts":
case "EventTypes", "Addresses", "Contracts", "AccountAddresses":
if value <= 0 {
return fmt.Errorf("state-stream-event-filter-limits %s must be greater than 0", key)
}
default:
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts")
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts, AccountAddresses")
}
}
if builder.stateStreamConf.ResponseLimit < 0 {
Expand Down Expand Up @@ -1373,6 +1373,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.stateStreamConf.MaxAddresses = value
case "Contracts":
builder.stateStreamConf.MaxContracts = value
case "AccountAddresses":
builder.stateStreamConf.MaxAccountAddress = value
}
}
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled
Expand Down
226 changes: 226 additions & 0 deletions engine/access/state_stream/account_status_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package state_stream

import (
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
)

// Core event types based on documentation https://cadence-lang.org/docs/language/core-events
const (
// CoreEventAccountCreated is emitted when a new account gets created
CoreEventAccountCreated = "flow.AccountCreated"

// CoreEventAccountKeyAdded is emitted when a key gets added to an account
CoreEventAccountKeyAdded = "flow.AccountKeyAdded"

// CoreEventAccountKeyRemoved is emitted when a key gets removed from an account
CoreEventAccountKeyRemoved = "flow.AccountKeyRemoved"

// CoreEventAccountContractAdded is emitted when a contract gets deployed to an account
CoreEventAccountContractAdded = "flow.AccountContractAdded"

// CoreEventAccountContractUpdated is emitted when a contract gets updated on an account
CoreEventAccountContractUpdated = "flow.AccountContractUpdated"

// CoreEventAccountContractRemoved is emitted when a contract gets removed from an account
CoreEventAccountContractRemoved = "flow.AccountContractRemoved"

// CoreEventInboxValuePublished is emitted when a Capability is published from an account
CoreEventInboxValuePublished = "flow.InboxValuePublished"

// CoreEventInboxValueUnpublished is emitted when a Capability is unpublished from an account
CoreEventInboxValueUnpublished = "flow.InboxValueUnpublished"

// CoreEventInboxValueClaimed is emitted when a Capability is claimed by an account
CoreEventInboxValueClaimed = "flow.InboxValueClaimed"
)

var defaultCoreEventsMap map[string]map[string]struct{}

func init() {
defaultCoreEventsMap = make(map[string]map[string]struct{}, len(DefaultCoreEvents))

addFilter := func(eventType, field string) {
if _, ok := defaultCoreEventsMap[eventType]; !ok {
defaultCoreEventsMap[eventType] = make(map[string]struct{})
}
defaultCoreEventsMap[eventType][field] = struct{}{}
}

for _, eventType := range DefaultCoreEvents {
switch eventType {
case CoreEventAccountCreated,
CoreEventAccountKeyAdded,
CoreEventAccountKeyRemoved,
CoreEventAccountContractAdded,
CoreEventAccountContractUpdated,
CoreEventAccountContractRemoved:
addFilter(eventType, "address")
Copy link
Contributor

@peterargue peterargue Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is only run once, you can simplify this

switch eventType {
	...
	case CoreEventAccountContractRemoved:
		defaultCoreEventsMap[eventType] = map[string]struct{}{
			"address": {},
		}
}

case CoreEventInboxValuePublished,
CoreEventInboxValueClaimed:
addFilter(eventType, "provider")
addFilter(eventType, "recipient")
case CoreEventInboxValueUnpublished:
addFilter(eventType, "provider")
default:
panic(fmt.Errorf("unsupported event type: %s", eventType))
}
}
}

// DefaultCoreEvents is an array containing all default core event types.
var DefaultCoreEvents = []string{
CoreEventAccountCreated,
CoreEventAccountKeyAdded,
CoreEventAccountKeyRemoved,
CoreEventAccountContractAdded,
CoreEventAccountContractUpdated,
CoreEventAccountContractRemoved,
CoreEventInboxValuePublished,
CoreEventInboxValueUnpublished,
CoreEventInboxValueClaimed,
}

// AccountStatusFilter defines a specific filter for account statuses.
// It embeds the EventFilter type to inherit its functionality.
type AccountStatusFilter struct {
*EventFilter
}

// NewAccountStatusFilter creates a new AccountStatusFilter based on the provided configuration.
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
// Expected errors:
// - error: An error, if any, encountered during core event type validating, check for max account addresses
// or validating account addresses.
func NewAccountStatusFilter(
config EventFilterConfig,
chain flow.Chain,
eventTypes []string,
accountAddresses []string,
) (AccountStatusFilter, error) {
if len(accountAddresses) == 0 {
// If `accountAddresses` is empty, the validation on `addCoreEventFieldFilter` would not happen.
// Therefore, event types are validated with `validateCoreEventTypes` to fail at the beginning of filter creation.
err := validateCoreEventTypes(eventTypes)
if err != nil {
return AccountStatusFilter{}, err
}
} else if len(accountAddresses) > DefaultMaxAccountAddresses {
// If `accountAddresses` exceeds the `DefaultAccountAddressesLimit`, it returns an error.
return AccountStatusFilter{}, fmt.Errorf("account limit exceeds, the limit is %d", DefaultMaxAccountAddresses)
}

// If `eventTypes` is empty, the filter returns all core events for any accounts.
if len(eventTypes) == 0 {
eventTypes = DefaultCoreEvents
}

// It's important to only set eventTypes if there are no addresses passed.
var filterEventTypes []string
if len(accountAddresses) == 0 {
filterEventTypes = eventTypes
}

// Creates an `EventFilter` with the provided `eventTypes`.
filter, err := NewEventFilter(config, chain, filterEventTypes, []string{}, []string{})
if err != nil {
return AccountStatusFilter{}, err
}

accountStatusFilter := AccountStatusFilter{
EventFilter: &filter,
}

for _, address := range accountAddresses {
// Validate account address
addr := flow.HexToAddress(address)
if err := validateAddress(addr, chain); err != nil {
return AccountStatusFilter{}, err
}

// If there are non-core event types at this stage, it returns an error from `addCoreEventFieldFilter`.
for _, eventType := range eventTypes {
// use the hex with prefix address to make sure it will match the cadence address
err = accountStatusFilter.addCoreEventFieldFilter(flow.EventType(eventType), addr.HexWithPrefix())
if err != nil {
return AccountStatusFilter{}, err
}
}
}

// We need to set hasFilters here if filterEventTypes was empty
accountStatusFilter.hasFilters = len(accountStatusFilter.EventFieldFilters) > 0 || len(eventTypes) > 0

return accountStatusFilter, nil
}

// GroupCoreEventsByAccountAddress extracts account-related core events from the provided list of events.
// It filters events based on the account field specified by the event type and organizes them by account address.
// Parameters:
// - events: The list of events to extract account-related core events from.
// - log: The logger to log errors encountered during event decoding and processing.
// Returns:
// - A map[string]flow.EventsList: A map where the key is the account address and the value is a list of
// account-related core events associated with that address.
func (f *AccountStatusFilter) GroupCoreEventsByAccountAddress(events flow.EventsList, log zerolog.Logger) map[string]flow.EventsList {
allAccountProtocolEvents := make(map[string]flow.EventsList)

for _, event := range events {
fields, fieldValues, err := getEventFields(&event)
if err != nil {
log.Info().Err(err).Msg("could not get event fields")
continue
}

//accountField := f.EventFieldFilters[event.Type]
accountField := defaultCoreEventsMap[string(event.Type)]
for i, field := range fields {
_, ok := accountField[field.Identifier]
if ok {
address := fieldValues[i].String()
allAccountProtocolEvents[address] = append(allAccountProtocolEvents[address], event)
}
}
}

return allAccountProtocolEvents
}

// addCoreEventFieldFilter adds a field filter for each core event type
func (f *AccountStatusFilter) addCoreEventFieldFilter(eventType flow.EventType, address string) error {
// Get the field associated with the event type from the defaultCoreEventsMap
fields, ok := defaultCoreEventsMap[string(eventType)]
if !ok {
return fmt.Errorf("unsupported event type: %s", eventType)
}

// Add the field filter for each field associated with the event type
for field := range fields {
if _, ok := f.EventFieldFilters[eventType]; !ok {
f.EventFieldFilters[eventType] = make(FieldFilter)
}
if _, ok := f.EventFieldFilters[eventType][field]; !ok {
f.EventFieldFilters[eventType][field] = make(map[string]struct{})
}
f.EventFieldFilters[eventType][field][address] = struct{}{}
}

return nil
}

// validateCoreEventTypes validates the provided event types against the default core event types.
// It returns an error if any of the provided event types are not in the default core event types list. Note, an empty
// event types array is also valid.
func validateCoreEventTypes(eventTypes []string) error {
for _, eventType := range eventTypes {
_, ok := defaultCoreEventsMap[eventType]
// If the provided event type does not match any of the default core event types, return an error
if !ok {
return fmt.Errorf("invalid provided event types for filter")
}
}

return nil // All provided event types are valid core event types or event types are empty
}
Loading
Loading