WIP: create miner command, stubbed out methods

This commit is contained in:
whyrusleeping 2019-07-16 20:05:55 -07:00
parent 2877cb5ba2
commit 9a7823ab84
9 changed files with 328 additions and 10 deletions

View File

@ -33,6 +33,7 @@ type API interface {
ChainHead(context.Context) (*chain.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *chain.TipSet) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (cid.Cid, *types.MessageReceipt, error)
// messages
@ -42,6 +43,7 @@ type API interface {
// // mpool
// // // ls / show / rm
MpoolPending(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error)
MpoolPush(context.Context, *chain.SignedMessage) error
// dag
@ -74,6 +76,12 @@ type API interface {
WalletNew(context.Context, string) (address.Address, error)
WalletList(context.Context) ([]address.Address, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletSign(context.Context, address.Address, []byte) (*chain.Signature, error)
WalletDefaultAddress(context.Context) (address.Address, error)
// Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain...
WalletGetNonce(context.Context, address.Address) (uint64, error)
// // import
// // export
// // (on cli - cmd to list associations)

View File

@ -20,15 +20,20 @@ type Struct struct {
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error
ChainHead func(context.Context) (*chain.TipSet, error)
ChainGetRandomness func(context.Context, *chain.TipSet) ([]byte, error)
ChainWaitMsg func(context.Context, cid.Cid) (cid.Cid, *types.MessageReceipt, error)
MpoolPending func(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error)
MpoolPush func(context.Context, *chain.SignedMessage) error
MinerStart func(context.Context, address.Address) error
MinerCreateBlock func(context.Context, address.Address, *chain.TipSet, []chain.Ticket, chain.ElectionProof, []*chain.SignedMessage) (*chain.BlockMsg, error)
WalletNew func(context.Context, string) (address.Address, error)
WalletList func(context.Context) ([]address.Address, error)
WalletBalance func(context.Context, address.Address) (types.BigInt, error)
WalletNew func(context.Context, string) (address.Address, error)
WalletList func(context.Context) ([]address.Address, error)
WalletBalance func(context.Context, address.Address) (types.BigInt, error)
WalletSign func(context.Context, address.Address, []byte) (*chain.Signature, error)
WalletDefaultAddress func(context.Context) (address.Address, error)
WalletGetNonce func(context.Context, address.Address) (uint64, error)
ClientImport func(ctx context.Context, path string) (cid.Cid, error)
ClientListImports func(ctx context.Context) ([]Import, error)
@ -51,6 +56,10 @@ func (c *Struct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.S
return c.Internal.MpoolPending(ctx, ts)
}
func (c *Struct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error {
return c.MpoolPush(ctx, smsg)
}
func (c *Struct) MinerStart(ctx context.Context, addr address.Address) error {
return c.Internal.MinerStart(ctx, addr)
}
@ -83,6 +92,10 @@ func (c *Struct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]b
return c.Internal.ChainGetRandomness(ctx, pts)
}
func (c *Struct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
return c.Internal.ChainWaitMsg(ctx, msgc)
}
// ID implements API.ID
func (c *Struct) ID(ctx context.Context) (peer.ID, error) {
return c.Internal.ID(ctx)
@ -105,4 +118,16 @@ func (c *Struct) WalletBalance(ctx context.Context, a address.Address) (types.Bi
return c.Internal.WalletBalance(ctx, a)
}
func (c *Struct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) {
return c.Internal.WalletSign(ctx, k, msg)
}
func (c *Struct) WalletDefaultAddress(ctx context.Context) (address.Address, error) {
return c.Internal.WalletDefaultAddress(ctx)
}
func (c *Struct) WalletGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return c.Internal.WalletGetNonce(ctx, addr)
}
var _ API = &Struct{}

View File

@ -2,6 +2,7 @@ package chain
import (
"context"
"encoding/json"
"fmt"
"sync"
@ -220,7 +221,7 @@ type ChainStore struct {
bestTips *pubsub.PubSub
headChange func(rev, app []*TipSet) error
headChangeNotifs []func(rev, app []*TipSet) error
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
@ -231,8 +232,42 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
}
}
func (cs *ChainStore) SubNewTips() chan interface{} {
return cs.bestTips.Sub("best")
func (cs *ChainStore) SubNewTips() chan *TipSet {
subch := cs.bestTips.Sub("best")
out := make(chan *TipSet)
go func() {
defer close(out)
for val := range subch {
out <- val.(*TipSet)
}
}()
return out
}
const (
HCRevert = "revert"
HCApply = "apply"
)
type HeadChange struct {
Type string
Val *TipSet
}
func (cs *ChainStore) SubHeadChanges() chan *HeadChange {
subch := cs.bestTips.Sub("headchange")
out := make(chan *HeadChange, 16)
go func() {
defer close(out)
for val := range subch {
out <- val.(*HeadChange)
}
}()
return out
}
func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*TipSet) error) {
cs.headChangeNotifs = append(cs.headChangeNotifs, f)
}
func (cs *ChainStore) SetGenesis(b *BlockHeader) error {
@ -275,7 +310,9 @@ func (cs *ChainStore) maybeTakeHeavierTipSet(ts *TipSet) error {
if err != nil {
return err
}
cs.headChange(revert, apply)
for _, hcf := range cs.headChangeNotifs {
hcf(revert, apply)
}
log.Infof("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
}
@ -470,7 +507,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*SignedMessage, error) {
return DecodeSignedMessage(sb.RawData())
}
func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) {
func (cs *ChainStore) MessageCidsForBlock(b *BlockHeader) ([]cid.Cid, error) {
cst := hamt.CSTFromBstore(cs.bs)
shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst)
if err != nil {
@ -491,9 +528,43 @@ func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error)
return nil, err
}
return cids, nil
}
func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) {
cids, err := cs.MessageCidsForBlock(b)
if err != nil {
return nil, err
}
return cs.LoadMessagesFromCids(cids)
}
func (cs *ChainStore) GetReceipt(b *BlockHeader, i int) (*types.MessageReceipt, error) {
cst := hamt.CSTFromBstore(cs.bs)
shar, err := sharray.Load(context.TODO(), b.MessageReceipts, 4, cst)
if err != nil {
return nil, errors.Wrap(err, "sharray load")
}
ival, err := shar.Get(context.TODO(), i)
if err != nil {
return nil, err
}
// @warpfork, @EricMyhre help me. save me.
out, err := json.Marshal(ival)
if err != nil {
return nil, err
}
var r types.MessageReceipt
if err := json.Unmarshal(out, &r); err != nil {
return nil, err
}
return &r, nil
}
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*SignedMessage, error) {
msgs := make([]*SignedMessage, 0, len(cids))
for _, c := range cids {
@ -528,3 +599,66 @@ func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) {
return act.Balance, nil
}
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
tsub := cs.SubHeadChanges()
head := cs.GetHeaviestTipSet()
bc, r, err := cs.tipsetContainsMsg(head, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
for {
select {
case val := <-tsub:
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
}
case <-ctx.Done():
return cid.Undef, nil, ctx.Err()
}
}
}
func (cs *ChainStore) tipsetContainsMsg(ts *TipSet, msg cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
for _, b := range ts.Blocks() {
r, err := cs.blockContainsMsg(b, msg)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return b.Cid(), r, nil
}
}
return cid.Undef, nil, nil
}
func (cs *ChainStore) blockContainsMsg(blk *BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) {
msgs, err := cs.MessageCidsForBlock(blk)
if err != nil {
return nil, err
}
for i, mc := range msgs {
if mc == msg {
return cs.GetReceipt(blk, i)
}
}
return nil, nil
}

