Merge branch 'feat/sturdypost' into fix-sturdy-tests

This commit is contained in:
Andrew Jackson (Ajax) 2023-11-20 18:08:40 -06:00 committed by GitHub
commit 31ed5cc8f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 796 additions and 29 deletions

View File

@ -11,10 +11,13 @@ import (
"strings" "strings"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/ipfs/go-datastore"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
cliutil "github.com/filecoin-project/lotus/cli/util" cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
@ -33,6 +36,12 @@ var configMigrateCmd = &cli.Command{
Value: "~/.lotusminer", Value: "~/.lotusminer",
Usage: fmt.Sprintf("Specify miner repo path. flag(%s) and env(LOTUS_STORAGE_PATH) are DEPRECATION, will REMOVE SOON", FlagMinerRepoDeprecation), Usage: fmt.Sprintf("Specify miner repo path. flag(%s) and env(LOTUS_STORAGE_PATH) are DEPRECATION, will REMOVE SOON", FlagMinerRepoDeprecation),
}, },
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Hidden: true,
Value: "~/.lotus",
},
&cli.StringFlag{ &cli.StringFlag{
Name: "to-layer", Name: "to-layer",
Aliases: []string{"t"}, Aliases: []string{"t"},
@ -117,14 +126,20 @@ func fromMiner(cctx *cli.Context) (err error) {
} }
// Populate Miner Address // Populate Miner Address
sm, cc, err := cliutil.GetStorageMinerAPI(cctx) mmeta, err := lr.Datastore(ctx, "/metadata")
if err != nil { if err != nil {
return fmt.Errorf("could not get storageMiner API: %w", err) return xerrors.Errorf("opening miner metadata datastore: %w", err)
} }
defer cc() defer mmeta.Close()
addr, err := sm.ActorAddress(ctx)
maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address"))
if err != nil { if err != nil {
return fmt.Errorf("could not read actor address: %w", err) return xerrors.Errorf("getting miner address datastore entry: %w", err)
}
addr, err := address.NewFromBytes(maddrBytes)
if err != nil {
return xerrors.Errorf("parsing miner actor address: %w", err)
} }
lpCfg.Addresses.MinerAddresses = []string{addr.String()} lpCfg.Addresses.MinerAddresses = []string{addr.String()}
@ -137,7 +152,8 @@ func fromMiner(cctx *cli.Context) (err error) {
if err != nil { if err != nil {
return xerrors.Errorf("error getting JWTSecretName: %w", err) return xerrors.Errorf("error getting JWTSecretName: %w", err)
} }
lpCfg.Apis.StorageRPCSecret = base64.RawStdEncoding.EncodeToString(js.PrivateKey)
lpCfg.Apis.StorageRPCSecret = base64.StdEncoding.EncodeToString(js.PrivateKey)
// Populate API Key // Populate API Key
_, header, err := cliutil.GetRawAPI(cctx, repo.FullNode, "v0") _, header, err := cliutil.GetRawAPI(cctx, repo.FullNode, "v0")
@ -145,7 +161,11 @@ func fromMiner(cctx *cli.Context) (err error) {
return fmt.Errorf("cannot read API: %w", err) return fmt.Errorf("cannot read API: %w", err)
} }
lpCfg.Apis.ChainApiInfo = []string{header.Get("Authorization")[7:]} ainfo, err := cliutil.GetAPIInfo(&cli.Context{}, repo.FullNode)
if err != nil {
return xerrors.Errorf("could not get API info for FullNode: %w", err)
}
lpCfg.Apis.ChainApiInfo = []string{header.Get("Authorization")[7:] + ":" + ainfo.Addr}
// Enable WindowPoSt // Enable WindowPoSt
lpCfg.Subsystems.EnableWindowPost = true lpCfg.Subsystems.EnableWindowPost = true
@ -166,6 +186,7 @@ environment variable LOTUS_WORKER_WINDOWPOST.
return xerrors.Errorf("Cannot get default config: %w", err) return xerrors.Errorf("Cannot get default config: %w", err)
} }
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '$1')", cfg) _, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '$1')", cfg)
if err != nil { if err != nil {
return err return err
} }

View File

