From e087cc2e7a5b28be0dbe5b8abe26657be03a11b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 20 Aug 2019 18:48:33 +0200 Subject: [PATCH] impl: Split full node implementation --- node/impl/full.go | 695 +-------------------------------------- node/impl/full/chain.go | 181 ++++++++++ node/impl/full/client.go | 183 +++++++++++ node/impl/full/mpool.go | 41 +++ node/impl/full/paych.go | 248 ++++++++++++++ node/impl/full/state.go | 139 ++++++++ node/impl/full/wallet.go | 70 ++++ 7 files changed, 874 insertions(+), 683 deletions(-) create mode 100644 node/impl/full/chain.go create mode 100644 node/impl/full/client.go create mode 100644 node/impl/full/mpool.go create mode 100644 node/impl/full/paych.go create mode 100644 node/impl/full/state.go create mode 100644 node/impl/full/wallet.go diff --git a/node/impl/full.go b/node/impl/full.go index 77c22a7c4..c617b57b6 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -2,701 +2,30 @@ package impl import ( "context" - "fmt" - "strconv" - - "github.com/filecoin-project/go-lotus/lib/bufbstore" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/chain" - "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/chain/deals" - "github.com/filecoin-project/go-lotus/chain/gen" - "github.com/filecoin-project/go-lotus/chain/state" - "github.com/filecoin-project/go-lotus/chain/store" - "github.com/filecoin-project/go-lotus/chain/types" - "github.com/filecoin-project/go-lotus/chain/vm" - "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/miner" - "github.com/filecoin-project/go-lotus/node/client" - "github.com/filecoin-project/go-lotus/paych" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-hamt-ipld" - cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/go-lotus/node/impl/full" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/peer" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "golang.org/x/xerrors" ) var log = logging.Logger("node") -type FullNodeAPI struct { - client.LocalStorage - +type API struct { CommonAPI + full.ChainAPI + full.ClientAPI + full.MpoolAPI + full.PaychAPI + full.StateAPI + full.WalletAPI - DealClient *deals.Client - Chain *store.ChainStore - PubSub *pubsub.PubSub - Mpool *chain.MessagePool - Wallet *wallet.Wallet - PaychMgr *paych.Manager + Miner *miner.Miner } -func (a *FullNodeAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { - // TODO: make this a param - self, err := a.WalletDefaultAddress(ctx) - if err != nil { - return nil, err - } - - // get miner peerID - msg := &types.Message{ - To: miner, - From: miner, - Method: actors.MAMethods.GetPeerID, - } - - r, err := a.ChainCall(ctx, msg, nil) - if err != nil { - return nil, err - } - pid, err := peer.IDFromBytes(r.Return) - if err != nil { - return nil, err - } - - vd, err := a.DealClient.VerifyParams(ctx, data) - if err != nil { - return nil, err - } - - voucherData, err := cbor.DumpObject(vd) - if err != nil { - return nil, err - } - - // setup payments - total := types.BigMul(price, types.NewInt(blocksDuration)) - - // TODO: at least ping the miner before creating paych / locking the money - paych, paychMsg, err := a.paychCreate(ctx, self, miner, total) - if err != nil { - return nil, err - } - - voucher := types.SignedVoucher{ - // TimeLock: 0, // TODO: do we want to use this somehow? - Extra: &types.ModVerifyParams{ - Actor: miner, - Method: actors.MAMethods.PaymentVerifyInclusion, - Data: voucherData, - }, - Lane: 0, - Amount: total, - MinCloseHeight: blocksDuration, // TODO: some way to start this after initial piece inclusion by actor? (also, at least add current height) - } - - sv, err := a.paychVoucherCreate(ctx, paych, voucher) - if err != nil { - return nil, err - } - - proposal := deals.ClientDealProposal{ - Data: data, - TotalPrice: total, - Duration: blocksDuration, - Payment: actors.PaymentInfo{ - PayChActor: paych, - Payer: self, - ChannelMessage: paychMsg, - Vouchers: []types.SignedVoucher{*sv}, - }, - MinerAddress: miner, - ClientAddress: self, - MinerID: pid, - } - - c, err := a.DealClient.Start(ctx, proposal, vd) - // TODO: send updated voucher with PaymentVerifySector for cheaper validation (validate the sector the miner sent us first!) - return &c, err +func (a *API) MinerRegister(ctx context.Context, addr address.Address) error { + return a.Miner.Register(addr) } -func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { - return a.Chain.SubHeadChanges(ctx), nil -} - -func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { - if err := a.Chain.AddBlock(blk.Header); err != nil { - return err - } - - b, err := blk.Serialize() - if err != nil { - return err - } - - // TODO: anything else to do here? - return a.PubSub.Publish("/fil/blocks", b) -} - -func (a *FullNodeAPI) ChainHead(context.Context) (*types.TipSet, error) { - return a.Chain.GetHeaviestTipSet(), nil -} - -func (a *FullNodeAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet) ([]byte, error) { - // TODO: this needs to look back in the chain for the right random beacon value - return []byte("foo bar random"), nil -} - -func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { - blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg) - if err != nil { - return nil, err - } - - return &api.MsgWait{ - InBlock: blkcid, - Receipt: *recpt, - }, nil -} - -func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) { - return a.Chain.GetBlock(msg) -} - -func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { - b, err := a.Chain.GetBlock(msg) - if err != nil { - return nil, err - } - - bmsgs, smsgs, err := a.Chain.MessagesForBlock(b) - if err != nil { - return nil, err - } - - return &api.BlockMessages{ - BlsMessages: bmsgs, - SecpkMessages: smsgs, - }, nil -} - -func (a *FullNodeAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) { - b, err := a.Chain.GetBlock(bcid) - if err != nil { - return nil, err - } - - // TODO: need to get the number of messages better than this - bm, sm, err := a.Chain.MessagesForBlock(b) - if err != nil { - return nil, err - } - - var out []*types.MessageReceipt - for i := 0; i < len(bm)+len(sm); i++ { - r, err := a.Chain.GetReceipt(b, i) - if err != nil { - return nil, err - } - - out = append(out, r) - } - - return out, nil -} - -func (a *FullNodeAPI) ChainCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { - return vm.Call(ctx, a.Chain, msg, ts) -} - -func (a *FullNodeAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) { - if ts == nil { - ts = a.Chain.GetHeaviestTipSet() - } - - st, err := a.Chain.TipSetState(ts.Cids()) - if err != nil { - return nil, err - } - - buf := bufbstore.NewBufferedBstore(a.Chain.Blockstore()) - cst := hamt.CSTFromBstore(buf) - return state.LoadStateTree(cst, st) -} - -func (a *FullNodeAPI) ChainGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) { - state, err := a.stateForTs(ts) - if err != nil { - return nil, err - } - - return state.GetActor(actor) -} - -func (a *FullNodeAPI) ChainReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) { - state, err := a.stateForTs(ts) - if err != nil { - return nil, err - } - - blk, err := state.Store.Blocks.GetBlock(ctx, act.Head) - if err != nil { - return nil, err - } - - oif, err := vm.DumpActorState(act.Code, blk.RawData()) - if err != nil { - return nil, err - } - - return &api.ActorState{ - Balance: act.Balance, - State: oif, - }, nil -} - -func (a *FullNodeAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { - // TODO: need to make sure we don't return messages that were already included in the referenced chain - // also need to accept ts == nil just fine, assume nil == chain.Head() - return a.Mpool.Pending(), nil -} - -func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error { - msgb, err := smsg.Serialize() - if err != nil { - return err - } - if err := a.Mpool.Add(smsg); err != nil { - return err - } - - return a.PubSub.Publish("/fil/messages", msgb) -} - -func (a *FullNodeAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { - return a.Mpool.GetNonce(addr) -} - -func (a *FullNodeAPI) MinerStart(ctx context.Context, addr address.Address) error { - // hrm... - m := miner.NewMiner(a, addr) - - go m.Mine(context.TODO()) - - return nil -} - -func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage) (*chain.BlockMsg, error) { - fblk, err := gen.MinerCreateBlock(ctx, a.Chain, addr, parents, tickets, proof, msgs) - if err != nil { - return nil, err - } - - var out chain.BlockMsg - out.Header = fblk.Header - for _, msg := range fblk.BlsMessages { - out.BlsMessages = append(out.BlsMessages, msg.Cid()) - } - for _, msg := range fblk.SecpkMessages { - out.SecpkMessages = append(out.SecpkMessages, msg.Cid()) - } - - return &out, nil -} - -func (a *FullNodeAPI) WalletNew(ctx context.Context, typ string) (address.Address, error) { - return a.Wallet.GenerateKey(typ) -} - -func (a *FullNodeAPI) WalletHas(ctx context.Context, addr address.Address) (bool, error) { - return a.Wallet.HasKey(addr) -} - -func (a *FullNodeAPI) WalletList(ctx context.Context) ([]address.Address, error) { - return a.Wallet.ListAddrs() -} - -func (a *FullNodeAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { - return a.Chain.GetBalance(addr) -} - -func (a *FullNodeAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*types.Signature, error) { - return a.Wallet.Sign(ctx, k, msg) -} - -func (a *FullNodeAPI) WalletSignMessage(ctx context.Context, k address.Address, msg *types.Message) (*types.SignedMessage, error) { - msgbytes, err := msg.Serialize() - if err != nil { - return nil, err - } - - sig, err := a.WalletSign(ctx, k, msgbytes) - if err != nil { - return nil, xerrors.Errorf("failed to sign message: %w", err) - } - - return &types.SignedMessage{ - Message: *msg, - Signature: *sig, - }, nil -} - -func (a *FullNodeAPI) WalletDefaultAddress(ctx context.Context) (address.Address, error) { - addrs, err := a.Wallet.ListAddrs() - if err != nil { - return address.Undef, err - } - if len(addrs) == 0 { - return address.Undef, xerrors.New("no addresses in wallet") - } - - // TODO: store a default address in the config or 'wallet' portion of the repo - return addrs[0], nil -} - -func (a *FullNodeAPI) StateMinerSectors(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { - ts := a.Chain.GetHeaviestTipSet() - - stc, err := a.Chain.TipSetState(ts.Cids()) - if err != nil { - return nil, err - } - - cst := hamt.CSTFromBstore(a.Chain.Blockstore()) - - st, err := state.LoadStateTree(cst, stc) - if err != nil { - return nil, err - } - - act, err := st.GetActor(addr) - if err != nil { - return nil, err - } - - var minerState actors.StorageMinerActorState - if err := cst.Get(ctx, act.Head, &minerState); err != nil { - return nil, err - } - - nd, err := hamt.LoadNode(ctx, cst, minerState.Sectors) - if err != nil { - return nil, err - } - - log.Info("miner sector count: ", minerState.SectorSetSize) - - var sinfos []*api.SectorInfo - // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set - err = nd.ForEach(ctx, func(k string, val interface{}) error { - sid, err := strconv.ParseUint(k, 10, 64) - if err != nil { - return err - } - - bval, ok := val.([]byte) - if !ok { - return fmt.Errorf("expected to get bytes in sector set hamt") - } - - var comms [][]byte - if err := cbor.DecodeInto(bval, &comms); err != nil { - return err - } - - sinfos = append(sinfos, &api.SectorInfo{ - SectorID: sid, - CommR: comms[0], - CommD: comms[1], - }) - return nil - }) - return sinfos, nil -} - -func (a *FullNodeAPI) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { - ts := a.Chain.GetHeaviestTipSet() - - stc, err := a.Chain.TipSetState(ts.Cids()) - if err != nil { - return nil, err - } - - cst := hamt.CSTFromBstore(a.Chain.Blockstore()) - - st, err := state.LoadStateTree(cst, stc) - if err != nil { - return nil, err - } - - act, err := st.GetActor(addr) - if err != nil { - return nil, err - } - - var minerState actors.StorageMinerActorState - if err := cst.Get(ctx, act.Head, &minerState); err != nil { - return nil, err - } - - nd, err := hamt.LoadNode(ctx, cst, minerState.ProvingSet) - if err != nil { - return nil, err - } - - var sinfos []*api.SectorInfo - // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set - err = nd.ForEach(ctx, func(k string, val interface{}) error { - sid, err := strconv.ParseUint(k, 10, 64) - if err != nil { - return err - } - - bval, ok := val.([]byte) - if !ok { - return fmt.Errorf("expected to get bytes in sector set hamt") - } - - var comms [][]byte - if err := cbor.DecodeInto(bval, &comms); err != nil { - return err - } - - sinfos = append(sinfos, &api.SectorInfo{ - SectorID: sid, - CommR: comms[0], - CommD: comms[1], - }) - return nil - }) - return sinfos, nil -} - -func (a *FullNodeAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) { - act, _, err := a.paychCreate(ctx, from, to, amt) - return act, err -} - -func (a *FullNodeAPI) paychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { - params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to}) - if aerr != nil { - return address.Undef, cid.Undef, aerr - } - - nonce, err := a.MpoolGetNonce(ctx, from) - if err != nil { - return address.Undef, cid.Undef, err - } - - enc, err := actors.SerializeParams(&actors.ExecParams{ - Params: params, - Code: actors.PaymentChannelActorCodeCid, - }) - - msg := &types.Message{ - To: actors.InitActorAddress, - From: from, - Value: amt, - Nonce: nonce, - Method: actors.IAMethods.Exec, - Params: enc, - GasLimit: types.NewInt(1000), - GasPrice: types.NewInt(0), - } - - ser, err := msg.Serialize() - if err != nil { - return address.Undef, cid.Undef, err - } - - sig, err := a.WalletSign(ctx, from, ser) - if err != nil { - return address.Undef, cid.Undef, err - } - - smsg := &types.SignedMessage{ - Message: *msg, - Signature: *sig, - } - - if err := a.MpoolPush(ctx, smsg); err != nil { - return address.Undef, cid.Undef, err - } - - mwait, err := a.ChainWaitMsg(ctx, smsg.Cid()) - if err != nil { - return address.Undef, cid.Undef, err - } - - if mwait.Receipt.ExitCode != 0 { - return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) - } - - paychaddr, err := address.NewFromBytes(mwait.Receipt.Return) - if err != nil { - return address.Undef, cid.Undef, err - } - - if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil { - return address.Undef, cid.Undef, err - } - - return paychaddr, msg.Cid(), nil -} - -func (a *FullNodeAPI) PaychList(ctx context.Context) ([]address.Address, error) { - return a.PaychMgr.ListChannels() -} - -func (a *FullNodeAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.PaychStatus, error) { - panic("nyi") -} - -func (a *FullNodeAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Cid, error) { - ci, err := a.PaychMgr.GetChannelInfo(addr) - if err != nil { - return cid.Undef, err - } - - nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) - if err != nil { - return cid.Undef, err - } - - msg := &types.Message{ - To: addr, - From: ci.ControlAddr, - Value: types.NewInt(0), - Method: actors.PCAMethods.Close, - Nonce: nonce, - - GasLimit: types.NewInt(500), - GasPrice: types.NewInt(0), - } - - smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) - if err != nil { - return cid.Undef, err - } - - if err := a.MpoolPush(ctx, smsg); err != nil { - return cid.Undef, err - } - - return smsg.Cid(), nil -} - -func (a *FullNodeAPI) PaychVoucherCheckValid(ctx context.Context, ch address.Address, sv *types.SignedVoucher) error { - return a.PaychMgr.CheckVoucherValid(ctx, ch, sv) -} - -func (a *FullNodeAPI) PaychVoucherCheckSpendable(ctx context.Context, ch address.Address, sv *types.SignedVoucher, secret []byte, proof []byte) (bool, error) { - return a.PaychMgr.CheckVoucherSpendable(ctx, ch, sv, secret, proof) -} - -func (a *FullNodeAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *types.SignedVoucher) error { - if err := a.PaychVoucherCheckValid(ctx, ch, sv); err != nil { - return err - } - - return a.PaychMgr.AddVoucher(ctx, ch, sv) -} - -// PaychVoucherCreate creates a new signed voucher on the given payment channel -// with the given lane and amount. The value passed in is exactly the value -// that will be used to create the voucher, so if previous vouchers exist, the -// actual additional value of this voucher will only be the difference between -// the two. -func (a *FullNodeAPI) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*types.SignedVoucher, error) { - return a.paychVoucherCreate(ctx, pch, types.SignedVoucher{Amount: amt, Lane: lane}) -} - -func (a *FullNodeAPI) paychVoucherCreate(ctx context.Context, pch address.Address, voucher types.SignedVoucher) (*types.SignedVoucher, error) { - ci, err := a.PaychMgr.GetChannelInfo(pch) - if err != nil { - return nil, err - } - - nonce, err := a.PaychMgr.NextNonceForLane(ctx, pch, voucher.Lane) - if err != nil { - return nil, err - } - - sv := &voucher - sv.Nonce = nonce - - vb, err := sv.SigningBytes() - if err != nil { - return nil, err - } - - sig, err := a.WalletSign(ctx, ci.ControlAddr, vb) - if err != nil { - return nil, err - } - - sv.Signature = sig - - if err := a.PaychMgr.AddVoucher(ctx, pch, sv); err != nil { - return nil, xerrors.Errorf("failed to persist voucher: %w", err) - } - - return sv, nil -} - -func (a *FullNodeAPI) PaychVoucherList(ctx context.Context, pch address.Address) ([]*types.SignedVoucher, error) { - return a.PaychMgr.ListVouchers(ctx, pch) -} - -func (a *FullNodeAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) { - ci, err := a.PaychMgr.GetChannelInfo(ch) - if err != nil { - return cid.Undef, err - } - - nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) - if err != nil { - return cid.Undef, err - } - - if sv.Extra != nil || len(sv.SecretPreimage) > 0 { - return cid.Undef, fmt.Errorf("cant handle more advanced payment channel stuff yet") - } - - enc, err := actors.SerializeParams(&actors.PCAUpdateChannelStateParams{ - Sv: *sv, - }) - if err != nil { - return cid.Undef, err - } - - msg := &types.Message{ - From: ci.ControlAddr, - To: ch, - Value: types.NewInt(0), - Nonce: nonce, - Method: actors.PCAMethods.UpdateChannelState, - Params: enc, - GasLimit: types.NewInt(100000), - GasPrice: types.NewInt(0), - } - - smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) - if err != nil { - return cid.Undef, err - } - - if err := a.MpoolPush(ctx, smsg); err != nil { - return cid.Undef, err - } - - // TODO: should we wait for it...? - return smsg.Cid(), nil -} - -var _ api.FullNode = &FullNodeAPI{} +var _ api.FullNode = &API{} diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go new file mode 100644 index 000000000..28fb01b62 --- /dev/null +++ b/node/impl/full/chain.go @@ -0,0 +1,181 @@ +package full + +import ( + "context" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/gen" + "github.com/filecoin-project/go-lotus/chain/state" + "github.com/filecoin-project/go-lotus/chain/store" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/chain/vm" + "github.com/filecoin-project/go-lotus/lib/bufbstore" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-hamt-ipld" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" +) + +type ChainAPI struct { + fx.In + + Chain *store.ChainStore + PubSub *pubsub.PubSub +} + +func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { + return a.Chain.SubHeadChanges(ctx), nil +} + +func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { + if err := a.Chain.AddBlock(blk.Header); err != nil { + return err + } + + b, err := blk.Serialize() + if err != nil { + return err + } + + // TODO: anything else to do here? + return a.PubSub.Publish("/fil/blocks", b) +} + +func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) { + return a.Chain.GetHeaviestTipSet(), nil +} + +func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet) ([]byte, error) { + // TODO: this needs to look back in the chain for the right random beacon value + return []byte("foo bar random"), nil +} + +func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { + blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg) + if err != nil { + return nil, err + } + + return &api.MsgWait{ + InBlock: blkcid, + Receipt: *recpt, + }, nil +} + +func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) { + return a.Chain.GetBlock(msg) +} + +func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { + b, err := a.Chain.GetBlock(msg) + if err != nil { + return nil, err + } + + bmsgs, smsgs, err := a.Chain.MessagesForBlock(b) + if err != nil { + return nil, err + } + + return &api.BlockMessages{ + BlsMessages: bmsgs, + SecpkMessages: smsgs, + }, nil +} + +func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) { + b, err := a.Chain.GetBlock(bcid) + if err != nil { + return nil, err + } + + // TODO: need to get the number of messages better than this + bm, sm, err := a.Chain.MessagesForBlock(b) + if err != nil { + return nil, err + } + + var out []*types.MessageReceipt + for i := 0; i < len(bm)+len(sm); i++ { + r, err := a.Chain.GetReceipt(b, i) + if err != nil { + return nil, err + } + + out = append(out, r) + } + + return out, nil +} + +func (a *ChainAPI) ChainCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { + return vm.Call(ctx, a.Chain, msg, ts) +} + +func (a *ChainAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) { + if ts == nil { + ts = a.Chain.GetHeaviestTipSet() + } + + st, err := a.Chain.TipSetState(ts.Cids()) + if err != nil { + return nil, err + } + + buf := bufbstore.NewBufferedBstore(a.Chain.Blockstore()) + cst := hamt.CSTFromBstore(buf) + return state.LoadStateTree(cst, st) +} + +func (a *ChainAPI) ChainGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) { + state, err := a.stateForTs(ts) + if err != nil { + return nil, err + } + + return state.GetActor(actor) +} + +func (a *ChainAPI) ChainReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) { + state, err := a.stateForTs(ts) + if err != nil { + return nil, err + } + + blk, err := state.Store.Blocks.GetBlock(ctx, act.Head) + if err != nil { + return nil, err + } + + oif, err := vm.DumpActorState(act.Code, blk.RawData()) + if err != nil { + return nil, err + } + + return &api.ActorState{ + Balance: act.Balance, + State: oif, + }, nil +} + +// This is on ChainAPI because miner.Miner requires this, and MinerAPI requires miner.Miner +func (a *ChainAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage) (*chain.BlockMsg, error) { + fblk, err := gen.MinerCreateBlock(ctx, a.Chain, addr, parents, tickets, proof, msgs) + if err != nil { + return nil, err + } + + var out chain.BlockMsg + out.Header = fblk.Header + for _, msg := range fblk.BlsMessages { + out.BlsMessages = append(out.BlsMessages, msg.Cid()) + } + for _, msg := range fblk.SecpkMessages { + out.SecpkMessages = append(out.SecpkMessages, msg.Cid()) + } + + return &out, nil +} diff --git a/node/impl/full/client.go b/node/impl/full/client.go new file mode 100644 index 000000000..49bf4b0aa --- /dev/null +++ b/node/impl/full/client.go @@ -0,0 +1,183 @@ +package full + +import ( + "context" + "errors" + "os" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/deals" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-filestore" + chunker "github.com/ipfs/go-ipfs-chunker" + files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-unixfs/importer/balanced" + ihelper "github.com/ipfs/go-unixfs/importer/helpers" + "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" +) + +type ClientAPI struct { + fx.In + + ChainAPI + WalletAPI + PaychAPI + + DealClient *deals.Client + + LocalDAG dtypes.ClientDAG + Filestore dtypes.ClientFilestore `optional:"true"` +} + +func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { + // TODO: make this a param + self, err := a.WalletDefaultAddress(ctx) + if err != nil { + return nil, err + } + + // get miner peerID + msg := &types.Message{ + To: miner, + From: miner, + Method: actors.MAMethods.GetPeerID, + } + + r, err := a.ChainCall(ctx, msg, nil) + if err != nil { + return nil, err + } + pid, err := peer.IDFromBytes(r.Return) + if err != nil { + return nil, err + } + + vd, err := a.DealClient.VerifyParams(ctx, data) + if err != nil { + return nil, err + } + + voucherData, err := cbor.DumpObject(vd) + if err != nil { + return nil, err + } + + // setup payments + total := types.BigMul(price, types.NewInt(blocksDuration)) + + // TODO: at least ping the miner before creating paych / locking the money + paych, paychMsg, err := a.paychCreate(ctx, self, miner, total) + if err != nil { + return nil, err + } + + voucher := types.SignedVoucher{ + // TimeLock: 0, // TODO: do we want to use this somehow? + Extra: &types.ModVerifyParams{ + Actor: miner, + Method: actors.MAMethods.PaymentVerifyInclusion, + Data: voucherData, + }, + Lane: 0, + Amount: total, + MinCloseHeight: blocksDuration, // TODO: some way to start this after initial piece inclusion by actor? (also, at least add current height) + } + + sv, err := a.paychVoucherCreate(ctx, paych, voucher) + if err != nil { + return nil, err + } + + proposal := deals.ClientDealProposal{ + Data: data, + TotalPrice: total, + Duration: blocksDuration, + Payment: actors.PaymentInfo{ + PayChActor: paych, + Payer: self, + ChannelMessage: paychMsg, + Vouchers: []types.SignedVoucher{*sv}, + }, + MinerAddress: miner, + ClientAddress: self, + MinerID: pid, + } + + c, err := a.DealClient.Start(ctx, proposal, vd) + // TODO: send updated voucher with PaymentVerifySector for cheaper validation (validate the sector the miner sent us first!) + return &c, err +} + +func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, error) { + f, err := os.Open(path) + if err != nil { + return cid.Undef, err + } + stat, err := f.Stat() + if err != nil { + return cid.Undef, err + } + + file, err := files.NewReaderPathFile(path, f, stat) + if err != nil { + return cid.Undef, err + } + + bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG) + + params := ihelper.DagBuilderParams{ + Maxlinks: ihelper.DefaultLinksPerBlock, + RawLeaves: true, + CidBuilder: nil, + Dagserv: bufferedDS, + NoCopy: true, + } + + db, err := params.New(chunker.DefaultSplitter(file)) + if err != nil { + return cid.Undef, err + } + nd, err := balanced.Layout(db) + if err != nil { + return cid.Undef, err + } + + return nd.Cid(), bufferedDS.Commit() +} + +func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) { + if a.Filestore == nil { + return nil, errors.New("listing imports is not supported with in-memory dag yet") + } + next, err := filestore.ListAll(a.Filestore, false) + if err != nil { + return nil, err + } + + // TODO: make this less very bad by tracking root cids instead of using ListAll + + out := make([]api.Import, 0) + for { + r := next() + if r == nil { + return out, nil + } + if r.Offset != 0 { + continue + } + out = append(out, api.Import{ + Status: r.Status, + Key: r.Key, + FilePath: r.FilePath, + Size: r.Size, + }) + } +} diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go new file mode 100644 index 000000000..5afc41d9b --- /dev/null +++ b/node/impl/full/mpool.go @@ -0,0 +1,41 @@ +package full + +import ( + "context" + "go.uber.org/fx" + + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" + + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +type MpoolAPI struct { + fx.In + + PubSub *pubsub.PubSub + Mpool *chain.MessagePool +} + +func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { + // TODO: need to make sure we don't return messages that were already included in the referenced chain + // also need to accept ts == nil just fine, assume nil == chain.Head() + return a.Mpool.Pending(), nil +} + +func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error { + msgb, err := smsg.Serialize() + if err != nil { + return err + } + if err := a.Mpool.Add(smsg); err != nil { + return err + } + + return a.PubSub.Publish("/fil/messages", msgb) +} + +func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { + return a.Mpool.GetNonce(addr) +} diff --git a/node/impl/full/paych.go b/node/impl/full/paych.go new file mode 100644 index 000000000..8922ba8d1 --- /dev/null +++ b/node/impl/full/paych.go @@ -0,0 +1,248 @@ +package full + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/paych" + + "github.com/ipfs/go-cid" + "go.uber.org/fx" + "golang.org/x/xerrors" +) + +type PaychAPI struct { + fx.In + + MpoolAPI + WalletAPI + ChainAPI + + PaychMgr *paych.Manager +} + +func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) { + act, _, err := a.paychCreate(ctx, from, to, amt) + return act, err +} + +func (a *PaychAPI) paychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { + params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to}) + if aerr != nil { + return address.Undef, cid.Undef, aerr + } + + nonce, err := a.MpoolGetNonce(ctx, from) + if err != nil { + return address.Undef, cid.Undef, err + } + + enc, err := actors.SerializeParams(&actors.ExecParams{ + Params: params, + Code: actors.PaymentChannelActorCodeCid, + }) + + msg := &types.Message{ + To: actors.InitActorAddress, + From: from, + Value: amt, + Nonce: nonce, + Method: actors.IAMethods.Exec, + Params: enc, + GasLimit: types.NewInt(1000), + GasPrice: types.NewInt(0), + } + + ser, err := msg.Serialize() + if err != nil { + return address.Undef, cid.Undef, err + } + + sig, err := a.WalletSign(ctx, from, ser) + if err != nil { + return address.Undef, cid.Undef, err + } + + smsg := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + + if err := a.MpoolPush(ctx, smsg); err != nil { + return address.Undef, cid.Undef, err + } + + mwait, err := a.ChainWaitMsg(ctx, smsg.Cid()) + if err != nil { + return address.Undef, cid.Undef, err + } + + if mwait.Receipt.ExitCode != 0 { + return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) + } + + paychaddr, err := address.NewFromBytes(mwait.Receipt.Return) + if err != nil { + return address.Undef, cid.Undef, err + } + + if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil { + return address.Undef, cid.Undef, err + } + + return paychaddr, msg.Cid(), nil +} + +func (a *PaychAPI) PaychList(ctx context.Context) ([]address.Address, error) { + return a.PaychMgr.ListChannels() +} + +func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.PaychStatus, error) { + panic("nyi") +} + +func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Cid, error) { + ci, err := a.PaychMgr.GetChannelInfo(addr) + if err != nil { + return cid.Undef, err + } + + nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) + if err != nil { + return cid.Undef, err + } + + msg := &types.Message{ + To: addr, + From: ci.ControlAddr, + Value: types.NewInt(0), + Method: actors.PCAMethods.Close, + Nonce: nonce, + + GasLimit: types.NewInt(500), + GasPrice: types.NewInt(0), + } + + smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) + if err != nil { + return cid.Undef, err + } + + if err := a.MpoolPush(ctx, smsg); err != nil { + return cid.Undef, err + } + + return smsg.Cid(), nil +} + +func (a *PaychAPI) PaychVoucherCheckValid(ctx context.Context, ch address.Address, sv *types.SignedVoucher) error { + return a.PaychMgr.CheckVoucherValid(ctx, ch, sv) +} + +func (a *PaychAPI) PaychVoucherCheckSpendable(ctx context.Context, ch address.Address, sv *types.SignedVoucher, secret []byte, proof []byte) (bool, error) { + return a.PaychMgr.CheckVoucherSpendable(ctx, ch, sv, secret, proof) +} + +func (a *PaychAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *types.SignedVoucher) error { + if err := a.PaychVoucherCheckValid(ctx, ch, sv); err != nil { + return err + } + + return a.PaychMgr.AddVoucher(ctx, ch, sv) +} + +// PaychVoucherCreate creates a new signed voucher on the given payment channel +// with the given lane and amount. The value passed in is exactly the value +// that will be used to create the voucher, so if previous vouchers exist, the +// actual additional value of this voucher will only be the difference between +// the two. +func (a *PaychAPI) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*types.SignedVoucher, error) { + return a.paychVoucherCreate(ctx, pch, types.SignedVoucher{Amount: amt, Lane: lane}) +} + +func (a *PaychAPI) paychVoucherCreate(ctx context.Context, pch address.Address, voucher types.SignedVoucher) (*types.SignedVoucher, error) { + ci, err := a.PaychMgr.GetChannelInfo(pch) + if err != nil { + return nil, err + } + + nonce, err := a.PaychMgr.NextNonceForLane(ctx, pch, voucher.Lane) + if err != nil { + return nil, err + } + + sv := &voucher + sv.Nonce = nonce + + vb, err := sv.SigningBytes() + if err != nil { + return nil, err + } + + sig, err := a.WalletSign(ctx, ci.ControlAddr, vb) + if err != nil { + return nil, err + } + + sv.Signature = sig + + if err := a.PaychMgr.AddVoucher(ctx, pch, sv); err != nil { + return nil, xerrors.Errorf("failed to persist voucher: %w", err) + } + + return sv, nil +} + +func (a *PaychAPI) PaychVoucherList(ctx context.Context, pch address.Address) ([]*types.SignedVoucher, error) { + return a.PaychMgr.ListVouchers(ctx, pch) +} + +func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) { + ci, err := a.PaychMgr.GetChannelInfo(ch) + if err != nil { + return cid.Undef, err + } + + nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) + if err != nil { + return cid.Undef, err + } + + if sv.Extra != nil || len(sv.SecretPreimage) > 0 { + return cid.Undef, fmt.Errorf("cant handle more advanced payment channel stuff yet") + } + + enc, err := actors.SerializeParams(&actors.PCAUpdateChannelStateParams{ + Sv: *sv, + }) + if err != nil { + return cid.Undef, err + } + + msg := &types.Message{ + From: ci.ControlAddr, + To: ch, + Value: types.NewInt(0), + Nonce: nonce, + Method: actors.PCAMethods.UpdateChannelState, + Params: enc, + GasLimit: types.NewInt(100000), + GasPrice: types.NewInt(0), + } + + smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) + if err != nil { + return cid.Undef, err + } + + if err := a.MpoolPush(ctx, smsg); err != nil { + return cid.Undef, err + } + + // TODO: should we wait for it...? + return smsg.Cid(), nil +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go new file mode 100644 index 000000000..5a557948b --- /dev/null +++ b/node/impl/full/state.go @@ -0,0 +1,139 @@ +package full + +import ( + "context" + "fmt" + "strconv" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/state" + "github.com/filecoin-project/go-lotus/chain/store" + + "github.com/ipfs/go-hamt-ipld" + cbor "github.com/ipfs/go-ipld-cbor" + "go.uber.org/fx" +) + +type StateAPI struct { + fx.In + + Chain *store.ChainStore +} + +func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { + ts := a.Chain.GetHeaviestTipSet() + + stc, err := a.Chain.TipSetState(ts.Cids()) + if err != nil { + return nil, err + } + + cst := hamt.CSTFromBstore(a.Chain.Blockstore()) + + st, err := state.LoadStateTree(cst, stc) + if err != nil { + return nil, err + } + + act, err := st.GetActor(addr) + if err != nil { + return nil, err + } + + var minerState actors.StorageMinerActorState + if err := cst.Get(ctx, act.Head, &minerState); err != nil { + return nil, err + } + + nd, err := hamt.LoadNode(ctx, cst, minerState.Sectors) + if err != nil { + return nil, err + } + + var sinfos []*api.SectorInfo + // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set + err = nd.ForEach(ctx, func(k string, val interface{}) error { + sid, err := strconv.ParseUint(k, 10, 64) + if err != nil { + return err + } + + bval, ok := val.([]byte) + if !ok { + return fmt.Errorf("expected to get bytes in sector set hamt") + } + + var comms [][]byte + if err := cbor.DecodeInto(bval, &comms); err != nil { + return err + } + + sinfos = append(sinfos, &api.SectorInfo{ + SectorID: sid, + CommR: comms[0], + CommD: comms[1], + }) + return nil + }) + return sinfos, nil +} + +func (a *StateAPI) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { + ts := a.Chain.GetHeaviestTipSet() + + stc, err := a.Chain.TipSetState(ts.Cids()) + if err != nil { + return nil, err + } + + cst := hamt.CSTFromBstore(a.Chain.Blockstore()) + + st, err := state.LoadStateTree(cst, stc) + if err != nil { + return nil, err + } + + act, err := st.GetActor(addr) + if err != nil { + return nil, err + } + + var minerState actors.StorageMinerActorState + if err := cst.Get(ctx, act.Head, &minerState); err != nil { + return nil, err + } + + nd, err := hamt.LoadNode(ctx, cst, minerState.ProvingSet) + if err != nil { + return nil, err + } + + var sinfos []*api.SectorInfo + // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set + err = nd.ForEach(ctx, func(k string, val interface{}) error { + sid, err := strconv.ParseUint(k, 10, 64) + if err != nil { + return err + } + + bval, ok := val.([]byte) + if !ok { + return fmt.Errorf("expected to get bytes in sector set hamt") + } + + var comms [][]byte + if err := cbor.DecodeInto(bval, &comms); err != nil { + return err + } + + sinfos = append(sinfos, &api.SectorInfo{ + SectorID: sid, + CommR: comms[0], + CommD: comms[1], + }) + return nil + }) + return sinfos, nil +} diff --git a/node/impl/full/wallet.go b/node/impl/full/wallet.go new file mode 100644 index 000000000..979740301 --- /dev/null +++ b/node/impl/full/wallet.go @@ -0,0 +1,70 @@ +package full + +import ( + "context" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/store" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/chain/wallet" + + "go.uber.org/fx" + "golang.org/x/xerrors" +) + +type WalletAPI struct { + fx.In + + Chain *store.ChainStore + Wallet *wallet.Wallet +} + +func (a *WalletAPI) WalletNew(ctx context.Context, typ string) (address.Address, error) { + return a.Wallet.GenerateKey(typ) +} + +func (a *WalletAPI) WalletHas(ctx context.Context, addr address.Address) (bool, error) { + return a.Wallet.HasKey(addr) +} + +func (a *WalletAPI) WalletList(ctx context.Context) ([]address.Address, error) { + return a.Wallet.ListAddrs() +} + +func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { + return a.Chain.GetBalance(addr) +} + +func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*types.Signature, error) { + return a.Wallet.Sign(ctx, k, msg) +} + +func (a *WalletAPI) WalletSignMessage(ctx context.Context, k address.Address, msg *types.Message) (*types.SignedMessage, error) { + msgbytes, err := msg.Serialize() + if err != nil { + return nil, err + } + + sig, err := a.WalletSign(ctx, k, msgbytes) + if err != nil { + return nil, xerrors.Errorf("failed to sign message: %w", err) + } + + return &types.SignedMessage{ + Message: *msg, + Signature: *sig, + }, nil +} + +func (a *WalletAPI) WalletDefaultAddress(ctx context.Context) (address.Address, error) { + addrs, err := a.Wallet.ListAddrs() + if err != nil { + return address.Undef, err + } + if len(addrs) == 0 { + return address.Undef, xerrors.New("no addresses in wallet") + } + + // TODO: store a default address in the config or 'wallet' portion of the repo + return addrs[0], nil +}