View File

@ -37,7 +37,7 @@ func NewMessagePool(cs *ChainStore) *MessagePool {
pending: make(map[address.Address]*msgSet),
cs: cs,
}
cs.headChange = mp.HeadChange
cs.SubscribeHeadChanges(mp.HeadChange)
return mp
}

View File

@ -66,4 +66,5 @@ var Commands = []*cli.Command{
mpoolCmd,
minerCmd,
walletCmd,
createMinerCmd,
}

119
cli/createminer.go Normal file
View File

@ -0,0 +1,119 @@
package cli
import (
"fmt"
"strconv"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/chain"
actors "github.com/filecoin-project/go-lotus/chain/actors"
address "github.com/filecoin-project/go-lotus/chain/address"
types "github.com/filecoin-project/go-lotus/chain/types"
"github.com/libp2p/go-libp2p-core/peer"
)
var createMinerCmd = &cli.Command{
Name: "createminer",
Usage: "Create a new storage market actor",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 4 {
return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID")
}
api, err := getAPI(cctx)
if err != nil {
return err
}
args := cctx.Args().Slice()
worker, err := address.NewFromString(args[0])
if err != nil {
return err
}
owner, err := address.NewFromString(args[1])
if err != nil {
return err
}
ssize, err := strconv.ParseUint(args[2], 10, 64)
if err != nil {
return err
}
pid, err := peer.IDB58Decode(args[3])
if err != nil {
return err
}
createMinerArgs := actors.CreateStorageMinerParams{
Worker: worker,
Owner: owner,
SectorSize: types.NewInt(ssize),
PeerID: pid,
}
ctx := reqContext(cctx)
addr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err
}
params, err := actors.SerializeParams(createMinerArgs)
if err != nil {
return err
}
nonce, err := api.WalletGetNonce(ctx, addr)
if err != nil {
return err
}
msg := types.Message{
To: actors.StorageMarketAddress,
From: addr,
Method: 1, // TODO: constants pls
Params: params,
Nonce: nonce,
GasPrice: types.NewInt(1),
GasLimit: types.NewInt(1),
}
msgbytes, err := msg.Serialize()
if err != nil {
return err
}
sig, err := api.WalletSign(ctx, addr, msgbytes)
if err != nil {
return err
}
smsg := &chain.SignedMessage{
Message: msg,
Signature: *sig,
}
if err := api.MpoolPush(ctx, smsg); err != nil {
return err
}
inblk, rect, err := api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
maddr, err := address.NewFromBytes(rect.Return)
if err != nil {
return err
}
fmt.Printf("miner created in block %s\n", inblk)
fmt.Println("new miner address: %s\n", maddr)
return nil
},
}

