lotus/miner/miner.go

448 lines
12 KiB
Go
Raw Normal View History

package miner
import (
"context"
"fmt"
2019-08-20 16:50:17 +00:00
"sync"
"time"
"github.com/filecoin-project/go-address"
2020-02-12 22:12:11 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2020-02-23 20:00:47 +00:00
"github.com/filecoin-project/specs-actors/actors/crypto"
2020-02-12 22:12:11 +00:00
lru "github.com/hashicorp/golang-lru"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
2019-08-20 16:50:17 +00:00
logging "github.com/ipfs/go-log/v2"
2019-08-20 16:50:17 +00:00
"go.opencensus.io/trace"
"golang.org/x/xerrors"
)
var log = logging.Logger("miner")
2019-12-03 20:00:04 +00:00
type waitFunc func(ctx context.Context, baseTime uint64) error
2019-09-23 15:27:30 +00:00
2019-11-25 04:45:13 +00:00
func NewMiner(api api.FullNode, epp gen.ElectionPoStProver) *Miner {
2020-01-17 07:10:47 +00:00
arc, err := lru.NewARC(10000)
if err != nil {
panic(err)
}
2019-07-11 02:36:43 +00:00
return &Miner{
2019-09-23 15:27:30 +00:00
api: api,
2019-11-25 04:45:13 +00:00
epp: epp,
2019-12-03 20:00:04 +00:00
waitFunc: func(ctx context.Context, baseTime uint64) error {
2019-10-09 04:38:59 +00:00
// Wait around for half the block time in case other parents come in
2019-12-03 20:00:04 +00:00
deadline := baseTime + build.PropagationDelay
time.Sleep(time.Until(time.Unix(int64(deadline), 0)))
2019-10-09 04:38:59 +00:00
return nil
},
minedBlockHeights: arc,
2019-07-11 02:36:43 +00:00
}
}
type Miner struct {
2019-11-25 04:45:13 +00:00
api api.FullNode
2019-11-21 22:21:45 +00:00
epp gen.ElectionPoStProver
2019-08-20 16:50:17 +00:00
lk sync.Mutex
addresses []address.Address
2019-08-20 18:05:17 +00:00
stop chan struct{}
stopping chan struct{}
2019-07-11 02:36:43 +00:00
2019-10-09 04:38:59 +00:00
waitFunc waitFunc
lastWork *MiningBase
minedBlockHeights *lru.ARCCache
}
2019-08-21 15:14:38 +00:00
func (m *Miner) Addresses() ([]address.Address, error) {
m.lk.Lock()
defer m.lk.Unlock()
out := make([]address.Address, len(m.addresses))
copy(out, m.addresses)
return out, nil
}
2019-08-20 16:50:17 +00:00
func (m *Miner) Register(addr address.Address) error {
m.lk.Lock()
defer m.lk.Unlock()
if len(m.addresses) > 0 {
for _, a := range m.addresses {
if a == addr {
log.Warnf("miner.Register called more than once for actor '%s'", addr)
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
}
}
2019-08-20 16:50:17 +00:00
}
m.addresses = append(m.addresses, addr)
if len(m.addresses) == 1 {
m.stop = make(chan struct{})
go m.mine(context.TODO())
}
2019-08-20 16:50:17 +00:00
return nil
}
2019-08-20 18:05:17 +00:00
func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
m.lk.Lock()
defer m.lk.Unlock()
2019-08-20 18:05:17 +00:00
if len(m.addresses) == 0 {
return xerrors.New("no addresses registered")
}
idx := -1
2019-08-20 18:05:17 +00:00
for i, a := range m.addresses {
if a == addr {
idx = i
break
}
}
if idx == -1 {
2019-08-20 18:05:17 +00:00
return xerrors.New("unregister: address not found")
}
m.addresses[idx] = m.addresses[len(m.addresses)-1]
m.addresses = m.addresses[:len(m.addresses)-1]
2019-08-20 18:05:17 +00:00
// Unregistering last address, stop mining first
if len(m.addresses) == 0 && m.stop != nil {
m.stopping = make(chan struct{})
2019-08-20 18:05:17 +00:00
stopping := m.stopping
close(m.stop)
2019-08-20 18:05:17 +00:00
select {
case <-stopping:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
2019-08-20 16:50:17 +00:00
func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End()
2019-08-20 18:05:17 +00:00
2019-10-10 02:03:42 +00:00
var lastBase MiningBase
eventLoop:
for {
2019-08-20 18:05:17 +00:00
select {
case <-m.stop:
stopping := m.stopping
2019-08-20 18:05:17 +00:00
m.stop = nil
m.stopping = nil
close(stopping)
return
2019-08-20 18:05:17 +00:00
default:
}
m.lk.Lock()
addrs := m.addresses
m.lk.Unlock()
2019-12-03 20:00:04 +00:00
prebase, err := m.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
time.Sleep(time.Second * 5)
continue
}
// Wait until propagation delay period after block we plan to mine on
if err := m.waitFunc(ctx, prebase.ts.MinTimestamp()); err != nil {
2019-10-09 04:38:59 +00:00
log.Error(err)
return
}
2019-08-20 18:05:17 +00:00
2019-10-15 05:00:30 +00:00
base, err := m.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
continue
}
if base.ts.Equals(lastBase.ts) && lastBase.nullRounds == base.nullRounds {
log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.ts.Cids(), lastBase.nullRounds)
2019-10-10 02:03:42 +00:00
time.Sleep(build.BlockDelay * time.Second)
continue
}
2019-10-10 02:03:42 +00:00
lastBase = *base
blks := make([]*types.BlockMsg, 0)
for _, addr := range addrs {
b, err := m.mineOne(ctx, addr, base)
if err != nil {
log.Errorf("mining block failed: %+v", err)
continue
}
if b != nil {
blks = append(blks, b)
}
}
if len(blks) != 0 {
btime := time.Unix(int64(blks[0].Header.Timestamp), 0)
2019-10-09 09:11:41 +00:00
if time.Now().Before(btime) {
time.Sleep(time.Until(btime))
2019-10-10 00:20:47 +00:00
} else {
log.Warnw("mined block in the past", "block-time", btime,
"time", time.Now(), "duration", time.Since(btime))
2019-10-09 09:11:41 +00:00
}
mWon := make(map[address.Address]struct{})
for _, b := range blks {
_, notOk := mWon[b.Header.Miner]
if notOk {
log.Errorw("2 blocks for the same miner. Throwing hands in the air. Report this. It is important.", "bloks", blks)
continue eventLoop
}
mWon[b.Header.Miner] = struct{}{}
}
for _, b := range blks {
// TODO: this code was written to handle creating blocks for multiple miners.
// However, we don't use that, and we probably never will. So even though this code will
// never see different miners, i'm going to handle the caching as if it was going to.
// We can clean it up later when we remove all the multiple miner logic.
blkKey := fmt.Sprintf("%s-%d", b.Header.Miner, b.Header.Height)
if _, ok := m.minedBlockHeights.Get(blkKey); ok {
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
continue
}
m.minedBlockHeights.Add(blkKey, true)
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
}
}
} else {
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*base.nullRounds)), 0)
select {
case <-time.After(time.Until(nextRound)):
case <-m.stop:
stopping := m.stopping
m.stop = nil
m.stopping = nil
close(stopping)
return
}
}
}
}
type MiningBase struct {
ts *types.TipSet
2020-02-08 02:18:32 +00:00
nullRounds abi.ChainEpoch
}
2019-10-15 05:00:30 +00:00
func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
bts, err := m.api.ChainHead(ctx)
if err != nil {
return nil, err
}
if m.lastWork != nil {
if m.lastWork.ts.Equals(bts) {
return m.lastWork, nil
}
btsw, err := m.api.ChainTipSetWeight(ctx, bts.Key())
2019-10-15 05:00:30 +00:00
if err != nil {
return nil, err
}
ltsw, err := m.api.ChainTipSetWeight(ctx, m.lastWork.ts.Key())
2019-10-15 05:00:30 +00:00
if err != nil {
return nil, err
}
if types.BigCmp(btsw, ltsw) <= 0 {
return m.lastWork, nil
}
}
2019-12-03 07:58:38 +00:00
m.lastWork = &MiningBase{ts: bts}
return m.lastWork, nil
}
func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.TipSet) (bool, error) {
power, err := m.api.StateMinerPower(ctx, addr, ts.Key())
2019-11-29 20:18:34 +00:00
if err != nil {
return false, err
}
2019-12-05 05:49:11 +00:00
return !power.MinerPower.Equals(types.NewInt(0)), nil
2019-11-29 20:18:34 +00:00
}
func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningBase) (*types.BlockMsg, error) {
2019-11-07 00:18:06 +00:00
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids()))
2019-11-27 14:18:51 +00:00
start := time.Now()
hasPower, err := m.hasPower(ctx, addr, base.ts)
2019-11-29 20:18:34 +00:00
if err != nil {
return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
}
2019-12-05 05:49:11 +00:00
if !hasPower {
// slashed or just have no power yet
2019-11-29 20:18:34 +00:00
base.nullRounds++
return nil, nil
}
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.ts.MinTimestamp(), base.nullRounds)
2019-11-21 22:21:45 +00:00
ticket, err := m.computeTicket(ctx, addr, base)
if err != nil {
return nil, xerrors.Errorf("scratching ticket failed: %w", err)
}
2019-12-03 21:27:07 +00:00
proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api)
if err != nil {
2019-11-25 04:45:13 +00:00
return nil, xerrors.Errorf("failed to check if we win next round: %w", err)
}
2019-12-03 21:27:07 +00:00
if proofin == nil {
base.nullRounds++
return nil, nil
}
2019-12-03 18:25:56 +00:00
// get pending messages early,
pending, err := m.api.MpoolPending(context.TODO(), base.ts.Key())
2019-12-03 18:25:56 +00:00
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
proof, err := gen.ComputeProof(ctx, m.epp, proofin)
if err != nil {
return nil, xerrors.Errorf("computing election proof: %w", err)
}
b, err := m.createBlock(base, addr, ticket, proof, pending)
if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err)
}
2019-12-03 20:00:04 +00:00
dur := time.Since(start)
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
2019-12-03 00:08:08 +00:00
if dur > time.Second*build.BlockDelay {
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
}
2019-11-27 14:18:51 +00:00
return b, nil
}
2020-02-23 20:00:47 +00:00
func (m *Miner) computeTicket(ctx context.Context, addr address.Address, base *MiningBase) (*types.Ticket, error) {
w, err := m.api.StateMinerWorker(ctx, addr, types.EmptyTSK)
2019-08-15 02:30:21 +00:00
if err != nil {
return nil, err
}
2020-02-23 20:00:47 +00:00
input, err := m.api.ChainGetRandomness(ctx, base.ts.Key(), crypto.DomainSeparationTag_TicketProduction, base.ts.Height(), addr.Bytes())
if err != nil {
return nil, err
}
2020-02-23 20:00:47 +00:00
vrfOut, err := gen.ComputeVRF(ctx, m.api.WalletSign, w, input)
2019-08-15 02:30:21 +00:00
if err != nil {
return nil, err
}
return &types.Ticket{
2019-10-09 04:38:59 +00:00
VRFProof: vrfOut,
2019-08-15 02:30:21 +00:00
}, nil
}
2019-12-03 18:25:56 +00:00
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) {
msgs, err := SelectMessages(context.TODO(), m.api.StateGetActor, base.ts, pending)
if err != nil {
return nil, xerrors.Errorf("message filtering failed: %w", err)
}
2019-07-11 02:36:43 +00:00
if len(msgs) > build.BlockMessageLimit {
log.Error("SelectMessages returned too many messages: ", len(msgs))
msgs = msgs[:build.BlockMessageLimit]
}
uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(base.nullRounds+1))
nheight := base.ts.Height() + base.nullRounds + 1
2019-09-06 17:44:09 +00:00
// why even return this? that api call could just submit it for us
return m.api.MinerCreateBlock(context.TODO(), addr, base.ts.Key(), ticket, proof, msgs, nheight, uint64(uts))
}
type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
2019-12-03 18:25:56 +00:00
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
for _, msg := range msgs {
if msg.Message.From == from {
out++
}
}
return out
}
func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt)
2019-12-03 20:00:04 +00:00
inclCount := make(map[address.Address]int)
2019-12-03 18:25:56 +00:00
for _, msg := range msgs {
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}
from := msg.Message.From
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key())
if err != nil {
log.Warnf("failed to check message sender balance, skipping message: %+v", err)
continue
}
2019-12-03 20:05:54 +00:00
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
log.Warnf("message in mempool does not have enough funds: %s", msg.Cid())
continue
}
if msg.Message.Nonce > inclNonces[from] {
log.Debugf("message in mempool has too high of a nonce (%d > %d, from %s, inclcount %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], from, inclCount[from], msg.Cid(), countFrom(msgs, from))
continue
}
if msg.Message.Nonce < inclNonces[from] {
2019-12-03 18:25:56 +00:00
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from))
continue
}
inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
2019-12-03 20:00:04 +00:00
inclCount[from]++
out = append(out, msg)
if len(out) >= build.BlockMessageLimit {
2019-12-04 06:18:02 +00:00
break
}
}
return out, nil
}