Merge pull request #48 from filecoin-project/feat/create-miner

create miner command, stubbed out methods
This commit is contained in:
Whyrusleeping 2019-07-22 12:13:11 -07:00 committed by GitHub
commit 19979a5ab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 411 additions and 14 deletions

View File

@ -8,7 +8,6 @@ commands:
steps:
- go/install-ssh
- go/install: {package: git}
- go/install: {package: bzr}
prepare:
steps:
- checkout

View File

@ -26,6 +26,11 @@ type Import struct {
Size uint64
}
type MsgWait struct {
InBlock cid.Cid
Receipt types.MessageReceipt
}
// API is a low-level interface to the Filecoin network
type API interface {
// chain
@ -33,6 +38,7 @@ type API interface {
ChainHead(context.Context) (*chain.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *chain.TipSet) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
// messages
@ -42,6 +48,7 @@ type API interface {
// // mpool
// // // ls / show / rm
MpoolPending(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error)
MpoolPush(context.Context, *chain.SignedMessage) error
// dag
@ -74,6 +81,12 @@ type API interface {
WalletNew(context.Context, string) (address.Address, error)
WalletList(context.Context) ([]address.Address, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletSign(context.Context, address.Address, []byte) (*chain.Signature, error)
WalletDefaultAddress(context.Context) (address.Address, error)
// Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain...
MpoolGetNonce(context.Context, address.Address) (uint64, error)
// // import
// // export
// // (on cli - cmd to list associations)

View File

@ -20,15 +20,20 @@ type Struct struct {
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error
ChainHead func(context.Context) (*chain.TipSet, error)
ChainGetRandomness func(context.Context, *chain.TipSet) ([]byte, error)
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error)
MpoolPending func(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error)
MpoolPush func(context.Context, *chain.SignedMessage) error
MinerStart func(context.Context, address.Address) error
MinerCreateBlock func(context.Context, address.Address, *chain.TipSet, []chain.Ticket, chain.ElectionProof, []*chain.SignedMessage) (*chain.BlockMsg, error)
WalletNew func(context.Context, string) (address.Address, error)
WalletList func(context.Context) ([]address.Address, error)
WalletBalance func(context.Context, address.Address) (types.BigInt, error)
WalletNew func(context.Context, string) (address.Address, error)
WalletList func(context.Context) ([]address.Address, error)
WalletBalance func(context.Context, address.Address) (types.BigInt, error)
WalletSign func(context.Context, address.Address, []byte) (*chain.Signature, error)
WalletDefaultAddress func(context.Context) (address.Address, error)
MpoolGetNonce func(context.Context, address.Address) (uint64, error)
ClientImport func(ctx context.Context, path string) (cid.Cid, error)
ClientListImports func(ctx context.Context) ([]Import, error)
@ -51,6 +56,10 @@ func (c *Struct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.S
return c.Internal.MpoolPending(ctx, ts)
}
func (c *Struct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error {
return c.Internal.MpoolPush(ctx, smsg)
}
func (c *Struct) MinerStart(ctx context.Context, addr address.Address) error {
return c.Internal.MinerStart(ctx, addr)
}
@ -83,6 +92,10 @@ func (c *Struct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]b
return c.Internal.ChainGetRandomness(ctx, pts)
}
func (c *Struct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.ChainWaitMsg(ctx, msgc)
}
// ID implements API.ID
func (c *Struct) ID(ctx context.Context) (peer.ID, error) {
return c.Internal.ID(ctx)
@ -105,4 +118,16 @@ func (c *Struct) WalletBalance(ctx context.Context, a address.Address) (types.Bi
return c.Internal.WalletBalance(ctx, a)
}
func (c *Struct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) {
return c.Internal.WalletSign(ctx, k, msg)
}
func (c *Struct) WalletDefaultAddress(ctx context.Context) (address.Address, error) {
return c.Internal.WalletDefaultAddress(ctx)
}
func (c *Struct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return c.Internal.MpoolGetNonce(ctx, addr)
}
var _ API = &Struct{}

View File

@ -2,6 +2,7 @@ package chain
import (
"context"
"encoding/json"
"fmt"
"sync"
@ -220,7 +221,7 @@ type ChainStore struct {
bestTips *pubsub.PubSub
headChange func(rev, app []*TipSet) error
headChangeNotifs []func(rev, app []*TipSet) error
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
@ -231,8 +232,42 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
}
}
func (cs *ChainStore) SubNewTips() chan interface{} {
return cs.bestTips.Sub("best")
func (cs *ChainStore) SubNewTips() chan *TipSet {
subch := cs.bestTips.Sub("best")
out := make(chan *TipSet)
go func() {
defer close(out)
for val := range subch {
out <- val.(*TipSet)
}
}()
return out
}
const (
HCRevert = "revert"
HCApply = "apply"
)
type HeadChange struct {
Type string
Val *TipSet
}
func (cs *ChainStore) SubHeadChanges() chan *HeadChange {
subch := cs.bestTips.Sub("headchange")
out := make(chan *HeadChange, 16)
go func() {
defer close(out)
for val := range subch {
out <- val.(*HeadChange)
}
}()
return out
}
func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*TipSet) error) {
cs.headChangeNotifs = append(cs.headChangeNotifs, f)
}
func (cs *ChainStore) SetGenesis(b *BlockHeader) error {
@ -275,7 +310,9 @@ func (cs *ChainStore) maybeTakeHeavierTipSet(ts *TipSet) error {
if err != nil {
return err
}
cs.headChange(revert, apply)
for _, hcf := range cs.headChangeNotifs {
hcf(revert, apply)
}
log.Infof("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
}
@ -470,7 +507,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*SignedMessage, error) {
return DecodeSignedMessage(sb.RawData())
}
func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) {
func (cs *ChainStore) MessageCidsForBlock(b *BlockHeader) ([]cid.Cid, error) {
cst := hamt.CSTFromBstore(cs.bs)
shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst)
if err != nil {
@ -491,9 +528,43 @@ func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error)
return nil, err
}
return cids, nil
}
func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) {
cids, err := cs.MessageCidsForBlock(b)
if err != nil {
return nil, err
}
return cs.LoadMessagesFromCids(cids)
}
func (cs *ChainStore) GetReceipt(b *BlockHeader, i int) (*types.MessageReceipt, error) {
cst := hamt.CSTFromBstore(cs.bs)
shar, err := sharray.Load(context.TODO(), b.MessageReceipts, 4, cst)
if err != nil {
return nil, errors.Wrap(err, "sharray load")
}
ival, err := shar.Get(context.TODO(), i)
if err != nil {
return nil, err
}
// @warpfork, @EricMyhre help me. save me.
out, err := json.Marshal(ival)
if err != nil {
return nil, err
}
var r types.MessageReceipt
if err := json.Unmarshal(out, &r); err != nil {
return nil, err
}
return &r, nil
}
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*SignedMessage, error) {
msgs := make([]*SignedMessage, 0, len(cids))
for _, c := range cids {
@ -528,3 +599,66 @@ func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) {
return act.Balance, nil
}
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
tsub := cs.SubHeadChanges()
head := cs.GetHeaviestTipSet()
bc, r, err := cs.tipsetContainsMsg(head, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
for {
select {
case val := <-tsub:
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
}
case <-ctx.Done():
return cid.Undef, nil, ctx.Err()
}
}
}
func (cs *ChainStore) tipsetContainsMsg(ts *TipSet, msg cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
for _, b := range ts.Blocks() {
r, err := cs.blockContainsMsg(b, msg)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return b.Cid(), r, nil
}
}
return cid.Undef, nil, nil
}
func (cs *ChainStore) blockContainsMsg(blk *BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) {
msgs, err := cs.MessageCidsForBlock(blk)
if err != nil {
return nil, err
}
for i, mc := range msgs {
if mc == msg {
return cs.GetReceipt(blk, i)
}
}
return nil, nil
}

View File

@ -4,6 +4,7 @@ import (
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
hamt "github.com/ipfs/go-hamt-ipld"
)
type MessagePool struct {
@ -37,7 +38,7 @@ func NewMessagePool(cs *ChainStore) *MessagePool {
pending: make(map[address.Address]*msgSet),
cs: cs,
}
cs.headChange = mp.HeadChange
cs.SubscribeHeadChanges(mp.HeadChange)
return mp
}
@ -74,6 +75,36 @@ func (mp *MessagePool) Add(m *SignedMessage) error {
return nil
}
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.lk.Lock()
defer mp.lk.Unlock()
mset, ok := mp.pending[addr]
if ok {
return mset.startNonce + uint64(len(mset.msgs)), nil
}
head := mp.cs.GetHeaviestTipSet()
state, err := mp.cs.TipSetState(head.Cids())
if err != nil {
return 0, err
}
cst := hamt.CSTFromBstore(mp.cs.bs)
st, err := LoadStateTree(cst, state)
if err != nil {
return 0, err
}
act, err := st.GetActor(addr)
if err != nil {
return 0, err
}
return act.Nonce, nil
}
func (mp *MessagePool) Remove(m *SignedMessage) {
mp.lk.Lock()
defer mp.lk.Unlock()

View File

@ -71,6 +71,9 @@ func (bi *BigInt) UnmarshalJSON(b []byte) error {
i, ok := big.NewInt(0).SetString(s, 10)
if !ok {
if string(s) == "<nil>" {
return nil
}
return fmt.Errorf("failed to parse bigint string")
}

36
chain/types_test.go Normal file
View File

@ -0,0 +1,36 @@
package chain
import (
"encoding/json"
"testing"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
)
func TestSignedMessageJsonRoundtrip(t *testing.T) {
to, _ := address.NewIDAddress(5234623)
from, _ := address.NewIDAddress(603911192)
smsg := &SignedMessage{
Message: types.Message{
To: to,
From: from,
Params: []byte("some bytes, idk"),
Method: 1235126,
Value: types.NewInt(123123),
GasPrice: types.NewInt(1234),
GasLimit: types.NewInt(9992969384),
Nonce: 123123,
},
}
out, err := json.Marshal(smsg)
if err != nil {
t.Fatal(err)
}
var osmsg SignedMessage
if err := json.Unmarshal(out, &osmsg); err != nil {
t.Fatal(err)
}
}

View File

@ -66,4 +66,5 @@ var Commands = []*cli.Command{
mpoolCmd,
minerCmd,
walletCmd,
createMinerCmd,
}

121
cli/createminer.go Normal file
View File

@ -0,0 +1,121 @@
package cli
import (
"fmt"
"strconv"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/chain"
actors "github.com/filecoin-project/go-lotus/chain/actors"
address "github.com/filecoin-project/go-lotus/chain/address"
types "github.com/filecoin-project/go-lotus/chain/types"
"github.com/libp2p/go-libp2p-core/peer"
)
var createMinerCmd = &cli.Command{
Name: "createminer",
Usage: "Create a new storage market actor",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 4 {
return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID")
}
api, err := getAPI(cctx)
if err != nil {
return err
}
args := cctx.Args().Slice()
worker, err := address.NewFromString(args[0])
if err != nil {
return err
}
owner, err := address.NewFromString(args[1])
if err != nil {
return err
}
ssize, err := strconv.ParseUint(args[2], 10, 64)
if err != nil {
return err
}
pid, err := peer.IDB58Decode(args[3])
if err != nil {
return err
}
createMinerArgs := actors.CreateStorageMinerParams{
Worker: worker,
Owner: owner,
SectorSize: types.NewInt(ssize),
PeerID: pid,
}
ctx := reqContext(cctx)
addr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return xerrors.Errorf("failed to get default address: %w", err)
}
params, err := actors.SerializeParams(createMinerArgs)
if err != nil {
return err
}
nonce, err := api.MpoolGetNonce(ctx, addr)
if err != nil {
return xerrors.Errorf("failed to get account nonce: %w", err)
}
msg := types.Message{
To: actors.StorageMarketAddress,
From: addr,
Method: 1, // TODO: constants pls
Params: params,
Value: types.NewInt(0),
Nonce: nonce,
GasPrice: types.NewInt(1),
GasLimit: types.NewInt(1),
}
msgbytes, err := msg.Serialize()
if err != nil {
return err
}
sig, err := api.WalletSign(ctx, addr, msgbytes)
if err != nil {
return xerrors.Errorf("failed to sign message: %w", err)
}
smsg := &chain.SignedMessage{
Message: msg,
Signature: *sig,
}
if err := api.MpoolPush(ctx, smsg); err != nil {
return xerrors.Errorf("failed to push signed message: %w", err)
}
mwait, err := api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return xerrors.Errorf("failed waiting for message inclusion: %w", err)
}
maddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return err
}
fmt.Printf("miner created in block %s\n", mwait.InBlock)
fmt.Printf("new miner address: %s\n", maddr)
return nil
},
}

