diff --git a/.circleci/config.yml b/.circleci/config.yml index eae5e07c8ff..85bcd045cdd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1018,7 +1018,6 @@ workflows: suite: utest-unit-cli target: "./cli/... ./cmd/... ./api/..." get-params: true - executor: golang-2xl - test: name: test-unit-node requires: @@ -1026,7 +1025,6 @@ workflows: suite: utest-unit-node target: "./node/..." - - test: name: test-unit-rest requires: @@ -1034,7 +1032,6 @@ workflows: suite: utest-unit-rest target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..." - executor: golang-2xl - test: name: test-unit-storage requires: @@ -1042,7 +1039,6 @@ workflows: suite: utest-unit-storage target: "./storage/... ./extern/..." - - test: go-test-flags: "-run=TestMulticoreSDR" requires: diff --git a/.circleci/template.yml b/.circleci/template.yml index 7bc84c2187f..2bfba956483 100644 --- a/.circleci/template.yml +++ b/.circleci/template.yml @@ -567,8 +567,6 @@ workflows: suite: utest-[[ $suite ]] target: "[[ $pkgs ]]" [[if eq $suite "unit-cli"]]get-params: true[[end]] - [[if eq $suite "unit-cli"]]executor: golang-2xl[[end]] - [[- if eq $suite "unit-rest"]]executor: golang-2xl[[end]] [[- end]] - test: go-test-flags: "-run=TestMulticoreSDR" diff --git a/cli/util/api.go b/cli/util/api.go index 7b06efdf324..3602b752de2 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -119,7 +119,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) { } } - return []APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type()) + return []APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v. Try setting environment variable: %s", t.Type(), primaryEnv) } func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { @@ -168,7 +168,7 @@ func GetRawAPIMultiV2(ctx *cli.Context, ainfoCfg []string, version string) ([]Ht var httpHeads []HttpHead if len(ainfoCfg) == 0 { - return httpHeads, xerrors.Errorf("could not get API info: none configured. \nConsider getting base.toml with './lotus-provider config get base >base.toml' \nthen adding \n[APIs] \n FULLNODE_API_INFO = [\" result_from lotus auth api-info --perm=admin \"]\n and updating it with './lotus-provider config set base.toml'") + return httpHeads, xerrors.Errorf("could not get API info: none configured. \nConsider getting base.toml with './lotus-provider config get base >/tmp/base.toml' \nthen adding \n[APIs] \n ChainApiInfo = [\" result_from lotus auth api-info --perm=admin \"]\n and updating it with './lotus-provider config set /tmp/base.toml'") } for _, i := range ainfoCfg { ainfo := ParseApiInfo(i) diff --git a/cmd/lotus-provider/migrate.go b/cmd/lotus-provider/migrate.go index 47095c398af..674a537dfd3 100644 --- a/cmd/lotus-provider/migrate.go +++ b/cmd/lotus-provider/migrate.go @@ -11,10 +11,13 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/ipfs/go-datastore" "github.com/samber/lo" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" + cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" @@ -33,6 +36,12 @@ var configMigrateCmd = &cli.Command{ Value: "~/.lotusminer", Usage: fmt.Sprintf("Specify miner repo path. flag(%s) and env(LOTUS_STORAGE_PATH) are DEPRECATION, will REMOVE SOON", FlagMinerRepoDeprecation), }, + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"LOTUS_PATH"}, + Hidden: true, + Value: "~/.lotus", + }, &cli.StringFlag{ Name: "to-layer", Aliases: []string{"t"}, @@ -117,14 +126,20 @@ func fromMiner(cctx *cli.Context) (err error) { } // Populate Miner Address - sm, cc, err := cliutil.GetStorageMinerAPI(cctx) + mmeta, err := lr.Datastore(ctx, "/metadata") + if err != nil { + return xerrors.Errorf("opening miner metadata datastore: %w", err) + } + defer mmeta.Close() + + maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address")) if err != nil { - return fmt.Errorf("could not get storageMiner API: %w", err) + return xerrors.Errorf("getting miner address datastore entry: %w", err) } - defer cc() - addr, err := sm.ActorAddress(ctx) + + addr, err := address.NewFromBytes(maddrBytes) if err != nil { - return fmt.Errorf("could not read actor address: %w", err) + return xerrors.Errorf("parsing miner actor address: %w", err) } lpCfg.Addresses.MinerAddresses = []string{addr.String()} @@ -137,7 +152,8 @@ func fromMiner(cctx *cli.Context) (err error) { if err != nil { return xerrors.Errorf("error getting JWTSecretName: %w", err) } - lpCfg.Apis.StorageRPCSecret = base64.RawStdEncoding.EncodeToString(js.PrivateKey) + + lpCfg.Apis.StorageRPCSecret = base64.StdEncoding.EncodeToString(js.PrivateKey) // Populate API Key _, header, err := cliutil.GetRawAPI(cctx, repo.FullNode, "v0") @@ -145,7 +161,11 @@ func fromMiner(cctx *cli.Context) (err error) { return fmt.Errorf("cannot read API: %w", err) } - lpCfg.Apis.ChainApiInfo = []string{header.Get("Authorization")[7:]} + ainfo, err := cliutil.GetAPIInfo(&cli.Context{}, repo.FullNode) + if err != nil { + return xerrors.Errorf("could not get API info for FullNode: %w", err) + } + lpCfg.Apis.ChainApiInfo = []string{header.Get("Authorization")[7:] + ":" + ainfo.Addr} // Enable WindowPoSt lpCfg.Subsystems.EnableWindowPost = true @@ -161,7 +181,7 @@ environment variable LOTUS_WORKER_WINDOWPOST. } if !lo.Contains(titles, "base") { - _, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '')", "base") + _, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '')") if err != nil { return err } diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 005cb9ebf2d..2d1bc99cd21 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -40,6 +40,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/provider/lpwinning" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" @@ -204,7 +205,10 @@ var runCmd = &cli.Command{ sa, err := StorageAuth(cfg.Apis.StorageRPCSecret) if err != nil { - return xerrors.Errorf("parsing Apis.StorageRPCSecret config: %w", err) + return xerrors.Errorf(`'%w' while parsing the config toml's + [Apis] + StorageRPCSecret=%v +Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret) } al := alerting.NewAlertingSystem(j) @@ -251,6 +255,11 @@ var runCmd = &cli.Command{ } activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) } + + if cfg.Subsystems.EnableWinningPost { + winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) + activeTasks = append(activeTasks, winPoStTask) + } } taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) if err != nil { diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index f7699e635e1..a65e82e9504 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -176,6 +176,16 @@ # env var: LOTUS_SUBSYSTEMS_DISABLEWINDOWPOST #DisableWindowPoSt = false + # When winning post is disabled, the miner process will NOT attempt to mine + # blocks. This should only be set when there's an external process mining + # blocks on behalf of the miner. + # When disabled and no external block producers are configured, all potential + # block rewards will be missed! + # + # type: bool + # env var: LOTUS_SUBSYSTEMS_DISABLEWINNINGPOST + #DisableWinningPoSt = false + [Dealmaking] # When enabled, the miner can accept online deals diff --git a/documentation/en/default-lotus-provider-config.toml b/documentation/en/default-lotus-provider-config.toml index cb42c7f5fb5..91606e503fd 100644 --- a/documentation/en/default-lotus-provider-config.toml +++ b/documentation/en/default-lotus-provider-config.toml @@ -8,6 +8,9 @@ # type: bool #EnableWinningPost = false + # type: int + #WinningPostMaxTasks = 0 + [Fees] # type: types.FIL diff --git a/go.sum b/go.sum index aee413de4ff..fff87909239 100644 --- a/go.sum +++ b/go.sum @@ -344,7 +344,6 @@ github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= -github.com/filecoin-project/go-state-types v0.11.1/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8= github.com/filecoin-project/go-state-types v0.11.2-0.20230712101859-8f37624fa540/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8= github.com/filecoin-project/go-state-types v0.12.5 h1:VQ2N2T3JeUDdIHEo/xhjnT7Q218Wl0UYIyglqT7Z9Ck= github.com/filecoin-project/go-state-types v0.12.5/go.mod h1:iJTqGdWDvzXhuVf64Lw0hzt4TIoitMo0VgHdxdjNDZI= diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index 1865f6afe0f..c6f993d7664 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -37,12 +37,12 @@ create table wdpost_proofs create table wdpost_recovery_tasks ( task_id bigint not null - constraint wdpost_partition_tasks_pk + constraint wdpost_recovery_tasks_pk primary key, sp_id bigint not null, proving_period_start bigint not null, deadline_index bigint not null, partition_index bigint not null, - constraint wdpost_partition_tasks_identity_key + constraint wdpost_recovery_tasks_identity_key unique (sp_id, proving_period_start, deadline_index, partition_index) ); \ No newline at end of file diff --git a/lib/harmony/harmonydb/sql/20231110.sql b/lib/harmony/harmonydb/sql/20231110.sql new file mode 100644 index 00000000000..15b478f4dd1 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20231110.sql @@ -0,0 +1,39 @@ +create table mining_tasks +( + task_id bigint not null + constraint mining_tasks_pk + primary key, + sp_id bigint not null, + epoch bigint not null, + base_compute_time timestamp not null, + + won bool not null default false, + mined_cid text, + mined_header jsonb, + mined_at timestamp, + + submitted_at timestamp, + + constraint mining_tasks_sp_epoch + unique (sp_id, epoch) +); + +create table mining_base_block +( + id bigserial not null + constraint mining_base_block_pk + primary key, + task_id bigint not null + constraint mining_base_block_mining_tasks_task_id_fk + references mining_tasks + on delete cascade, + sp_id bigint, + block_cid text not null, + + no_win bool not null default false, + + constraint mining_base_block_pk2 + unique (sp_id, task_id, block_cid) +); + +CREATE UNIQUE INDEX mining_base_block_cid_k ON mining_base_block (sp_id, block_cid) WHERE no_win = false; diff --git a/lib/harmony/harmonydb/sql/20231113.sql b/lib/harmony/harmonydb/sql/20231113.sql new file mode 100644 index 00000000000..7a71d98aead --- /dev/null +++ b/lib/harmony/harmonydb/sql/20231113.sql @@ -0,0 +1 @@ +ALTER TABLE harmony_task_history ADD COLUMN completed_by_host_and_port varchar(300) NOT NULL; \ No newline at end of file diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index bb25be54962..595e5b63a61 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -106,6 +106,7 @@ type TaskEngine struct { follows map[string][]followStruct lastFollowTime time.Time lastCleanup atomic.Value + hostAndPort string } type followStruct struct { f func(TaskID, AddTaskFunc) (bool, error) @@ -129,13 +130,14 @@ func New( } ctx, grace := context.WithCancel(context.Background()) e := &TaskEngine{ - ctx: ctx, - grace: grace, - db: db, - reg: reg, - ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" - taskMap: make(map[string]*taskTypeHandler, len(impls)), - follows: make(map[string][]followStruct), + ctx: ctx, + grace: grace, + db: db, + reg: reg, + ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" + taskMap: make(map[string]*taskTypeHandler, len(impls)), + follows: make(map[string][]followStruct), + hostAndPort: hostnameAndPort, } e.lastCleanup.Store(time.Now()) for _, c := range impls { diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 09540913d6b..7ec47d32a7c 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -198,8 +198,8 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done } } _, err = tx.Exec(`INSERT INTO harmony_task_history - (task_id, name, posted, work_start, work_end, result, err) - VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result) + (task_id, name, posted, work_start, work_end, result, by_host_and_port, err) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result) if err != nil { return false, fmt.Errorf("could not write history: %w", err) } diff --git a/miner/miner.go b/miner/miner.go index 329f3417190..d11e9d4aa04 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -71,7 +71,7 @@ func NewMiner(api v1api.FullNode, epp gen.WinningPoStProver, addr address.Addres api: api, epp: epp, address: addr, - waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error) { + propagationWaitFunc: func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error) { // wait around for half the block time in case other parents come in // // if we're mining a block in the past via catch-up/rush mining, @@ -114,7 +114,7 @@ type Miner struct { stop chan struct{} stopping chan struct{} - waitFunc waitFunc + propagationWaitFunc waitFunc // lastWork holds the last MiningBase we built upon. lastWork *MiningBase @@ -205,15 +205,21 @@ func (m *Miner) mine(ctx context.Context) { ctx, span := trace.StartSpan(ctx, "/mine") defer span.End() + // Perform the Winning PoSt warmup in a separate goroutine. go m.doWinPoStWarmup(ctx) var lastBase MiningBase + + // Start the main mining loop. minerLoop: for { + // Prepare a context for a single node operation. ctx := cliutil.OnSingleNode(ctx) + // Handle stop signals. select { case <-m.stop: + // If a stop signal is received, clean up and exit the mining loop. stopping := m.stopping m.stop = nil m.stopping = nil @@ -223,10 +229,11 @@ minerLoop: default: } - var base *MiningBase + var base *MiningBase // NOTE: This points to m.lastWork; Incrementing nulls here will increment it there. var onDone func(bool, abi.ChainEpoch, error) var injectNulls abi.ChainEpoch + // Look for the best mining candidate. for { prebase, err := m.GetBestMiningCandidate(ctx) if err != nil { @@ -237,6 +244,7 @@ minerLoop: continue } + // Check if we have a new base or if the current base is still valid. if base != nil && base.TipSet.Height() == prebase.TipSet.Height() && base.NullRounds == prebase.NullRounds { base = prebase break @@ -253,13 +261,13 @@ minerLoop: // best mining candidate at that time. // Wait until propagation delay period after block we plan to mine on - onDone, injectNulls, err = m.waitFunc(ctx, prebase.TipSet.MinTimestamp()) + onDone, injectNulls, err = m.propagationWaitFunc(ctx, prebase.TipSet.MinTimestamp()) if err != nil { log.Error(err) continue } - // just wait for the beacon entry to become available before we select our final mining base + // Ensure the beacon entry is available before finalizing the mining base. _, err = m.api.StateGetBeaconEntry(ctx, prebase.TipSet.Height()+prebase.NullRounds+1) if err != nil { log.Errorf("failed getting beacon entry: %s", err) @@ -272,8 +280,9 @@ minerLoop: base = prebase } - base.NullRounds += injectNulls // testing + base.NullRounds += injectNulls // Adjust for testing purposes. + // Check for repeated mining candidates and handle sleep for the next round. if base.TipSet.Equals(lastBase.TipSet) && lastBase.NullRounds == base.NullRounds { log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.NullRounds) if !m.niceSleep(time.Duration(build.BlockDelaySecs) * time.Second) { @@ -282,6 +291,7 @@ minerLoop: continue } + // Attempt to mine a block. b, err := m.mineOne(ctx, base) if err != nil { log.Errorf("mining block failed: %+v", err) @@ -299,9 +309,12 @@ minerLoop: } onDone(b != nil, h, nil) + // Process the mined block. if b != nil { + // Record the event of mining a block. m.journal.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} { return map[string]interface{}{ + // Data about the mined block. "parents": base.TipSet.Cids(), "nulls": base.NullRounds, "epoch": b.Header.Height, @@ -312,19 +325,23 @@ minerLoop: btime := time.Unix(int64(b.Header.Timestamp), 0) now := build.Clock.Now() + // Handle timing for broadcasting the block. switch { case btime == now: // block timestamp is perfectly aligned with time. case btime.After(now): + // Wait until it's time to broadcast the block. if !m.niceSleep(build.Clock.Until(btime)) { log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") build.Clock.Sleep(build.Clock.Until(btime)) } default: + // Log if the block was mined in the past. log.Warnw("mined block in the past", "block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime)) } + // Check for slash filter conditions. if os.Getenv("LOTUS_MINER_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" && !build.IsNearUpgrade(base.TipSet.Height(), build.UpgradeWatermelonFixHeight) { witness, fault, err := m.sf.MinedBlock(ctx, b.Header, base.TipSet.Height()+base.NullRounds) if err != nil { @@ -339,25 +356,27 @@ minerLoop: } } + // Check for blocks created at the same height. if _, ok := m.minedBlockHeights.Get(b.Header.Height); ok { log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents) continue } + // Add the block height to the mined block heights. m.minedBlockHeights.Add(b.Header.Height, true) + // Submit the newly mined block. if err := m.api.SyncSubmitBlock(ctx, b); err != nil { log.Errorf("failed to submit newly mined block: %+v", err) } } else { + // If no block was mined, increase the null rounds and wait for the next epoch. base.NullRounds++ - // Wait until the next epoch, plus the propagation delay, so a new tipset - // has enough time to form. - // - // See: https://github.com/filecoin-project/lotus/issues/1845 + // Calculate the time for the next round. nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0) + // Wait for the next round or stop signal. select { case <-build.Clock.After(build.Clock.Until(nextRound)): case <-m.stop: diff --git a/miner/testminer.go b/miner/testminer.go index f1d11bae0a6..e23b26ae29f 100644 --- a/miner/testminer.go +++ b/miner/testminer.go @@ -28,13 +28,13 @@ func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(v1api.FullNo } m := &Miner{ - api: api, - waitFunc: chanWaiter(nextCh), - epp: epp, - minedBlockHeights: arc, - address: addr, - sf: slashfilter.New(ds.NewMapDatastore()), - journal: journal.NilJournal(), + api: api, + propagationWaitFunc: chanWaiter(nextCh), + epp: epp, + minedBlockHeights: arc, + address: addr, + sf: slashfilter.New(ds.NewMapDatastore()), + journal: journal.NilJournal(), } if err := m.Start(context.TODO()); err != nil { diff --git a/node/builder_miner.go b/node/builder_miner.go index 4ba5f05b57f..dd35c6becf9 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -121,8 +121,12 @@ func ConfigStorageMiner(c interface{}) Option { // Mining / proving Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), - Override(new(*miner.Miner), modules.SetupBlockProducer), - Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), + + If(!cfg.Subsystems.DisableWinningPoSt, + Override(new(*miner.Miner), modules.SetupBlockProducer), + Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), + ), + Override(PreflightChecksKey, modules.PreflightChecks), Override(new(*sealing.Sealing), modules.SealingPipeline(cfg.Fees)), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index a65bd874665..13ecb2706e3 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -31,10 +31,10 @@ var Doc = map[string][]DocField{ }, "ApisConfig": { { - Name: "FULLNODE_API_INFO", + Name: "ChainApiInfo", Type: "[]string", - Comment: `FULLNODE_API_INFO is the API endpoint for the Lotus daemon.`, + Comment: `ChainApiInfo is the API endpoint for the Lotus daemon.`, }, { Name: "StorageRPCSecret", @@ -977,6 +977,16 @@ This option will stop lotus-miner from performing any actions related to window post, including scheduling, submitting proofs, and recovering sectors.`, }, + { + Name: "DisableWinningPoSt", + Type: "bool", + + Comment: `When winning post is disabled, the miner process will NOT attempt to mine +blocks. This should only be set when there's an external process mining +blocks on behalf of the miner. +When disabled and no external block producers are configured, all potential +block rewards will be missed!`, + }, }, "ProviderSubsystemsConfig": { { @@ -995,6 +1005,12 @@ sectors.`, Name: "EnableWinningPost", Type: "bool", + Comment: ``, + }, + { + Name: "WinningPostMaxTasks", + Type: "int", + Comment: ``, }, }, diff --git a/node/config/types.go b/node/config/types.go index 120e5024826..2152e079569 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -92,9 +92,10 @@ type JournalConfig struct { } type ProviderSubsystemsConfig struct { - EnableWindowPost bool - WindowPostMaxTasks int - EnableWinningPost bool + EnableWindowPost bool + WindowPostMaxTasks int + EnableWinningPost bool + WinningPostMaxTasks int } type DAGStoreConfig struct { @@ -161,6 +162,13 @@ type MinerSubsystemConfig struct { // to window post, including scheduling, submitting proofs, and recovering // sectors. DisableWindowPoSt bool + + // When winning post is disabled, the miner process will NOT attempt to mine + // blocks. This should only be set when there's an external process mining + // blocks on behalf of the miner. + // When disabled and no external block producers are configured, all potential + // block rewards will be missed! + DisableWinningPoSt bool } type DealmakingConfig struct { diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index bd3e2fe2787..d8e41fb2bdf 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -346,7 +346,7 @@ func (fsr *FsRepo) APIEndpoint() (multiaddr.Multiaddr, error) { f, err := os.Open(p) if os.IsNotExist(err) { - return nil, ErrNoAPIEndpoint + return nil, xerrors.Errorf("No file (%s): %w", p, ErrNoAPIEndpoint) } else if err != nil { return nil, err } diff --git a/provider/lpwindow/compute_do.go b/provider/lpwindow/compute_do.go index d36541e8e1d..ba13cdc2b53 100644 --- a/provider/lpwindow/compute_do.go +++ b/provider/lpwindow/compute_do.go @@ -3,8 +3,6 @@ package lpwindow import ( "bytes" "context" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/storage/sealer" "sort" "sync" "time" @@ -25,7 +23,9 @@ import ( proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" types "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index d50fddc0e31..fafe76e569b 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -2,11 +2,15 @@ package lpwindow import ( "context" + + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -21,7 +25,6 @@ import ( "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/wdpost" - "golang.org/x/xerrors" ) type WdPostRecoverDeclareTask struct { diff --git a/provider/lpwinning/winning_task.go b/provider/lpwinning/winning_task.go new file mode 100644 index 00000000000..56d66692158 --- /dev/null +++ b/provider/lpwinning/winning_task.go @@ -0,0 +1,641 @@ +package lpwinning + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/binary" + "encoding/json" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/network" + prooftypes "github.com/filecoin-project/go-state-types/proof" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/gen" + lrand "github.com/filecoin-project/lotus/chain/rand" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("lpwinning") + +type WinPostTask struct { + max int + db *harmonydb.DB + + prover ProverWinningPoSt + verifier storiface.Verifier + + api WinPostAPI + actors []dtypes.MinerAddress + + mineTF promise.Promise[harmonytask.AddTaskFunc] +} + +type WinPostAPI interface { + ChainHead(context.Context) (*types.TipSet, error) + ChainTipSetWeight(context.Context, types.TipSetKey) (types.BigInt, error) + ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) + + StateGetBeaconEntry(context.Context, abi.ChainEpoch) (*types.BeaconEntry, error) + SyncSubmitBlock(context.Context, *types.BlockMsg) error + StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) + StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) + StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + + MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error) + MinerCreateBlock(context.Context, *api.BlockTemplate) (*types.BlockMsg, error) + MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) + + WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error) +} + +type ProverWinningPoSt interface { + GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, sectorInfo []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]prooftypes.PoStProof, error) +} + +func NewWinPostTask(max int, db *harmonydb.DB, prover ProverWinningPoSt, verifier storiface.Verifier, api WinPostAPI, actors []dtypes.MinerAddress) *WinPostTask { + t := &WinPostTask{ + max: max, + db: db, + prover: prover, + verifier: verifier, + api: api, + actors: actors, + } + // TODO: run warmup + + go t.mineBasic(context.TODO()) + + return t +} + +func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + log.Debugw("WinPostTask.Do()", "taskID", taskID) + + ctx := context.TODO() + + type BlockCID struct { + CID string + } + + type MiningTaskDetails struct { + SpID uint64 + Epoch uint64 + BlockCIDs []BlockCID + CompTime time.Time + } + + var details MiningTaskDetails + + // First query to fetch from mining_tasks + err = t.db.QueryRow(ctx, `SELECT sp_id, epoch, base_compute_time FROM mining_tasks WHERE task_id = $1`, taskID).Scan(&details.SpID, &details.Epoch, &details.CompTime) + if err != nil { + return false, err + } + + // Second query to fetch from mining_base_block + rows, err := t.db.Query(ctx, `SELECT block_cid FROM mining_base_block WHERE task_id = $1`, taskID) + if err != nil { + return false, err + } + defer rows.Close() + + for rows.Next() { + var cid BlockCID + if err := rows.Scan(&cid.CID); err != nil { + return false, err + } + details.BlockCIDs = append(details.BlockCIDs, cid) + } + + if err := rows.Err(); err != nil { + return false, err + } + + // construct base + maddr, err := address.NewIDAddress(details.SpID) + if err != nil { + return false, err + } + + var bcids []cid.Cid + for _, c := range details.BlockCIDs { + bcid, err := cid.Parse(c.CID) + if err != nil { + return false, err + } + bcids = append(bcids, bcid) + } + + tsk := types.NewTipSetKey(bcids...) + baseTs, err := t.api.ChainGetTipSet(ctx, tsk) + if err != nil { + return false, xerrors.Errorf("loading base tipset: %w", err) + } + + base := MiningBase{ + TipSet: baseTs, + AddRounds: abi.ChainEpoch(details.Epoch) - baseTs.Height() - 1, + ComputeTime: details.CompTime, + } + + persistNoWin := func() error { + _, err := t.db.Exec(ctx, `UPDATE mining_base_block SET no_win = true WHERE task_id = $1`, taskID) + if err != nil { + return xerrors.Errorf("marking base as not-won: %w", err) + } + + return nil + } + + // ensure we have a beacon entry for the epoch we're mining on + round := base.epoch() + + _ = retry1(func() (*types.BeaconEntry, error) { + return t.api.StateGetBeaconEntry(ctx, round) + }) + + // MAKE A MINING ATTEMPT!! + log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) + + mbi, err := t.api.MinerGetBaseInfo(ctx, maddr, round, base.TipSet.Key()) + if err != nil { + return false, xerrors.Errorf("failed to get mining base info: %w", err) + } + if mbi == nil { + // not eligible to mine on this base, we're done here + log.Debugw("WinPoSt not eligible to mine on this base", "tipset", types.LogCids(base.TipSet.Cids())) + return true, persistNoWin() + } + + if !mbi.EligibleForMining { + // slashed or just have no power yet, we're done here + log.Debugw("WinPoSt not eligible for mining", "tipset", types.LogCids(base.TipSet.Cids())) + return true, persistNoWin() + } + + if len(mbi.Sectors) == 0 { + log.Warnw("WinPoSt no sectors to mine", "tipset", types.LogCids(base.TipSet.Cids())) + return false, xerrors.Errorf("no sectors selected for winning PoSt") + } + + var rbase types.BeaconEntry + var bvals []types.BeaconEntry + var eproof *types.ElectionProof + + // winner check + { + bvals = mbi.BeaconEntries + rbase = mbi.PrevBeaconEntry + if len(bvals) > 0 { + rbase = bvals[len(bvals)-1] + } + + eproof, err = gen.IsRoundWinner(ctx, round, maddr, rbase, mbi, t.api) + if err != nil { + log.Warnw("WinPoSt failed to check if we win next round", "error", err) + return false, xerrors.Errorf("failed to check if we win next round: %w", err) + } + + if eproof == nil { + // not a winner, we're done here + log.Debugw("WinPoSt not a winner", "tipset", types.LogCids(base.TipSet.Cids())) + return true, persistNoWin() + } + } + + // winning PoSt + var wpostProof []prooftypes.PoStProof + { + buf := new(bytes.Buffer) + if err := maddr.MarshalCBOR(buf); err != nil { + err = xerrors.Errorf("failed to marshal miner address: %w", err) + return false, err + } + + brand, err := lrand.DrawRandomnessFromBase(rbase.Data, crypto.DomainSeparationTag_WinningPoStChallengeSeed, round, buf.Bytes()) + if err != nil { + err = xerrors.Errorf("failed to get randomness for winning post: %w", err) + return false, err + } + + prand := abi.PoStRandomness(brand) + prand[31] &= 0x3f // make into fr + + sectorNums := make([]abi.SectorNumber, len(mbi.Sectors)) + for i, s := range mbi.Sectors { + sectorNums[i] = s.SectorNumber + } + + ppt, err := mbi.Sectors[0].SealProof.RegisteredWinningPoStProof() + if err != nil { + return false, xerrors.Errorf("mapping sector seal proof type to post proof type: %w", err) + } + + postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, abi.ActorID(details.SpID), prand, sectorNums) + if err != nil { + return false, xerrors.Errorf("generating election challenges: %v", err) + } + + sectorChallenges := make([]storiface.PostSectorChallenge, len(mbi.Sectors)) + for i, s := range mbi.Sectors { + sectorChallenges[i] = storiface.PostSectorChallenge{ + SealProof: s.SealProof, + SectorNumber: s.SectorNumber, + SealedCID: s.SealedCID, + Challenge: postChallenges.Challenges[s.SectorNumber], + Update: s.SectorKey != nil, + } + } + + wpostProof, err = t.prover.GenerateWinningPoSt(ctx, ppt, abi.ActorID(details.SpID), sectorChallenges, prand) + if err != nil { + err = xerrors.Errorf("failed to compute winning post proof: %w", err) + return false, err + } + } + + ticket, err := t.computeTicket(ctx, maddr, &rbase, round, base.TipSet.MinTicket(), mbi) + if err != nil { + return false, xerrors.Errorf("scratching ticket failed: %w", err) + } + + // get pending messages early, + msgs, err := t.api.MpoolSelect(ctx, base.TipSet.Key(), ticket.Quality()) + if err != nil { + return false, xerrors.Errorf("failed to select messages for block: %w", err) + } + + // equivocation handling + { + // This next block exists to "catch" equivocating miners, + // who submit 2 blocks at the same height at different times in order to split the network. + // To safeguard against this, we make sure it's been EquivocationDelaySecs since our base was calculated, + // then re-calculate it. + // If the daemon detected equivocated blocks, those blocks will no longer be in the new base. + time.Sleep(time.Until(base.ComputeTime.Add(time.Duration(build.EquivocationDelaySecs) * time.Second))) + + bestTs, err := t.api.ChainHead(ctx) + if err != nil { + return false, xerrors.Errorf("failed to get chain head: %w", err) + } + + headWeight, err := t.api.ChainTipSetWeight(ctx, bestTs.Key()) + if err != nil { + return false, xerrors.Errorf("failed to get chain head weight: %w", err) + } + + baseWeight, err := t.api.ChainTipSetWeight(ctx, base.TipSet.Key()) + if err != nil { + return false, xerrors.Errorf("failed to get base weight: %w", err) + } + if types.BigCmp(headWeight, baseWeight) <= 0 { + bestTs = base.TipSet + } + + // If the base has changed, we take the _intersection_ of our old base and new base, + // thus ejecting blocks from any equivocating miners, without taking any new blocks. + if bestTs.Height() == base.TipSet.Height() && !bestTs.Equals(base.TipSet) { + log.Warnf("base changed from %s to %s, taking intersection", base.TipSet.Key(), bestTs.Key()) + newBaseMap := map[cid.Cid]struct{}{} + for _, newBaseBlk := range bestTs.Cids() { + newBaseMap[newBaseBlk] = struct{}{} + } + + refreshedBaseBlocks := make([]*types.BlockHeader, 0, len(base.TipSet.Cids())) + for _, baseBlk := range base.TipSet.Blocks() { + if _, ok := newBaseMap[baseBlk.Cid()]; ok { + refreshedBaseBlocks = append(refreshedBaseBlocks, baseBlk) + } + } + + if len(refreshedBaseBlocks) != 0 && len(refreshedBaseBlocks) != len(base.TipSet.Blocks()) { + refreshedBase, err := types.NewTipSet(refreshedBaseBlocks) + if err != nil { + return false, xerrors.Errorf("failed to create new tipset when refreshing: %w", err) + } + + if !base.TipSet.MinTicket().Equals(refreshedBase.MinTicket()) { + log.Warn("recomputing ticket due to base refresh") + + ticket, err = t.computeTicket(ctx, maddr, &rbase, round, refreshedBase.MinTicket(), mbi) + if err != nil { + return false, xerrors.Errorf("failed to refresh ticket: %w", err) + } + } + + log.Warn("re-selecting messages due to base refresh") + // refresh messages, as the selected messages may no longer be valid + msgs, err = t.api.MpoolSelect(ctx, refreshedBase.Key(), ticket.Quality()) + if err != nil { + return false, xerrors.Errorf("failed to re-select messages for block: %w", err) + } + + base.TipSet = refreshedBase + } + } + } + + // block construction + var blockMsg *types.BlockMsg + { + uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.AddRounds)+1) + + blockMsg, err = t.api.MinerCreateBlock(context.TODO(), &api.BlockTemplate{ + Miner: maddr, + Parents: base.TipSet.Key(), + Ticket: ticket, + Eproof: eproof, + BeaconValues: bvals, + Messages: msgs, + Epoch: round, + Timestamp: uts, + WinningPoStProof: wpostProof, + }) + if err != nil { + return false, xerrors.Errorf("failed to create block: %w", err) + } + } + + // persist in db + { + bhjson, err := json.Marshal(blockMsg.Header) + if err != nil { + return false, xerrors.Errorf("failed to marshal block header: %w", err) + } + + _, err = t.db.Exec(ctx, `UPDATE mining_tasks + SET won = true, mined_cid = $2, mined_header = $3, mined_at = $4 + WHERE task_id = $1`, taskID, blockMsg.Header.Cid(), string(bhjson), time.Now().UTC()) + if err != nil { + return false, xerrors.Errorf("failed to update mining task: %w", err) + } + } + + // wait until block timestamp + { + time.Sleep(time.Until(time.Unix(int64(blockMsg.Header.Timestamp), 0))) + } + + // submit block!! + { + if err := t.api.SyncSubmitBlock(ctx, blockMsg); err != nil { + return false, xerrors.Errorf("failed to submit block: %w", err) + } + } + + log.Infow("mined a block", "tipset", types.LogCids(blockMsg.Header.Parents), "height", blockMsg.Header.Height, "miner", maddr, "cid", blockMsg.Header.Cid()) + + // persist that we've submitted the block + { + _, err = t.db.Exec(ctx, `UPDATE mining_tasks + SET submitted_at = $2 + WHERE task_id = $1`, taskID, time.Now().UTC()) + if err != nil { + return false, xerrors.Errorf("failed to update mining task: %w", err) + } + } + + return true, nil +} + +func (t *WinPostTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + if len(ids) == 0 { + // probably can't happen, but panicking is bad + return nil, nil + } + + // select lowest epoch + var lowestEpoch abi.ChainEpoch + var lowestEpochID = ids[0] + for _, id := range ids { + var epoch uint64 + err := t.db.QueryRow(context.Background(), `SELECT epoch FROM mining_tasks WHERE task_id = $1`, id).Scan(&epoch) + if err != nil { + return nil, err + } + + if lowestEpoch == 0 || abi.ChainEpoch(epoch) < lowestEpoch { + lowestEpoch = abi.ChainEpoch(epoch) + lowestEpochID = id + } + } + + return &lowestEpochID, nil +} + +func (t *WinPostTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "WinPost", + Max: t.max, + MaxFailures: 3, + Follows: nil, + Cost: resources.Resources{ + Cpu: 1, + + // todo set to something for 32/64G sector sizes? Technically windowPoSt is happy on a CPU + // but it will use a GPU if available + Gpu: 0, + + Ram: 1 << 30, // todo arbitrary number + }, + } +} + +func (t *WinPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { + t.mineTF.Set(taskFunc) +} + +// MiningBase is the tipset on top of which we plan to construct our next block. +// Refer to godocs on GetBestMiningCandidate. +type MiningBase struct { + TipSet *types.TipSet + ComputeTime time.Time + AddRounds abi.ChainEpoch +} + +func (mb MiningBase) epoch() abi.ChainEpoch { + // return the epoch that will result from mining on this base + return mb.TipSet.Height() + mb.AddRounds + 1 +} + +func (mb MiningBase) baseTime() time.Time { + tsTime := time.Unix(int64(mb.TipSet.MinTimestamp()), 0) + roundDelay := build.BlockDelaySecs * uint64(mb.AddRounds+1) + tsTime = tsTime.Add(time.Duration(roundDelay) * time.Second) + return tsTime +} + +func (mb MiningBase) afterPropDelay() time.Time { + base := mb.baseTime() + base.Add(randTimeOffset(time.Second)) + return base +} + +func (t *WinPostTask) mineBasic(ctx context.Context) { + var workBase MiningBase + + taskFn := t.mineTF.Val(ctx) + + // initialize workbase + { + head := retry1(func() (*types.TipSet, error) { + return t.api.ChainHead(ctx) + }) + + workBase = MiningBase{ + TipSet: head, + AddRounds: 0, + ComputeTime: time.Now(), + } + } + + /* + + /- T+0 == workBase.baseTime + | + >--------*------*--------[wait until next round]-----> + | + |- T+PD == workBase.afterPropDelay+(~1s) + |- Here we acquire the new workBase, and start a new round task + \- Then we loop around, and wait for the next head + + time --> + */ + + for { + // limit the rate at which we mine blocks to at least EquivocationDelaySecs + // this is to prevent races on devnets in catch up mode. Acts as a minimum + // delay for the sleep below. + time.Sleep(time.Duration(build.EquivocationDelaySecs)*time.Second + time.Second) + + // wait for *NEXT* propagation delay + time.Sleep(time.Until(workBase.afterPropDelay())) + + // check current best candidate + maybeBase := retry1(func() (*types.TipSet, error) { + return t.api.ChainHead(ctx) + }) + + if workBase.TipSet.Equals(maybeBase) { + // workbase didn't change in the new round so we have a null round here + workBase.AddRounds++ + log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "same-tipset") + } else { + btsw := retry1(func() (types.BigInt, error) { + return t.api.ChainTipSetWeight(ctx, maybeBase.Key()) + }) + + ltsw := retry1(func() (types.BigInt, error) { + return t.api.ChainTipSetWeight(ctx, workBase.TipSet.Key()) + }) + + if types.BigCmp(btsw, ltsw) <= 0 { + // new tipset for some reason has less weight than the old one, assume null round here + // NOTE: the backing node may have reorged, or manually changed head + workBase.AddRounds++ + log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-local-weight") + } else { + // new tipset has more weight, so we should mine on it, no null round here + log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-new-tipset") + + workBase = MiningBase{ + TipSet: maybeBase, + AddRounds: 0, + ComputeTime: time.Now(), + } + } + } + + // dispatch mining task + // (note equivocation prevention is handled by the mining code) + + for _, act := range t.actors { + spID, err := address.IDFromAddress(address.Address(act)) + if err != nil { + log.Errorf("failed to get spID from address %s: %s", act, err) + continue + } + + taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + _, err := tx.Exec(`INSERT INTO mining_tasks (task_id, sp_id, epoch, base_compute_time) VALUES ($1, $2, $3, $4)`, id, spID, workBase.epoch(), workBase.ComputeTime.UTC()) + if err != nil { + return false, xerrors.Errorf("inserting mining_tasks: %w", err) + } + + for _, c := range workBase.TipSet.Cids() { + _, err = tx.Exec(`INSERT INTO mining_base_block (task_id, sp_id, block_cid) VALUES ($1, $2, $3)`, id, spID, c) + if err != nil { + return false, xerrors.Errorf("inserting mining base blocks: %w", err) + } + } + + return true, nil // no errors, commit the transaction + }) + } + } +} + +func (t *WinPostTask) computeTicket(ctx context.Context, maddr address.Address, brand *types.BeaconEntry, round abi.ChainEpoch, chainRand *types.Ticket, mbi *api.MiningBaseInfo) (*types.Ticket, error) { + buf := new(bytes.Buffer) + if err := maddr.MarshalCBOR(buf); err != nil { + return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err) + } + + if round > build.UpgradeSmokeHeight { + buf.Write(chainRand.VRFProof) + } + + input, err := lrand.DrawRandomnessFromBase(brand.Data, crypto.DomainSeparationTag_TicketProduction, round-build.TicketRandomnessLookback, buf.Bytes()) + if err != nil { + return nil, err + } + + vrfOut, err := gen.ComputeVRF(ctx, t.api.WalletSign, mbi.WorkerKey, input) + if err != nil { + return nil, err + } + + return &types.Ticket{ + VRFProof: vrfOut, + }, nil +} + +func randTimeOffset(width time.Duration) time.Duration { + buf := make([]byte, 8) + rand.Reader.Read(buf) //nolint:errcheck + val := time.Duration(binary.BigEndian.Uint64(buf) % uint64(width)) + + return val - (width / 2) +} + +func retry1[R any](f func() (R, error)) R { + for { + r, err := f() + if err == nil { + return r + } + + log.Errorw("error in mining loop, retrying", "error", err) + time.Sleep(time.Second) + } +} + +var _ harmonytask.TaskInterface = &WinPostTask{}