From 7b6d76389a18830260292b1c0dfedf12fa1f4aa7 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 30 Jan 2024 21:02:44 +1100 Subject: [PATCH 1/6] test: DDO onboarding non-market verified data --- itests/direct_data_onboard_test.go | 218 +++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 90dde120767..123221f2fee 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -4,25 +4,37 @@ import ( "bytes" "context" "crypto/rand" + "fmt" "testing" "time" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-commp-utils/nonffi" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin" + minertypes13 "github.com/filecoin-project/go-state-types/builtin/v13/miner" + verifregtypes13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg" + datacap2 "github.com/filecoin-project/go-state-types/builtin/v9/datacap" market2 "github.com/filecoin-project/go-state-types/builtin/v9/market" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/lotus/api" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/datacap" "github.com/filecoin-project/lotus/chain/actors/builtin/market" minertypes "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg" "github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/node/config" @@ -102,6 +114,212 @@ func TestOnboardRawPiece(t *testing.T) { require.Equal(t, dc.PieceCID, *si.CommD) } +func TestOnboardRawPieceVerified(t *testing.T) { + kit.QuietMiningLogs() + + var ( + blocktime = 2 * time.Millisecond + ctx = context.Background() + ) + + rootKey, err := key.GenerateKey(types.KTSecp256k1) + require.NoError(t, err) + + verifier1Key, err := key.GenerateKey(types.KTSecp256k1) + require.NoError(t, err) + + verifiedClientKey, err := key.GenerateKey(types.KTBLS) + require.NoError(t, err) + + bal, err := types.ParseFIL("100fil") + require.NoError(t, err) + + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), + kit.RootVerifier(rootKey, abi.NewTokenAmount(bal.Int64())), + kit.Account(verifier1Key, abi.NewTokenAmount(bal.Int64())), + kit.Account(verifiedClientKey, abi.NewTokenAmount(bal.Int64())), + ) + + ens.InterconnectAll().BeginMiningMustPost(blocktime) + + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsListNonGenesis(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + snum := sl[0] + + maddr, err := miner.ActorAddress(ctx) + require.NoError(t, err) + + client.WaitForSectorActive(ctx, t, snum, maddr) + + pieceSize := abi.PaddedPieceSize(2048).Unpadded() + pieceData := make([]byte, pieceSize) + _, _ = rand.Read(pieceData) + + dc, err := miner.ComputeDataCid(ctx, pieceSize, bytes.NewReader(pieceData)) + require.NoError(t, err) + + // get VRH + vrh, err := client.StateVerifiedRegistryRootKey(ctx, types.TipSetKey{}) + fmt.Println(vrh.String()) + require.NoError(t, err) + + // import the root key. + rootAddr, err := client.WalletImport(ctx, &rootKey.KeyInfo) + require.NoError(t, err) + + // import the verifiers' keys. + verifier1Addr, err := client.WalletImport(ctx, &verifier1Key.KeyInfo) + require.NoError(t, err) + + // import the verified client's key. + verifiedClientAddr, err := client.WalletImport(ctx, &verifiedClientKey.KeyInfo) + require.NoError(t, err) + + // make the 2 verifiers + + mkVerifier(ctx, t, client.FullNode.(*api.FullNodeStruct), rootAddr, verifier1Addr) + + // assign datacap to a client + initialDatacap := big.NewInt(10000) + + params, err := actors.SerializeParams(&verifregtypes13.AddVerifiedClientParams{Address: verifiedClientAddr, Allowance: initialDatacap}) + require.NoError(t, err) + + msg := &types.Message{ + From: verifier1Addr, + To: verifreg.Address, + Method: verifreg.Methods.AddVerifiedClient, + Params: params, + Value: big.Zero(), + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + res, err := client.StateWaitMsg(ctx, sm.Cid(), 1, lapi.LookbackNoLimit, true) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + minerId, err := address.IDFromAddress(miner.ActorAddr) + require.NoError(t, err) + + allocationRequest := verifregtypes13.AllocationRequest{ + Provider: abi.ActorID(minerId), + Data: dc.PieceCID, + Size: dc.Size, + TermMin: verifregtypes13.MinimumVerifiedAllocationTerm, + TermMax: verifregtypes13.MaximumVerifiedAllocationTerm, + Expiration: verifregtypes13.MaximumVerifiedAllocationExpiration, + } + + allocationRequests := verifregtypes13.AllocationRequests{ + Allocations: []verifregtypes13.AllocationRequest{allocationRequest}, + } + + receiverParams, err := actors.SerializeParams(&allocationRequests) + require.NoError(t, err) + + transferParams, err := actors.SerializeParams(&datacap2.TransferParams{ + To: builtin.VerifiedRegistryActorAddr, + Amount: big.Mul(big.NewInt(int64(dc.Size)), builtin.TokenPrecision), + OperatorData: receiverParams, + }) + require.NoError(t, err) + + msg = &types.Message{ + To: builtin.DatacapActorAddr, + From: verifiedClientAddr, + Method: datacap.Methods.TransferExported, + Params: transferParams, + Value: big.Zero(), + } + + sm, err = client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + res, err = client.StateWaitMsg(ctx, sm.Cid(), 1, lapi.LookbackNoLimit, true) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + allocations, err := client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) + require.NoError(t, err) + + require.Equal(t, 1, len(allocations)) + + var allocationId verifregtypes13.AllocationId + var clientId abi.ActorID + for key, value := range allocations { + allocationId = verifregtypes13.AllocationId(key) + clientId = value.Client + break + } + + head, err := client.ChainHead(ctx) + require.NoError(t, err) + + so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{ + PublishCid: nil, + DealID: 0, + DealProposal: nil, + DealSchedule: piece.DealSchedule{ + StartEpoch: head.Height() + 2880*2, + EndEpoch: head.Height() + 2880*400, + }, + KeepUnsealed: true, + PieceActivationManifest: &minertypes.PieceActivationManifest{ + CID: dc.PieceCID, + Size: dc.Size, + VerifiedAllocationKey: &minertypes13.VerifiedAllocationKey{Client: clientId, ID: allocationId}, + Notify: nil, + }, + }) + require.NoError(t, err) + + // wait for sector to commit + miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{ + so.Sector: {}, + }) + + si, err := miner.SectorsStatus(ctx, so.Sector, true) + require.NoError(t, err) + require.Equal(t, dc.PieceCID, *si.CommD) + + require.Equal(t, si.DealWeight, big.Zero()) + require.Equal(t, si.VerifiedDealWeight, big.Mul(big.NewInt(int64(dc.Size)), big.NewInt(int64(si.Expiration-si.Activation)))) + + allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) + require.NoError(t, err) + require.Len(t, allocations, 0) +} + +func mkVerifier(ctx context.Context, t *testing.T, api *api.FullNodeStruct, rootAddr address.Address, addr address.Address) { + allowance := big.NewInt(100000000000) + params, aerr := actors.SerializeParams(&verifregtypes13.AddVerifierParams{Address: addr, Allowance: allowance}) + require.NoError(t, aerr) + + msg := &types.Message{ + From: rootAddr, + To: verifreg.Address, + Method: verifreg.Methods.AddVerifier, + Params: params, + Value: big.Zero(), + } + + sm, err := api.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err, "AddVerifier failed") + + res, err := api.StateWaitMsg(ctx, sm.Cid(), 1, lapi.LookbackNoLimit, true) + require.NoError(t, err) + require.EqualValues(t, 0, res.Receipt.ExitCode) + + verifierAllowance, err := api.StateVerifierStatus(ctx, addr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, allowance, *verifierAllowance) +} + func makeMarketDealProposal(t *testing.T, client *kit.TestFullNode, miner *kit.TestMiner, data cid.Cid, ps abi.PaddedPieceSize, start, end abi.ChainEpoch) market2.ClientDealProposal { ca, err := client.WalletDefaultAddress(context.Background()) require.NoError(t, err) From ff17d7f5edec963cca0b273611214f2193c6ef0d Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 1 Feb 2024 19:50:45 +1100 Subject: [PATCH 2/6] TEMP: extract events --- itests/direct_data_onboard_test.go | 102 +++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 123221f2fee..4dfb9084028 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -5,10 +5,14 @@ import ( "context" "crypto/rand" "fmt" + "os" "testing" "time" "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" @@ -140,6 +144,23 @@ func TestOnboardRawPieceVerified(t *testing.T) { kit.Account(verifiedClientKey, abi.NewTokenAmount(bal.Int64())), ) + evtChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + Filter: types.ActorEventFilter{ + MinEpoch: -1, + MaxEpoch: -1, + }, + Prefill: true, + }) + require.NoError(t, err) + + events := make([]types.ActorEvent, 0) + go func() { + for e := range evtChan { + fmt.Printf("%s Got ActorEvent: %+v", time.Now().Format(time.StampMilli), e) + events = append(events, *e) + } + }() + ens.InterconnectAll().BeginMiningMustPost(blocktime) miner.PledgeSectors(ctx, 1, 0, nil) @@ -293,6 +314,87 @@ func TestOnboardRawPieceVerified(t *testing.T) { allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) require.NoError(t, err) require.Len(t, allocations, 0) + + evts, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ + MinEpoch: -1, + MaxEpoch: -1, + }) + require.NoError(t, err) + + for _, evt := range evts { + fmt.Printf("Got ActorEvent: %+v", evt) + } + + blockOutFile, err := os.Create("block.out") + require.NoError(t, err) + write := func(s string) { + _, err := blockOutFile.WriteString(s) + require.NoError(t, err) + } + + head, err = miner.FullNode.ChainHead(ctx) + require.NoError(t, err) + for height := 0; height < int(head.Height()); height++ { + // for each tipset + ts, err := miner.FullNode.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK) + require.NoError(t, err) + for _, b := range ts.Blocks() { + if b.Miner == miner.ActorAddr { + // for each block + // alternative here is to go straight to receipts: miner.FullNode.ChainGetParentReceipts(ctx, b.Cid()) + messages, err := miner.FullNode.ChainGetParentMessages(ctx, b.Cid()) + require.NoError(t, err) + if len(messages) == 0 { + continue + } + write(fmt.Sprintf("Height=%d Block=%s:", height, b.Cid())) + for _, parent := range b.Parents { + write(fmt.Sprintf(" %s", parent)) + } + write("\n") + for _, m := range messages { + // for each message + write(fmt.Sprintf(" Message=%s: %s -> %s, %d\n", m.Cid, m.Message.From, m.Message.To, m.Message.Method)) + receipt, err := miner.FullNode.StateSearchMsg(ctx, ts.Key(), m.Cid, -1, false) + require.NoError(t, err) + // receipt + write(fmt.Sprintf(" Receipt Exit=%d, Gas=%d, Return=0x%x\n", receipt.Receipt.ExitCode, receipt.Receipt.GasUsed, receipt.Receipt.Return)) + if receipt.Receipt.EventsRoot == nil { + write(fmt.Sprintln(" No events")) + } else { + // receipt + events, err := miner.FullNode.ChainGetEvents(ctx, *receipt.Receipt.EventsRoot) + require.NoError(t, err) + for ii, evt := range events { + // for each event + addr, err := address.NewIDAddress(uint64(evt.Emitter)) + require.NoError(t, err) + write(fmt.Sprintf(" Event=%d (%s):\n", ii, addr)) + for _, e := range evt.Entries { + // for each event entry + write(fmt.Sprintf(" Entry=0x%x, 0x%x, %s=%s\n", e.Codec, e.Flags, e.Key, toDagJson(t, e.Codec, e.Value))) + } + } + } + } + } + } + } + require.NoError(t, blockOutFile.Close()) + fmt.Println("Wrote block.out") +} + +func toDagJson(t *testing.T, codec uint64, data []byte) string { + switch codec { + case 0x51: + nd, err := ipld.Decode(data, dagcbor.Decode) + require.NoError(t, err) + byts, err := ipld.Encode(nd, dagjson.Encode) + require.NoError(t, err) + return string(byts) + default: + return fmt.Sprintf("0x%x", data) + } } func mkVerifier(ctx context.Context, t *testing.T, api *api.FullNodeStruct, rootAddr address.Address, addr address.Address) { From b54d1112d2d753d728d880c0d302536815eb8ea0 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 2 Feb 2024 17:43:34 +1100 Subject: [PATCH 3/6] Extract buildActorEventsFromMessages and separate event printing code --- itests/direct_data_onboard_test.go | 149 ++++++++++++++++++++--------- 1 file changed, 106 insertions(+), 43 deletions(-) diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 4dfb9084028..8377caecd23 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "fmt" "os" + "strings" "testing" "time" @@ -13,6 +14,8 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/node/bindnode" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" @@ -25,11 +28,13 @@ import ( verifregtypes13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg" datacap2 "github.com/filecoin-project/go-state-types/builtin/v9/datacap" market2 "github.com/filecoin-project/go-state-types/builtin/v9/market" + verifregtypes9 "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/datacap" "github.com/filecoin-project/lotus/chain/actors/builtin/market" @@ -320,68 +325,126 @@ func TestOnboardRawPieceVerified(t *testing.T) { MaxEpoch: -1, }) require.NoError(t, err) - for _, evt := range evts { fmt.Printf("Got ActorEvent: %+v", evt) } - blockOutFile, err := os.Create("block.out") - require.NoError(t, err) - write := func(s string) { - _, err := blockOutFile.WriteString(s) - require.NoError(t, err) + eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode) + writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages) + for _, evt := range evts { + fmt.Printf("Got ActorEvent from messages: %+v", evt) } - head, err = miner.FullNode.ChainHead(ctx) + // TODO: compare GetActorEvents & SubscribeActorEvents & eventsFromMessages for equality +} + +func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.FullNode) []types.ActorEvent { + actorEvents := make([]types.ActorEvent, 0) + + head, err := node.ChainHead(ctx) require.NoError(t, err) for height := 0; height < int(head.Height()); height++ { // for each tipset - ts, err := miner.FullNode.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK) + ts, err := node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK) require.NoError(t, err) for _, b := range ts.Blocks() { - if b.Miner == miner.ActorAddr { - // for each block - // alternative here is to go straight to receipts: miner.FullNode.ChainGetParentReceipts(ctx, b.Cid()) - messages, err := miner.FullNode.ChainGetParentMessages(ctx, b.Cid()) + // for each block + // alternative here is to go straight to receipts, but we need the message CID for our event + // list: node.ChainGetParentReceipts(ctx, b.Cid()) + messages, err := node.ChainGetParentMessages(ctx, b.Cid()) + require.NoError(t, err) + if len(messages) == 0 { + continue + } + for _, m := range messages { + receipt, err := node.StateSearchMsg(ctx, ts.Key(), m.Cid, -1, false) require.NoError(t, err) - if len(messages) == 0 { - continue - } - write(fmt.Sprintf("Height=%d Block=%s:", height, b.Cid())) - for _, parent := range b.Parents { - write(fmt.Sprintf(" %s", parent)) - } - write("\n") - for _, m := range messages { - // for each message - write(fmt.Sprintf(" Message=%s: %s -> %s, %d\n", m.Cid, m.Message.From, m.Message.To, m.Message.Method)) - receipt, err := miner.FullNode.StateSearchMsg(ctx, ts.Key(), m.Cid, -1, false) + // receipt + if receipt.Receipt.EventsRoot != nil { + events, err := node.ChainGetEvents(ctx, *receipt.Receipt.EventsRoot) require.NoError(t, err) - // receipt - write(fmt.Sprintf(" Receipt Exit=%d, Gas=%d, Return=0x%x\n", receipt.Receipt.ExitCode, receipt.Receipt.GasUsed, receipt.Receipt.Return)) - if receipt.Receipt.EventsRoot == nil { - write(fmt.Sprintln(" No events")) - } else { - // receipt - events, err := miner.FullNode.ChainGetEvents(ctx, *receipt.Receipt.EventsRoot) + for _, evt := range events { + // for each event + addr, err := address.NewIDAddress(uint64(evt.Emitter)) + require.NoError(t, err) + tsCid, err := ts.Key().Cid() require.NoError(t, err) - for ii, evt := range events { - // for each event - addr, err := address.NewIDAddress(uint64(evt.Emitter)) - require.NoError(t, err) - write(fmt.Sprintf(" Event=%d (%s):\n", ii, addr)) - for _, e := range evt.Entries { - // for each event entry - write(fmt.Sprintf(" Entry=0x%x, 0x%x, %s=%s\n", e.Codec, e.Flags, e.Key, toDagJson(t, e.Codec, e.Value))) - } - } + + actorEvents = append(actorEvents, types.ActorEvent{ + Entries: evt.Entries, + EmitterAddr: addr, + Reverted: false, + Height: abi.ChainEpoch(height), + TipSetKey: tsCid, + MsgCid: m.Cid, + }) } } } } } - require.NoError(t, blockOutFile.Close()) - fmt.Println("Wrote block.out") + return actorEvents +} + +func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, events []types.ActorEvent) { + file, err := os.Create("block.out") + require.NoError(t, err) + defer func() { + require.NoError(t, file.Close()) + }() + write := func(s string) { + _, err := file.WriteString(s) + require.NoError(t, err) + } + claimKeyCbor, err := ipld.Encode(basicnode.NewString("claim"), dagcbor.Encode) + require.NoError(t, err) + + for _, event := range events { + entryStrings := []string{ + fmt.Sprintf("height=%d", event.Height), + fmt.Sprintf("msg=%s", event.MsgCid), + fmt.Sprintf("emitter=%s", event.EmitterAddr), + fmt.Sprintf("reverted=%t", event.Reverted), + } + claims := make([]*verifregtypes9.Claim, 0) + var isClaim bool + var claimId int64 = -1 + var providerId int64 = -1 + + for _, e := range event.Entries { + // for each event entry + entryStrings = append(entryStrings, fmt.Sprintf("%s=%s", e.Key, toDagJson(t, e.Codec, e.Value))) + if e.Key == "$type" && bytes.Equal(e.Value, claimKeyCbor) { + isClaim = true + } else if isClaim && e.Key == "id" { + nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + require.NoError(t, err) + claimId = *bindnode.Unwrap(nd).(*int64) + } else if isClaim && e.Key == "provider" { + nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + require.NoError(t, err) + providerId = *bindnode.Unwrap(nd).(*int64) + } + if isClaim && claimId != -1 && providerId != -1 { + provider, err := address.NewIDAddress(uint64(providerId)) + require.NoError(t, err) + claim, err := node.StateGetClaim(ctx, provider, verifregtypes9.ClaimId(claimId), types.EmptyTSK) + require.NoError(t, err) + claims = append(claims, claim) + } + } + write(fmt.Sprintf("Event<%s>\n", strings.Join(entryStrings, ", "))) + if len(claims) > 0 { + for _, claim := range claims { + p, err := address.NewIDAddress(uint64(claim.Provider)) + require.NoError(t, err) + c, err := address.NewIDAddress(uint64(claim.Client)) + require.NoError(t, err) + write(fmt.Sprintf(" Claim\n", + p, c, claim.Data, claim.Size, claim.TermMin, claim.TermMax, claim.TermStart, claim.Sector)) + } + } + } } func toDagJson(t *testing.T, codec uint64, data []byte) string { From 3f2c3a31d69327f78e1c525a89d33114d648b3d1 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 5 Feb 2024 18:39:51 +0400 Subject: [PATCH 4/6] solve actor resolution bug --- node/modules/actorevent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index e71ce4e31a2..4400c19af45 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -137,7 +137,7 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc actor, err := sm.LoadActor(ctx, idAddr, ts) if err != nil || actor.Address == nil { - return idAddr, false + return idAddr, true } return *actor.Address, true From a694690f0df3d6be13b22f3ae75d683a4f74add6 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 7 Feb 2024 12:17:46 +0400 Subject: [PATCH 5/6] tests for events API --- .circleci/config.yml | 6 - cmd/lotus-shed/indexes.go | 14 +-- documentation/en/default-lotus-config.toml | 8 +- itests/actor_events_filter_test.go | 92 -------------- itests/direct_data_onboard_test.go | 134 +++++++++++++++++---- itests/kit/node_opts.go | 3 + 6 files changed, 123 insertions(+), 134 deletions(-) delete mode 100644 itests/actor_events_filter_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index e5234d50245..c8187fd87cf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -542,12 +542,6 @@ workflows: - docs-check: requires: - build - - test: - name: test-itest-actor_events_filter - requires: - - build - suite: itest-actor_events_filter - target: "./itests/actor_events_filter_test.go" - test: name: test-itest-api requires: diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index be7d43e0513..620933e25f8 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -9,13 +9,11 @@ import ( "strings" "github.com/mitchellh/go-homedir" - "github.com/multiformats/go-varint" "github.com/urfave/cli/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" @@ -109,6 +107,7 @@ var backfillEventsCmd = &cli.Command{ addressLookups := make(map[abi.ActorID]address.Address) + // TODO: We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { // we only want to match using f4 addresses idAddr, err := address.NewIDAddress(uint64(emitter)) @@ -118,18 +117,9 @@ var backfillEventsCmd = &cli.Command{ actor, err := api.StateGetActor(ctx, idAddr, ts.Key()) if err != nil || actor.Address == nil { - return address.Undef, false - } - - // if robust address is not f4 then we won't match against it so bail early - if actor.Address.Protocol() != address.Delegated { - return address.Undef, false + return idAddr, true } - // we have an f4 address, make sure it's assigned by the EAM - if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } return *actor.Address, true } diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index c97ce0fe1d9..a403a580e5e 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -330,6 +330,9 @@ # env var: LOTUS_FEVM_ENABLEETHRPC #EnableEthRPC = false + # EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). + # This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. + # # type: bool # env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI #EnableActorEventsAPI = false @@ -342,9 +345,8 @@ #EthTxHashMappingLifetimeDays = 0 [Fevm.Events] - # EnableEthRPC enables APIs that # DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. - # The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + # The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. # # type: bool # env var: LOTUS_FEVM_EVENTS_DISABLEREALTIMEFILTERAPI @@ -352,7 +354,7 @@ # DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events # that occurred in the past. HistoricFilterAPI maintains a queryable index of events. - # The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + # The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. # # type: bool # env var: LOTUS_FEVM_EVENTS_DISABLEHISTORICFILTERAPI diff --git a/itests/actor_events_filter_test.go b/itests/actor_events_filter_test.go deleted file mode 100644 index 987f5f9a8b7..00000000000 --- a/itests/actor_events_filter_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package itests - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-address" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/itests/kit" -) - -func TestGetActorEvents(t *testing.T) { - t.Skip("skipping for now") - //require := require.New(t) - kit.QuietAllLogsExcept("events", "messagepool") - - blockTime := 100 * time.Millisecond - - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC()) - ens.InterconnectAll().BeginMining(blockTime) - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - // Set up the test fixture with a standard list of invocations - contract1, contract2, invocations := prepareEventMatrixInvocations(ctx, t, client) - fmt.Printf("contract1:%s; contract2:%s\n", contract1, contract2) - - cf1, err := contract1.ToFilecoinAddress() - if err != nil { - panic(err) - } - - cf2, err := contract2.ToFilecoinAddress() - if err != nil { - panic(err) - } - - fmt.Printf("contract1 f4 is:%s; contract2 f4 is:%s\n", cf1.String(), cf2.String()) - - testCases := getCombinationFilterTestCases(contract1, contract2, "0x0") - - messages := invokeAndWaitUntilAllOnChain(t, client, invocations) - - // f410fiy2dwcbbvc5c6xwwrhlwgi2dby4rzgamxllpgva - - for _, tc := range testCases { - tc := tc // appease the lint despot - t.Run(tc.name, func(t *testing.T) { - - res, err := client.EthGetLogs(ctx, tc.spec) - require.NoError(t, err) - - /*ch, _ := client.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ - Prefill: true, - ActorEventFilter: types.ActorEventFilter{ - MinEpoch: 0, - MaxEpoch: 1000, - }, - }) - - for i := range ch { - fmt.Println("Hello Chan", i.Entries[0].Key, i.Entries[0].Codec, i.EmitterAddr.String()) - }*/ - - res2, _ := client.GetActorEvents(ctx, &types.ActorEventFilter{ - Addresses: []address.Address{cf2}, - //EthAddresses: []ethtypes.EthAddress{ - // contract1, - //}, - }) - for _, res := range res2 { - res := res - fmt.Println("Emitter Address is", res.EmitterAddr.String()) - for _, entry := range res.Entries { - fmt.Println("Hello", entry.Key, entry.Codec, string(entry.Value)) - } - - } - fmt.Println("Hello", res2[0].Entries[0].Key, res2[0].Entries[0].Codec, res2[0].EmitterAddr.String()) - - elogs, err := parseEthLogsFromFilterResult(res) - require.NoError(t, err) - AssertEthLogs(t, elogs, tc.expected, messages) - }) - } -} diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 8377caecd23..e64bd63e5b0 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "crypto/rand" + "encoding/json" "fmt" "os" + "sort" "strings" "testing" "time" @@ -149,22 +151,28 @@ func TestOnboardRawPieceVerified(t *testing.T) { kit.Account(verifiedClientKey, abi.NewTokenAmount(bal.Int64())), ) - evtChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ Filter: types.ActorEventFilter{ - MinEpoch: -1, - MaxEpoch: -1, + Addresses: []address.Address{miner.ActorAddr}, }, Prefill: true, }) require.NoError(t, err) - events := make([]types.ActorEvent, 0) - go func() { - for e := range evtChan { - fmt.Printf("%s Got ActorEvent: %+v", time.Now().Format(time.StampMilli), e) - events = append(events, *e) - } - }() + // only consume and match sector-activated events + sectorActivatedCbor, err := ipld.Encode(basicnode.NewString("sector-activated"), dagcbor.Encode) + require.NoError(t, err) + sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + Filter: types.ActorEventFilter{ + Fields: map[string][]types.ActorEventBlock{ + "$type": { + {Codec: 0x51, Value: sectorActivatedCbor}, + }, + }, + }, + Prefill: true, + }) + require.NoError(t, err) ens.InterconnectAll().BeginMiningMustPost(blocktime) @@ -319,23 +327,107 @@ func TestOnboardRawPieceVerified(t *testing.T) { allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) require.NoError(t, err) require.Len(t, allocations, 0) + eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode) + writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages) + + /* --- Tests for the Actor events API --- */ + pstring := func(s string) *string { return &s } + // Match events from Get API and receipts + allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ + FromBlock: pstring("earliest"), + ToBlock: pstring("latest"), + }) + require.NoError(t, err) + matchEvents(t, eventsFromMessages, getEventsArray(allEvtsFromGetAPI)) + + // match Miner Actor events from subscription channel and Miner Actor events obtained from receipts + var subMinerEvts []types.ActorEvent + for evt := range minerEvtsChan { + subMinerEvts = append(subMinerEvts, *evt) + if len(subMinerEvts) == 4 { + break + } + } + var allMinerEvts []types.ActorEvent + for _, evt := range eventsFromMessages { + if evt.EmitterAddr == miner.ActorAddr { + allMinerEvts = append(allMinerEvts, evt) + } + } + matchEvents(t, allMinerEvts, subMinerEvts) + + // Match pre-filled events from sector activated channel and events obtained from receipts + var prefillSectorActivatedEvts []types.ActorEvent + for evt := range sectorActivatedEvtsCh { + prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, *evt) + if len(prefillSectorActivatedEvts) == 2 { + break + } + } + require.Len(t, prefillSectorActivatedEvts, 2) + var sectorActivatedEvts []types.ActorEvent + for _, evt := range eventsFromMessages { + for _, entry := range evt.Entries { + if entry.Key == "$type" && bytes.Equal(entry.Value, sectorActivatedCbor) { + sectorActivatedEvts = append(sectorActivatedEvts, evt) + break + } + } + } + matchEvents(t, sectorActivatedEvts, prefillSectorActivatedEvts) - evts, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ - MinEpoch: -1, - MaxEpoch: -1, + // Match pre-filled events from subscription channel and events obtained from receipts + allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + Filter: types.ActorEventFilter{ + FromBlock: pstring("earliest"), + ToBlock: pstring("latest"), + }, + Prefill: true, }) require.NoError(t, err) - for _, evt := range evts { - fmt.Printf("Got ActorEvent: %+v", evt) + var prefillEvts []types.ActorEvent + for evt := range allEvtsCh { + prefillEvts = append(prefillEvts, *evt) + if len(prefillEvts) == len(eventsFromMessages) { + break + } } + matchEvents(t, eventsFromMessages, prefillEvts) +} - eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode) - writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages) - for _, evt := range evts { - fmt.Printf("Got ActorEvent from messages: %+v", evt) +func getEventsArray(ptr []*types.ActorEvent) []types.ActorEvent { + var evts []types.ActorEvent + for _, evt := range ptr { + evts = append(evts, *evt) + } + return evts +} + +func matchEvents(t *testing.T, exp []types.ActorEvent, actual []types.ActorEvent) { + // height and tipset cid can mismatch because expected events are sourced using APIs that can put in different tipsets + for i := range exp { + exp[i].Height = 0 + exp[i].TipSetKey = cid.Undef + } + for i := range actual { + actual[i].Height = 0 + actual[i].TipSetKey = cid.Undef } - // TODO: compare GetActorEvents & SubscribeActorEvents & eventsFromMessages for equality + require.Equal(t, len(exp), len(actual)) + // marshal both arrays to json, sort by json, and compare + bz1, err := json.Marshal(exp) + require.NoError(t, err) + sort.Slice(bz1, func(i, j int) bool { + return bz1[i] <= bz1[j] + }) + + bz2, err := json.Marshal(actual) + require.NoError(t, err) + sort.Slice(bz2, func(i, j int) bool { + return bz2[i] <= bz2[j] + }) + require.True(t, bytes.Equal(bz1, bz2)) } func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.FullNode) []types.ActorEvent { @@ -374,7 +466,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api. Entries: evt.Entries, EmitterAddr: addr, Reverted: false, - Height: abi.ChainEpoch(height), + Height: ts.Height(), TipSetKey: tsCid, MsgCid: m.Cid, }) diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 51db8d8b2b2..0253342037f 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -1,6 +1,8 @@ package kit import ( + "math" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -64,6 +66,7 @@ var DefaultNodeOpts = nodeOpts{ cfg.Fevm.EnableEthRPC = true cfg.Fevm.EnableActorEventsAPI = true + cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64 return nil }, }, From f9c891b329252e28b53cb411571e31dca14c7829 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 7 Feb 2024 15:08:18 +0400 Subject: [PATCH 6/6] lint --- itests/direct_data_onboard_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index e64bd63e5b0..112efe7b8b8 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -327,8 +327,8 @@ func TestOnboardRawPieceVerified(t *testing.T) { allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) require.NoError(t, err) require.Len(t, allocations, 0) - eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode) - writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages) + eventsFromMessages := buildActorEventsFromMessages(ctx, t, miner.FullNode) + writeEventsToFile(ctx, t, miner.FullNode, eventsFromMessages) /* --- Tests for the Actor events API --- */ pstring := func(s string) *string { return &s } @@ -430,7 +430,7 @@ func matchEvents(t *testing.T, exp []types.ActorEvent, actual []types.ActorEvent require.True(t, bytes.Equal(bz1, bz2)) } -func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.FullNode) []types.ActorEvent { +func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api.FullNode) []types.ActorEvent { actorEvents := make([]types.ActorEvent, 0) head, err := node.ChainHead(ctx) @@ -478,7 +478,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api. return actorEvents } -func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, events []types.ActorEvent) { +func writeEventsToFile(ctx context.Context, t *testing.T, node v1api.FullNode, events []types.ActorEvent) { file, err := os.Create("block.out") require.NoError(t, err) defer func() { @@ -509,11 +509,11 @@ func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, e if e.Key == "$type" && bytes.Equal(e.Value, claimKeyCbor) { isClaim = true } else if isClaim && e.Key == "id" { - nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) require.NoError(t, err) claimId = *bindnode.Unwrap(nd).(*int64) } else if isClaim && e.Key == "provider" { - nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) require.NoError(t, err) providerId = *bindnode.Unwrap(nd).(*int64) }