Implement 'storage miner' module, wire up a few bits it needs to start
This commit is contained in:
parent
2576853426
commit
ee224e5b21
@ -73,6 +73,7 @@ type FullNode interface {
|
||||
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
|
||||
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
|
||||
ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.SignedMessage, error)
|
||||
ChainCall(context.Context, *types.Message) (*types.MessageReceipt, error)
|
||||
|
||||
// messages
|
||||
|
||||
|
@ -39,13 +39,14 @@ type FullNodeStruct struct {
|
||||
CommonStruct
|
||||
|
||||
Internal struct {
|
||||
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
|
||||
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"`
|
||||
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
|
||||
ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"`
|
||||
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
|
||||
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||
ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.SignedMessage, error) `perm:"read"`
|
||||
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
|
||||
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"`
|
||||
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
|
||||
ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"`
|
||||
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
|
||||
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||
ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.SignedMessage, error) `perm:"read"`
|
||||
ChainCall func(context.Context, *types.Message) (*types.MessageReceipt, error) `perm:"read"`
|
||||
|
||||
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
|
||||
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
|
||||
@ -155,6 +156,10 @@ func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWa
|
||||
return c.Internal.ChainWaitMsg(ctx, msgc)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) ChainCall(ctx context.Context, msg *types.Message) (*types.MessageReceipt, error) {
|
||||
return c.Internal.ChainCall(ctx, msg)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) {
|
||||
return c.Internal.WalletNew(ctx, typ)
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ func (sma StorageMinerActor) Exports() []interface{} {
|
||||
//6: sma.ArbitrateDeal,
|
||||
//7: sma.DePledge,
|
||||
//8: sma.GetOwner,
|
||||
//9: sma.GetWorkerAddr,
|
||||
9: sma.GetWorkerAddr,
|
||||
10: sma.GetPower,
|
||||
//11: sma.GetPeerID,
|
||||
//12: sma.GetSectorSize,
|
||||
@ -275,3 +275,12 @@ func CollateralForPower(power types.BigInt) types.BigInt {
|
||||
return collateralRequired
|
||||
*/
|
||||
}
|
||||
|
||||
func (sma StorageMinerActor) GetWorkerAddr(act *types.Actor, vmctx types.VMContext, params *struct{}) ([]byte, ActorError) {
|
||||
var self StorageMinerActorState
|
||||
state := vmctx.Storage().GetHead()
|
||||
if err := vmctx.Storage().Get(state, &self); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return self.Worker.Bytes(), nil
|
||||
}
|
||||
|
@ -62,10 +62,15 @@ const (
|
||||
Actor
|
||||
// BLS represents the address BLS protocol.
|
||||
BLS
|
||||
|
||||
Unknown = Protocol(255)
|
||||
)
|
||||
|
||||
// Protocol returns the protocol used by the address.
|
||||
func (a Address) Protocol() Protocol {
|
||||
if len(a.str) == 0 {
|
||||
return Unknown
|
||||
}
|
||||
return a.str[0]
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
hamt "github.com/ipfs/go-hamt-ipld"
|
||||
@ -79,6 +80,10 @@ func (st *StateTree) lookupID(addr address.Address) (address.Address, error) {
|
||||
}
|
||||
|
||||
func (st *StateTree) GetActor(addr address.Address) (*types.Actor, error) {
|
||||
if addr == address.Undef {
|
||||
return nil, fmt.Errorf("GetActor called on undefined address")
|
||||
}
|
||||
|
||||
if addr.Protocol() != address.ID {
|
||||
iaddr, err := st.lookupID(addr)
|
||||
if err != nil {
|
||||
|
@ -26,6 +26,8 @@ func init() {
|
||||
Complete())
|
||||
}
|
||||
|
||||
var EmptyInt = BigInt{}
|
||||
|
||||
type BigInt struct {
|
||||
*big.Int
|
||||
}
|
||||
|
@ -153,6 +153,14 @@ func (w *Wallet) GenerateKey(typ string) (address.Address, error) {
|
||||
return k.Address, nil
|
||||
}
|
||||
|
||||
func (w *Wallet) HasKey(addr address.Address) (bool, error) {
|
||||
k, err := w.findKey(addr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return k != nil, nil
|
||||
}
|
||||
|
||||
type Key struct {
|
||||
types.KeyInfo
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"golang.org/x/xerrors"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
@ -166,9 +167,16 @@ var initCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: persist this address in the storage-miner repo
|
||||
log.Infof("New storage miners address is: %s", addr)
|
||||
|
||||
ds, err := lr.Datastore("/metadata")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ds.Put(datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Point to setting storage price, maybe do it interactively or something
|
||||
log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'")
|
||||
|
||||
|
@ -74,6 +74,7 @@ var runCmd = &cli.Command{
|
||||
return lr.SetAPIEndpoint(apima)
|
||||
}),
|
||||
node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(storageRepoPath)),
|
||||
node.Override(new(api.FullNode), func() api.FullNode { return nodeApi }),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
|
||||
"github.com/filecoin-project/go-lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/go-lotus/node/repo"
|
||||
"github.com/filecoin-project/go-lotus/storage"
|
||||
)
|
||||
|
||||
// special is a type used to give keys to modules which
|
||||
@ -180,6 +181,9 @@ func Online() Option {
|
||||
|
||||
libp2p(),
|
||||
|
||||
// common
|
||||
Override(new(*wallet.Wallet), wallet.NewWallet),
|
||||
|
||||
// Full node
|
||||
|
||||
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull },
|
||||
@ -198,7 +202,6 @@ func Online() Option {
|
||||
// Filecoin services
|
||||
Override(new(*chain.Syncer), chain.NewSyncer),
|
||||
Override(new(*chain.BlockSync), chain.NewBlockSyncClient),
|
||||
Override(new(*wallet.Wallet), wallet.NewWallet),
|
||||
Override(new(*chain.MessagePool), chain.NewMessagePool),
|
||||
|
||||
Override(new(modules.Genesis), modules.ErrorGenesis),
|
||||
@ -214,6 +217,7 @@ func Online() Option {
|
||||
// Storage miner
|
||||
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner },
|
||||
Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
|
||||
Override(new(*storage.Miner), modules.StorageMiner),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/filecoin-project/go-lotus/chain/gen"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/chain/vm"
|
||||
"github.com/filecoin-project/go-lotus/chain/wallet"
|
||||
"github.com/filecoin-project/go-lotus/miner"
|
||||
"github.com/filecoin-project/go-lotus/node/client"
|
||||
@ -85,6 +86,32 @@ func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([
|
||||
return a.Chain.MessagesForBlock(b)
|
||||
}
|
||||
|
||||
func (a *FullNodeAPI) ChainCall(ctx context.Context, msg *types.Message) (*types.MessageReceipt, error) {
|
||||
hts := a.Chain.GetHeaviestTipSet()
|
||||
state, err := a.Chain.TipSetState(hts.Cids())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vmi, err := vm.NewVM(state, hts.Height(), hts.Blocks()[0].Miner, a.Chain)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to set up vm: %w", err)
|
||||
}
|
||||
|
||||
if msg.GasLimit == types.EmptyInt {
|
||||
msg.GasLimit = types.NewInt(10000000000)
|
||||
}
|
||||
if msg.GasPrice == types.EmptyInt {
|
||||
msg.GasPrice = types.NewInt(0)
|
||||
}
|
||||
if msg.Value == types.EmptyInt {
|
||||
msg.Value = types.NewInt(0)
|
||||
}
|
||||
|
||||
// TODO: maybe just use the invoker directly?
|
||||
return vmi.ApplyMessage(ctx, msg)
|
||||
}
|
||||
|
||||
func (a *FullNodeAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||
// TODO: need to make sure we don't return messages that were already included in the referenced chain
|
||||
// also need to accept ts == nil just fine, assume nil == chain.Head()
|
||||
|
@ -8,12 +8,15 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-lotus/api"
|
||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/go-lotus/storage"
|
||||
)
|
||||
|
||||
type StorageMinerAPI struct {
|
||||
CommonAPI
|
||||
|
||||
SectorBuilder *sectorbuilder.SectorBuilder
|
||||
|
||||
Miner *storage.Miner
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
||||
|
@ -34,9 +34,11 @@ import (
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/chain/wallet"
|
||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/go-lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/go-lotus/node/repo"
|
||||
"github.com/filecoin-project/go-lotus/storage"
|
||||
)
|
||||
|
||||
var log = logging.Logger("modules")
|
||||
@ -237,12 +239,55 @@ func SectorBuilderConfig(storagePath string) func() (*sectorbuilder.SectorBuilde
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return §orbuilder.SectorBuilderConfig{
|
||||
sb := §orbuilder.SectorBuilderConfig{
|
||||
Miner: minerAddr,
|
||||
SectorSize: 1024,
|
||||
MetadataDir: metadata,
|
||||
SealedDir: sealed,
|
||||
StagedDir: staging,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return sb, nil
|
||||
}
|
||||
}
|
||||
|
||||
func SectorBuilder(lc fx.Lifecycle, sbc *sectorbuilder.SectorBuilderConfig) (*sectorbuilder.SectorBuilder, error) {
|
||||
sb, err := sectorbuilder.New(sbc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
sb.Run(ctx)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
return sb, nil
|
||||
}
|
||||
|
||||
func StorageMiner(lc fx.Lifecycle, api api.FullNode, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, w *wallet.Wallet) (*storage.Miner, error) {
|
||||
maddrb, err := ds.Get(datastore.NewKey("miner-address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
maddr, err := address.NewFromBytes(maddrb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm, err := storage.NewMiner(api, maddr, h, ds, sb, w)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
return sm.Run(ctx)
|
||||
},
|
||||
})
|
||||
|
||||
return sm, nil
|
||||
}
|
||||
|
@ -2,12 +2,14 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/api"
|
||||
"github.com/filecoin-project/go-lotus/chain/actors"
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/chain/wallet"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
@ -24,6 +26,8 @@ type Miner struct {
|
||||
|
||||
sb *sectorbuilder.SectorBuilder
|
||||
|
||||
w *wallet.Wallet
|
||||
|
||||
maddr address.Address
|
||||
|
||||
worker address.Address
|
||||
@ -38,24 +42,23 @@ type storageMinerApi interface {
|
||||
//ReadState(ctx context.Context, addr address.Address) (????, error)
|
||||
|
||||
// Call a read only method on actors (no interaction with the chain required)
|
||||
CallMethod(ctx context.Context, addr address.Address, method uint64, params []byte) ([]byte, error)
|
||||
ChainCall(ctx context.Context, msg *types.Message) (*types.MessageReceipt, error)
|
||||
|
||||
MpoolPush(context.Context, *types.SignedMessage) error
|
||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||
|
||||
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
|
||||
ChainNotifs(context.Context) (<-chan *store.HeadChange, error)
|
||||
|
||||
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
|
||||
}
|
||||
|
||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder) (*Miner, error) {
|
||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, w *wallet.Wallet) (*Miner, error) {
|
||||
return &Miner{
|
||||
api: api,
|
||||
maddr: addr,
|
||||
h: h,
|
||||
ds: ds,
|
||||
sb: sb,
|
||||
w: w,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -80,30 +83,36 @@ func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.commitSector(ctx, sinfo); err != nil {
|
||||
log.Errorf("failed to commit sector: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Warning("exiting seal posting routine")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error {
|
||||
params := &actors.CommitSectorParams{
|
||||
SectorId: types.NewInt(sinfo.SectorID),
|
||||
CommD: sinfo.CommD,
|
||||
CommR: sinfo.CommR,
|
||||
CommRStar: sinfo.CommRStar,
|
||||
CommD: sinfo.CommD[:],
|
||||
CommR: sinfo.CommR[:],
|
||||
CommRStar: sinfo.CommRStar[:],
|
||||
Proof: sinfo.Proof,
|
||||
}
|
||||
enc, err := actors.SerializeParams(params)
|
||||
if err != nil {
|
||||
return errors.wrap(err, "could not serialize commit sector parameters")
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
return errors.Wrap(aerr, "could not serialize commit sector parameters")
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
msg := types.Message{
|
||||
To: m.maddr,
|
||||
From: m.worker,
|
||||
Method: actors.MAMethods.CommitSector,
|
||||
Params: params,
|
||||
Params: enc,
|
||||
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||
GasLimit: types.NewInt(10000 /* i dont know help */),
|
||||
GasPrice: types.NewInt(1),
|
||||
@ -121,14 +130,14 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
return errors.Wrap(err, "serializing commit sector message")
|
||||
}
|
||||
|
||||
sig, err := m.api.WalletSign(ctx, m.worker, data)
|
||||
sig, err := m.w.Sign(m.worker, data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "signing commit sector message")
|
||||
}
|
||||
|
||||
smsg := &types.SignedMessage{
|
||||
Message: msg,
|
||||
Signature: sig,
|
||||
Signature: *sig,
|
||||
}
|
||||
|
||||
if err := m.api.MpoolPush(ctx, smsg); err != nil {
|
||||
@ -136,6 +145,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
}
|
||||
|
||||
m.trackCommitSectorMessage(smsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// make sure the message gets included in the chain successfully
|
||||
@ -156,18 +166,35 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
|
||||
m.worker = worker
|
||||
|
||||
// try signing something with that key to make sure we can
|
||||
if _, err := m.api.WalletSign(ctx, worker, []byte("sign me")); err != nil {
|
||||
return errors.Wrap(err, "failed to verify ablity to sign with worker key")
|
||||
has, err := m.w.HasKey(worker)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to check wallet for worker key")
|
||||
}
|
||||
|
||||
if !has {
|
||||
return errors.New("key for worker not found in local wallet")
|
||||
}
|
||||
|
||||
log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) getWorkerAddr(ctx context.Context) (address.Address, error) {
|
||||
ret, err := m.api.CallMethod(ctx, m.maddr, actors.MAMethods.GetWorkerAddr, actors.EmptyStructCBOR)
|
||||
msg := &types.Message{
|
||||
To: m.maddr,
|
||||
From: m.maddr, // it doesnt like it if we dont give it a from... probably should fix that
|
||||
Method: actors.MAMethods.GetWorkerAddr,
|
||||
Params: actors.EmptyStructCBOR,
|
||||
}
|
||||
|
||||
recpt, err := m.api.ChainCall(ctx, msg)
|
||||
if err != nil {
|
||||
return address.Undef, errors.Wrapf(err, "calling getWorker(%s)", m.maddr)
|
||||
}
|
||||
|
||||
return address.NewFromBytes(ret)
|
||||
if recpt.ExitCode != 0 {
|
||||
return address.Undef, fmt.Errorf("failed to call getWorker(%s): return %d", m.maddr, recpt.ExitCode)
|
||||
}
|
||||
|
||||
return address.NewFromBytes(recpt.Return)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user