@ -43,6 +43,7 @@ import (
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider" "github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/provider/lpwinning"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
@ -159,6 +160,11 @@ var runCmd = &cli.Command{
} }
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask)
} }
if cfg.Subsystems.EnableWinningPost {
winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs)
activeTasks = append(activeTasks, winPoStTask)
}
} }
log.Infow("This lotus_provider instance handles", log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs, "miner_addresses", maddrs,

View File

@ -176,6 +176,16 @@
# env var: LOTUS_SUBSYSTEMS_DISABLEWINDOWPOST # env var: LOTUS_SUBSYSTEMS_DISABLEWINDOWPOST
#DisableWindowPoSt = false #DisableWindowPoSt = false
# When winning post is disabled, the miner process will NOT attempt to mine
# blocks. This should only be set when there's an external process mining
# blocks on behalf of the miner.
# When disabled and no external block producers are configured, all potential
# block rewards will be missed!
#
# type: bool
# env var: LOTUS_SUBSYSTEMS_DISABLEWINNINGPOST
#DisableWinningPoSt = false
[Dealmaking] [Dealmaking]
# When enabled, the miner can accept online deals # When enabled, the miner can accept online deals

View File

@ -8,6 +8,9 @@
# type: bool # type: bool
#EnableWinningPost = false #EnableWinningPost = false
# type: int
#WinningPostMaxTasks = 0
[Fees] [Fees]
# type: types.FIL # type: types.FIL

View File

@ -0,0 +1,39 @@
create table mining_tasks
(
task_id bigint not null
constraint mining_tasks_pk
primary key,
sp_id bigint not null,
epoch bigint not null,
base_compute_time timestamp not null,
won bool not null default false,
mined_cid text,
mined_header jsonb,
mined_at timestamp,
submitted_at timestamp,
constraint mining_tasks_sp_epoch
unique (sp_id, epoch)
);
create table mining_base_block
(
id bigserial not null
constraint mining_base_block_pk
primary key,
task_id bigint not null
constraint mining_base_block_mining_tasks_task_id_fk
references mining_tasks
on delete cascade,
sp_id bigint,
block_cid text not null,
no_win bool not null default false,
constraint mining_base_block_pk2
unique (sp_id, task_id, block_cid)
);
CREATE UNIQUE INDEX mining_base_block_cid_k ON mining_base_block (sp_id, block_cid) WHERE no_win = false;

View File

@ -71,7 +71,7 @@ func NewMiner(api v1api.FullNode, epp gen.WinningPoStProver, addr address.Addres
api: api, api: api,
epp: epp, epp: epp,
address: addr, address: addr,
waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error) { propagationWaitFunc: func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error) {
// wait around for half the block time in case other parents come in // wait around for half the block time in case other parents come in
// //
// if we're mining a block in the past via catch-up/rush mining, // if we're mining a block in the past via catch-up/rush mining,
@ -114,7 +114,7 @@ type Miner struct {
stop chan struct{} stop chan struct{}
stopping chan struct{} stopping chan struct{}
waitFunc waitFunc propagationWaitFunc waitFunc
// lastWork holds the last MiningBase we built upon. // lastWork holds the last MiningBase we built upon.
lastWork *MiningBase lastWork *MiningBase
@ -205,15 +205,21 @@ func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine") ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End() defer span.End()
// Perform the Winning PoSt warmup in a separate goroutine.
go m.doWinPoStWarmup(ctx) go m.doWinPoStWarmup(ctx)
var lastBase MiningBase var lastBase MiningBase
// Start the main mining loop.
minerLoop: minerLoop:
for { for {
// Prepare a context for a single node operation.
ctx := cliutil.OnSingleNode(ctx) ctx := cliutil.OnSingleNode(ctx)
// Handle stop signals.
select { select {
case <-m.stop: case <-m.stop:
// If a stop signal is received, clean up and exit the mining loop.
stopping := m.stopping stopping := m.stopping
m.stop = nil m.stop = nil
m.stopping = nil m.stopping = nil
@ -223,10 +229,11 @@ minerLoop:
default: default:
} }
var base *MiningBase var base *MiningBase // NOTE: This points to m.lastWork; Incrementing nulls here will increment it there.
var onDone func(bool, abi.ChainEpoch, error) var onDone func(bool, abi.ChainEpoch, error)
var injectNulls abi.ChainEpoch var injectNulls abi.ChainEpoch
// Look for the best mining candidate.
for { for {
prebase, err := m.GetBestMiningCandidate(ctx) prebase, err := m.GetBestMiningCandidate(ctx)
if err != nil { if err != nil {
@ -237,6 +244,7 @@ minerLoop:
continue continue
} }
// Check if we have a new base or if the current base is still valid.
if base != nil && base.TipSet.Height() == prebase.TipSet.Height() && base.NullRounds == prebase.NullRounds { if base != nil && base.TipSet.Height() == prebase.TipSet.Height() && base.NullRounds == prebase.NullRounds {
base = prebase base = prebase
break break
@ -253,13 +261,13 @@ minerLoop:
// best mining candidate at that time. // best mining candidate at that time.
// Wait until propagation delay period after block we plan to mine on // Wait until propagation delay period after block we plan to mine on
onDone, injectNulls, err = m.waitFunc(ctx, prebase.TipSet.MinTimestamp()) onDone, injectNulls, err = m.propagationWaitFunc(ctx, prebase.TipSet.MinTimestamp())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue continue
} }
// just wait for the beacon entry to become available before we select our final mining base // Ensure the beacon entry is available before finalizing the mining base.
_, err = m.api.StateGetBeaconEntry(ctx, prebase.TipSet.Height()+prebase.NullRounds+1) _, err = m.api.StateGetBeaconEntry(ctx, prebase.TipSet.Height()+prebase.NullRounds+1)
if err != nil { if err != nil {
log.Errorf("failed getting beacon entry: %s", err) log.Errorf("failed getting beacon entry: %s", err)
@ -272,8 +280,9 @@ minerLoop:
base = prebase base = prebase
} }
base.NullRounds += injectNulls // testing base.NullRounds += injectNulls // Adjust for testing purposes.
// Check for repeated mining candidates and handle sleep for the next round.
if base.TipSet.Equals(lastBase.TipSet) && lastBase.NullRounds == base.NullRounds { if base.TipSet.Equals(lastBase.TipSet) && lastBase.NullRounds == base.NullRounds {
log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.NullRounds) log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.NullRounds)
if !m.niceSleep(time.Duration(build.BlockDelaySecs) * time.Second) { if !m.niceSleep(time.Duration(build.BlockDelaySecs) * time.Second) {
@ -282,6 +291,7 @@ minerLoop:
continue continue
} }
// Attempt to mine a block.
b, err := m.mineOne(ctx, base) b, err := m.mineOne(ctx, base)
if err != nil { if err != nil {
log.Errorf("mining block failed: %+v", err) log.Errorf("mining block failed: %+v", err)
@ -299,9 +309,12 @@ minerLoop:
} }
onDone(b != nil, h, nil) onDone(b != nil, h, nil)
// Process the mined block.
if b != nil { if b != nil {
// Record the event of mining a block.
m.journal.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} { m.journal.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
return map[string]interface{}{ return map[string]interface{}{
// Data about the mined block.
"parents": base.TipSet.Cids(), "parents": base.TipSet.Cids(),
"nulls": base.NullRounds, "nulls": base.NullRounds,
"epoch": b.Header.Height, "epoch": b.Header.Height,
@ -312,19 +325,23 @@ minerLoop:
btime := time.Unix(int64(b.Header.Timestamp), 0) btime := time.Unix(int64(b.Header.Timestamp), 0)
now := build.Clock.Now() now := build.Clock.Now()
// Handle timing for broadcasting the block.
switch { switch {
case btime == now: case btime == now:
// block timestamp is perfectly aligned with time. // block timestamp is perfectly aligned with time.
case btime.After(now): case btime.After(now):
// Wait until it's time to broadcast the block.
if !m.niceSleep(build.Clock.Until(btime)) { if !m.niceSleep(build.Clock.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
build.Clock.Sleep(build.Clock.Until(btime)) build.Clock.Sleep(build.Clock.Until(btime))
} }
default: default:
// Log if the block was mined in the past.
log.Warnw("mined block in the past", log.Warnw("mined block in the past",
"block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime)) "block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime))
} }
// Check for slash filter conditions.
if os.Getenv("LOTUS_MINER_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" && !build.IsNearUpgrade(base.TipSet.Height(), build.UpgradeWatermelonFixHeight) { if os.Getenv("LOTUS_MINER_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" && !build.IsNearUpgrade(base.TipSet.Height(), build.UpgradeWatermelonFixHeight) {
witness, fault, err := m.sf.MinedBlock(ctx, b.Header, base.TipSet.Height()+base.NullRounds) witness, fault, err := m.sf.MinedBlock(ctx, b.Header, base.TipSet.Height()+base.NullRounds)
if err != nil { if err != nil {
@ -339,25 +356,27 @@ minerLoop:
} }
} }
// Check for blocks created at the same height.
if _, ok := m.minedBlockHeights.Get(b.Header.Height); ok { if _, ok := m.minedBlockHeights.Get(b.Header.Height); ok {
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents) log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
continue continue
} }
// Add the block height to the mined block heights.
m.minedBlockHeights.Add(b.Header.Height, true) m.minedBlockHeights.Add(b.Header.Height, true)
// Submit the newly mined block.
if err := m.api.SyncSubmitBlock(ctx, b); err != nil { if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %+v", err) log.Errorf("failed to submit newly mined block: %+v", err)
} }
} else { } else {
// If no block was mined, increase the null rounds and wait for the next epoch.
base.NullRounds++ base.NullRounds++
// Wait until the next epoch, plus the propagation delay, so a new tipset // Calculate the time for the next round.
// has enough time to form.
//
// See: https://github.com/filecoin-project/lotus/issues/1845
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0) nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0)
// Wait for the next round or stop signal.
select { select {
case <-build.Clock.After(build.Clock.Until(nextRound)): case <-build.Clock.After(build.Clock.Until(nextRound)):
case <-m.stop: case <-m.stop:

View File

@ -29,7 +29,7 @@ func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(v1api.FullNo
m := &Miner{ m := &Miner{
api: api, api: api,
waitFunc: chanWaiter(nextCh), propagationWaitFunc: chanWaiter(nextCh),
epp: epp, epp: epp,
minedBlockHeights: arc, minedBlockHeights: arc,
address: addr, address: addr,

View File

@ -121,8 +121,12 @@ func ConfigStorageMiner(c interface{}) Option {
// Mining / proving // Mining / proving
Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter),
If(!cfg.Subsystems.DisableWinningPoSt,
Override(new(*miner.Miner), modules.SetupBlockProducer), Override(new(*miner.Miner), modules.SetupBlockProducer),
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
),
Override(PreflightChecksKey, modules.PreflightChecks), Override(PreflightChecksKey, modules.PreflightChecks),
Override(new(*sealing.Sealing), modules.SealingPipeline(cfg.Fees)), Override(new(*sealing.Sealing), modules.SealingPipeline(cfg.Fees)),

View File

@ -977,6 +977,16 @@ This option will stop lotus-miner from performing any actions related
to window post, including scheduling, submitting proofs, and recovering to window post, including scheduling, submitting proofs, and recovering
sectors.`, sectors.`,
}, },
{
Name: "DisableWinningPoSt",
Type: "bool",
Comment: `When winning post is disabled, the miner process will NOT attempt to mine
blocks. This should only be set when there's an external process mining
blocks on behalf of the miner.
When disabled and no external block producers are configured, all potential
block rewards will be missed!`,
},
}, },
"ProviderSubsystemsConfig": { "ProviderSubsystemsConfig": {
{ {
@ -995,6 +1005,12 @@ sectors.`,
Name: "EnableWinningPost", Name: "EnableWinningPost",
Type: "bool", Type: "bool",
Comment: ``,
},
{
Name: "WinningPostMaxTasks",
Type: "int",
Comment: ``, Comment: ``,
}, },
}, },

View File

@ -95,6 +95,7 @@ type ProviderSubsystemsConfig struct {
EnableWindowPost bool EnableWindowPost bool
WindowPostMaxTasks int WindowPostMaxTasks int
EnableWinningPost bool EnableWinningPost bool
WinningPostMaxTasks int
} }
type DAGStoreConfig struct { type DAGStoreConfig struct {
@ -161,6 +162,13 @@ type MinerSubsystemConfig struct {
// to window post, including scheduling, submitting proofs, and recovering // to window post, including scheduling, submitting proofs, and recovering
// sectors. // sectors.
DisableWindowPoSt bool DisableWindowPoSt bool
// When winning post is disabled, the miner process will NOT attempt to mine
// blocks. This should only be set when there's an external process mining
// blocks on behalf of the miner.
// When disabled and no external block producers are configured, all potential
// block rewards will be missed!
DisableWinningPoSt bool
} }
type DealmakingConfig struct { type DealmakingConfig struct {

View File

@ -0,0 +1,641 @@
package lpwinning
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network"
prooftypes "github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
lrand "github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("lpwinning")
type WinPostTask struct {
max int
db *harmonydb.DB
prover ProverWinningPoSt
verifier storiface.Verifier
api WinPostAPI
actors []dtypes.MinerAddress
mineTF promise.Promise[harmonytask.AddTaskFunc]
}
type WinPostAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainTipSetWeight(context.Context, types.TipSetKey) (types.BigInt, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateGetBeaconEntry(context.Context, abi.ChainEpoch) (*types.BeaconEntry, error)
SyncSubmitBlock(context.Context, *types.BlockMsg) error
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error)
MinerCreateBlock(context.Context, *api.BlockTemplate) (*types.BlockMsg, error)
MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error)
WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error)
}
type ProverWinningPoSt interface {
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, sectorInfo []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]prooftypes.PoStProof, error)
}
func NewWinPostTask(max int, db *harmonydb.DB, prover ProverWinningPoSt, verifier storiface.Verifier, api WinPostAPI, actors []dtypes.MinerAddress) *WinPostTask {
t := &WinPostTask{
max: max,
db: db,
prover: prover,
verifier: verifier,
api: api,
actors: actors,
}
// TODO: run warmup
go t.mineBasic(context.TODO())
return t
}
func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
log.Debugw("WinPostTask.Do()", "taskID", taskID)
ctx := context.TODO()
type BlockCID struct {
CID string
}
type MiningTaskDetails struct {
SpID uint64
Epoch uint64
BlockCIDs []BlockCID
CompTime time.Time
}
var details MiningTaskDetails
// First query to fetch from mining_tasks
err = t.db.QueryRow(ctx, `SELECT sp_id, epoch, base_compute_time FROM mining_tasks WHERE task_id = $1`, taskID).Scan(&details.SpID, &details.Epoch, &details.CompTime)
if err != nil {
return false, err
}
// Second query to fetch from mining_base_block
rows, err := t.db.Query(ctx, `SELECT block_cid FROM mining_base_block WHERE task_id = $1`, taskID)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var cid BlockCID
if err := rows.Scan(&cid.CID); err != nil {
return false, err
}
details.BlockCIDs = append(details.BlockCIDs, cid)
}
if err := rows.Err(); err != nil {
return false, err
}
// construct base
maddr, err := address.NewIDAddress(details.SpID)
if err != nil {
return false, err
}
var bcids []cid.Cid
for _, c := range details.BlockCIDs {
bcid, err := cid.Parse(c.CID)
if err != nil {
return false, err
}
bcids = append(bcids, bcid)
}
tsk := types.NewTipSetKey(bcids...)
baseTs, err := t.api.ChainGetTipSet(ctx, tsk)
if err != nil {
return false, xerrors.Errorf("loading base tipset: %w", err)
}
base := MiningBase{
TipSet: baseTs,
AddRounds: abi.ChainEpoch(details.Epoch) - baseTs.Height() - 1,
ComputeTime: details.CompTime,
}
persistNoWin := func() error {
_, err := t.db.Exec(ctx, `UPDATE mining_base_block SET no_win = true WHERE task_id = $1`, taskID)
if err != nil {
return xerrors.Errorf("marking base as not-won: %w", err)
}
return nil
}
// ensure we have a beacon entry for the epoch we're mining on
round := base.epoch()
_ = retry1(func() (*types.BeaconEntry, error) {
return t.api.StateGetBeaconEntry(ctx, round)
})
// MAKE A MINING ATTEMPT!!
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
mbi, err := t.api.MinerGetBaseInfo(ctx, maddr, round, base.TipSet.Key())
if err != nil {
return false, xerrors.Errorf("failed to get mining base info: %w", err)
}
if mbi == nil {
// not eligible to mine on this base, we're done here
log.Debugw("WinPoSt not eligible to mine on this base", "tipset", types.LogCids(base.TipSet.Cids()))
return true, persistNoWin()
}
if !mbi.EligibleForMining {
// slashed or just have no power yet, we're done here
log.Debugw("WinPoSt not eligible for mining", "tipset", types.LogCids(base.TipSet.Cids()))
return true, persistNoWin()
}
if len(mbi.Sectors) == 0 {
log.Warnw("WinPoSt no sectors to mine", "tipset", types.LogCids(base.TipSet.Cids()))
return false, xerrors.Errorf("no sectors selected for winning PoSt")
}
var rbase types.BeaconEntry
var bvals []types.BeaconEntry
var eproof *types.ElectionProof
// winner check
{
bvals = mbi.BeaconEntries
rbase = mbi.PrevBeaconEntry
if len(bvals) > 0 {
rbase = bvals[len(bvals)-1]
}
eproof, err = gen.IsRoundWinner(ctx, round, maddr, rbase, mbi, t.api)
if err != nil {
log.Warnw("WinPoSt failed to check if we win next round", "error", err)
return false, xerrors.Errorf("failed to check if we win next round: %w", err)
}
if eproof == nil {
// not a winner, we're done here
log.Debugw("WinPoSt not a winner", "tipset", types.LogCids(base.TipSet.Cids()))
return true, persistNoWin()
}
}
// winning PoSt
var wpostProof []prooftypes.PoStProof
{
buf := new(bytes.Buffer)
if err := maddr.MarshalCBOR(buf); err != nil {
err = xerrors.Errorf("failed to marshal miner address: %w", err)
return false, err
}
brand, err := lrand.DrawRandomnessFromBase(rbase.Data, crypto.DomainSeparationTag_WinningPoStChallengeSeed, round, buf.Bytes())
if err != nil {
err = xerrors.Errorf("failed to get randomness for winning post: %w", err)
return false, err
}
prand := abi.PoStRandomness(brand)
prand[31] &= 0x3f // make into fr
sectorNums := make([]abi.SectorNumber, len(mbi.Sectors))
for i, s := range mbi.Sectors {
sectorNums[i] = s.SectorNumber
}
ppt, err := mbi.Sectors[0].SealProof.RegisteredWinningPoStProof()
if err != nil {
return false, xerrors.Errorf("mapping sector seal proof type to post proof type: %w", err)
}
postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, abi.ActorID(details.SpID), prand, sectorNums)
if err != nil {
return false, xerrors.Errorf("generating election challenges: %v", err)
}
sectorChallenges := make([]storiface.PostSectorChallenge, len(mbi.Sectors))
for i, s := range mbi.Sectors {
sectorChallenges[i] = storiface.PostSectorChallenge{
SealProof: s.SealProof,
SectorNumber: s.SectorNumber,
SealedCID: s.SealedCID,
Challenge: postChallenges.Challenges[s.SectorNumber],
Update: s.SectorKey != nil,
}
}
wpostProof, err = t.prover.GenerateWinningPoSt(ctx, ppt, abi.ActorID(details.SpID), sectorChallenges, prand)
if err != nil {
err = xerrors.Errorf("failed to compute winning post proof: %w", err)
return false, err
}
}
ticket, err := t.computeTicket(ctx, maddr, &rbase, round, base.TipSet.MinTicket(), mbi)
if err != nil {
return false, xerrors.Errorf("scratching ticket failed: %w", err)
}
// get pending messages early,
msgs, err := t.api.MpoolSelect(ctx, base.TipSet.Key(), ticket.Quality())
if err != nil {
return false, xerrors.Errorf("failed to select messages for block: %w", err)
}
// equivocation handling
{
// This next block exists to "catch" equivocating miners,
// who submit 2 blocks at the same height at different times in order to split the network.
// To safeguard against this, we make sure it's been EquivocationDelaySecs since our base was calculated,
// then re-calculate it.
// If the daemon detected equivocated blocks, those blocks will no longer be in the new base.
time.Sleep(time.Until(base.ComputeTime.Add(time.Duration(build.EquivocationDelaySecs) * time.Second)))
bestTs, err := t.api.ChainHead(ctx)
if err != nil {
return false, xerrors.Errorf("failed to get chain head: %w", err)
}
headWeight, err := t.api.ChainTipSetWeight(ctx, bestTs.Key())
if err != nil {
return false, xerrors.Errorf("failed to get chain head weight: %w", err)
}
baseWeight, err := t.api.ChainTipSetWeight(ctx, base.TipSet.Key())
if err != nil {
return false, xerrors.Errorf("failed to get base weight: %w", err)
}
if types.BigCmp(headWeight, baseWeight) <= 0 {
bestTs = base.TipSet
}
// If the base has changed, we take the _intersection_ of our old base and new base,
// thus ejecting blocks from any equivocating miners, without taking any new blocks.
if bestTs.Height() == base.TipSet.Height() && !bestTs.Equals(base.TipSet) {
log.Warnf("base changed from %s to %s, taking intersection", base.TipSet.Key(), bestTs.Key())
newBaseMap := map[cid.Cid]struct{}{}
for _, newBaseBlk := range bestTs.Cids() {
newBaseMap[newBaseBlk] = struct{}{}
}
refreshedBaseBlocks := make([]*types.BlockHeader, 0, len(base.TipSet.Cids()))
for _, baseBlk := range base.TipSet.Blocks() {
if _, ok := newBaseMap[baseBlk.Cid()]; ok {
refreshedBaseBlocks = append(refreshedBaseBlocks, baseBlk)
}
}
if len(refreshedBaseBlocks) != 0 && len(refreshedBaseBlocks) != len(base.TipSet.Blocks()) {
refreshedBase, err := types.NewTipSet(refreshedBaseBlocks)
if err != nil {
return false, xerrors.Errorf("failed to create new tipset when refreshing: %w", err)
}
if !base.TipSet.MinTicket().Equals(refreshedBase.MinTicket()) {
log.Warn("recomputing ticket due to base refresh")
ticket, err = t.computeTicket(ctx, maddr, &rbase, round, refreshedBase.MinTicket(), mbi)
if err != nil {
return false, xerrors.Errorf("failed to refresh ticket: %w", err)
}
}
log.Warn("re-selecting messages due to base refresh")
// refresh messages, as the selected messages may no longer be valid
msgs, err = t.api.MpoolSelect(ctx, refreshedBase.Key(), ticket.Quality())
if err != nil {
return false, xerrors.Errorf("failed to re-select messages for block: %w", err)
}
base.TipSet = refreshedBase
}
}
}
// block construction
var blockMsg *types.BlockMsg
{
uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.AddRounds)+1)
blockMsg, err = t.api.MinerCreateBlock(context.TODO(), &api.BlockTemplate{
Miner: maddr,
Parents: base.TipSet.Key(),
Ticket: ticket,
Eproof: eproof,
BeaconValues: bvals,
Messages: msgs,
Epoch: round,
Timestamp: uts,
WinningPoStProof: wpostProof,
})
if err != nil {
return false, xerrors.Errorf("failed to create block: %w", err)
}
}
// persist in db
{
bhjson, err := json.Marshal(blockMsg.Header)
if err != nil {
return false, xerrors.Errorf("failed to marshal block header: %w", err)
}
_, err = t.db.Exec(ctx, `UPDATE mining_tasks
SET won = true, mined_cid = $2, mined_header = $3, mined_at = $4
WHERE task_id = $1`, taskID, blockMsg.Header.Cid(), string(bhjson), time.Now().UTC())
if err != nil {
return false, xerrors.Errorf("failed to update mining task: %w", err)
}
}
// wait until block timestamp
{
time.Sleep(time.Until(time.Unix(int64(blockMsg.Header.Timestamp), 0)))
}
// submit block!!
{
if err := t.api.SyncSubmitBlock(ctx, blockMsg); err != nil {
return false, xerrors.Errorf("failed to submit block: %w", err)
}
}
log.Infow("mined a block", "tipset", types.LogCids(blockMsg.Header.Parents), "height", blockMsg.Header.Height, "miner", maddr, "cid", blockMsg.Header.Cid())
// persist that we've submitted the block
{
_, err = t.db.Exec(ctx, `UPDATE mining_tasks
SET submitted_at = $2
WHERE task_id = $1`, taskID, time.Now().UTC())
if err != nil {
return false, xerrors.Errorf("failed to update mining task: %w", err)
}
}
return true, nil
}
func (t *WinPostTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
if len(ids) == 0 {
// probably can't happen, but panicking is bad
return nil, nil
}
// select lowest epoch
var lowestEpoch abi.ChainEpoch
var lowestEpochID = ids[0]
for _, id := range ids {
var epoch uint64
err := t.db.QueryRow(context.Background(), `SELECT epoch FROM mining_tasks WHERE task_id = $1`, id).Scan(&epoch)
if err != nil {
return nil, err
}
if lowestEpoch == 0 || abi.ChainEpoch(epoch) < lowestEpoch {
lowestEpoch = abi.ChainEpoch(epoch)
lowestEpochID = id
}
}
return &lowestEpochID, nil
}
func (t *WinPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WinPost",
Max: t.max,
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
// todo set to something for 32/64G sector sizes? Technically windowPoSt is happy on a CPU
// but it will use a GPU if available
Gpu: 0,
Ram: 1 << 30, // todo arbitrary number
},
}
}
func (t *WinPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
t.mineTF.Set(taskFunc)
}
// MiningBase is the tipset on top of which we plan to construct our next block.
// Refer to godocs on GetBestMiningCandidate.
type MiningBase struct {
TipSet *types.TipSet
ComputeTime time.Time
AddRounds abi.ChainEpoch
}
func (mb MiningBase) epoch() abi.ChainEpoch {
// return the epoch that will result from mining on this base
return mb.TipSet.Height() + mb.AddRounds + 1
}
func (mb MiningBase) baseTime() time.Time {
tsTime := time.Unix(int64(mb.TipSet.MinTimestamp()), 0)
roundDelay := build.BlockDelaySecs * uint64(mb.AddRounds+1)
tsTime = tsTime.Add(time.Duration(roundDelay) * time.Second)
return tsTime
}
func (mb MiningBase) afterPropDelay() time.Time {
base := mb.baseTime()
base.Add(randTimeOffset(time.Second))
return base
}
func (t *WinPostTask) mineBasic(ctx context.Context) {
var workBase MiningBase
taskFn := t.mineTF.Val(ctx)
// initialize workbase
{
head := retry1(func() (*types.TipSet, error) {
return t.api.ChainHead(ctx)
})
workBase = MiningBase{
TipSet: head,
AddRounds: 0,
ComputeTime: time.Now(),
}
}
/*
/- T+0 == workBase.baseTime
|
>--------*------*--------[wait until next round]----->
|
|- T+PD == workBase.afterPropDelay+(~1s)
|- Here we acquire the new workBase, and start a new round task
\- Then we loop around, and wait for the next head
time -->
*/
for {
// limit the rate at which we mine blocks to at least EquivocationDelaySecs
// this is to prevent races on devnets in catch up mode. Acts as a minimum
// delay for the sleep below.
time.Sleep(time.Duration(build.EquivocationDelaySecs)*time.Second + time.Second)
// wait for *NEXT* propagation delay
time.Sleep(time.Until(workBase.afterPropDelay()))
// check current best candidate
maybeBase := retry1(func() (*types.TipSet, error) {
return t.api.ChainHead(ctx)
})
if workBase.TipSet.Equals(maybeBase) {
// workbase didn't change in the new round so we have a null round here
workBase.AddRounds++
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "same-tipset")
} else {
btsw := retry1(func() (types.BigInt, error) {
return t.api.ChainTipSetWeight(ctx, maybeBase.Key())
})
ltsw := retry1(func() (types.BigInt, error) {
return t.api.ChainTipSetWeight(ctx, workBase.TipSet.Key())
})
if types.BigCmp(btsw, ltsw) <= 0 {
// new tipset for some reason has less weight than the old one, assume null round here
// NOTE: the backing node may have reorged, or manually changed head
workBase.AddRounds++
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-local-weight")
} else {
// new tipset has more weight, so we should mine on it, no null round here
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-new-tipset")
workBase = MiningBase{
TipSet: maybeBase,
AddRounds: 0,
ComputeTime: time.Now(),
}
}
}
// dispatch mining task
// (note equivocation prevention is handled by the mining code)
for _, act := range t.actors {
spID, err := address.IDFromAddress(address.Address(act))
if err != nil {
log.Errorf("failed to get spID from address %s: %s", act, err)
continue
}
taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`INSERT INTO mining_tasks (task_id, sp_id, epoch, base_compute_time) VALUES ($1, $2, $3, $4)`, id, spID, workBase.epoch(), workBase.ComputeTime.UTC())
if err != nil {
return false, xerrors.Errorf("inserting mining_tasks: %w", err)
}
for _, c := range workBase.TipSet.Cids() {
_, err = tx.Exec(`INSERT INTO mining_base_block (task_id, sp_id, block_cid) VALUES ($1, $2, $3)`, id, spID, c)
if err != nil {
return false, xerrors.Errorf("inserting mining base blocks: %w", err)
}
}
return true, nil // no errors, commit the transaction
})
}
}
}
func (t *WinPostTask) computeTicket(ctx context.Context, maddr address.Address, brand *types.BeaconEntry, round abi.ChainEpoch, chainRand *types.Ticket, mbi *api.MiningBaseInfo) (*types.Ticket, error) {
buf := new(bytes.Buffer)
if err := maddr.MarshalCBOR(buf); err != nil {
return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err)
}
if round > build.UpgradeSmokeHeight {
buf.Write(chainRand.VRFProof)
}
input, err := lrand.DrawRandomnessFromBase(brand.Data, crypto.DomainSeparationTag_TicketProduction, round-build.TicketRandomnessLookback, buf.Bytes())
if err != nil {
return nil, err
}
vrfOut, err := gen.ComputeVRF(ctx, t.api.WalletSign, mbi.WorkerKey, input)
if err != nil {
return nil, err
}
return &types.Ticket{
VRFProof: vrfOut,
}, nil
}
func randTimeOffset(width time.Duration) time.Duration {
buf := make([]byte, 8)
rand.Reader.Read(buf) //nolint:errcheck
val := time.Duration(binary.BigEndian.Uint64(buf) % uint64(width))
return val - (width / 2)
}
func retry1[R any](f func() (R, error)) R {
for {
r, err := f()
if err == nil {
return r
}
log.Errorw("error in mining loop, retrying", "error", err)
time.Sleep(time.Second)
}
}
var _ harmonytask.TaskInterface = &WinPostTask{}