diff --git a/.circleci/config.yml b/.circleci/config.yml index b35d2ceb4..3b8c78592 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,6 @@ commands: steps: - go/install-ssh - go/install: {package: git} - - go/install: {package: bzr} prepare: steps: - checkout diff --git a/api/api.go b/api/api.go index ff6017571..d2c2ad424 100644 --- a/api/api.go +++ b/api/api.go @@ -26,6 +26,11 @@ type Import struct { Size uint64 } +type MsgWait struct { + InBlock cid.Cid + Receipt types.MessageReceipt +} + // API is a low-level interface to the Filecoin network type API interface { // chain @@ -33,6 +38,7 @@ type API interface { ChainHead(context.Context) (*chain.TipSet, error) // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *chain.TipSet) ([]byte, error) + ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) // messages @@ -42,6 +48,7 @@ type API interface { // // mpool // // // ls / show / rm MpoolPending(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error) + MpoolPush(context.Context, *chain.SignedMessage) error // dag @@ -74,6 +81,12 @@ type API interface { WalletNew(context.Context, string) (address.Address, error) WalletList(context.Context) ([]address.Address, error) WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletSign(context.Context, address.Address, []byte) (*chain.Signature, error) + WalletDefaultAddress(context.Context) (address.Address, error) + + // Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain... + MpoolGetNonce(context.Context, address.Address) (uint64, error) + // // import // // export // // (on cli - cmd to list associations) diff --git a/api/struct.go b/api/struct.go index 8a44f193d..7783bcabc 100644 --- a/api/struct.go +++ b/api/struct.go @@ -20,15 +20,20 @@ type Struct struct { ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error ChainHead func(context.Context) (*chain.TipSet, error) ChainGetRandomness func(context.Context, *chain.TipSet) ([]byte, error) + ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) MpoolPending func(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error) + MpoolPush func(context.Context, *chain.SignedMessage) error MinerStart func(context.Context, address.Address) error MinerCreateBlock func(context.Context, address.Address, *chain.TipSet, []chain.Ticket, chain.ElectionProof, []*chain.SignedMessage) (*chain.BlockMsg, error) - WalletNew func(context.Context, string) (address.Address, error) - WalletList func(context.Context) ([]address.Address, error) - WalletBalance func(context.Context, address.Address) (types.BigInt, error) + WalletNew func(context.Context, string) (address.Address, error) + WalletList func(context.Context) ([]address.Address, error) + WalletBalance func(context.Context, address.Address) (types.BigInt, error) + WalletSign func(context.Context, address.Address, []byte) (*chain.Signature, error) + WalletDefaultAddress func(context.Context) (address.Address, error) + MpoolGetNonce func(context.Context, address.Address) (uint64, error) ClientImport func(ctx context.Context, path string) (cid.Cid, error) ClientListImports func(ctx context.Context) ([]Import, error) @@ -51,6 +56,10 @@ func (c *Struct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.S return c.Internal.MpoolPending(ctx, ts) } +func (c *Struct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { + return c.Internal.MpoolPush(ctx, smsg) +} + func (c *Struct) MinerStart(ctx context.Context, addr address.Address) error { return c.Internal.MinerStart(ctx, addr) } @@ -83,6 +92,10 @@ func (c *Struct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]b return c.Internal.ChainGetRandomness(ctx, pts) } +func (c *Struct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { + return c.Internal.ChainWaitMsg(ctx, msgc) +} + // ID implements API.ID func (c *Struct) ID(ctx context.Context) (peer.ID, error) { return c.Internal.ID(ctx) @@ -105,4 +118,16 @@ func (c *Struct) WalletBalance(ctx context.Context, a address.Address) (types.Bi return c.Internal.WalletBalance(ctx, a) } +func (c *Struct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { + return c.Internal.WalletSign(ctx, k, msg) +} + +func (c *Struct) WalletDefaultAddress(ctx context.Context) (address.Address, error) { + return c.Internal.WalletDefaultAddress(ctx) +} + +func (c *Struct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { + return c.Internal.MpoolGetNonce(ctx, addr) +} + var _ API = &Struct{} diff --git a/chain/chain.go b/chain/chain.go index a971b0e4d..9c2e5f200 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -2,6 +2,7 @@ package chain import ( "context" + "encoding/json" "fmt" "sync" @@ -220,7 +221,7 @@ type ChainStore struct { bestTips *pubsub.PubSub - headChange func(rev, app []*TipSet) error + headChangeNotifs []func(rev, app []*TipSet) error } func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { @@ -231,8 +232,42 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { } } -func (cs *ChainStore) SubNewTips() chan interface{} { - return cs.bestTips.Sub("best") +func (cs *ChainStore) SubNewTips() chan *TipSet { + subch := cs.bestTips.Sub("best") + out := make(chan *TipSet) + go func() { + defer close(out) + for val := range subch { + out <- val.(*TipSet) + } + }() + return out +} + +const ( + HCRevert = "revert" + HCApply = "apply" +) + +type HeadChange struct { + Type string + Val *TipSet +} + +func (cs *ChainStore) SubHeadChanges() chan *HeadChange { + subch := cs.bestTips.Sub("headchange") + out := make(chan *HeadChange, 16) + go func() { + defer close(out) + for val := range subch { + out <- val.(*HeadChange) + } + }() + return out +} + +func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*TipSet) error) { + cs.headChangeNotifs = append(cs.headChangeNotifs, f) } func (cs *ChainStore) SetGenesis(b *BlockHeader) error { @@ -275,7 +310,9 @@ func (cs *ChainStore) maybeTakeHeavierTipSet(ts *TipSet) error { if err != nil { return err } - cs.headChange(revert, apply) + for _, hcf := range cs.headChangeNotifs { + hcf(revert, apply) + } log.Infof("New heaviest tipset! %s", ts.Cids()) cs.heaviest = ts } @@ -470,7 +507,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*SignedMessage, error) { return DecodeSignedMessage(sb.RawData()) } -func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) { +func (cs *ChainStore) MessageCidsForBlock(b *BlockHeader) ([]cid.Cid, error) { cst := hamt.CSTFromBstore(cs.bs) shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst) if err != nil { @@ -491,9 +528,43 @@ func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) return nil, err } + return cids, nil +} + +func (cs *ChainStore) MessagesForBlock(b *BlockHeader) ([]*SignedMessage, error) { + cids, err := cs.MessageCidsForBlock(b) + if err != nil { + return nil, err + } + return cs.LoadMessagesFromCids(cids) } +func (cs *ChainStore) GetReceipt(b *BlockHeader, i int) (*types.MessageReceipt, error) { + cst := hamt.CSTFromBstore(cs.bs) + shar, err := sharray.Load(context.TODO(), b.MessageReceipts, 4, cst) + if err != nil { + return nil, errors.Wrap(err, "sharray load") + } + + ival, err := shar.Get(context.TODO(), i) + if err != nil { + return nil, err + } + + // @warpfork, @EricMyhre help me. save me. + out, err := json.Marshal(ival) + if err != nil { + return nil, err + } + var r types.MessageReceipt + if err := json.Unmarshal(out, &r); err != nil { + return nil, err + } + + return &r, nil +} + func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*SignedMessage, error) { msgs := make([]*SignedMessage, 0, len(cids)) for _, c := range cids { @@ -528,3 +599,66 @@ func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) { return act.Balance, nil } + +func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) { + tsub := cs.SubHeadChanges() + + head := cs.GetHeaviestTipSet() + + bc, r, err := cs.tipsetContainsMsg(head, mcid) + if err != nil { + return cid.Undef, nil, err + } + + if r != nil { + return bc, r, nil + } + + for { + select { + case val := <-tsub: + switch val.Type { + case HCRevert: + continue + case HCApply: + bc, r, err := cs.tipsetContainsMsg(val.Val, mcid) + if err != nil { + return cid.Undef, nil, err + } + if r != nil { + return bc, r, nil + } + } + case <-ctx.Done(): + return cid.Undef, nil, ctx.Err() + } + } +} + +func (cs *ChainStore) tipsetContainsMsg(ts *TipSet, msg cid.Cid) (cid.Cid, *types.MessageReceipt, error) { + for _, b := range ts.Blocks() { + r, err := cs.blockContainsMsg(b, msg) + if err != nil { + return cid.Undef, nil, err + } + if r != nil { + return b.Cid(), r, nil + } + } + return cid.Undef, nil, nil +} + +func (cs *ChainStore) blockContainsMsg(blk *BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) { + msgs, err := cs.MessageCidsForBlock(blk) + if err != nil { + return nil, err + } + + for i, mc := range msgs { + if mc == msg { + return cs.GetReceipt(blk, i) + } + } + + return nil, nil +} diff --git a/chain/messagepool.go b/chain/messagepool.go index 19d3154e6..84af33e41 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/filecoin-project/go-lotus/chain/address" + hamt "github.com/ipfs/go-hamt-ipld" ) type MessagePool struct { @@ -37,7 +38,7 @@ func NewMessagePool(cs *ChainStore) *MessagePool { pending: make(map[address.Address]*msgSet), cs: cs, } - cs.headChange = mp.HeadChange + cs.SubscribeHeadChanges(mp.HeadChange) return mp } @@ -74,6 +75,36 @@ func (mp *MessagePool) Add(m *SignedMessage) error { return nil } +func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { + mp.lk.Lock() + defer mp.lk.Unlock() + + mset, ok := mp.pending[addr] + if ok { + return mset.startNonce + uint64(len(mset.msgs)), nil + } + + head := mp.cs.GetHeaviestTipSet() + + state, err := mp.cs.TipSetState(head.Cids()) + if err != nil { + return 0, err + } + + cst := hamt.CSTFromBstore(mp.cs.bs) + st, err := LoadStateTree(cst, state) + if err != nil { + return 0, err + } + + act, err := st.GetActor(addr) + if err != nil { + return 0, err + } + + return act.Nonce, nil +} + func (mp *MessagePool) Remove(m *SignedMessage) { mp.lk.Lock() defer mp.lk.Unlock() diff --git a/chain/types/bigint.go b/chain/types/bigint.go index ae2efc470..5e5b54dc5 100644 --- a/chain/types/bigint.go +++ b/chain/types/bigint.go @@ -71,6 +71,9 @@ func (bi *BigInt) UnmarshalJSON(b []byte) error { i, ok := big.NewInt(0).SetString(s, 10) if !ok { + if string(s) == "" { + return nil + } return fmt.Errorf("failed to parse bigint string") } diff --git a/chain/types_test.go b/chain/types_test.go new file mode 100644 index 000000000..99bbb8212 --- /dev/null +++ b/chain/types_test.go @@ -0,0 +1,36 @@ +package chain + +import ( + "encoding/json" + "testing" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func TestSignedMessageJsonRoundtrip(t *testing.T) { + to, _ := address.NewIDAddress(5234623) + from, _ := address.NewIDAddress(603911192) + smsg := &SignedMessage{ + Message: types.Message{ + To: to, + From: from, + Params: []byte("some bytes, idk"), + Method: 1235126, + Value: types.NewInt(123123), + GasPrice: types.NewInt(1234), + GasLimit: types.NewInt(9992969384), + Nonce: 123123, + }, + } + + out, err := json.Marshal(smsg) + if err != nil { + t.Fatal(err) + } + + var osmsg SignedMessage + if err := json.Unmarshal(out, &osmsg); err != nil { + t.Fatal(err) + } +} diff --git a/cli/cmd.go b/cli/cmd.go index eae207d06..ddba0a542 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -66,4 +66,5 @@ var Commands = []*cli.Command{ mpoolCmd, minerCmd, walletCmd, + createMinerCmd, } diff --git a/cli/createminer.go b/cli/createminer.go new file mode 100644 index 000000000..f22931627 --- /dev/null +++ b/cli/createminer.go @@ -0,0 +1,121 @@ +package cli + +import ( + "fmt" + "strconv" + + "golang.org/x/xerrors" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/chain" + actors "github.com/filecoin-project/go-lotus/chain/actors" + address "github.com/filecoin-project/go-lotus/chain/address" + types "github.com/filecoin-project/go-lotus/chain/types" + + "github.com/libp2p/go-libp2p-core/peer" +) + +var createMinerCmd = &cli.Command{ + Name: "createminer", + Usage: "Create a new storage market actor", + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() != 4 { + return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID") + } + + api, err := getAPI(cctx) + if err != nil { + return err + } + + args := cctx.Args().Slice() + + worker, err := address.NewFromString(args[0]) + if err != nil { + return err + } + + owner, err := address.NewFromString(args[1]) + if err != nil { + return err + } + + ssize, err := strconv.ParseUint(args[2], 10, 64) + if err != nil { + return err + } + + pid, err := peer.IDB58Decode(args[3]) + if err != nil { + return err + } + + createMinerArgs := actors.CreateStorageMinerParams{ + Worker: worker, + Owner: owner, + SectorSize: types.NewInt(ssize), + PeerID: pid, + } + + ctx := reqContext(cctx) + addr, err := api.WalletDefaultAddress(ctx) + if err != nil { + return xerrors.Errorf("failed to get default address: %w", err) + } + + params, err := actors.SerializeParams(createMinerArgs) + if err != nil { + return err + } + + nonce, err := api.MpoolGetNonce(ctx, addr) + if err != nil { + return xerrors.Errorf("failed to get account nonce: %w", err) + } + + msg := types.Message{ + To: actors.StorageMarketAddress, + From: addr, + Method: 1, // TODO: constants pls + Params: params, + Value: types.NewInt(0), + Nonce: nonce, + GasPrice: types.NewInt(1), + GasLimit: types.NewInt(1), + } + + msgbytes, err := msg.Serialize() + if err != nil { + return err + } + + sig, err := api.WalletSign(ctx, addr, msgbytes) + if err != nil { + return xerrors.Errorf("failed to sign message: %w", err) + } + + smsg := &chain.SignedMessage{ + Message: msg, + Signature: *sig, + } + + if err := api.MpoolPush(ctx, smsg); err != nil { + return xerrors.Errorf("failed to push signed message: %w", err) + } + + mwait, err := api.ChainWaitMsg(ctx, smsg.Cid()) + if err != nil { + return xerrors.Errorf("failed waiting for message inclusion: %w", err) + } + + maddr, err := address.NewFromBytes(mwait.Receipt.Return) + if err != nil { + return err + } + + fmt.Printf("miner created in block %s\n", mwait.InBlock) + fmt.Printf("new miner address: %s\n", maddr) + + return nil + }, +} diff --git a/go.mod b/go.mod index 451de0c25..052eacd27 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,7 @@ require ( github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d - github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8 + github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33 go.opencensus.io v0.22.0 // indirect go.uber.org/dig v1.7.0 // indirect go.uber.org/fx v1.9.0 diff --git a/go.sum b/go.sum index e7a0cdfd0..a6aa7f2a8 100644 --- a/go.sum +++ b/go.sum @@ -418,8 +418,8 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1: github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d h1:wnjWu1N8UTNf2zzF5FWlEyNNbNw5GMVHaHaaLdvdTdA= github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d/go.mod h1:g7ckxrjiFh8mi1AY7ox23PZD0g6QU/TxW3U3unX7I3A= -github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8 h1:n89ErB+0d4SBbyD8ykr7Q/j+C41ysUttZG3l9/2ufC4= -github.com/whyrusleeping/sharray v0.0.0-20190520213710-bd32aab369f8/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA= +github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33 h1:7Bsg3GZnFAhdadeyRie9ReenkK2XbC2FlOpJQgTzpbA= +github.com/whyrusleeping/sharray v0.0.0-20190718051354-e41931821e33/go.mod h1:c1pwhNePDPlcYJZinQlfLTOKwTmVf45nfdTg73yOOcA= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/lib/jsonrpc/util.go b/lib/jsonrpc/util.go index ece0af7de..03b0bf7bd 100644 --- a/lib/jsonrpc/util.go +++ b/lib/jsonrpc/util.go @@ -2,6 +2,7 @@ package jsonrpc import ( "encoding/json" + "fmt" "reflect" ) @@ -42,7 +43,8 @@ func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) { panic("expected error as second return value") } default: - panic("too many error values") + errstr := fmt.Sprintf("too many return values: %s", funcType) + panic(errstr) } return diff --git a/node/api.go b/node/api.go index e2700fb8f..ee0954ee6 100644 --- a/node/api.go +++ b/node/api.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-lotus/miner" "github.com/filecoin-project/go-lotus/node/client" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -50,6 +51,10 @@ func (a *API) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte return []byte("foo bar random"), nil } +func (a *API) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { + panic("TODO") +} + func (a *API) ID(context.Context) (peer.ID, error) { return a.Host.ID(), nil } @@ -66,6 +71,19 @@ func (a *API) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.Sign return a.Mpool.Pending(), nil } +func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { + msgb, err := smsg.Serialize() + if err != nil { + return err + } + + return a.PubSub.Publish("/fil/messages", msgb) +} + +func (a *API) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { + return a.Mpool.GetNonce(addr) +} + func (a *API) MinerStart(ctx context.Context, addr address.Address) error { // hrm... m := miner.NewMiner(a, addr) @@ -118,6 +136,20 @@ func (a *API) WalletBalance(ctx context.Context, addr address.Address) (types.Bi return a.Chain.GetBalance(addr) } +func (a *API) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { + return a.Wallet.Sign(k, msg) +} + +func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) { + addrs, err := a.Wallet.ListAddrs() + if err != nil { + return address.Undef, err + } + + // TODO: store a default address in the config or 'wallet' portion of the repo + return addrs[0], nil +} + func (a *API) NetConnect(ctx context.Context, p peer.AddrInfo) error { return a.Host.Connect(ctx, p) }