lotus/miner/miner.go

578 lines
15 KiB
Go
Raw Normal View History

package miner
import (
"bytes"
"context"
"fmt"
2019-08-20 16:50:17 +00:00
"sync"
"time"
address "github.com/filecoin-project/go-address"
2020-02-12 22:12:11 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/crypto"
2020-02-12 22:12:11 +00:00
lru "github.com/hashicorp/golang-lru"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
2020-04-29 22:25:48 +00:00
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
2019-08-20 16:50:17 +00:00
logging "github.com/ipfs/go-log/v2"
2019-08-20 16:50:17 +00:00
"go.opencensus.io/trace"
"golang.org/x/xerrors"
)
var log = logging.Logger("miner")
2020-04-23 21:12:42 +00:00
// returns a callback reporting whether we mined a blocks in this round
type waitFunc func(ctx context.Context, baseTime uint64) (func(bool), error)
2019-09-23 15:27:30 +00:00
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address) *Miner {
2020-01-17 07:10:47 +00:00
arc, err := lru.NewARC(10000)
if err != nil {
panic(err)
}
2019-07-11 02:36:43 +00:00
return &Miner{
api: api,
epp: epp,
address: addr,
2020-04-23 21:12:42 +00:00
waitFunc: func(ctx context.Context, baseTime uint64) (func(bool), error) {
2019-10-09 04:38:59 +00:00
// Wait around for half the block time in case other parents come in
2019-12-03 20:00:04 +00:00
deadline := baseTime + build.PropagationDelay
time.Sleep(time.Until(time.Unix(int64(deadline), 0)))
2020-04-23 21:12:42 +00:00
return func(bool) {}, nil
2019-10-09 04:38:59 +00:00
},
minedBlockHeights: arc,
2019-07-11 02:36:43 +00:00
}
}
type Miner struct {
2019-11-25 04:45:13 +00:00
api api.FullNode
epp gen.WinningPoStProver
2019-11-21 22:21:45 +00:00
lk sync.Mutex
address address.Address
stop chan struct{}
stopping chan struct{}
2019-07-11 02:36:43 +00:00
2019-10-09 04:38:59 +00:00
waitFunc waitFunc
lastWork *MiningBase
minedBlockHeights *lru.ARCCache
}
func (m *Miner) Address() address.Address {
2019-08-21 15:14:38 +00:00
m.lk.Lock()
defer m.lk.Unlock()
return m.address
2019-08-21 15:14:38 +00:00
}
func (m *Miner) Start(ctx context.Context) error {
2019-08-20 16:50:17 +00:00
m.lk.Lock()
defer m.lk.Unlock()
if m.stop != nil {
return fmt.Errorf("miner already started")
2019-08-20 16:50:17 +00:00
}
m.stop = make(chan struct{})
go m.mine(context.TODO())
2019-08-20 16:50:17 +00:00
return nil
}
func (m *Miner) Stop(ctx context.Context) error {
2019-08-20 18:05:17 +00:00
m.lk.Lock()
defer m.lk.Unlock()
2019-08-20 18:05:17 +00:00
m.stopping = make(chan struct{})
stopping := m.stopping
close(m.stop)
2019-08-20 18:05:17 +00:00
select {
case <-stopping:
return nil
case <-ctx.Done():
return ctx.Err()
2019-08-20 18:05:17 +00:00
}
}
func (m *Miner) niceSleep(d time.Duration) bool {
select {
case <-time.After(d):
return true
case <-m.stop:
return false
}
}
2019-08-20 16:50:17 +00:00
func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End()
2019-08-20 18:05:17 +00:00
2019-10-10 02:03:42 +00:00
var lastBase MiningBase
for {
2019-08-20 18:05:17 +00:00
select {
case <-m.stop:
stopping := m.stopping
2019-08-20 18:05:17 +00:00
m.stop = nil
m.stopping = nil
close(stopping)
return
2019-08-20 18:05:17 +00:00
default:
}
2019-12-03 20:00:04 +00:00
prebase, err := m.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
m.niceSleep(time.Second * 5)
2019-12-03 20:00:04 +00:00
continue
}
// Wait until propagation delay period after block we plan to mine on
2020-04-23 21:12:42 +00:00
onDone, err := m.waitFunc(ctx, prebase.TipSet.MinTimestamp())
if err != nil {
2019-10-09 04:38:59 +00:00
log.Error(err)
2020-06-28 10:20:56 +00:00
continue
2019-10-09 04:38:59 +00:00
}
2019-08-20 18:05:17 +00:00
2019-10-15 05:00:30 +00:00
base, err := m.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
continue
}
2020-04-23 21:12:42 +00:00
if base.TipSet.Equals(lastBase.TipSet) && lastBase.NullRounds == base.NullRounds {
log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.NullRounds)
m.niceSleep(time.Duration(build.BlockDelaySecs) * time.Second)
continue
}
b, err := m.mineOne(ctx, base)
if err != nil {
log.Errorf("mining block failed: %+v", err)
m.niceSleep(time.Second)
continue
}
lastBase = *base
onDone(b != nil)
2020-04-23 21:12:42 +00:00
if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0)
2019-10-09 09:11:41 +00:00
if time.Now().Before(btime) {
if !m.niceSleep(time.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
time.Sleep(time.Until(btime))
}
2019-10-10 00:20:47 +00:00
} else {
log.Warnw("mined block in the past", "block-time", btime,
"time", time.Now(), "duration", time.Since(btime))
2019-10-09 09:11:41 +00:00
}
2020-05-05 19:39:43 +00:00
// TODO: should do better 'anti slash' protection here
blkKey := fmt.Sprintf("%d", b.Header.Height)
if _, ok := m.minedBlockHeights.Get(blkKey); ok {
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
continue
}
m.minedBlockHeights.Add(blkKey, true)
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
}
} else {
2020-06-12 00:11:38 +00:00
base.NullRounds++
// Wait until the next epoch, plus the propagation delay, so a new tipset
// has enough time to form.
//
// See: https://github.com/filecoin-project/lotus/issues/1845
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelay), 0)
select {
case <-time.After(time.Until(nextRound)):
case <-m.stop:
stopping := m.stopping
m.stop = nil
m.stopping = nil
close(stopping)
return
}
}
}
}
type MiningBase struct {
2020-04-23 21:12:42 +00:00
TipSet *types.TipSet
NullRounds abi.ChainEpoch
}
2019-10-15 05:00:30 +00:00
func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
m.lk.Lock()
defer m.lk.Unlock()
2019-10-15 05:00:30 +00:00
bts, err := m.api.ChainHead(ctx)
if err != nil {
return nil, err
}
if m.lastWork != nil {
2020-04-23 21:12:42 +00:00
if m.lastWork.TipSet.Equals(bts) {
return m.lastWork, nil
}
btsw, err := m.api.ChainTipSetWeight(ctx, bts.Key())
2019-10-15 05:00:30 +00:00
if err != nil {
return nil, err
}
2020-04-23 21:12:42 +00:00
ltsw, err := m.api.ChainTipSetWeight(ctx, m.lastWork.TipSet.Key())
2019-10-15 05:00:30 +00:00
if err != nil {
return nil, err
}
if types.BigCmp(btsw, ltsw) <= 0 {
return m.lastWork, nil
}
}
2020-04-23 21:12:42 +00:00
m.lastWork = &MiningBase{TipSet: bts}
2019-12-03 07:58:38 +00:00
return m.lastWork, nil
}
func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.TipSet) (bool, error) {
mpower, err := m.api.StateMinerPower(ctx, addr, ts.Key())
2019-11-29 20:18:34 +00:00
if err != nil {
return false, err
}
return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil
2019-11-29 20:18:34 +00:00
}
// mineOne attempts to mine a single block, and does so synchronously, if and
// only if we are eligible to mine.
2020-06-23 21:51:25 +00:00
//
// {hint/landmark}: This method coordinates all the steps involved in mining a
// block, including the condition of whether mine or not at all depending on
// whether we win the round or not.
//
// This method does the following:
//
// 1.
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
2020-04-23 21:12:42 +00:00
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
2019-11-27 14:18:51 +00:00
start := time.Now()
2020-04-23 21:12:42 +00:00
round := base.TipSet.Height() + base.NullRounds + 1
mbi, err := m.api.MinerGetBaseInfo(ctx, m.address, round, base.TipSet.Key())
2020-04-08 15:11:42 +00:00
if err != nil {
2020-04-09 17:13:09 +00:00
return nil, xerrors.Errorf("failed to get mining base info: %w", err)
2020-04-08 15:11:42 +00:00
}
2020-04-23 21:12:42 +00:00
if mbi == nil {
return nil, nil
}
2020-04-08 15:11:42 +00:00
2020-05-15 09:17:13 +00:00
tMBI := time.Now()
2020-04-09 17:13:09 +00:00
beaconPrev := mbi.PrevBeaconEntry
2020-05-15 09:17:13 +00:00
tDrand := time.Now()
bvals := mbi.BeaconEntries
2020-05-15 09:17:13 +00:00
hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
2019-11-29 20:18:34 +00:00
if err != nil {
return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
}
2019-12-05 05:49:11 +00:00
if !hasPower {
// slashed or just have no power yet
2019-11-29 20:18:34 +00:00
return nil, nil
}
2020-05-15 09:17:13 +00:00
tPowercheck := time.Now()
2020-04-23 21:12:42 +00:00
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
2020-04-09 17:13:09 +00:00
rbase := beaconPrev
2020-04-08 15:11:42 +00:00
if len(bvals) > 0 {
rbase = bvals[len(bvals)-1]
}
ticket, err := m.computeTicket(ctx, &rbase, base, len(bvals) > 0)
if err != nil {
return nil, xerrors.Errorf("scratching ticket failed: %w", err)
}
winner, err := gen.IsRoundWinner(ctx, base.TipSet, round, m.address, rbase, mbi, m.api)
if err != nil {
2019-11-25 04:45:13 +00:00
return nil, xerrors.Errorf("failed to check if we win next round: %w", err)
}
if winner == nil {
return nil, nil
}
2020-05-15 09:17:13 +00:00
tTicket := time.Now()
2020-04-30 20:21:46 +00:00
buf := new(bytes.Buffer)
if err := m.address.MarshalCBOR(buf); err != nil {
2020-04-30 20:21:46 +00:00
return nil, xerrors.Errorf("failed to marshal miner address: %w", err)
}
rand, err := store.DrawRandomness(rbase.Data, crypto.DomainSeparationTag_WinningPoStChallengeSeed, base.TipSet.Height()+base.NullRounds+1, buf.Bytes())
if err != nil {
return nil, xerrors.Errorf("failed to get randomness for winning post: %w", err)
}
prand := abi.PoStRandomness(rand)
2020-05-15 09:17:13 +00:00
tSeed := time.Now()
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
if err != nil {
return nil, xerrors.Errorf("failed to compute winning post proof: %w", err)
}
2019-12-03 18:25:56 +00:00
// get pending messages early,
2020-04-23 21:12:42 +00:00
pending, err := m.api.MpoolPending(context.TODO(), base.TipSet.Key())
2019-12-03 18:25:56 +00:00
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
2020-05-15 09:17:13 +00:00
tPending := time.Now()
2020-04-08 15:11:42 +00:00
// TODO: winning post proof
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err)
}
2020-05-15 09:17:13 +00:00
tCreateBlock := time.Now()
dur := tCreateBlock.Sub(start)
2019-12-03 20:00:04 +00:00
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
if dur > time.Second*time.Duration(build.BlockDelaySecs) {
2019-12-03 00:08:08 +00:00
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
2020-05-15 09:17:13 +00:00
log.Warnw("tMinerBaseInfo ", "duration", tMBI.Sub(start))
log.Warnw("tDrand ", "duration", tDrand.Sub(tMBI))
log.Warnw("tPowercheck ", "duration", tPowercheck.Sub(tDrand))
log.Warnw("tTicket ", "duration", tTicket.Sub(tPowercheck))
log.Warnw("tSeed ", "duration", tSeed.Sub(tTicket))
log.Warnw("tPending ", "duration", tPending.Sub(tSeed))
log.Warnw("tCreateBlock ", "duration", tCreateBlock.Sub(tPending))
2019-12-03 00:08:08 +00:00
}
2019-11-27 14:18:51 +00:00
return b, nil
}
func (m *Miner) computeTicket(ctx context.Context, brand *types.BeaconEntry, base *MiningBase, haveNewEntries bool) (*types.Ticket, error) {
mi, err := m.api.StateMinerInfo(ctx, m.address, types.EmptyTSK)
2019-08-15 02:30:21 +00:00
if err != nil {
return nil, err
}
2020-04-16 20:38:42 +00:00
worker, err := m.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return nil, err
}
2019-08-15 02:30:21 +00:00
buf := new(bytes.Buffer)
if err := m.address.MarshalCBOR(buf); err != nil {
return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err)
}
2020-04-29 22:25:48 +00:00
if !haveNewEntries {
buf.Write(base.TipSet.MinTicket().VRFProof)
}
input, err := store.DrawRandomness(brand.Data, crypto.DomainSeparationTag_TicketProduction, base.TipSet.Height()+base.NullRounds+1-build.TicketRandomnessLookback, buf.Bytes())
2020-02-23 20:00:47 +00:00
if err != nil {
return nil, err
}
2020-04-16 20:38:42 +00:00
vrfOut, err := gen.ComputeVRF(ctx, m.api.WalletSign, worker, input)
2019-08-15 02:30:21 +00:00
if err != nil {
return nil, err
}
return &types.Ticket{
2019-10-09 04:38:59 +00:00
VRFProof: vrfOut,
2019-08-15 02:30:21 +00:00
}, nil
}
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket,
eproof *types.ElectionProof, bvals []types.BeaconEntry, wpostProof []abi.PoStProof, pending []*types.SignedMessage) (*types.BlockMsg, error) {
2020-04-23 21:12:42 +00:00
msgs, err := SelectMessages(context.TODO(), m.api.StateGetActor, base.TipSet, pending)
if err != nil {
return nil, xerrors.Errorf("message filtering failed: %w", err)
}
2019-07-11 02:36:43 +00:00
if len(msgs) > build.BlockMessageLimit {
log.Error("SelectMessages returned too many messages: ", len(msgs))
msgs = msgs[:build.BlockMessageLimit]
}
uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.NullRounds)+1)
2020-04-23 21:12:42 +00:00
nheight := base.TipSet.Height() + base.NullRounds + 1
2019-09-06 17:44:09 +00:00
// why even return this? that api call could just submit it for us
2020-04-09 00:24:10 +00:00
return m.api.MinerCreateBlock(context.TODO(), &api.BlockTemplate{
Miner: addr,
2020-04-23 21:12:42 +00:00
Parents: base.TipSet.Key(),
Ticket: ticket,
Eproof: eproof,
BeaconValues: bvals,
Messages: msgs,
Epoch: nheight,
Timestamp: uts,
WinningPoStProof: wpostProof,
2020-04-09 00:24:10 +00:00
})
}
2020-05-15 09:17:13 +00:00
type actCacheEntry struct {
act *types.Actor
err error
}
type cachedActorLookup struct {
tsk types.TipSetKey
cache map[address.Address]actCacheEntry
fallback ActorLookup
}
func (c *cachedActorLookup) StateGetActor(ctx context.Context, a address.Address, tsk types.TipSetKey) (*types.Actor, error) {
if c.tsk == tsk {
e, has := c.cache[a]
if has {
return e.act, e.err
}
}
e, err := c.fallback(ctx, a, tsk)
if c.tsk == tsk {
c.cache[a] = actCacheEntry{
act: e, err: err,
}
}
return e, err
}
type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
2019-12-03 18:25:56 +00:00
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
for _, msg := range msgs {
if msg.Message.From == from {
out++
}
}
return out
}
func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
2020-05-15 09:17:13 +00:00
al = (&cachedActorLookup{
tsk: ts.Key(),
cache: map[address.Address]actCacheEntry{},
fallback: al,
}).StateGetActor
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt)
2019-12-03 20:00:04 +00:00
inclCount := make(map[address.Address]int)
2019-12-03 18:25:56 +00:00
2020-05-15 09:17:13 +00:00
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := time.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
for _, msg := range msgs {
2020-05-15 09:17:13 +00:00
vmstart := time.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
log.Warnf("invalid message in message pool: %s", err)
continue
}
2020-05-15 09:17:13 +00:00
vmValid += time.Since(vmstart)
// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
continue
}
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}
from := msg.Message.From
2020-05-15 09:17:13 +00:00
getBalStart := time.Now()
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key())
if err != nil {
log.Warnf("failed to check message sender balance, skipping message: %+v", err)
continue
}
2019-12-03 20:05:54 +00:00
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
2020-05-15 09:17:13 +00:00
getbal += time.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
2020-05-15 09:17:13 +00:00
tooLowFundMsgs++
// todo: drop from mpool
continue
}
if msg.Message.Nonce > inclNonces[from] {
2020-05-15 09:17:13 +00:00
tooHighNonceMsgs++
continue
}
if msg.Message.Nonce < inclNonces[from] {
2019-12-03 18:25:56 +00:00
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from))
continue
}
inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
2019-12-03 20:00:04 +00:00
inclCount[from]++
out = append(out, msg)
if len(out) >= build.BlockMessageLimit {
2019-12-04 06:18:02 +00:00
break
}
}
2020-05-15 09:17:13 +00:00
if tooLowFundMsgs > 0 {
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
}
if tooHighNonceMsgs > 0 {
log.Warnf("%d messages in mempool had too high nonce", tooLowFundMsgs)
}
sm := time.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),
"vmvalidate", vmValid,
"getbalance", getbal,
"msgs", len(msgs))
}
return out, nil
}