3
go.mod
View File

@ -58,7 +58,7 @@ require (
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33
go.opencensus.io v0.22.0 // indirect
go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0
@ -66,6 +66,7 @@ require (
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
)

2
go.sum
View File

@ -420,6 +420,8 @@ github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d h1:wnjWu1N8UT
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d/go.mod h1:g7ckxrjiFh8mi1AY7ox23PZD0g6QU/TxW3U3unX7I3A=
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8 h1:n89ErB+0d4SBbyD8ykr7Q/j+C41ysUttZG3l9/2ufC4=
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA=
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33 h1:7Bsg3GZnFAhdadeyRie9ReenkK2XbC2FlOpJQgTzpbA=
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-lotus/miner"
"github.com/filecoin-project/go-lotus/node/client"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -50,6 +51,9 @@ func (a *API) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte
return []byte("foo bar random"), nil
}
func (a *API) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.WaitMsg, error) {
}
func (a *API) ID(context.Context) (peer.ID, error) {
return a.Host.ID(), nil
}
@ -66,6 +70,19 @@ func (a *API) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.Sign
return a.Mpool.Pending(), nil
}
func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error {
msgb, err := smsg.Serialize()
if err != nil {
return err
}
return a.PubSub.Publish("/fil/messages", msgb)
}
func (a *API) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
func (a *API) MinerStart(ctx context.Context, addr address.Address) error {
// hrm...
m := miner.NewMiner(a, addr)
@ -118,6 +135,17 @@ func (a *API) WalletBalance(ctx context.Context, addr address.Address) (types.Bi
return a.Chain.GetBalance(addr)
}
func (a *API) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) {
return a.Wallet.Sign(k, msg)
}
func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) {
addrs := a.Wallet.ListAddrs()
// TODO: store a default address in the config or 'wallet' portion of the repo
return addrs[0], nil
}
func (a *API) NetConnect(ctx context.Context, p peer.AddrInfo) error {
return a.Host.Connect(ctx, p)
}