2
go.mod
View File

@ -59,7 +59,7 @@ require (
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33
go.opencensus.io v0.22.0 // indirect
go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0

4
go.sum
View File

@ -418,8 +418,8 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d h1:wnjWu1N8UTNf2zzF5FWlEyNNbNw5GMVHaHaaLdvdTdA=
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d/go.mod h1:g7ckxrjiFh8mi1AY7ox23PZD0g6QU/TxW3U3unX7I3A=
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8 h1:n89ErB+0d4SBbyD8ykr7Q/j+C41ysUttZG3l9/2ufC4=
github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA=
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33 h1:7Bsg3GZnFAhdadeyRie9ReenkK2XbC2FlOpJQgTzpbA=
github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=

View File

@ -2,6 +2,7 @@ package jsonrpc
import (
"encoding/json"
"fmt"
"reflect"
)
@ -42,7 +43,8 @@ func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
panic("expected error as second return value")
}
default:
panic("too many error values")
errstr := fmt.Sprintf("too many return values: %s", funcType)
panic(errstr)
}
return

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-lotus/miner"
"github.com/filecoin-project/go-lotus/node/client"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -50,6 +51,10 @@ func (a *API) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte
return []byte("foo bar random"), nil
}
func (a *API) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
panic("TODO")
}
func (a *API) ID(context.Context) (peer.ID, error) {
return a.Host.ID(), nil
}
@ -66,6 +71,19 @@ func (a *API) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.Sign
return a.Mpool.Pending(), nil
}
func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error {
msgb, err := smsg.Serialize()
if err != nil {
return err
}
return a.PubSub.Publish("/fil/messages", msgb)
}
func (a *API) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
func (a *API) MinerStart(ctx context.Context, addr address.Address) error {
// hrm...
m := miner.NewMiner(a, addr)
@ -118,6 +136,20 @@ func (a *API) WalletBalance(ctx context.Context, addr address.Address) (types.Bi
return a.Chain.GetBalance(addr)
}
func (a *API) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) {
return a.Wallet.Sign(k, msg)
}
func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) {
addrs, err := a.Wallet.ListAddrs()
if err != nil {
return address.Undef, err
}
// TODO: store a default address in the config or 'wallet' portion of the repo
return addrs[0], nil
}
func (a *API) NetConnect(ctx context.Context, p peer.AddrInfo) error {
return a.Host.Connect(ctx, p)
}