lotus/miner/miner.go

373 lines
8.6 KiB
Go
Raw Normal View History

package miner
import (
"context"
2019-08-15 02:30:21 +00:00
"crypto/sha256"
"math/big"
2019-08-20 16:50:17 +00:00
"sync"
"time"
chain "github.com/filecoin-project/go-lotus/chain"
2019-08-15 02:30:21 +00:00
"github.com/filecoin-project/go-lotus/chain/actors"
2019-07-11 02:36:43 +00:00
"github.com/filecoin-project/go-lotus/chain/address"
2019-08-16 04:40:59 +00:00
"github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/vdf"
2019-08-20 16:50:17 +00:00
"github.com/filecoin-project/go-lotus/node/impl/full"
logging "github.com/ipfs/go-log"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/fx"
"golang.org/x/xerrors"
)
var log = logging.Logger("miner")
2019-08-20 16:50:17 +00:00
type api struct {
fx.In
2019-08-20 16:50:17 +00:00
full.ChainAPI
full.MpoolAPI
full.WalletAPI
2019-07-11 02:36:43 +00:00
}
2019-08-20 16:50:17 +00:00
func NewMiner(api api) *Miner {
2019-07-11 02:36:43 +00:00
return &Miner{
2019-08-20 16:50:17 +00:00
api: api,
Delay: time.Second * 4,
2019-07-11 02:36:43 +00:00
}
}
type Miner struct {
api api
2019-08-20 16:50:17 +00:00
lk sync.Mutex
addresses []address.Address
2019-08-20 18:05:17 +00:00
stop chan struct{}
stopping chan struct{}
2019-07-11 02:36:43 +00:00
// time between blocks, network parameter
Delay time.Duration
lastWork *MiningBase
}
2019-08-20 16:50:17 +00:00
func (m *Miner) Register(addr address.Address) error {
m.lk.Lock()
defer m.lk.Unlock()
if len(m.addresses) > 0 {
if len(m.addresses) > 1 || m.addresses[0] != addr {
return errors.New("mining with more than one storage miner instance not supported yet") // TODO !
}
log.Warnf("miner.Register called more than once for actor '%s'", addr)
2019-08-20 16:50:17 +00:00
}
m.addresses = append(m.addresses, addr)
2019-08-20 18:05:17 +00:00
m.stop = make(chan struct{})
2019-08-20 16:50:17 +00:00
go m.mine(context.TODO())
return nil
}
2019-08-20 18:05:17 +00:00
func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
m.lk.Lock()
if len(m.addresses) == 0 {
m.lk.Unlock()
return xerrors.New("no addresses registered")
}
if len(m.addresses) > 1 {
m.lk.Unlock()
log.Errorf("UNREGISTER NOT IMPLEMENTED FOR MORE THAN ONE ADDRESS!")
return xerrors.New("can't unregister when more than one actor is registered: not implemented")
}
if m.addresses[0] != addr {
m.lk.Unlock()
return xerrors.New("unregister: address not found")
}
// Unregistering last address, stop mining first
if m.stop != nil {
if m.stopping == nil {
m.stopping = make(chan struct{})
close(m.stop)
}
stopping := m.stopping
m.lk.Unlock()
select {
case <-stopping:
case <-ctx.Done():
return ctx.Err()
}
m.lk.Lock()
}
m.addresses = []address.Address{}
m.lk.Unlock()
return nil
}
2019-08-20 16:50:17 +00:00
func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End()
2019-08-20 18:05:17 +00:00
for {
2019-08-20 18:05:17 +00:00
select {
case <-m.stop:
m.lk.Lock()
2019-08-20 18:05:17 +00:00
close(m.stopping)
m.stop = nil
m.stopping = nil
2019-08-20 18:05:17 +00:00
m.lk.Unlock()
return
2019-08-20 18:05:17 +00:00
default:
}
base, err := m.GetBestMiningCandidate()
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
continue
}
b, err := m.mineOne(ctx, base)
if err != nil {
log.Errorf("mining block failed: %s", err)
log.Warn("waiting 400ms before attempting to mine a block")
time.Sleep(400 * time.Millisecond)
continue
}
if b != nil {
2019-07-11 02:36:43 +00:00
if err := m.api.ChainSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
}
}
}
}
type MiningBase struct {
2019-07-26 04:54:22 +00:00
ts *types.TipSet
2019-08-15 02:30:21 +00:00
tickets []*types.Ticket
}
func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) {
2019-07-11 02:36:43 +00:00
bts, err := m.api.ChainHead(context.TODO())
if err != nil {
return nil, err
}
if m.lastWork != nil {
if m.lastWork.ts.Equals(bts) {
return m.lastWork, nil
}
if bts.Weight() <= m.lastWork.ts.Weight() {
return m.lastWork, nil
}
}
return &MiningBase{
ts: bts,
}, nil
}
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) {
log.Info("attempting to mine a block on:", base.ts.Cids())
ticket, err := m.scratchTicket(ctx, base)
if err != nil {
return nil, errors.Wrap(err, "scratching ticket failed")
}
2019-08-15 02:30:21 +00:00
win, proof, err := m.isWinnerNextRound(ctx, base)
if err != nil {
return nil, errors.Wrap(err, "failed to check if we win next round")
}
if !win {
m.submitNullTicket(base, ticket)
return nil, nil
}
b, err := m.createBlock(base, ticket, proof)
if err != nil {
return nil, errors.Wrap(err, "failed to create block")
}
log.Infof("mined new block: %s", b.Cid())
return b, nil
}
2019-08-15 02:30:21 +00:00
func (m *Miner) submitNullTicket(base *MiningBase, ticket *types.Ticket) {
base.tickets = append(base.tickets, ticket)
m.lastWork = base
}
2019-08-15 02:30:21 +00:00
func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) {
2019-08-20 16:50:17 +00:00
w, err := m.getMinerWorker(ctx, m.addresses[0], nil)
2019-08-15 02:30:21 +00:00
if err != nil {
return nil, err
}
2019-08-16 04:40:59 +00:00
return gen.ComputeVRF(ctx, m.api.WalletSign, w, input)
2019-08-15 02:30:21 +00:00
}
func (m *Miner) getMinerWorker(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
ret, err := m.api.ChainCall(ctx, &types.Message{
From: addr,
To: addr,
Method: actors.MAMethods.GetWorkerAddr,
}, ts)
if err != nil {
return address.Undef, xerrors.Errorf("failed to get miner worker addr: %w", err)
}
if ret.ExitCode != 0 {
return address.Undef, xerrors.Errorf("failed to get miner worker addr (exit code %d)", ret.ExitCode)
}
w, err := address.NewFromBytes(ret.Return)
if err != nil {
return address.Undef, xerrors.Errorf("GetWorkerAddr returned malformed address: %w", err)
}
return w, nil
}
func (m *Miner) isWinnerNextRound(ctx context.Context, base *MiningBase) (bool, types.ElectionProof, error) {
2019-07-11 02:36:43 +00:00
r, err := m.api.ChainGetRandomness(context.TODO(), base.ts)
if err != nil {
return false, nil, err
}
2019-08-15 02:30:21 +00:00
vrfout, err := m.computeVRF(ctx, r)
if err != nil {
return false, nil, xerrors.Errorf("failed to compute VRF: %w", err)
}
2019-08-20 16:50:17 +00:00
mpow, totpow, err := m.getPowerForTipset(ctx, m.addresses[0], base.ts)
2019-08-15 02:30:21 +00:00
if err != nil {
return false, nil, xerrors.Errorf("failed to check power: %w", err)
}
return powerCmp(vrfout, mpow, totpow), vrfout, nil
}
func powerCmp(vrfout []byte, mpow, totpow types.BigInt) bool {
/*
Need to check that
h(vrfout) / 2^256 < minerPower / totalPower
*/
h := sha256.Sum256(vrfout)
// 2^256
rden := types.BigInt{big.NewInt(0).Exp(big.NewInt(2), big.NewInt(256), nil)}
top := types.BigMul(rden, mpow)
out := types.BigDiv(top, totpow)
return types.BigCmp(types.BigFromBytes(h[:]), out) < 0
}
func (m *Miner) getPowerForTipset(ctx context.Context, maddr address.Address, ts *types.TipSet) (types.BigInt, types.BigInt, error) {
var err error
enc, err := actors.SerializeParams(&actors.PowerLookupParams{maddr})
if err != nil {
return types.EmptyInt, types.EmptyInt, err
}
ret, err := m.api.ChainCall(ctx, &types.Message{
From: maddr,
To: actors.StorageMarketAddress,
Method: actors.SMAMethods.PowerLookup,
Params: enc,
}, ts)
if err != nil {
return types.EmptyInt, types.EmptyInt, xerrors.Errorf("failed to get miner power from chain: %w", err)
}
if ret.ExitCode != 0 {
return types.EmptyInt, types.EmptyInt, xerrors.Errorf("failed to get miner power from chain (exit code %d)", ret.ExitCode)
}
mpow := types.BigFromBytes(ret.Return)
2019-08-15 02:30:21 +00:00
ret, err = m.api.ChainCall(ctx, &types.Message{
From: maddr,
To: actors.StorageMarketAddress,
Method: actors.SMAMethods.GetTotalStorage,
}, ts)
if err != nil {
return types.EmptyInt, types.EmptyInt, xerrors.Errorf("failed to get total power from chain: %w", err)
}
if ret.ExitCode != 0 {
return types.EmptyInt, types.EmptyInt, xerrors.Errorf("failed to get total power from chain (exit code %d)", ret.ExitCode)
}
tpow := types.BigFromBytes(ret.Return)
return mpow, tpow, nil
}
2019-08-15 02:30:21 +00:00
func (m *Miner) runVDF(ctx context.Context, input []byte) ([]byte, []byte, error) {
select {
case <-ctx.Done():
2019-08-15 02:30:21 +00:00
return nil, nil, ctx.Err()
case <-time.After(m.Delay):
}
return vdf.Run(input)
2019-08-15 02:30:21 +00:00
}
func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Ticket, error) {
var lastTicket *types.Ticket
if len(base.tickets) > 0 {
lastTicket = base.tickets[len(base.tickets)-1]
} else {
lastTicket = base.ts.MinTicket()
2019-08-15 02:30:21 +00:00
}
vrfOut, err := m.computeVRF(ctx, lastTicket.VDFResult)
if err != nil {
return nil, err
}
res, proof, err := m.runVDF(ctx, vrfOut)
if err != nil {
return nil, err
}
return &types.Ticket{
VRFProof: vrfOut,
VDFResult: res,
VDFProof: proof,
}, nil
}
2019-08-15 02:30:21 +00:00
func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*chain.BlockMsg, error) {
2019-07-11 02:36:43 +00:00
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
return nil, errors.Wrapf(err, "failed to get pending messages")
}
2019-07-11 02:36:43 +00:00
msgs := m.selectMessages(pending)
// why even return this? that api call could just submit it for us
2019-08-20 16:50:17 +00:00
return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs)
}
func (m *Miner) selectMessages(msgs []*types.SignedMessage) []*types.SignedMessage {
// TODO: filter and select 'best' message if too many to fit in one block
return msgs
}