diff --git a/app/node/builder.go b/app/node/builder.go index af606fc015..23f7f46ddd 100644 --- a/app/node/builder.go +++ b/app/node/builder.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/pkg/errors" + "github.com/filecoin-project/venus/app/submodule/actorevent" "github.com/filecoin-project/venus/app/submodule/blockstore" "github.com/filecoin-project/venus/app/submodule/chain" "github.com/filecoin-project/venus/app/submodule/common" @@ -171,6 +172,10 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { return nil, err } + if nd.actorEvent, err = actorevent.NewActorEventSubModule(ctx, b.repo.Config(), nd.chain, nd.eth); err != nil { + return nil, err + } + apiBuilder := NewBuilder() apiBuilder.NameSpace("Filecoin") @@ -188,6 +193,7 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { nd.market, nd.common, nd.eth, + nd.actorEvent, ) if err != nil { diff --git a/app/node/node.go b/app/node/node.go index 6134eef64a..8a55c3baac 100644 --- a/app/node/node.go +++ b/app/node/node.go @@ -11,6 +11,7 @@ import ( "github.com/awnumar/memguard" "github.com/etherlabsio/healthcheck/v2" "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/venus/app/submodule/actorevent" "github.com/filecoin-project/venus/app/submodule/blockstore" chain2 "github.com/filecoin-project/venus/app/submodule/chain" "github.com/filecoin-project/venus/app/submodule/common" @@ -100,7 +101,8 @@ type Node struct { common *common.CommonModule - eth *eth.EthSubModule + eth *eth.EthSubModule + actorEvent *actorevent.ActorEventSubModule // // Jsonrpc diff --git a/app/node/rpc.go b/app/node/rpc.go index 04a3a55fb2..549e48eecf 100644 --- a/app/node/rpc.go +++ b/app/node/rpc.go @@ -5,6 +5,7 @@ import ( "reflect" "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/venus/app/submodule/actorevent" "github.com/filecoin-project/venus/app/submodule/eth" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" @@ -40,6 +41,7 @@ func (builder *RPCBuilder) AddServices(services ...RPCService) error { } var ethSubModuleTyp = reflect.TypeOf(ð.EthSubModule{}).Elem() +var actorEventSubModuleTyp = reflect.TypeOf(&actorevent.ActorEventSubModule{}).Elem() func skipV0API(in interface{}) bool { inT := reflect.TypeOf(in) @@ -47,7 +49,7 @@ func skipV0API(in interface{}) bool { inT = inT.Elem() } - return inT.AssignableTo(ethSubModuleTyp) + return inT.AssignableTo(ethSubModuleTyp) || inT.AssignableTo(actorEventSubModuleTyp) } func (builder *RPCBuilder) AddV0API(service RPCService) error { diff --git a/app/submodule/actorevent/actor_event.go b/app/submodule/actorevent/actor_event.go new file mode 100644 index 0000000000..26d0156b56 --- /dev/null +++ b/app/submodule/actorevent/actor_event.go @@ -0,0 +1,364 @@ +package actorevent + +import ( + "context" + "fmt" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/raulk/clock" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/venus/pkg/events/filter" + "github.com/filecoin-project/venus/venus-shared/api" + v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + "github.com/filecoin-project/venus/venus-shared/types" +) + +var log = logging.Logger("actor_event") + +type ChainAccessor interface { + GetHead() *types.TipSet +} + +type EventFilterManager interface { + Install( + ctx context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, + ) (filter.EventFilter, error) + Remove(ctx context.Context, id types.FilterID) error +} + +type ActorEventHandler struct { // nolint + chain ChainAccessor + eventFilterManager EventFilterManager + blockDelay time.Duration + maxFilterHeightRange abi.ChainEpoch + clock clock.Clock +} + +var _ v1api.IActorEvent = (*ActorEventHandler)(nil) + +func NewActorEventHandler( + chain ChainAccessor, + eventFilterManager EventFilterManager, + blockDelay time.Duration, + maxFilterHeightRange abi.ChainEpoch, +) *ActorEventHandler { + return &ActorEventHandler{ + chain: chain, + eventFilterManager: eventFilterManager, + blockDelay: blockDelay, + maxFilterHeightRange: maxFilterHeightRange, + clock: clock.New(), + } +} + +func NewActorEventHandlerWithClock( + chain ChainAccessor, + eventFilterManager EventFilterManager, + blockDelay time.Duration, + maxFilterHeightRange abi.ChainEpoch, + clock clock.Clock, +) *ActorEventHandler { + return &ActorEventHandler{ + chain: chain, + eventFilterManager: eventFilterManager, + blockDelay: blockDelay, + maxFilterHeightRange: maxFilterHeightRange, + clock: clock, + } +} + +func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) { + if a.eventFilterManager == nil { + return nil, api.ErrNotSupported + } + + if evtFilter == nil { + evtFilter = &types.ActorEventFilter{} + } + params, err := a.parseFilter(*evtFilter) + if err != nil { + return nil, err + } + + // Install a filter just for this call, collect events, remove the filter + tipSetCid, err := params.GetTipSetCid() + if err != nil { + return nil, fmt.Errorf("failed to get tipset cid: %w", err) + } + f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + if err != nil { + return nil, err + } + defer func() { + // Remove the temporary filter regardless of the original context. + if err := a.eventFilterManager.Remove(context.Background(), f.ID()); err != nil { + log.Warnf("failed to remove filter: %s", err) + } + }() + return getCollected(ctx, f), nil +} + +type filterParams struct { + MinHeight abi.ChainEpoch + MaxHeight abi.ChainEpoch + TipSetKey types.TipSetKey +} + +func (fp filterParams) GetTipSetCid() (cid.Cid, error) { + if fp.TipSetKey.IsEmpty() { + return cid.Undef, nil + } + return fp.TipSetKey.Cid() +} + +func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams, error) { + if f.TipSetKey != nil && !f.TipSetKey.IsEmpty() { + if f.FromHeight != nil || f.ToHeight != nil { + return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight") + } + + return &filterParams{ + MinHeight: 0, + MaxHeight: 0, + TipSetKey: *f.TipSetKey, + }, nil + } + + min, max, err := parseHeightRange(a.chain.GetHead().Height(), f.FromHeight, f.ToHeight, a.maxFilterHeightRange) + if err != nil { + return nil, err + } + + return &filterParams{ + MinHeight: min, + MaxHeight: max, + TipSetKey: types.EmptyTSK, + }, nil +} + +// parseHeightRange is similar to eth's parseBlockRange but with slightly different semantics but +// results in equivalent values that we can plug in to the EventFilterManager. +// +// * Uses "height", allowing for nillable values rather than strings +// * No "latest" and "earliest", those are now represented by nil on the way in and -1 on the way out +// * No option for hex representation +func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEpoch, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { + if fromHeight != nil && *fromHeight < 0 { + return 0, 0, fmt.Errorf("range 'from' must be greater than or equal to 0") + } + if fromHeight == nil { + minHeight = -1 + } else { + minHeight = *fromHeight + } + if toHeight == nil { + maxHeight = -1 + } else { + maxHeight = *toHeight + } + + // Validate height ranges are within limits set by node operator + if minHeight == -1 && maxHeight > 0 { + // Here the client is looking for events between the head and some future height + if maxHeight-heaviest > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: 'to' height is too far in the future (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight == -1 { + // Here the client is looking for events between some time in the past and the current head + if heaviest-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: 'from' height is too far in the past (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight >= 0 { + if minHeight > maxHeight { + return 0, 0, fmt.Errorf("invalid epoch range: 'to' height (%d) must be after 'from' height (%d)", minHeight, maxHeight) + } else if maxHeight-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: range between to and 'from' heights is too large (maximum: %d)", maxRange) + } + } + return minHeight, maxHeight, nil +} + +func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { + if a.eventFilterManager == nil { + return nil, api.ErrNotSupported + } + + if evtFilter == nil { + evtFilter = &types.ActorEventFilter{} + } + params, err := a.parseFilter(*evtFilter) + if err != nil { + return nil, err + } + + tipSetCid, err := params.GetTipSetCid() + if err != nil { + return nil, fmt.Errorf("failed to get tipset cid: %w", err) + } + fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + if err != nil { + return nil, err + } + + // The goal for the code below is to send events on the `out` channel as fast as possible and not + // let it get too far behind the rate at which the events are generated. + // For historical events, we aim to send all events within a single block's time (30s on mainnet). + // This ensures that the client can catch up quickly enough to start receiving new events. + // For ongoing events, we also aim to send all events within a single block's time, so we never + // want to be buffering events (approximately) more than one epoch behind the current head. + // It's approximate because we only update our notion of "current epoch" once per ~blocktime. + + out := make(chan *types.ActorEvent) + + // When we start sending real-time events, we want to make sure that we don't fall behind more + // than one epoch's worth of events (approximately). Capture this value now, before we send + // historical events to allow for a little bit of slack in the historical event sending. + minBacklogHeight := a.chain.GetHead().Height() - 1 + + go func() { + defer func() { + // tell the caller we're done + close(out) + fm.ClearSubChannel() + if err := a.eventFilterManager.Remove(ctx, fm.ID()); err != nil { + log.Warnf("failed to remove filter: %s", err) + } + }() + + // Handle any historical events that our filter may have picked up ----------------------------- + + evs := getCollected(ctx, fm) + if len(evs) > 0 { + // ensure we get all events out on the channel within one block's time (30s on mainnet) + timer := a.clock.Timer(a.blockDelay) + for _, ev := range evs { + select { + case out <- ev: + case <-timer.C: + log.Errorf("closing event subscription due to slow event sending rate") + timer.Stop() + return + case <-ctx.Done(): + timer.Stop() + return + } + } + timer.Stop() + } + + // for the case where we have a MaxHeight set, we don't get a signal from the filter when we + // reach that height, so we need to check it ourselves, do it now but also in the loop + if params.MaxHeight > 0 && minBacklogHeight+1 >= params.MaxHeight { + return + } + + // Handle ongoing events from the filter ------------------------------------------------------- + + in := make(chan interface{}, 256) + fm.SetSubChannel(in) + + var buffer []*types.ActorEvent + nextBacklogHeightUpdate := a.clock.Now().Add(a.blockDelay) + + collectEvent := func(ev interface{}) bool { + ce, ok := ev.(*filter.CollectedEvent) + if !ok { + log.Errorf("got unexpected value from event filter: %T", ev) + return false + } + + if ce.Height < minBacklogHeight { + // since we mostly care about buffer size, we only trigger a too-slow close when the buffer + // increases, i.e. we collect a new event + log.Errorf("closing event subscription due to slow event sending rate") + return false + } + + buffer = append(buffer, &types.ActorEvent{ + Entries: ce.Entries, + Emitter: ce.EmitterAddr, + Reverted: ce.Reverted, + Height: ce.Height, + TipSetKey: ce.TipSetKey, + MsgCid: ce.MsgCid, + }) + return true + } + + ticker := a.clock.Ticker(a.blockDelay) + defer ticker.Stop() + + for ctx.Err() == nil { + if len(buffer) > 0 { + select { + case ev, ok := <-in: // incoming event + if !ok || !collectEvent(ev) { + return + } + case out <- buffer[0]: // successful send + buffer[0] = nil + buffer = buffer[1:] + case <-ticker.C: + // check that our backlog isn't too big by looking at the oldest event + if buffer[0].Height < minBacklogHeight { + log.Errorf("closing event subscription due to slow event sending rate") + return + } + case <-ctx.Done(): + return + } + } else { + select { + case ev, ok := <-in: // incoming event + if !ok || !collectEvent(ev) { + return + } + case <-ctx.Done(): + return + case <-ticker.C: + currentHeight := a.chain.GetHead().Height() + if params.MaxHeight > 0 && currentHeight > params.MaxHeight { + // we've reached the filter's MaxHeight, we're done so we can close the channel + return + } + } + } + + if a.clock.Now().After(nextBacklogHeightUpdate) { + minBacklogHeight = a.chain.GetHead().Height() - 1 + nextBacklogHeightUpdate = a.clock.Now().Add(a.blockDelay) + } + } + }() + + return out, nil +} + +func getCollected(ctx context.Context, f filter.EventFilter) []*types.ActorEvent { + ces := f.TakeCollectedEvents(ctx) + + var out []*types.ActorEvent + + for _, e := range ces { + out = append(out, &types.ActorEvent{ + Entries: e.Entries, + Emitter: e.EmitterAddr, + Reverted: e.Reverted, + Height: e.Height, + TipSetKey: e.TipSetKey, + MsgCid: e.MsgCid, + }) + } + + return out +} diff --git a/app/submodule/actorevent/actor_event_module.go b/app/submodule/actorevent/actor_event_module.go new file mode 100644 index 0000000000..a1bfa700d4 --- /dev/null +++ b/app/submodule/actorevent/actor_event_module.go @@ -0,0 +1,59 @@ +package actorevent + +import ( + "context" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/app/submodule/chain" + "github.com/filecoin-project/venus/app/submodule/eth" + "github.com/filecoin-project/venus/pkg/config" + v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" +) + +type ActorEventSubModule struct { // nolint + chainModule *chain.ChainSubmodule + ethModule *eth.EthSubModule + cfg *config.Config + actorEventHandler v1api.IActorEvent +} + +func NewActorEventSubModule(ctx context.Context, + cfg *config.Config, + chainModule *chain.ChainSubmodule, + ethModule *eth.EthSubModule, +) (*ActorEventSubModule, error) { + aem := &ActorEventSubModule{ + cfg: cfg, + ethModule: ethModule, + chainModule: chainModule, + actorEventHandler: &ActorEventDummy{}, + } + + if !cfg.EventsConfig.EnableActorEventsAPI { + return aem, nil + } + + fm := ethModule.GetEventFilterManager() + if cfg.FevmConfig.Event.DisableRealTimeFilterAPI { + fm = nil + } + + netParams, err := chainModule.API().StateGetNetworkParams(ctx) + if err != nil { + return nil, err + } + + aem.actorEventHandler = NewActorEventHandler( + chainModule.ChainReader, + fm, + time.Duration(netParams.BlockDelaySecs)*time.Second, + abi.ChainEpoch(cfg.FevmConfig.Event.MaxFilterHeightRange), + ) + + return aem, nil +} + +func (aem *ActorEventSubModule) API() v1api.IActorEvent { + return aem.actorEventHandler +} diff --git a/app/submodule/actorevent/actor_event_test.go b/app/submodule/actorevent/actor_event_test.go new file mode 100644 index 0000000000..063b68b76d --- /dev/null +++ b/app/submodule/actorevent/actor_event_test.go @@ -0,0 +1,780 @@ +package actorevent + +import ( + "context" + "fmt" + pseudo "math/rand" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/raulk/clock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/venus/pkg/events/filter" + "github.com/filecoin-project/venus/venus-shared/types" +) + +var testCid = cid.MustParse("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i") + +func TestParseHeightRange(t *testing.T) { + testCases := []struct { + name string + heaviest abi.ChainEpoch + from *abi.ChainEpoch + to *abi.ChainEpoch + maxRange abi.ChainEpoch + minOut abi.ChainEpoch + maxOut abi.ChainEpoch + errStr string + }{ + { + name: "fails when both are specified and range is greater than max allowed range", + heaviest: 100, + from: epochPtr(256), + to: epochPtr(512), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + { + name: "fails when min is specified and range is greater than max allowed range", + heaviest: 500, + from: epochPtr(16), + to: nil, + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "'from' height is too far in the past", + }, + { + name: "fails when max is specified and range is greater than max allowed range", + heaviest: 500, + from: nil, + to: epochPtr(65536), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "'to' height is too far in the future", + }, + { + name: "fails when from is greater than to", + heaviest: 100, + from: epochPtr(512), + to: epochPtr(256), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "must be after", + }, + { + name: "works when range is valid (nil from)", + heaviest: 500, + from: nil, + to: epochPtr(48), + maxRange: 1000, + minOut: -1, + maxOut: 48, + }, + { + name: "works when range is valid (nil to)", + heaviest: 500, + from: epochPtr(0), + to: nil, + maxRange: 1000, + minOut: 0, + maxOut: -1, + }, + { + name: "works when range is valid (nil from and to)", + heaviest: 500, + from: nil, + to: nil, + maxRange: 1000, + minOut: -1, + maxOut: -1, + }, + { + name: "works when range is valid and specified", + heaviest: 500, + from: epochPtr(16), + to: epochPtr(48), + maxRange: 1000, + minOut: 16, + maxOut: 48, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + req := require.New(t) + min, max, err := parseHeightRange(tc.heaviest, tc.from, tc.to, tc.maxRange) + req.Equal(tc.minOut, min) + req.Equal(tc.maxOut, max) + if tc.errStr != "" { + t.Log(err) + req.Error(err) + req.Contains(err.Error(), tc.errStr) + } else { + req.NoError(err) + } + }) + } +} + +func TestGetActorEvents(t *testing.T) { + ctx := context.Background() + req := require.New(t) + + const ( + seed = 984651320 + maxFilterHeightRange = 100 + ) + + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + req.NoError(err) + + testCases := []struct { + name string + filter *types.ActorEventFilter + currentHeight int64 + installMinHeight int64 + installMaxHeight int64 + installTipSetKey cid.Cid + installAddresses []address.Address + installKeysWithCodec map[string][]types.ActorEventBlock + installExcludeReverted bool + expectErr string + }{ + { + name: "nil filter", + filter: nil, + installMinHeight: -1, + installMaxHeight: -1, + }, + { + name: "empty filter", + filter: &types.ActorEventFilter{}, + installMinHeight: -1, + installMaxHeight: -1, + }, + { + name: "basic height range filter", + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + ToHeight: epochPtr(maxFilterHeightRange), + }, + installMinHeight: 0, + installMaxHeight: maxFilterHeightRange, + }, + { + name: "from, no to height", + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + }, + currentHeight: maxFilterHeightRange - 1, + installMinHeight: 0, + installMaxHeight: -1, + }, + { + name: "to, no from height", + filter: &types.ActorEventFilter{ + ToHeight: epochPtr(maxFilterHeightRange - 1), + }, + installMinHeight: -1, + installMaxHeight: maxFilterHeightRange - 1, + }, + { + name: "from, no to height, too far", + filter: &types.ActorEventFilter{ + FromHeight: epochPtr(0), + }, + currentHeight: maxFilterHeightRange + 1, + expectErr: "invalid epoch range: 'from' height is too far in the past", + }, + { + name: "to, no from height, too far", + filter: &types.ActorEventFilter{ + ToHeight: epochPtr(maxFilterHeightRange + 1), + }, + currentHeight: 0, + expectErr: "invalid epoch range: 'to' height is too far in the future", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + efm := newMockEventFilterManager(t) + collectedEvents := makeCollectedEvents(t, rng, 0, 1, 10) + filter := newMockFilter(ctx, t, rng, collectedEvents) + + if tc.expectErr == "" { + efm.expectInstall(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, tc.installExcludeReverted, filter) + } + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, tc.currentHeight)}) + req.NoError(err) + chain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandler(chain, efm, 50*time.Millisecond, maxFilterHeightRange) + + gotEvents, err := handler.GetActorEvents(ctx, tc.filter) + if tc.expectErr != "" { + req.Error(err) + req.Contains(err.Error(), tc.expectErr) + } else { + req.NoError(err) + expectedEvents := collectedToActorEvents(collectedEvents) + req.Equal(expectedEvents, gotEvents) + efm.requireRemoved(filter.ID()) + } + }) + } +} + +func TestSubscribeActorEvents(t *testing.T) { + const ( + seed = 984651320 + maxFilterHeightRange = 100 + blockDelay = 30 * time.Second + filterStartHeight = 0 + currentHeight = 10 + finishHeight = 20 + eventsPerEpoch = 2 + ) + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + mockClock := clock.NewMock() + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + for _, tc := range []struct { + name string + receiveSpeed time.Duration // how fast will we receive all events _per epoch_ + expectComplete bool // do we expect this to succeed? + endEpoch int // -1 for no end + }{ + {"fast", 0, true, -1}, + {"fast with end", 0, true, finishHeight}, + {"half block speed", blockDelay / 2, true, -1}, + {"half block speed with end", blockDelay / 2, true, finishHeight}, + // testing exactly blockDelay is a border case and will be flaky + {"1.5 block speed", blockDelay * 3 / 2, false, -1}, + {"twice block speed", blockDelay * 2, false, -1}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + tc := tc + t.Run(tc.name, func(t *testing.T) { + req := require.New(t) + + mockClock.Set(time.Now()) + mockFilterManager := newMockEventFilterManager(t) + allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight) + historicalEvents := allEvents[0 : (currentHeight-filterStartHeight)*eventsPerEpoch] + mockFilter := newMockFilter(ctx, t, rng, historicalEvents) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, false, mockFilter) + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) + req.NoError(err) + mockChain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock) + + aef := &types.ActorEventFilter{FromHeight: epochPtr(0)} + if tc.endEpoch >= 0 { + aef.ToHeight = epochPtr(tc.endEpoch) + } + eventChan, err := handler.SubscribeActorEvents(ctx, aef) + req.NoError(err) + + // assume we can cleanly pick up all historical events in one go + var gotEvents []*types.ActorEvent + for len(gotEvents) < len(historicalEvents) && ctx.Err() == nil { + select { + case e, ok := <-eventChan: + req.True(ok) + gotEvents = append(gotEvents, e) + case <-ctx.Done(): + t.Fatalf("timed out waiting for event") + } + } + req.Equal(collectedToActorEvents(historicalEvents), gotEvents) + + mockClock.Add(blockDelay) + nextReceiveTime := mockClock.Now() + + // Ticker to simulate both time and the chain advancing, including emitting events at + // the right time directly to the filter. + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for thisHeight := int64(currentHeight); ctx.Err() == nil; thisHeight++ { + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, thisHeight)}) + req.NoError(err) + mockChain.setHeaviestTipSet(ts) + + var eventsThisEpoch []*filter.CollectedEvent + if thisHeight <= finishHeight { + eventsThisEpoch = allEvents[(thisHeight-filterStartHeight)*eventsPerEpoch : (thisHeight-filterStartHeight+2)*eventsPerEpoch] + } + for i := 0; i < eventsPerEpoch && ctx.Err() == nil; i++ { + if len(eventsThisEpoch) > 0 { + mockFilter.sendEventToChannel(eventsThisEpoch[0]) + eventsThisEpoch = eventsThisEpoch[1:] + } + select { + case <-time.After(2 * time.Millisecond): // allow everyone to catch a breath + mockClock.Add(blockDelay / eventsPerEpoch) + case <-ctx.Done(): + return + } + } + + if thisHeight == finishHeight+1 && tc.expectComplete && tc.endEpoch < 0 && ctx.Err() == nil { + // at finish+1, for the case where we expect clean completion and there is no ToEpoch + // set on the filter, if we send one more event at the next height so we end up with + // something uncollected in the buffer, causing a disconnect + evt := makeCollectedEvents(t, rng, finishHeight+1, 1, finishHeight+1)[0] + mockFilter.sendEventToChannel(evt) + } // else if endEpoch is set, we expect the chain advance to force closure + } + }() + + // Client collecting events off the channel + + var prematureEnd bool + for thisHeight := int64(currentHeight); thisHeight <= finishHeight && !prematureEnd && ctx.Err() == nil; thisHeight++ { + // delay to simulate latency + select { + case <-mockClock.After(nextReceiveTime.Sub(mockClock.Now())): + case <-ctx.Done(): + t.Fatalf("timed out simulating receive delay") + } + + // collect eventsPerEpoch more events + var newEvents []*types.ActorEvent + for len(newEvents) < eventsPerEpoch && !prematureEnd && ctx.Err() == nil { + select { + case e, ok := <-eventChan: // receive the events from the subscription + if ok { + newEvents = append(newEvents, e) + } else { + prematureEnd = true + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for event") + } + nextReceiveTime = nextReceiveTime.Add(tc.receiveSpeed) + } + + if tc.expectComplete || !prematureEnd { + // sanity check that we got what we expected this epoch + req.Len(newEvents, eventsPerEpoch) + epochEvents := allEvents[(thisHeight)*eventsPerEpoch : (thisHeight+1)*eventsPerEpoch] + req.Equal(collectedToActorEvents(epochEvents), newEvents) + gotEvents = append(gotEvents, newEvents...) + } + } + + req.Equal(tc.expectComplete, !prematureEnd, "expected to complete") + if tc.expectComplete { + req.Len(gotEvents, len(allEvents)) + req.Equal(collectedToActorEvents(allEvents), gotEvents) + } else { + req.NotEqual(len(gotEvents), len(allEvents)) + } + + // cleanup + mockFilter.requireClearSubChannelCalledEventually(500 * time.Millisecond) + mockFilterManager.requireRemovedEventually(mockFilter.ID(), 500*time.Millisecond) + cancel() + wg.Wait() // wait for the chain to stop advancing + }) + } +} + +func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) { + // Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height + const ( + seed = 984651320 + maxFilterHeightRange = 100 + blockDelay = 30 * time.Second + filterStartHeight = 0 + currentHeight = 10 + eventsPerEpoch = 2 + ) + t.Logf("seed: %d", seed) + rng := pseudo.New(pseudo.NewSource(seed)) + mockClock := clock.NewMock() + + minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + for _, tc := range []struct { + name string + blockTimeToComplete float64 // fraction of a block time that it takes to receive all events + expectComplete bool // do we expect this to succeed? + }{ + {"fast", 0, true}, + {"half block speed", 0.5, true}, + {"1.5 block speed", 1.5, false}, + {"twice block speed", 2, false}, + } { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + tc := tc + t.Run(tc.name, func(t *testing.T) { + req := require.New(t) + + mockClock.Set(time.Now()) + mockFilterManager := newMockEventFilterManager(t) + allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight) + mockFilter := newMockFilter(ctx, t, rng, allEvents) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter) + + ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) + req.NoError(err) + mockChain := newMockChainAccessor(t, ts) + + handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock) + + aef := &types.ActorEventFilter{FromHeight: epochPtr(0), ToHeight: epochPtr(currentHeight)} + eventChan, err := handler.SubscribeActorEvents(ctx, aef) + req.NoError(err) + + var gotEvents []*types.ActorEvent + + // assume we can cleanly pick up all historical events in one go + receiveLoop: + for ctx.Err() == nil { + select { + case e, ok := <-eventChan: + if ok { + gotEvents = append(gotEvents, e) + mockClock.Add(time.Duration(float64(blockDelay) * tc.blockTimeToComplete / float64(len(allEvents)))) + // no need to advance the chain, we're also testing that's not necessary + time.Sleep(2 * time.Millisecond) // catch a breath + } else { + break receiveLoop + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for event, got %d/%d events", len(gotEvents), len(allEvents)) + } + } + if tc.expectComplete { + req.Equal(collectedToActorEvents(allEvents), gotEvents) + } else { + req.NotEqual(len(gotEvents), len(allEvents)) + } + // advance the chain and observe cleanup + ts, err = types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight+1)}) + req.NoError(err) + mockChain.setHeaviestTipSet(ts) + mockClock.Add(blockDelay) + mockFilterManager.requireRemovedEventually(mockFilter.ID(), 1*time.Second) + }) + } +} + +var ( + _ ChainAccessor = (*mockChainAccessor)(nil) + _ filter.EventFilter = (*mockFilter)(nil) + _ EventFilterManager = (*mockEventFilterManager)(nil) +) + +type mockChainAccessor struct { + t *testing.T + ts *types.TipSet + lk sync.Mutex +} + +func newMockChainAccessor(t *testing.T, ts *types.TipSet) *mockChainAccessor { + return &mockChainAccessor{t: t, ts: ts} +} + +func (m *mockChainAccessor) setHeaviestTipSet(ts *types.TipSet) { + m.lk.Lock() + defer m.lk.Unlock() + m.ts = ts +} + +func (m *mockChainAccessor) GetHead() *types.TipSet { + m.lk.Lock() + defer m.lk.Unlock() + return m.ts +} + +type mockFilter struct { + t *testing.T + ctx context.Context + id types.FilterID + lastTaken time.Time + ch chan<- interface{} + historicalEvents []*filter.CollectedEvent + subChannelCalls int + clearSubChannelCalls int + lk sync.Mutex +} + +func newMockFilter(ctx context.Context, t *testing.T, rng *pseudo.Rand, historicalEvents []*filter.CollectedEvent) *mockFilter { + t.Helper() + var id [32]byte + _, err := rng.Read(id[:]) + require.NoError(t, err) + return &mockFilter{ + t: t, + ctx: ctx, + id: id, + historicalEvents: historicalEvents, + } +} + +func (m *mockFilter) sendEventToChannel(e *filter.CollectedEvent) { + m.lk.Lock() + defer m.lk.Unlock() + if m.ch != nil { + select { + case m.ch <- e: + case <-m.ctx.Done(): + } + } +} + +func (m *mockFilter) requireClearSubChannelCalledEventually(timeout time.Duration) { + m.t.Helper() + require.Eventually(m.t, + func() bool { + m.lk.Lock() + c := m.clearSubChannelCalls + m.lk.Unlock() + switch c { + case 0: + return false + case 1: + return true + default: + m.t.Fatalf("ClearSubChannel called more than once: %d", c) + return false + } + }, timeout, 10*time.Millisecond, "ClearSubChannel is not called exactly once") +} + +func (m *mockFilter) ID() types.FilterID { + return m.id +} + +func (m *mockFilter) LastTaken() time.Time { + return m.lastTaken +} + +func (m *mockFilter) SetSubChannel(ch chan<- interface{}) { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + m.subChannelCalls++ + m.ch = ch +} + +func (m *mockFilter) ClearSubChannel() { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + m.clearSubChannelCalls++ + m.ch = nil +} + +func (m *mockFilter) TakeCollectedEvents(context.Context) []*filter.CollectedEvent { + e := m.historicalEvents + m.historicalEvents = nil + m.lastTaken = time.Now() + return e +} + +func (m *mockFilter) CollectEvents(context.Context, *filter.TipSetEvents, bool, filter.AddressResolver) error { + m.t.Fatalf("unexpected call to CollectEvents") + return nil +} + +type filterManagerExpectation struct { + minHeight, maxHeight abi.ChainEpoch + tipsetCid cid.Cid + addresses []address.Address + keysWithCodec map[string][]types.ActorEventBlock + excludeReverted bool + returnFilter filter.EventFilter +} + +type mockEventFilterManager struct { + t *testing.T + expectations []filterManagerExpectation + removed []types.FilterID + lk sync.Mutex +} + +func newMockEventFilterManager(t *testing.T) *mockEventFilterManager { + return &mockEventFilterManager{t: t} +} + +func (m *mockEventFilterManager) expectInstall( + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, + returnFilter filter.EventFilter) { + + m.t.Helper() + m.expectations = append(m.expectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + excludeReverted: excludeReverted, + returnFilter: returnFilter, + }) +} + +func (m *mockEventFilterManager) requireRemoved(id types.FilterID) { + m.t.Helper() + m.lk.Lock() + defer m.lk.Unlock() + require.Contains(m.t, m.removed, id) +} + +func (m *mockEventFilterManager) requireRemovedEventually(id types.FilterID, timeout time.Duration) { + m.t.Helper() + require.Eventuallyf(m.t, func() bool { + m.lk.Lock() + defer m.lk.Unlock() + if len(m.removed) == 0 { + return false + } + assert.Contains(m.t, m.removed, id) + return true + }, timeout, 10*time.Millisecond, "filter %x not removed", id) +} + +func (m *mockEventFilterManager) Install( + _ context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, +) (filter.EventFilter, error) { + + require.True(m.t, len(m.expectations) > 0, "unexpected call to Install") + exp := m.expectations[0] + m.expectations = m.expectations[1:] + // check the expectation matches the call then return the attached filter + require.Equal(m.t, exp.minHeight, minHeight) + require.Equal(m.t, exp.maxHeight, maxHeight) + require.Equal(m.t, exp.tipsetCid, tipsetCid) + require.Equal(m.t, exp.addresses, addresses) + require.Equal(m.t, exp.keysWithCodec, keysWithCodec) + require.Equal(m.t, exp.excludeReverted, excludeReverted) + return exp.returnFilter, nil +} + +func (m *mockEventFilterManager) Remove(_ context.Context, id types.FilterID) error { + m.lk.Lock() + defer m.lk.Unlock() + m.removed = append(m.removed, id) + return nil +} + +func newBlockHeader(minerAddr address.Address, height int64) *types.BlockHeader { + return &types.BlockHeader{ + Miner: minerAddr, + Ticket: &types.Ticket{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + ElectionProof: &types.ElectionProof{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + Parents: []cid.Cid{testCid, testCid}, + ParentMessageReceipts: testCid, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")}, + ParentWeight: types.NewInt(123125126212), + Messages: testCid, + Height: abi.ChainEpoch(height), + ParentStateRoot: testCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("sign me up")}, + ParentBaseFee: types.NewInt(3432432843291), + } +} + +func epochPtr(i int) *abi.ChainEpoch { + e := abi.ChainEpoch(i) + return &e +} + +func collectedToActorEvents(collected []*filter.CollectedEvent) []*types.ActorEvent { + var out []*types.ActorEvent + for _, c := range collected { + out = append(out, &types.ActorEvent{ + Entries: c.Entries, + Emitter: c.EmitterAddr, + Reverted: c.Reverted, + Height: c.Height, + TipSetKey: c.TipSetKey, + MsgCid: c.MsgCid, + }) + } + return out +} + +func makeCollectedEvents(t *testing.T, rng *pseudo.Rand, eventStartHeight, eventsPerHeight, eventEndHeight int64) []*filter.CollectedEvent { + var out []*filter.CollectedEvent + for h := eventStartHeight; h <= eventEndHeight; h++ { + for i := int64(0); i < eventsPerHeight; i++ { + out = append(out, makeCollectedEvent(t, rng, types.NewTipSetKey(mkCid(t, fmt.Sprintf("h=%d", h))), abi.ChainEpoch(h))) + } + } + return out +} + +func makeCollectedEvent(t *testing.T, rng *pseudo.Rand, tsKey types.TipSetKey, height abi.ChainEpoch) *filter.CollectedEvent { + addr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(t, err) + + return &filter.CollectedEvent{ + Entries: []types.EventEntry{ + {Flags: 0x01, Key: "k1", Codec: cid.Raw, Value: []byte("v1")}, + {Flags: 0x01, Key: "k2", Codec: cid.Raw, Value: []byte("v2")}, + }, + EmitterAddr: addr, + EventIdx: 0, + Reverted: false, + Height: height, + TipSetKey: tsKey, + MsgIdx: 0, + MsgCid: testCid, + } +} + +func mkCid(t *testing.T, s string) cid.Cid { + h, err := multihash.Sum([]byte(s), multihash.SHA2_256, -1) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, h) +} diff --git a/app/submodule/actorevent/dummy.go b/app/submodule/actorevent/dummy.go new file mode 100644 index 0000000000..b0754b2db9 --- /dev/null +++ b/app/submodule/actorevent/dummy.go @@ -0,0 +1,23 @@ +package actorevent + +import ( + "context" + "errors" + + v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + "github.com/filecoin-project/venus/venus-shared/types" +) + +var ErrActorEventModuleDisabled = errors.New("module disabled, enable with Fevm.EnableActorEventsAPI") + +type ActorEventDummy struct{} // nolint + +func (a *ActorEventDummy) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { + return nil, ErrActorEventModuleDisabled +} + +func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { + return nil, ErrActorEventModuleDisabled +} + +var _ v1api.IActorEvent = &ActorEventDummy{} diff --git a/app/submodule/eth/eth_event_api.go b/app/submodule/eth/eth_event_api.go index db87a7dcf0..ee896c2173 100644 --- a/app/submodule/eth/eth_event_api.go +++ b/app/submodule/eth/eth_event_api.go @@ -13,7 +13,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-state-types/abi" - builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/venus/pkg/chain" "github.com/filecoin-project/venus/pkg/events" "github.com/filecoin-project/venus/pkg/events/filter" @@ -22,7 +21,7 @@ import ( "github.com/filecoin-project/venus/venus-shared/types" "github.com/google/uuid" "github.com/ipfs/go-cid" - "github.com/multiformats/go-varint" + "github.com/multiformats/go-multicodec" "github.com/zyedidia/generic/queue" ) @@ -82,17 +81,9 @@ func newEthEventAPI(ctx context.Context, em *EthSubModule) (*ethEventAPI, error) actor, err := em.chainModule.Stmgr.GetActorAt(ctx, idAddr, ts) if err != nil || actor.Address == nil { - return address.Undef, false + return idAddr, true } - // if robust address is not f4 then we won't match against it so bail early - if actor.Address.Protocol() != address.Delegated { - return address.Undef, false - } - // we have an f4 address, make sure it's assigned by the EAM - if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } return *actor.Address, true }, @@ -212,7 +203,65 @@ func (e *ethEventAPI) EthGetFilterLogs(ctx context.Context, id types.EthFilterID return nil, fmt.Errorf("wrong filter type") } -func (e *ethEventAPI) installEthFilterSpec(ctx context.Context, filterSpec *types.EthFilterSpec) (*filter.EventFilter, error) { +// parseBlockRange is similar to actor event's parseHeightRange but with slightly different semantics +// +// * "block" instead of "height" +// * strings that can have "latest" and "earliest" and nil +// * hex strings for actual heights +func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { + if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 { + minHeight = heaviest + } else if *fromBlock == "earliest" { + minHeight = 0 + } else { + if !strings.HasPrefix(*fromBlock, "0x") { + return 0, 0, fmt.Errorf("FromBlock is not a hex") + } + epoch, err := types.EthUint64FromHex(*fromBlock) + if err != nil { + return 0, 0, fmt.Errorf("invalid epoch") + } + minHeight = abi.ChainEpoch(epoch) + } + + if toBlock == nil || *toBlock == "latest" || len(*toBlock) == 0 { + // here latest means the latest at the time + maxHeight = -1 + } else if *toBlock == "earliest" { + maxHeight = 0 + } else { + if !strings.HasPrefix(*toBlock, "0x") { + return 0, 0, fmt.Errorf("ToBlock is not a hex") + } + epoch, err := types.EthUint64FromHex(*toBlock) + if err != nil { + return 0, 0, fmt.Errorf("invalid epoch") + } + maxHeight = abi.ChainEpoch(epoch) + } + + // Validate height ranges are within limits set by node operator + if minHeight == -1 && maxHeight > 0 { + // Here the client is looking for events between the head and some future height + if maxHeight-heaviest > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight == -1 { + // Here the client is looking for events between some time in the past and the current head + if heaviest-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight >= 0 { + if minHeight > maxHeight { + return 0, 0, fmt.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight) + } else if maxHeight-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", maxRange) + } + } + return minHeight, maxHeight, nil +} + +func (e *ethEventAPI) installEthFilterSpec(ctx context.Context, filterSpec *types.EthFilterSpec) (filter.EventFilter, error) { var ( minHeight abi.ChainEpoch maxHeight abi.ChainEpoch @@ -228,70 +277,13 @@ func (e *ethEventAPI) installEthFilterSpec(ctx context.Context, filterSpec *type tipsetCid = filterSpec.BlockHash.ToCid() } else { - if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" { - ts, err := e.ChainAPI.ChainHead(ctx) - if err != nil { - return nil, fmt.Errorf("failed to got head %v", err) - } - minHeight = ts.Height() - } else if *filterSpec.FromBlock == "earliest" { - minHeight = 0 - } else if *filterSpec.FromBlock == "pending" { - return nil, api.ErrNotSupported - } else { - if !strings.HasPrefix(*filterSpec.FromBlock, "0x") { - return nil, fmt.Errorf("FromBlock is not a hex") - } - epoch, err := types.EthUint64FromHex(*filterSpec.FromBlock) - if err != nil { - return nil, fmt.Errorf("invalid epoch") - } - minHeight = abi.ChainEpoch(epoch) - } - - if filterSpec.ToBlock == nil || *filterSpec.ToBlock == "latest" { - // here latest means the latest at the time - maxHeight = -1 - } else if *filterSpec.ToBlock == "earliest" { - maxHeight = 0 - } else if *filterSpec.ToBlock == "pending" { - return nil, api.ErrNotSupported - } else { - if !strings.HasPrefix(*filterSpec.ToBlock, "0x") { - return nil, fmt.Errorf("ToBlock is not a hex") - } - epoch, err := types.EthUint64FromHex(*filterSpec.ToBlock) - if err != nil { - return nil, fmt.Errorf("invalid epoch") - } - maxHeight = abi.ChainEpoch(epoch) + head, err := e.ChainAPI.ChainHead(ctx) + if err != nil { + return nil, err } - - // Validate height ranges are within limits set by node operator - if minHeight == -1 && maxHeight > 0 { - // Here the client is looking for events between the head and some future height - ts, err := e.ChainAPI.ChainHead(ctx) - if err != nil { - return nil, fmt.Errorf("failed to got head %v", err) - } - if maxHeight-ts.Height() > e.MaxFilterHeightRange { - return nil, fmt.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", e.MaxFilterHeightRange) - } - } else if minHeight >= 0 && maxHeight == -1 { - // Here the client is looking for events between some time in the past and the current head - ts, err := e.ChainAPI.ChainHead(ctx) - if err != nil { - return nil, fmt.Errorf("failed to got head %v", err) - } - if ts.Height()-minHeight > e.MaxFilterHeightRange { - return nil, fmt.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", e.MaxFilterHeightRange) - } - } else if minHeight >= 0 && maxHeight >= 0 { - if minHeight > maxHeight { - return nil, fmt.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight) - } else if maxHeight-minHeight > e.MaxFilterHeightRange { - return nil, fmt.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", e.MaxFilterHeightRange) - } + minHeight, maxHeight, err = parseBlockRange(head.Height(), filterSpec.FromBlock, filterSpec.ToBlock, e.MaxFilterHeightRange) + if err != nil { + return nil, err } } @@ -309,7 +301,20 @@ func (e *ethEventAPI) installEthFilterSpec(ctx context.Context, filterSpec *type return nil, err } - return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys) + return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keysToKeysWithCodec(keys), true) +} + +func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEventBlock { + keysWithCodec := make(map[string][]types.ActorEventBlock) + for k, v := range keys { + for _, vv := range v { + keysWithCodec[k] = append(keysWithCodec[k], types.ActorEventBlock{ + Codec: uint64(multicodec.Raw), // FEVM smart contract events are always encoded with the `raw` Codec. + Value: vv, + }) + } + } + return keysWithCodec } func (e *ethEventAPI) EthNewFilter(ctx context.Context, filterSpec *types.EthFilterSpec) (types.EthFilterID, error) { @@ -403,7 +408,7 @@ func (e *ethEventAPI) EthUninstallFilter(ctx context.Context, id types.EthFilter func (e *ethEventAPI) uninstallFilter(ctx context.Context, f filter.Filter) error { switch f.(type) { - case *filter.EventFilter: + case filter.EventFilter: err := e.EventFilterManager.Remove(ctx, f.ID()) if err != nil && !errors.Is(err, filter.ErrFilterNotFound) { return err @@ -485,7 +490,7 @@ func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ty } } - f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keys) + f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keysToKeysWithCodec(keys), true) if err != nil { // clean up any previous filters added and stop the sub _, _ = e.EthUnsubscribe(ctx, sub.id) diff --git a/app/submodule/eth/eth_submodule.go b/app/submodule/eth/eth_submodule.go index 3a8d9cba59..caa4ddab2a 100644 --- a/app/submodule/eth/eth_submodule.go +++ b/app/submodule/eth/eth_submodule.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/venus/app/submodule/mpool" "github.com/filecoin-project/venus/pkg/config" "github.com/filecoin-project/venus/pkg/constants" + "github.com/filecoin-project/venus/pkg/events/filter" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" ) @@ -92,6 +93,10 @@ func (em *EthSubModule) Close(ctx context.Context) error { return em.ethAPIAdapter.close() } +func (em *EthSubModule) GetEventFilterManager() *filter.EventFilterManager { + return em.ethEventAPI.EventFilterManager +} + type ethAPIAdapter interface { v1api.IETH start(ctx context.Context) error diff --git a/app/submodule/eth/eth_test.go b/app/submodule/eth/eth_test.go index ef250530e9..66a32f3ab7 100644 --- a/app/submodule/eth/eth_test.go +++ b/app/submodule/eth/eth_test.go @@ -3,6 +3,7 @@ package eth import ( "bytes" "encoding/hex" + "fmt" "testing" "github.com/ipfs/go-cid" @@ -10,11 +11,86 @@ import ( "github.com/stretchr/testify/require" cbg "github.com/whyrusleeping/cbor-gen" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/venus/pkg/messagepool" "github.com/filecoin-project/venus/venus-shared/types" ) +func TestParseBlockRange(t *testing.T) { + pstring := func(s string) *string { return &s } + + tcs := map[string]struct { + heaviest abi.ChainEpoch + from *string + to *string + maxRange abi.ChainEpoch + minOut abi.ChainEpoch + maxOut abi.ChainEpoch + errStr string + }{ + "fails when both are specified and range is greater than max allowed range": { + heaviest: 100, + from: pstring("0x100"), + to: pstring("0x200"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + "fails when min is specified and range is greater than max allowed range": { + heaviest: 500, + from: pstring("0x10"), + to: pstring("latest"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too far in the past", + }, + "fails when max is specified and range is greater than max allowed range": { + heaviest: 500, + from: pstring("earliest"), + to: pstring("0x10000"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + "works when range is valid": { + heaviest: 500, + from: pstring("earliest"), + to: pstring("latest"), + maxRange: 1000, + minOut: 0, + maxOut: -1, + }, + "works when range is valid and specified": { + heaviest: 500, + from: pstring("0x10"), + to: pstring("0x30"), + maxRange: 1000, + minOut: 16, + maxOut: 48, + }, + } + + for name, tc := range tcs { + tc2 := tc + t.Run(name, func(t *testing.T) { + min, max, err := parseBlockRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange) + require.Equal(t, tc2.minOut, min) + require.Equal(t, tc2.maxOut, max) + if tc2.errStr != "" { + fmt.Println(err) + require.Error(t, err) + require.Contains(t, err.Error(), tc2.errStr) + } else { + require.NoError(t, err) + } + }) + } +} + func TestEthLogFromEvent(t *testing.T) { // basic empty data, topics, ok := ethLogFromEvent(nil) diff --git a/go.mod b/go.mod index d5f7aaa065..1bae085524 100644 --- a/go.mod +++ b/go.mod @@ -246,7 +246,7 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect - github.com/mattn/go-sqlite3 v1.14.16 + github.com/mattn/go-sqlite3 v1.14.22 github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.55 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect diff --git a/go.sum b/go.sum index dc4a239fa5..49e1a64b32 100644 --- a/go.sum +++ b/go.sum @@ -921,8 +921,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= -github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= -github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= diff --git a/pkg/config/config.go b/pkg/config/config.go index eb4d7a5491..e9ab5f707f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,6 +40,7 @@ type Config struct { SlashFilterDs *SlashFilterDsConfig `json:"slashFilter"` RateLimitCfg *RateLimitCfg `json:"rateLimit"` FevmConfig *FevmConfig `json:"fevm"` + EventsConfig *EventsConfig `json:"events"` PubsubConfig *PubsubConfig `json:"pubsub"` FaultReporter *FaultReporterConfig `json:"faultReporter"` } @@ -425,14 +426,13 @@ func newRateLimitConfig() *RateLimitCfg { } type EventConfig struct { - // EnableEthRPC enables APIs that // DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. - // The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + // The API is enabled when EnableEthRPC or Events.EnableActorEventsAPI is true, but can be disabled selectively with this flag. DisableRealTimeFilterAPI bool `json:"disableRealTimeFilterAPI"` // DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events // that occurred in the past. HistoricFilterAPI maintains a queryable index of events. - // The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + // The API is enabled when EnableEthRPC or Events.EnableActorEventsAPI is true, but can be disabled selectively with this flag. DisableHistoricFilterAPI bool `json:"disableHistoricFilterAPI"` // FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than @@ -471,6 +471,14 @@ type FevmConfig struct { Event EventConfig `json:"event"` } +type EventsConfig struct { + // EnableActorEventsAPI enables the Actor events API that enables clients to consume events + // emitted by (smart contracts + built-in Actors). + // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be + // disabled by setting their respective Disable* options in Fevm.Event. + EnableActorEventsAPI bool `json:"enableActorEventsAPI"` +} + func newFevmConfig() *FevmConfig { return &FevmConfig{ EnableEthRPC: false, @@ -486,6 +494,12 @@ func newFevmConfig() *FevmConfig { } } +func newEventsConfig() *EventsConfig { + return &EventsConfig{ + EnableActorEventsAPI: false, + } +} + type PubsubConfig struct { // Run the node in bootstrap-node mode Bootstrapper bool `json:"bootstrapper"` @@ -534,6 +548,7 @@ func NewDefaultConfig() *Config { SlashFilterDs: newDefaultSlashFilterDsConfig(), RateLimitCfg: newRateLimitConfig(), FevmConfig: newFevmConfig(), + EventsConfig: newEventsConfig(), PubsubConfig: newPubsubConfig(), FaultReporter: newFaultReporterConfig(), } diff --git a/pkg/events/filter/event.go b/pkg/events/filter/event.go index b7dc1167c0..da2adfff49 100644 --- a/pkg/events/filter/event.go +++ b/pkg/events/filter/event.go @@ -30,14 +30,24 @@ func isIndexedValue(b uint8) bool { return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } -type EventFilter struct { - id types.FilterID - minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum - maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum - tipsetCid cid.Cid - addresses []address.Address // list of f4 actor addresses that are extpected to emit the event - keys map[string][][]byte // map of key names to a list of alternate values that may match - maxResults int // maximum number of results to collect, 0 is unlimited +type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool) + +type EventFilter interface { + Filter + + TakeCollectedEvents(context.Context) []*CollectedEvent + CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error +} + +type eventFilter struct { + id types.FilterID + minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum + maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum + tipsetCid cid.Cid + addresses []address.Address // list of actor addresses that are extpected to emit the event + + keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match + maxResults int // maximum number of results to collect, 0 is unlimited mu sync.Mutex collected []*CollectedEvent @@ -45,11 +55,11 @@ type EventFilter struct { ch chan<- interface{} } -var _ Filter = (*EventFilter)(nil) +var _ Filter = (*eventFilter)(nil) type CollectedEvent struct { Entries []types.EventEntry - EmitterAddr address.Address // f4 address of emitter + EmitterAddr address.Address // address of emitter EventIdx int // index of the event within the list of emitted events Reverted bool Height abi.ChainEpoch @@ -58,24 +68,24 @@ type CollectedEvent struct { MsgCid cid.Cid // cid of message that produced event } -func (f *EventFilter) ID() types.FilterID { +func (f *eventFilter) ID() types.FilterID { return f.id } -func (f *EventFilter) SetSubChannel(ch chan<- interface{}) { +func (f *eventFilter) SetSubChannel(ch chan<- interface{}) { f.mu.Lock() defer f.mu.Unlock() f.ch = ch f.collected = nil } -func (f *EventFilter) ClearSubChannel() { +func (f *eventFilter) ClearSubChannel() { f.mu.Lock() defer f.mu.Unlock() f.ch = nil } -func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { +func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error { if !f.matchTipset(te) { return nil } @@ -140,13 +150,13 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } -func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) { +func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) { f.mu.Lock() f.collected = ces f.mu.Unlock() } -func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { +func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { f.mu.Lock() collected := f.collected f.collected = nil @@ -156,14 +166,14 @@ func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent return collected } -func (f *EventFilter) LastTaken() time.Time { +func (f *eventFilter) LastTaken() time.Time { f.mu.Lock() defer f.mu.Unlock() return f.lastTaken } // matchTipset reports whether this filter matches the given tipset -func (f *EventFilter) matchTipset(te *TipSetEvents) bool { +func (f *eventFilter) matchTipset(te *TipSetEvents) bool { if f.tipsetCid != cid.Undef { tsCid, err := te.Cid() if err != nil { @@ -181,7 +191,7 @@ func (f *EventFilter) matchTipset(te *TipSetEvents) bool { return true } -func (f *EventFilter) matchAddress(o address.Address) bool { +func (f *eventFilter) matchAddress(o address.Address) bool { if len(f.addresses) == 0 { return true } @@ -196,8 +206,8 @@ func (f *EventFilter) matchAddress(o address.Address) bool { return false } -func (f *EventFilter) matchKeys(ees []types.EventEntry) bool { - if len(f.keys) == 0 { +func (f *eventFilter) matchKeys(ees []types.EventEntry) bool { + if len(f.keysWithCodec) == 0 { return true } // TODO: optimize this naive algorithm @@ -219,19 +229,19 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool { continue } - wantlist, ok := f.keys[keyname] + wantlist, ok := f.keysWithCodec[keyname] if !ok || len(wantlist) == 0 { continue } for _, w := range wantlist { - if bytes.Equal(w, ee.Value) { + if bytes.Equal(w.Value, ee.Value) && w.Codec == ee.Codec { matched[keyname] = true break } } - if len(matched) == len(f.keys) { + if len(matched) == len(f.keysWithCodec) { // all keys have been matched return true } @@ -299,7 +309,7 @@ type EventFilterManager struct { EventIndex *EventIndex mu sync.Mutex // guards mutations to filters - filters map[types.FilterID]*EventFilter + filters map[types.FilterID]EventFilter currentHeight abi.ChainEpoch } @@ -365,7 +375,8 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) return nil } -func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) { +func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { m.mu.Lock() currentHeight := m.currentHeight m.mu.Unlock() @@ -379,26 +390,26 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a return nil, xerrors.Errorf("new filter id: %w", err) } - f := &EventFilter{ - id: id, - minHeight: minHeight, - maxHeight: maxHeight, - tipsetCid: tipsetCid, - addresses: addresses, - keys: keys, - maxResults: m.MaxFilterResults, + f := &eventFilter{ + id: id, + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + maxResults: m.MaxFilterResults, } if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { // Filter needs historic events - if err := m.EventIndex.PrefillFilter(ctx, f); err != nil { + if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { return nil, err } } m.mu.Lock() if m.filters == nil { - m.filters = make(map[types.FilterID]*EventFilter) + m.filters = make(map[types.FilterID]EventFilter) } m.filters[id] = f m.mu.Unlock() diff --git a/pkg/events/filter/event_test.go b/pkg/events/filter/event_test.go index 093616ea41..92601f2c79 100644 --- a/pkg/events/filter/event_test.go +++ b/pkg/events/filter/event_test.go @@ -21,6 +21,19 @@ import ( "github.com/filecoin-project/venus/venus-shared/types" ) +func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEventBlock { + keysWithCodec := make(map[string][]types.ActorEventBlock) + for k, v := range keys { + for _, vv := range v { + keysWithCodec[k] = append(keysWithCodec[k], types.ActorEventBlock{ + Codec: cid.Raw, + Value: vv, + }) + } + } + return keysWithCodec +} + func TestEventFilterCollectEvents(t *testing.T) { rng := pseudo.New(pseudo.NewSource(299792458)) a1 := randomF4Addr(t, rng) @@ -72,13 +85,13 @@ func TestEventFilterCollectEvents(t *testing.T) { testCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -87,7 +100,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -96,7 +109,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -105,7 +118,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -115,7 +128,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -125,7 +138,7 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -135,124 +148,124 @@ func TestEventFilterCollectEvents(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("cancel"), []byte("propose"), []byte("approval"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("cancel"), []byte("propose"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "method": { []byte("approval"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "signer": { []byte("addr1"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "approver": { []byte("addr1"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "signer": { []byte("addr2"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "amount": { []byte("2988181"), }, - }, + }), }, te: events14000, want: noCollectedEvents, diff --git a/pkg/events/filter/index.go b/pkg/events/filter/index.go index 8cc4a6a259..a536ba1e57 100644 --- a/pkg/events/filter/index.go +++ b/pkg/events/filter/index.go @@ -483,8 +483,8 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } -// PrefillFilter fills a filter's collection of events from the historic index -func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { +// prefillFilter fills a filter's collection of events from the historic index +func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { clauses := []string{} values := []any{} joins := []string{} @@ -512,9 +512,9 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } - if len(f.keys) > 0 { + if len(f.keysWithCodec) > 0 { join := 0 - for key, vals := range f.keys { + for key, vals := range f.keysWithCodec { if len(vals) > 0 { join++ joinAlias := fmt.Sprintf("ee%d", join) @@ -523,8 +523,8 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { values = append(values, key) subclauses := []string{} for _, val := range vals { - subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias)) - values = append(values, val) + subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias)) + values = append(values, val.Value, val.Codec) } clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } diff --git a/pkg/events/filter/index_test.go b/pkg/events/filter/index_test.go index a2ac4f8339..e46395478e 100644 --- a/pkg/events/filter/index_test.go +++ b/pkg/events/filter/index_test.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/types" ) @@ -81,13 +82,13 @@ func TestEventIndexPrefillFilter(t *testing.T) { testCases := []struct { name string - filter *EventFilter + filter *eventFilter te *TipSetEvents want []*CollectedEvent }{ { name: "nomatch tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14001, maxHeight: -1, }, @@ -96,7 +97,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch tipset max height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: 13999, }, @@ -105,7 +106,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match tipset min height", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: 14000, maxHeight: -1, }, @@ -114,7 +115,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match tipset cid", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, tipsetCid: cid14000, @@ -124,7 +125,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "nomatch address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a2}, @@ -134,7 +135,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match address", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, addresses: []address.Address{a1}, @@ -144,124 +145,124 @@ func TestEventIndexPrefillFilter(t *testing.T) { }, { name: "match one entry", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "match one entry with alternate values", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("cancel"), []byte("propose"), []byte("approval"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "nomatch one entry by missing value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("cancel"), []byte("propose"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry by missing key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "method": { []byte("approval"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "match one entry with multiple keys", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "signer": { []byte("addr1"), }, - }, + }), }, te: events14000, want: oneCollectedEvent, }, { name: "nomatch one entry with one mismatching key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "approver": { []byte("addr1"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry with one mismatching value", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "type": { []byte("approval"), }, "signer": { []byte("addr2"), }, - }, + }), }, te: events14000, want: noCollectedEvents, }, { name: "nomatch one entry with one unindexed key", - filter: &EventFilter{ + filter: &eventFilter{ minHeight: -1, maxHeight: -1, - keys: map[string][][]byte{ + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ "amount": { []byte("2988181"), }, - }, + }), }, te: events14000, want: noCollectedEvents, @@ -271,7 +272,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.PrefillFilter(context.Background(), tc.filter); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { require.NoError(t, err, "prefill filter events") } @@ -280,3 +281,619 @@ func TestEventIndexPrefillFilter(t *testing.T) { }) } } + +func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(299792458)) + a1 := randomF4Addr(t, rng) + a2 := randomF4Addr(t, rng) + a3 := randomF4Addr(t, rng) + + a1ID := abi.ActorID(1) + a2ID := abi.ActorID(2) + + addrMap := addressMap{} + addrMap.add(a1ID, a1) + addrMap.add(a2ID, a2) + + ev1 := fakeEvent( + a1ID, + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + []kv{ + {k: "amount", v: []byte("2988181")}, + }, + ) + ev2 := fakeEvent( + a2ID, + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr2")}, + }, + []kv{ + {k: "amount", v: []byte("2988182")}, + }, + ) + + st := newStore() + events := []*types.Event{ev1} + revertedEvents := []*types.Event{ev2} + em := executedMessage{ + msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)), + rct: fakeReceipt(t, rng, st, events), + evs: events, + } + revertedEm := executedMessage{ + msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)), + rct: fakeReceipt(t, rng, st, revertedEvents), + evs: revertedEvents, + } + + events14000 := buildTipSetEvents(t, rng, 14000, em) + revertedEvents14000 := buildTipSetEvents(t, rng, 14000, revertedEm) + cid14000, err := events14000.msgTS.Key().Cid() + require.NoError(t, err, "tipset cid") + reveredCID14000, err := revertedEvents14000.msgTS.Key().Cid() + require.NoError(t, err, "tipset cid") + + noCollectedEvents := []*CollectedEvent{} + oneCollectedEvent := []*CollectedEvent{ + { + Entries: ev1.Entries, + EmitterAddr: a1, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTS.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), + }, + } + twoCollectedEvent := []*CollectedEvent{ + { + Entries: ev1.Entries, + EmitterAddr: a1, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTS.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), + }, + { + Entries: ev2.Entries, + EmitterAddr: a2, + EventIdx: 0, + Reverted: true, + Height: 14000, + TipSetKey: revertedEvents14000.msgTS.Key(), + MsgIdx: 0, + MsgCid: revertedEm.msg.Cid(), + }, + } + oneCollectedRevertedEvent := []*CollectedEvent{ + { + Entries: ev2.Entries, + EmitterAddr: a2, + EventIdx: 0, + Reverted: true, + Height: 14000, + TipSetKey: revertedEvents14000.msgTS.Key(), + MsgIdx: 0, + MsgCid: revertedEm.msg.Cid(), + }, + } + + workDir, err := os.MkdirTemp("", "lotusevents") + require.NoError(t, err, "create temporary work directory") + + defer func() { + _ = os.RemoveAll(workDir) + }() + t.Logf("using work dir %q", workDir) + + dbPath := filepath.Join(workDir, "actorevents.db") + + ei, err := NewEventIndex(context.Background(), dbPath, nil) + require.NoError(t, err, "create event index") + if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil { + require.NoError(t, err, "collect reverted events") + } + if err := ei.CollectEvents(context.Background(), revertedEvents14000, true, addrMap.ResolveAddress); err != nil { + require.NoError(t, err, "revert reverted events") + } + if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + require.NoError(t, err, "collect events") + } + + inclusiveTestCases := []struct { + name string + filter *eventFilter + te *TipSetEvents + want []*CollectedEvent + }{ + { + name: "nomatch tipset min height", + filter: &eventFilter{ + minHeight: 14001, + maxHeight: -1, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch tipset max height", + filter: &eventFilter{ + minHeight: -1, + maxHeight: 13999, + }, + te: events14000, + want: noCollectedEvents, + }, + // { + // name: "match tipset min height", + // filter: &eventFilter{ + // minHeight: 14000, + // maxHeight: -1, + // }, + // te: events14000, + // want: twoCollectedEvent, + // }, + { + name: "match tipset cid", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match tipset cid", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + tipsetCid: reveredCID14000, + }, + te: revertedEvents14000, + want: oneCollectedRevertedEvent, + }, + { + name: "nomatch address", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a3}, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match address 2", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + }, + te: revertedEvents14000, + want: oneCollectedRevertedEvent, + }, + { + name: "match address 1", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + }), + }, + te: events14000, + want: twoCollectedEvent, + }, + { + name: "match one entry with alternate values", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + []byte("approval"), + }, + }), + }, + te: events14000, + want: twoCollectedEvent, + }, + { + name: "nomatch one entry by missing value", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry by missing key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "method": { + []byte("approval"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match one entry with multiple keys", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr1"), + }, + }), + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry with multiple keys", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr2"), + }, + }), + }, + te: revertedEvents14000, + want: oneCollectedRevertedEvent, + }, + { + name: "nomatch one entry with one mismatching key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "approver": { + []byte("addr1"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one mismatching value", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr3"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one unindexed key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "amount": { + []byte("2988181"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one unindexed key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "amount": { + []byte("2988182"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + } + + exclusiveTestCases := []struct { + name string + filter *eventFilter + te *TipSetEvents + want []*CollectedEvent + }{ + { + name: "nomatch tipset min height", + filter: &eventFilter{ + minHeight: 14001, + maxHeight: -1, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch tipset max height", + filter: &eventFilter{ + minHeight: -1, + maxHeight: 13999, + }, + te: events14000, + want: noCollectedEvents, + }, + // { + // name: "match tipset min height", + // filter: &eventFilter{ + // minHeight: 14000, + // maxHeight: -1, + // }, + // te: events14000, + // want: oneCollectedEvent, + // }, + { + name: "match tipset cid", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + }, + te: events14000, + want: oneCollectedEvent, + }, + // { + // name: "match tipset cid but reverted", + // filter: &eventFilter{ + // minHeight: -1, + // maxHeight: -1, + // tipsetCid: reveredCID14000, + // }, + // te: revertedEvents14000, + // want: noCollectedEvents, + // }, + { + name: "nomatch address", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a3}, + }, + te: events14000, + want: noCollectedEvents, + }, + // { + // name: "nomatch address 2 but reverted", + // filter: &eventFilter{ + // minHeight: -1, + // maxHeight: -1, + // addresses: []address.Address{a2}, + // }, + // te: revertedEvents14000, + // want: noCollectedEvents, + // }, + { + name: "match address", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + }, + te: events14000, + want: oneCollectedEvent, + }, + // { + // name: "match one entry", + // filter: &eventFilter{ + // minHeight: -1, + // maxHeight: -1, + // keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + // "type": { + // []byte("approval"), + // }, + // }), + // }, + // te: events14000, + // want: oneCollectedEvent, + // }, + // { + // name: "match one entry with alternate values", + // filter: &eventFilter{ + // minHeight: -1, + // maxHeight: -1, + // keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + // "type": { + // []byte("cancel"), + // []byte("propose"), + // []byte("approval"), + // }, + // }), + // }, + // te: events14000, + // want: oneCollectedEvent, + // }, + { + name: "nomatch one entry by missing value", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry by missing key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "method": { + []byte("approval"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match one entry with multiple keys", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr1"), + }, + }), + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch one entry with one mismatching key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "approver": { + []byte("addr1"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + // { + // name: "nomatch one entry with matching reverted value", + // filter: &eventFilter{ + // minHeight: -1, + // maxHeight: -1, + // keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + // "type": { + // []byte("approval"), + // }, + // "signer": { + // []byte("addr2"), + // }, + // }), + // }, + // te: events14000, + // want: noCollectedEvents, + // }, + { + name: "nomatch one entry with one mismatching value", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr3"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one unindexed key", + filter: &eventFilter{ + minHeight: -1, + maxHeight: -1, + keysWithCodec: keysToKeysWithCodec(map[string][][]byte{ + "amount": { + []byte("2988181"), + }, + }), + }, + te: events14000, + want: noCollectedEvents, + }, + } + + for _, tc := range inclusiveTestCases { + tc := tc // appease lint + t.Run(tc.name, func(t *testing.T) { + if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { + require.NoError(t, err, "prefill filter events") + } + + coll := tc.filter.TakeCollectedEvents(context.Background()) + require.ElementsMatch(t, coll, tc.want, tc.name) + }) + } + + for _, tc := range exclusiveTestCases { + tc := tc // appease lint + t.Run(tc.name, func(t *testing.T) { + if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil { + require.NoError(t, err, "prefill filter events") + } + + coll := tc.filter.TakeCollectedEvents(context.Background()) + require.ElementsMatch(t, coll, tc.want, tc.name) + }) + } +} diff --git a/venus-devtool/api-gen/example.go b/venus-devtool/api-gen/example.go index 4aa443b272..ed2e307a44 100644 --- a/venus-devtool/api-gen/example.go +++ b/venus-devtool/api-gen/example.go @@ -296,6 +296,25 @@ func init() { Address: []types.EthAddress{ethaddr}, }) + addExample(&types.ActorEventBlock{ + Codec: 0x51, + Value: []byte("ddata"), + }) + + addExample(&types.ActorEventFilter{ + Addresses: []address.Address{addr}, + Fields: map[string][]types.ActorEventBlock{ + "abc": { + { + Codec: 0x51, + Value: []byte("ddata"), + }, + }, + }, + FromHeight: epochPtr(1010), + ToHeight: epochPtr(1020), + }) + percent := types.Percent(123) addExample(percent) addExample(&percent) @@ -386,3 +405,8 @@ func shouldIgnoreField(f reflect.StructField, parentType reflect.Type) bool { return strings.Split(jtag, ",")[0] == "-" } + +func epochPtr(ei int64) *abi.ChainEpoch { + ep := abi.ChainEpoch(ei) + return &ep +} diff --git a/venus-devtool/go.mod b/venus-devtool/go.mod index 8f4466aedd..63832cf359 100644 --- a/venus-devtool/go.mod +++ b/venus-devtool/go.mod @@ -9,8 +9,9 @@ require ( github.com/filecoin-project/go-fil-markets v1.28.3 github.com/filecoin-project/go-jsonrpc v0.3.1 github.com/filecoin-project/go-state-types v0.13.0-rc.2 - github.com/filecoin-project/lotus v1.25.3-0.20240227185223-01ec166e3a9e + github.com/filecoin-project/lotus v1.26.0-rc1 github.com/filecoin-project/venus v0.0.0-00010101000000-000000000000 + github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 @@ -92,7 +93,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.1.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect diff --git a/venus-devtool/go.sum b/venus-devtool/go.sum index 5906b7686c..47fb39129c 100644 --- a/venus-devtool/go.sum +++ b/venus-devtool/go.sum @@ -264,8 +264,8 @@ github.com/filecoin-project/go-statestore v0.2.0 h1:cRRO0aPLrxKQCZ2UOQbzFGn4WDNd github.com/filecoin-project/go-statestore v0.2.0/go.mod h1:8sjBYbS35HwPzct7iT4lIXjLlYyPor80aU7t7a/Kspo= github.com/filecoin-project/kubo-api-client v0.0.1 h1:IR1b+sm+VYxSRvbgECVv9SbhIgygcXcSoN1Q7xsHDXg= github.com/filecoin-project/kubo-api-client v0.0.1/go.mod h1:c36PPMIVOkKfHDwDG5U05gUlPRY9wNuh/BePwo0e+6Y= -github.com/filecoin-project/lotus v1.25.3-0.20240227185223-01ec166e3a9e h1:P/dO3xDWRGOmmYSy4Baogg7Bd3m9O+69sIFym1JWCVA= -github.com/filecoin-project/lotus v1.25.3-0.20240227185223-01ec166e3a9e/go.mod h1:8TFqfu2kdzTjT/+e/o6SYBOxvGAQOMwX7UNu4BZDUTU= +github.com/filecoin-project/lotus v1.26.0-rc1 h1:GpqEmM3J+BX9ZvNF/KTdrvNMWPZs6vKFR6l5kAHpUwk= +github.com/filecoin-project/lotus v1.26.0-rc1/go.mod h1:6FC17T8tgqFKiTGXNAuy4HdsHcksuk0HyjWDF8my+aA= github.com/filecoin-project/pubsub v1.0.0 h1:ZTmT27U07e54qV1mMiQo4HDr0buo8I1LDHBYLXlsNXM= github.com/filecoin-project/pubsub v1.0.0/go.mod h1:GkpB33CcUtUNrLPhJgfdy4FDx4OMNR9k+46DHx/Lqrg= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= diff --git a/venus-shared/actors/types/eth.go b/venus-shared/actors/types/eth.go index 62eaf565a4..448527f13a 100644 --- a/venus-shared/actors/types/eth.go +++ b/venus-shared/actors/types/eth.go @@ -655,7 +655,7 @@ type EthFilterSpec struct { Topics EthTopicSpec `json:"topics"` // Restricts event logs returned to those emitted from messages contained in this tipset. - // If BlockHash is present in in the filter criteria, then neither FromBlock nor ToBlock are allowed. + // If BlockHash is present in the filter criteria, then neither FromBlock nor ToBlock are allowed. // Added in EIP-234 BlockHash *EthHash `json:"blockHash,omitempty"` } diff --git a/venus-shared/api/chain/v1/actor_event.go b/venus-shared/api/chain/v1/actor_event.go new file mode 100644 index 0000000000..aa1472079e --- /dev/null +++ b/venus-shared/api/chain/v1/actor_event.go @@ -0,0 +1,36 @@ +package v1 + +import ( + "context" + + "github.com/filecoin-project/venus/venus-shared/types" +) + +type IActorEvent interface { + // Actor events + + // GetActorEvents returns all user-programmed and built-in actor events that match the given + // filter. + // This is a request/response API. + // Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange + // configuration options and also the amount of historical data available in the node. + // + // This is an EXPERIMENTAL API and may be subject to change. + GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read + + // SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor + // events that match the given filter. + // Events that match the given filter are written to the stream in real-time as they are emitted + // from the FVM. + // The response stream is closed when the client disconnects, when a ToHeight is specified and is + // reached, or if there is an error while writing an event to the stream. + // This API also allows clients to read all historical events matching the given filter before any + // real-time events are written to the response stream if the filter specifies an earlier + // FromHeight. + // Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange + // configuration options and also the amount of historical data available in the node. + // + // Note: this API is only available via websocket connections. + // This is an EXPERIMENTAL API and may be subject to change. + SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read +} diff --git a/venus-shared/api/chain/v1/fullnode.go b/venus-shared/api/chain/v1/fullnode.go index 8afc3effb4..9a363b9f88 100644 --- a/venus-shared/api/chain/v1/fullnode.go +++ b/venus-shared/api/chain/v1/fullnode.go @@ -12,4 +12,5 @@ type FullNode interface { IWallet ICommon FullETH + IActorEvent } diff --git a/venus-shared/api/chain/v1/method.md b/venus-shared/api/chain/v1/method.md index 462a04feca..06dc2d1a00 100644 --- a/venus-shared/api/chain/v1/method.md +++ b/venus-shared/api/chain/v1/method.md @@ -11,6 +11,9 @@ curl http://:/rpc/v1 -X POST -H "Content-Type: application/json" -H " * [Actor](#actor) * [ListActor](#listactor) * [StateGetActor](#stategetactor) +* [ActorEvent](#actorevent) + * [GetActorEvents](#getactorevents) + * [SubscribeActorEvents](#subscribeactorevents) * [BlockStore](#blockstore) * [ChainDeleteObj](#chaindeleteobj) * [ChainHasObj](#chainhasobj) @@ -325,6 +328,133 @@ Response: } ``` +## ActorEvent + +### GetActorEvents +Actor events + + +Perms: read + +Inputs: +```json +[ + { + "addresses": [ + "f01234" + ], + "fields": { + "abc": [ + { + "codec": 81, + "value": "ZGRhdGE=" + } + ] + }, + "fromHeight": 1010, + "toHeight": 1020 + } +] +``` + +Response: +```json +[ + { + "entries": [ + { + "Flags": 7, + "Key": "string value", + "Codec": 42, + "Value": "Ynl0ZSBhcnJheQ==" + } + ], + "emitter": "f01234", + "reverted": true, + "height": 10101, + "tipsetKey": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + "msgCid": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + } +] +``` + +### SubscribeActorEvents +SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor +events that match the given filter. +Events that match the given filter are written to the stream in real-time as they are emitted +from the FVM. +The response stream is closed when the client disconnects, when a ToHeight is specified and is +reached, or if there is an error while writing an event to the stream. +This API also allows clients to read all historical events matching the given filter before any +real-time events are written to the response stream if the filter specifies an earlier +FromHeight. +Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange +configuration options and also the amount of historical data available in the node. + +Note: this API is only available via websocket connections. +This is an EXPERIMENTAL API and may be subject to change. + + +Perms: read + +Inputs: +```json +[ + { + "addresses": [ + "f01234" + ], + "fields": { + "abc": [ + { + "codec": 81, + "value": "ZGRhdGE=" + } + ] + }, + "fromHeight": 1010, + "toHeight": 1020 + } +] +``` + +Response: +```json +{ + "entries": [ + { + "Flags": 7, + "Key": "string value", + "Codec": 42, + "Value": "Ynl0ZSBhcnJheQ==" + } + ], + "emitter": "f01234", + "reverted": true, + "height": 10101, + "tipsetKey": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + "msgCid": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +} +``` + ## BlockStore ### ChainDeleteObj diff --git a/venus-shared/api/chain/v1/mock/mock_fullnode.go b/venus-shared/api/chain/v1/mock/mock_fullnode.go index 37462e3922..0af5ca0d13 100644 --- a/venus-shared/api/chain/v1/mock/mock_fullnode.go +++ b/venus-shared/api/chain/v1/mock/mock_fullnode.go @@ -1145,6 +1145,21 @@ func (mr *MockFullNodeMockRecorder) GetActor(arg0, arg1 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActor", reflect.TypeOf((*MockFullNode)(nil).GetActor), arg0, arg1) } +// GetActorEvents mocks base method. +func (m *MockFullNode) GetActorEvents(arg0 context.Context, arg1 *types0.ActorEventFilter) ([]*types0.ActorEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetActorEvents", arg0, arg1) + ret0, _ := ret[0].([]*types0.ActorEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetActorEvents indicates an expected call of GetActorEvents. +func (mr *MockFullNodeMockRecorder) GetActorEvents(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActorEvents", reflect.TypeOf((*MockFullNode)(nil).GetActorEvents), arg0, arg1) +} + // GetEntry mocks base method. func (m *MockFullNode) GetEntry(arg0 context.Context, arg1 abi.ChainEpoch, arg2 uint64) (*types0.BeaconEntry, error) { m.ctrl.T.Helper() @@ -3230,6 +3245,21 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockFullNode)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4) } +// SubscribeActorEvents mocks base method. +func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types0.ActorEventFilter) (<-chan *types0.ActorEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1) + ret0, _ := ret[0].(<-chan *types0.ActorEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubscribeActorEvents indicates an expected call of SubscribeActorEvents. +func (mr *MockFullNodeMockRecorder) SubscribeActorEvents(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeActorEvents", reflect.TypeOf((*MockFullNode)(nil).SubscribeActorEvents), arg0, arg1) +} + // SyncIncomingBlocks mocks base method. func (m *MockFullNode) SyncIncomingBlocks(arg0 context.Context) (<-chan *types0.BlockHeader, error) { m.ctrl.T.Helper() diff --git a/venus-shared/api/chain/v1/proxy_gen.go b/venus-shared/api/chain/v1/proxy_gen.go index f26e017e13..1f9b96ede5 100644 --- a/venus-shared/api/chain/v1/proxy_gen.go +++ b/venus-shared/api/chain/v1/proxy_gen.go @@ -1048,6 +1048,20 @@ type FullETHStruct struct { IETHEventStruct } +type IActorEventStruct struct { + Internal struct { + GetActorEvents func(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) `perm:"read"` + SubscribeActorEvents func(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"` + } +} + +func (s *IActorEventStruct) GetActorEvents(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) { + return s.Internal.GetActorEvents(p0, p1) +} +func (s *IActorEventStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { + return s.Internal.SubscribeActorEvents(p0, p1) +} + type FullNodeStruct struct { IBlockStoreStruct IChainStruct @@ -1060,4 +1074,5 @@ type FullNodeStruct struct { IWalletStruct ICommonStruct FullETHStruct + IActorEventStruct } diff --git a/venus-shared/compatible-checks/api-checksum.txt b/venus-shared/compatible-checks/api-checksum.txt index 809dc09beb..1261ae0c68 100644 --- a/venus-shared/compatible-checks/api-checksum.txt +++ b/venus-shared/compatible-checks/api-checksum.txt @@ -331,6 +331,7 @@ api.FullNode: GasEstimateGasLimit: In=3, Out=2, CheckSum=4d1bd57eef0ee90d4c2e89f097d0604d GasEstimateGasPremium: In=5, Out=2, CheckSum=550724ed37e2fdaa64e55147e82214b1 GasEstimateMessageGas: In=4, Out=2, CheckSum=6ff6179b579feed33897d96429504624 + GetActorEvents: In=2, Out=2, CheckSum=e0c47876fd090c7125f3610b99e0dd27 ID: In=1, Out=2, CheckSum=1635810444d2b13b381cbefece853ba7 LogAlerts: In=1, Out=2, CheckSum=c9262fa7c93e891ec80868e0b83a2222 LogList: In=1, Out=2, CheckSum=c6d763b6ec7190283b7c648e735725c0 @@ -488,6 +489,7 @@ api.FullNode: StateVerifiedRegistryRootKey: In=2, Out=2, CheckSum=5ad3a497ee24e321c780a69b8d2f0936 StateVerifierStatus: In=3, Out=2, CheckSum=e33ae4cd2315832f2d6f2aa74b68c34e StateWaitMsg: In=5, Out=2, CheckSum=561c18d1417310b5cd35cfffb0b75a00 + SubscribeActorEvents: In=2, Out=2, CheckSum=af75eb1b19217696987fdf472121561c SyncCheckBad: In=2, Out=2, CheckSum=ba06470da0ca1d6cc2f9ada7f0288a6c SyncCheckpoint: In=2, Out=1, CheckSum=cdfe593ac791e823186abb77bfad49a0 SyncIncomingBlocks: In=1, Out=2, CheckSum=f6ad051ba2ce73511f74f9c08032acc3 diff --git a/venus-shared/types/actor_event.go b/venus-shared/types/actor_event.go new file mode 100644 index 0000000000..bf95189e19 --- /dev/null +++ b/venus-shared/types/actor_event.go @@ -0,0 +1,67 @@ +package types + +import ( + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" +) + +type ActorEventBlock struct { + // The value codec to match when filtering event values. + Codec uint64 `json:"codec"` + + // The value to want to match on associated with the corresponding "event key" + // when filtering events. + // Should be a byte array encoded with the specified codec. + // Assumes base64 encoding when converting to/from JSON strings. + Value []byte `json:"value"` +} + +type ActorEventFilter struct { + // Matches events from one of these actors, or any actor if empty. + // For now, this MUST be a Filecoin address. + Addresses []address.Address `json:"addresses,omitempty"` + + // Matches events with the specified key/values, or all events if empty. + // If the value is an empty slice, the filter will match on the key only, accepting any value. + Fields map[string][]ActorEventBlock `json:"fields,omitempty"` + + // The height of the earliest tipset to include in the query. If empty, the query starts at the + // last finalized tipset. + // NOTE: In a future upgrade, this will be strict when set and will result in an error if a filter + // cannot be fulfilled by the depth of history available in the node. Currently, the node will + // nott return an error, but will return starting from the epoch it has data for. + FromHeight *abi.ChainEpoch `json:"fromHeight,omitempty"` + + // The height of the latest tipset to include in the query. If empty, the query ends at the + // latest tipset. + ToHeight *abi.ChainEpoch `json:"toHeight,omitempty"` + + // Restricts events returned to those emitted from messages contained in this tipset. + // If `TipSetKey` is legt empty in the filter criteria, then neither `FromHeight` nor `ToHeight` are allowed. + TipSetKey *TipSetKey `json:"tipsetKey,omitempty"` +} + +type ActorEvent struct { + // Event entries in log form. + Entries []EventEntry `json:"entries"` + + // Filecoin address of the actor that emitted this event. + // NOTE: In a future upgrade, this will change to always be an ID address. Currently this will be + // either the f4 address, or ID address if an f4 is not available for this actor. + Emitter address.Address `json:"emitter"` + + // Reverted is set to true if the message that produced this event was reverted because of a network re-org + // in that case, the event should be considered as reverted as well. + Reverted bool `json:"reverted"` + + // Height of the tipset that contained the message that produced this event. + Height abi.ChainEpoch `json:"height"` + + // The tipset that contained the message that produced this event. + TipSetKey TipSetKey `json:"tipsetKey"` + + // CID of message that produced this event. + MsgCid cid.Cid `json:"msgCid"` +} diff --git a/venus-shared/types/actor_event_test.go b/venus-shared/types/actor_event_test.go new file mode 100644 index 0000000000..29e30c9c78 --- /dev/null +++ b/venus-shared/types/actor_event_test.go @@ -0,0 +1,125 @@ +package types + +import ( + "encoding/json" + pseudo "math/rand" + "testing" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + builtintypes "github.com/filecoin-project/go-state-types/builtin" +) + +func TestJSONMarshalling(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(0)) + t.Run("actor event with entries", + testJsonMarshalling( + ActorEvent{ + Entries: []EventEntry{ + { + Key: "key1", + Codec: 0x51, + Value: []byte("value1"), + }, + { + Key: "key2", + Codec: 0x52, + Value: []byte("value2"), + }, + }, + Emitter: randomF4Addr(t, rng), + Reverted: false, + Height: 1001, + TipSetKey: NewTipSetKey(randomCid(t, rng)), + MsgCid: randomCid(t, rng), + }, + `{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"t410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetKey":[{"/":"bafkqacx3dag26sfht3qlcdi"}],"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}}`, + ), + ) + + t.Run("actor event filter", + testJsonMarshalling( + ActorEventFilter{ + Addresses: []address.Address{ + randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), + randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), + }, + Fields: map[string][]ActorEventBlock{ + "key1": { + { + Codec: 0x51, + Value: []byte("value1"), + }, + }, + "key2": { + { + Codec: 0x52, + Value: []byte("value2"), + }, + }, + }, + FromHeight: heightOf(0), + ToHeight: heightOf(100), + TipSetKey: randomTipSetKey(t, rng), + }, + `{"addresses":["t410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","t410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromHeight":0,"toHeight":100,"tipsetKey":[{"/":"bafkqacxcqxwocuiukv4aq5i"}]}`, + ), + ) + t.Run("actor event block", + testJsonMarshalling( + ActorEventBlock{ + Codec: 1, + Value: []byte("test"), + }, + `{"codec":1,"value":"dGVzdA=="}`, + ), + ) +} + +func testJsonMarshalling[V ActorEvent | ActorEventBlock | ActorEventFilter](subject V, expect string) func(t *testing.T) { + return func(t *testing.T) { + gotMarshalled, err := json.Marshal(subject) + require.NoError(t, err) + require.JSONEqf(t, expect, string(gotMarshalled), "serialization mismatch") + var gotUnmarshalled V + require.NoError(t, json.Unmarshal([]byte(expect), &gotUnmarshalled)) + require.Equal(t, subject, gotUnmarshalled) + } +} + +func heightOf(h int64) *abi.ChainEpoch { + hp := abi.ChainEpoch(h) + return &hp +} + +func randomTipSetKey(tb testing.TB, rng *pseudo.Rand) *TipSetKey { + tb.Helper() + tk := NewTipSetKey(randomCid(tb, rng)) + return &tk +} + +func randomF4Addr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewDelegatedAddress(builtintypes.EthereumAddressManagerActorID, randomBytes(32, rng)) + require.NoError(tb, err) + + return addr +} + +func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { + tb.Helper() + cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := cb.Sum(randomBytes(10, rng)) + require.NoError(tb, err) + return c +} + +func randomBytes(n int, rng *pseudo.Rand) []byte { + buf := make([]byte, n) + rng.Read(buf) + return buf +} diff --git a/venus-shared/types/event.go b/venus-shared/types/event.go index 106a120e21..5f6415d49e 100644 --- a/venus-shared/types/event.go +++ b/venus-shared/types/event.go @@ -28,7 +28,7 @@ type EventEntry struct { // The event value's codec Codec uint64 - // The event value + // The event value. It is encoded using the codec specified above Value []byte }