lpwinning: implement WinPostTask.Do
This commit is contained in:
parent
49c56e4d1c
commit
b9625daf2e
@ -3,8 +3,17 @@ create table mining_tasks
|
||||
task_id bigint not null
|
||||
constraint mining_tasks_pk
|
||||
primary key,
|
||||
sp_id bigint,
|
||||
epoch bigint,
|
||||
sp_id bigint not null,
|
||||
epoch bigint not null,
|
||||
base_compute_time timestamp not null,
|
||||
|
||||
won bool not null default false,
|
||||
mined_cid text,
|
||||
mined_header jsonb,
|
||||
mined_at timestamp,
|
||||
|
||||
submitted_at timestamp,
|
||||
|
||||
constraint mining_tasks_sp_epoch
|
||||
unique (sp_id, epoch)
|
||||
);
|
||||
|
@ -1,18 +1,27 @@
|
||||
package lpwinning
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
prooftypes "github.com/filecoin-project/go-state-types/proof"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/gen"
|
||||
lrand "github.com/filecoin-project/lotus/chain/rand"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
"github.com/filecoin-project/lotus/lib/promise"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
"time"
|
||||
@ -22,9 +31,8 @@ var log = logging.Logger("lpwinning")
|
||||
|
||||
type WinPostTask struct {
|
||||
max int
|
||||
|
||||
// lastWork holds the last MiningBase we built upon.
|
||||
lastWork *MiningBase
|
||||
db *harmonydb.DB
|
||||
epp gen.WinningPoStProver
|
||||
|
||||
api WinPostAPI
|
||||
actors []dtypes.MinerAddress
|
||||
@ -35,20 +43,314 @@ type WinPostTask struct {
|
||||
type WinPostAPI interface {
|
||||
ChainHead(context.Context) (*types.TipSet, error)
|
||||
ChainTipSetWeight(context.Context, types.TipSetKey) (types.BigInt, error)
|
||||
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
||||
|
||||
StateGetBeaconEntry(context.Context, abi.ChainEpoch) (*types.BeaconEntry, error)
|
||||
SyncSubmitBlock(context.Context, *types.BlockMsg) error
|
||||
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
|
||||
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
|
||||
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
|
||||
|
||||
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error)
|
||||
MinerCreateBlock(context.Context, *api.BlockTemplate) (*types.BlockMsg, error)
|
||||
MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error)
|
||||
|
||||
WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error)
|
||||
}
|
||||
|
||||
func NewWinPostTask(max abi.SectorNumber) *WinPostTask {
|
||||
// todo run warmup
|
||||
func NewWinPostTask(max int, db *harmonydb.DB, epp gen.WinningPoStProver, api WinPostAPI, actors []dtypes.MinerAddress) *WinPostTask {
|
||||
return &WinPostTask{
|
||||
max: max,
|
||||
db: db,
|
||||
epp: epp,
|
||||
api: api,
|
||||
actors: actors,
|
||||
}
|
||||
// TODO: run warmup
|
||||
}
|
||||
|
||||
func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||
// TODO THIS WILL BASICALLY BE A mineOne() function
|
||||
ctx := context.TODO()
|
||||
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
type BlockCID struct {
|
||||
CID string
|
||||
}
|
||||
|
||||
type MiningTaskDetails struct {
|
||||
SpID uint64
|
||||
Epoch uint64
|
||||
BlockCIDs []BlockCID
|
||||
CompTime time.Time
|
||||
}
|
||||
|
||||
var details MiningTaskDetails
|
||||
|
||||
// First query to fetch from mining_tasks
|
||||
err = t.db.QueryRow(ctx, `SELECT sp_id, epoch, base_compute_time FROM mining_tasks WHERE task_id = $1`, taskID).Scan(&details.SpID, &details.Epoch, &details.CompTime)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Second query to fetch from mining_base_block
|
||||
rows, err := t.db.Query(ctx, `SELECT block_cid FROM mining_base_block WHERE task_id = $1`, taskID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid BlockCID
|
||||
if err := rows.Scan(&cid.CID); err != nil {
|
||||
return false, err
|
||||
}
|
||||
details.BlockCIDs = append(details.BlockCIDs, cid)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// construct base
|
||||
maddr, err := address.NewIDAddress(details.SpID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var bcids []cid.Cid
|
||||
for _, c := range details.BlockCIDs {
|
||||
bcid, err := cid.Parse(c.CID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
bcids = append(bcids, bcid)
|
||||
}
|
||||
|
||||
tsk := types.NewTipSetKey(bcids...)
|
||||
baseTs, err := t.api.ChainGetTipSet(ctx, tsk)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("loading base tipset: %w", err)
|
||||
}
|
||||
|
||||
base := MiningBase{
|
||||
TipSet: baseTs,
|
||||
AddRounds: abi.ChainEpoch(details.Epoch) - baseTs.Height() - 1,
|
||||
ComputeTime: details.CompTime,
|
||||
}
|
||||
|
||||
// ensure we have a beacon entry for the epoch we're mining on
|
||||
round := base.epoch()
|
||||
|
||||
_ = retry1(func() (*types.BeaconEntry, error) {
|
||||
return t.api.StateGetBeaconEntry(ctx, round)
|
||||
})
|
||||
|
||||
// MAKE A MINING ATTEMPT!!
|
||||
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
|
||||
|
||||
mbi, err := t.api.MinerGetBaseInfo(ctx, maddr, round, base.TipSet.Key())
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("failed to get mining base info: %w", err)
|
||||
return false, err
|
||||
}
|
||||
if mbi == nil {
|
||||
// not elloigible to mine on this base, we're done here
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if !mbi.EligibleForMining {
|
||||
// slashed or just have no power yet, we're done here
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var rbase types.BeaconEntry
|
||||
var bvals []types.BeaconEntry
|
||||
var eproof *types.ElectionProof
|
||||
|
||||
// winner check
|
||||
{
|
||||
bvals = mbi.BeaconEntries
|
||||
rbase = mbi.PrevBeaconEntry
|
||||
if len(bvals) > 0 {
|
||||
rbase = bvals[len(bvals)-1]
|
||||
}
|
||||
|
||||
eproof, err = gen.IsRoundWinner(ctx, round, maddr, rbase, mbi, t.api)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to check if we win next round: %w", err)
|
||||
}
|
||||
|
||||
if eproof == nil {
|
||||
// not a winner, we're done here
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// winning PoSt
|
||||
var wpostProof []prooftypes.PoStProof
|
||||
{
|
||||
buf := new(bytes.Buffer)
|
||||
if err := maddr.MarshalCBOR(buf); err != nil {
|
||||
err = xerrors.Errorf("failed to marshal miner address: %w", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
rand, err := lrand.DrawRandomnessFromBase(rbase.Data, crypto.DomainSeparationTag_WinningPoStChallengeSeed, round, buf.Bytes())
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("failed to get randomness for winning post: %w", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
prand := abi.PoStRandomness(rand)
|
||||
|
||||
nv, err := t.api.StateNetworkVersion(ctx, base.TipSet.Key())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
wpostProof, err = t.epp.ComputeProof(ctx, mbi.Sectors, prand, round, nv)
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("failed to compute winning post proof: %w", err)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
ticket, err := t.computeTicket(ctx, maddr, &rbase, round, base.TipSet.MinTicket(), mbi)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("scratching ticket failed: %w", err)
|
||||
}
|
||||
|
||||
// get pending messages early,
|
||||
msgs, err := t.api.MpoolSelect(ctx, base.TipSet.Key(), ticket.Quality())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to select messages for block: %w", err)
|
||||
}
|
||||
|
||||
// equivocation handling
|
||||
{
|
||||
// This next block exists to "catch" equivocating miners,
|
||||
// who submit 2 blocks at the same height at different times in order to split the network.
|
||||
// To safeguard against this, we make sure it's been EquivocationDelaySecs since our base was calculated,
|
||||
// then re-calculate it.
|
||||
// If the daemon detected equivocated blocks, those blocks will no longer be in the new base.
|
||||
time.Sleep(time.Until(base.ComputeTime.Add(time.Duration(build.EquivocationDelaySecs) * time.Second)))
|
||||
|
||||
bestTs, err := t.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to get chain head: %w", err)
|
||||
}
|
||||
|
||||
headWeight, err := t.api.ChainTipSetWeight(ctx, bestTs.Key())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to get chain head weight: %w", err)
|
||||
}
|
||||
|
||||
baseWeight, err := t.api.ChainTipSetWeight(ctx, base.TipSet.Key())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to get base weight: %w", err)
|
||||
}
|
||||
if types.BigCmp(headWeight, baseWeight) <= 0 {
|
||||
bestTs = base.TipSet
|
||||
}
|
||||
|
||||
// If the base has changed, we take the _intersection_ of our old base and new base,
|
||||
// thus ejecting blocks from any equivocating miners, without taking any new blocks.
|
||||
if bestTs.Height() == base.TipSet.Height() && !bestTs.Equals(base.TipSet) {
|
||||
log.Warnf("base changed from %s to %s, taking intersection", base.TipSet.Key(), bestTs.Key())
|
||||
newBaseMap := map[cid.Cid]struct{}{}
|
||||
for _, newBaseBlk := range bestTs.Cids() {
|
||||
newBaseMap[newBaseBlk] = struct{}{}
|
||||
}
|
||||
|
||||
refreshedBaseBlocks := make([]*types.BlockHeader, 0, len(base.TipSet.Cids()))
|
||||
for _, baseBlk := range base.TipSet.Blocks() {
|
||||
if _, ok := newBaseMap[baseBlk.Cid()]; ok {
|
||||
refreshedBaseBlocks = append(refreshedBaseBlocks, baseBlk)
|
||||
}
|
||||
}
|
||||
|
||||
if len(refreshedBaseBlocks) != 0 && len(refreshedBaseBlocks) != len(base.TipSet.Blocks()) {
|
||||
refreshedBase, err := types.NewTipSet(refreshedBaseBlocks)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to create new tipset when refreshing: %w", err)
|
||||
}
|
||||
|
||||
if !base.TipSet.MinTicket().Equals(refreshedBase.MinTicket()) {
|
||||
log.Warn("recomputing ticket due to base refresh")
|
||||
|
||||
ticket, err = t.computeTicket(ctx, maddr, &rbase, round, refreshedBase.MinTicket(), mbi)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to refresh ticket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Warn("re-selecting messages due to base refresh")
|
||||
// refresh messages, as the selected messages may no longer be valid
|
||||
msgs, err = t.api.MpoolSelect(ctx, refreshedBase.Key(), ticket.Quality())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to re-select messages for block: %w", err)
|
||||
}
|
||||
|
||||
base.TipSet = refreshedBase
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// block construction
|
||||
var blockMsg *types.BlockMsg
|
||||
{
|
||||
uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.AddRounds)+1)
|
||||
|
||||
blockMsg, err = t.api.MinerCreateBlock(context.TODO(), &api.BlockTemplate{
|
||||
Miner: maddr,
|
||||
Parents: base.TipSet.Key(),
|
||||
Ticket: ticket,
|
||||
Eproof: eproof,
|
||||
BeaconValues: bvals,
|
||||
Messages: msgs,
|
||||
Epoch: round,
|
||||
Timestamp: uts,
|
||||
WinningPoStProof: wpostProof,
|
||||
})
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to create block: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// persist in db
|
||||
{
|
||||
bhjson, err := json.Marshal(blockMsg.Header)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to marshal block header: %w", err)
|
||||
}
|
||||
|
||||
_, err = t.db.Exec(ctx, `UPDATE mining_tasks
|
||||
SET won = true, mined_cid = $2, mined_header = $3, mined_at = $4
|
||||
WHERE task_id = $1`, taskID, blockMsg.Header.Cid(), string(bhjson), time.Now().UTC())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to update mining task: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// submit block!!
|
||||
{
|
||||
if err := t.api.SyncSubmitBlock(ctx, blockMsg); err != nil {
|
||||
return false, xerrors.Errorf("failed to submit block: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("mined a block", "tipset", types.LogCids(blockMsg.Header.Parents), "height", blockMsg.Header.Height, "miner", maddr, "cid", blockMsg.Header.Cid())
|
||||
|
||||
// persist that we've submitted the block
|
||||
{
|
||||
_, err = t.db.Exec(ctx, `UPDATE mining_tasks
|
||||
SET submitted_at = $2
|
||||
WHERE task_id = $1`, taskID, time.Now().UTC())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to update mining task: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *WinPostTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||
@ -188,7 +490,7 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
|
||||
}
|
||||
|
||||
taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
||||
_, err := tx.Exec(`INSERT INTO mining_tasks (task_id, sp_id, epoch) VALUES ($1, $2, $3)`, id, spID, workBase.epoch())
|
||||
_, err := tx.Exec(`INSERT INTO mining_tasks (task_id, sp_id, epoch, base_compute_time) VALUES ($1, $2, $3, $4)`, id, spID, workBase.epoch(), workBase.ComputeTime.UTC())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting mining_tasks: %w", err)
|
||||
}
|
||||
@ -376,6 +678,31 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
|
||||
}
|
||||
*/
|
||||
|
||||
func (t *WinPostTask) computeTicket(ctx context.Context, maddr address.Address, brand *types.BeaconEntry, round abi.ChainEpoch, chainRand *types.Ticket, mbi *api.MiningBaseInfo) (*types.Ticket, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
if err := maddr.MarshalCBOR(buf); err != nil {
|
||||
return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err)
|
||||
}
|
||||
|
||||
if round > build.UpgradeSmokeHeight {
|
||||
buf.Write(chainRand.VRFProof)
|
||||
}
|
||||
|
||||
input, err := lrand.DrawRandomnessFromBase(brand.Data, crypto.DomainSeparationTag_TicketProduction, round-build.TicketRandomnessLookback, buf.Bytes())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vrfOut, err := gen.ComputeVRF(ctx, t.api.WalletSign, mbi.WorkerKey, input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &types.Ticket{
|
||||
VRFProof: vrfOut,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func randTimeOffset(width time.Duration) time.Duration {
|
||||
buf := make([]byte, 8)
|
||||
rand.Reader.Read(buf) //nolint:errcheck
|
||||
|
Loading…
Reference in New Issue
Block a user