miner: Create miner in DI

This commit is contained in:
Łukasz Magiera 2019-08-20 18:50:17 +02:00
parent e087cc2e7a
commit 4431dffc39
7 changed files with 46 additions and 137 deletions

View File

@ -65,7 +65,7 @@ type FullNode interface {
// miner // miner
MinerStart(context.Context, address.Address) error MinerRegister(context.Context, address.Address) error
MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage) (*chain.BlockMsg, error) MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage) (*chain.BlockMsg, error)
// // UX ? // // UX ?

View File

@ -54,7 +54,7 @@ type FullNodeStruct struct {
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MinerStart func(context.Context, address.Address) error `perm:"admin"` MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage) (*chain.BlockMsg, error) `perm:"write"` MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage) (*chain.BlockMsg, error) `perm:"write"`
WalletNew func(context.Context, string) (address.Address, error) `perm:"write"` WalletNew func(context.Context, string) (address.Address, error) `perm:"write"`
@ -159,8 +159,8 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg) return c.Internal.MpoolPush(ctx, smsg)
} }
func (c *FullNodeStruct) MinerStart(ctx context.Context, addr address.Address) error { func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error {
return c.Internal.MinerStart(ctx, addr) return c.Internal.MinerRegister(ctx, addr)
} }
func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage) (*chain.BlockMsg, error) { func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage) (*chain.BlockMsg, error) {

View File

@ -194,7 +194,7 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
// -1 to match block.Height // -1 to match block.Height
b.Header = tu.blocks[h-1].Header b.Header = tu.blocks[h-1].Header
for _, msg := range tu.blocks[h-1].SecpkMessages { for _, msg := range tu.blocks[h-1].SecpkMessages {
c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) c, err := tu.nds[to].(*impl.API).Chain.PutMessage(msg)
require.NoError(tu.t, err) require.NoError(tu.t, err)
b.SecpkMessages = append(b.SecpkMessages, c) b.SecpkMessages = append(b.SecpkMessages, c)

View File

@ -36,7 +36,7 @@ var minerStart = &cli.Command{
return err return err
} }
if err := api.MinerStart(ctx, maddr); err != nil { if err := api.MinerRegister(ctx, maddr); err != nil {
return err return err
} }

View File

@ -4,62 +4,46 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"math/big" "math/big"
"sync"
"time" "time"
logging "github.com/ipfs/go-log"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
chain "github.com/filecoin-project/go-lotus/chain" chain "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/vdf" "github.com/filecoin-project/go-lotus/lib/vdf"
"github.com/filecoin-project/go-lotus/node/impl/full"
logging "github.com/ipfs/go-log"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/fx"
"golang.org/x/xerrors"
) )
var log = logging.Logger("miner") var log = logging.Logger("miner")
type api interface { type api struct {
ChainCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) fx.In
ChainSubmitBlock(context.Context, *chain.BlockMsg) error full.ChainAPI
full.MpoolAPI
// returns a set of messages that havent been included in the chain as of full.WalletAPI
// the given tipset
MpoolPending(ctx context.Context, base *types.TipSet) ([]*types.SignedMessage, error)
// Returns the best tipset for the miner to mine on top of.
// TODO: Not sure this feels right (including the messages api). Miners
// will likely want to have more control over exactly which blocks get
// mined on, and which messages are included.
ChainHead(context.Context) (*types.TipSet, error)
// returns the lookback randomness from the chain used for the election
ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error)
// create a block
// it seems realllllly annoying to do all the actions necessary to build a
// block through the API. so, we just add the block creation to the API
// now, all the 'miner' does is check if they win, and call create block
MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage) (*chain.BlockMsg, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
} }
func NewMiner(api api, addr address.Address) *Miner { func NewMiner(api api) *Miner {
return &Miner{ return &Miner{
api: api, api: api,
address: addr, Delay: time.Second * 4,
Delay: time.Second * 4,
} }
} }
type Miner struct { type Miner struct {
api api api api
address address.Address lk sync.Mutex
addresses []address.Address
// time between blocks, network parameter // time between blocks, network parameter
Delay time.Duration Delay time.Duration
@ -67,7 +51,21 @@ type Miner struct {
lastWork *MiningBase lastWork *MiningBase
} }
func (m *Miner) Mine(ctx context.Context) { func (m *Miner) Register(addr address.Address) error {
m.lk.Lock()
defer m.lk.Unlock()
if len(m.addresses) > 0 {
return errors.New("mining with more than one storage miner instance not supported yet") // TODO !
}
m.addresses = append(m.addresses, addr)
go m.mine(context.TODO())
return nil
}
func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine") ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End() defer span.End()
for { for {
@ -151,7 +149,7 @@ func (m *Miner) submitNullTicket(base *MiningBase, ticket *types.Ticket) {
} }
func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) { func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) {
w, err := m.getMinerWorker(ctx, m.address, nil) w, err := m.getMinerWorker(ctx, m.addresses[0], nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -192,7 +190,7 @@ func (m *Miner) isWinnerNextRound(ctx context.Context, base *MiningBase) (bool,
return false, nil, xerrors.Errorf("failed to compute VRF: %w", err) return false, nil, xerrors.Errorf("failed to compute VRF: %w", err)
} }
mpow, totpow, err := m.getPowerForTipset(ctx, m.address, base.ts) mpow, totpow, err := m.getPowerForTipset(ctx, m.addresses[0], base.ts)
if err != nil { if err != nil {
return false, nil, xerrors.Errorf("failed to check power: %w", err) return false, nil, xerrors.Errorf("failed to check power: %w", err)
} }
@ -302,7 +300,7 @@ func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.
msgs := m.selectMessages(pending) msgs := m.selectMessages(pending)
// why even return this? that api call could just submit it for us // why even return this? that api call could just submit it for us
return m.api.MinerCreateBlock(context.TODO(), m.address, base.ts, append(base.tickets, ticket), proof, msgs) return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs)
} }
func (m *Miner) selectMessages(msgs []*types.SignedMessage) []*types.SignedMessage { func (m *Miner) selectMessages(msgs []*types.SignedMessage) []*types.SignedMessage {

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/miner"
"github.com/filecoin-project/go-lotus/node/config" "github.com/filecoin-project/go-lotus/node/config"
"github.com/filecoin-project/go-lotus/node/hello" "github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/impl" "github.com/filecoin-project/go-lotus/node/impl"
@ -222,6 +223,8 @@ func Online() Option {
Override(new(*paych.Store), modules.PaychStore), Override(new(*paych.Store), modules.PaychStore),
Override(new(*paych.Manager), modules.PaymentChannelManager), Override(new(*paych.Manager), modules.PaymentChannelManager),
Override(new(*miner.Miner), miner.NewMiner),
), ),
// Storage miner // Storage miner
@ -309,7 +312,7 @@ func Repo(r repo.Repo) Option {
func FullAPI(out *api.FullNode) Option { func FullAPI(out *api.FullNode) Option {
return func(s *Settings) error { return func(s *Settings) error {
resAPI := &impl.FullNodeAPI{} resAPI := &impl.API{}
s.invokes[ExtractApiKey] = fx.Extract(resAPI) s.invokes[ExtractApiKey] = fx.Extract(resAPI)
*out = resAPI *out = resAPI
return nil return nil

View File

@ -1,93 +1 @@
package client package client
import (
"context"
"errors"
"os"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-filestore"
"go.uber.org/fx"
"github.com/ipfs/go-cid"
chunker "github.com/ipfs/go-ipfs-chunker"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
)
type LocalStorage struct {
fx.In
LocalDAG dtypes.ClientDAG
Filestore dtypes.ClientFilestore `optional:"true"`
}
func (s *LocalStorage) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
f, err := os.Open(path)
if err != nil {
return cid.Undef, err
}
stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}
file, err := files.NewReaderPathFile(path, f, stat)
if err != nil {
return cid.Undef, err
}
bufferedDS := ipld.NewBufferedDAG(ctx, s.LocalDAG)
params := ihelper.DagBuilderParams{
Maxlinks: ihelper.DefaultLinksPerBlock,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
NoCopy: true,
}
db, err := params.New(chunker.DefaultSplitter(file))
if err != nil {
return cid.Undef, err
}
nd, err := balanced.Layout(db)
if err != nil {
return cid.Undef, err
}
return nd.Cid(), bufferedDS.Commit()
}
func (s *LocalStorage) ClientListImports(ctx context.Context) ([]api.Import, error) {
if s.Filestore == nil {
return nil, errors.New("listing imports is not supported with in-memory dag yet")
}
next, err := filestore.ListAll(s.Filestore, false)
if err != nil {
return nil, err
}
// TODO: make this less very bad by tracking root cids instead of using ListAll
out := make([]api.Import, 0)
for {
r := next()
if r == nil {
return out, nil
}
if r.Offset != 0 {
continue
}
out = append(out, api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
})
}
}