Merge pull request #196 from filecoin-project/feat/retrieval-payments

Retrieval Payments
This commit is contained in:
Łukasz Magiera 2019-09-17 10:50:10 +02:00 committed by GitHub
commit fc5c455cf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 756 additions and 349 deletions

View File

@ -58,6 +58,7 @@ type FullNode interface {
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
MpoolGetNonce(context.Context, address.Address) (uint64, error)
// FullNodeStruct
@ -104,20 +105,22 @@ type FullNode interface {
StateMinerProvingSet(context.Context, address.Address) ([]*SectorInfo, error)
StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
// if tipset is nil, we'll use heaviest
StateCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error)
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error)
PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychClose(context.Context, address.Address) (cid.Cid, error)
PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error)
PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error)
PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error)
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte) error
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
PaychVoucherList(context.Context, address.Address) ([]*types.SignedVoucher, error)
PaychVoucherSubmit(context.Context, address.Address, *types.SignedVoucher) (cid.Cid, error)
}
@ -246,8 +249,10 @@ type QueryOffer struct {
func (o *QueryOffer) Order() RetrievalOrder {
return RetrievalOrder{
Root: o.Root,
Size: o.Size,
Root: o.Root,
Size: o.Size,
Total: o.MinPrice,
Miner: o.Miner,
MinerPeerID: o.MinerPeerID,
}
@ -258,7 +263,9 @@ type RetrievalOrder struct {
Root cid.Cid
Size uint64
// TODO: support offset
Total types.BigInt
Client address.Address
Miner address.Address
MinerPeerID peer.ID
}

View File

@ -48,8 +48,9 @@ type FullNodeStruct struct {
ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
@ -78,19 +79,21 @@ type FullNodeStruct struct {
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"`
StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"`
StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"`
StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"`
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
PaychCreate func(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"`
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
PaychAllocateLane func(context.Context, address.Address) (uint64, error) `perm:"sign"`
PaychNewPayment func(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) `perm:"sign"`
PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckSpendable func(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"`
PaychVoucherAdd func(context.Context, address.Address, *types.SignedVoucher, []byte) error `perm:"write"`
PaychVoucherAdd func(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) `perm:"write"`
PaychVoucherCreate func(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error) `perm:"sign"`
PaychVoucherList func(context.Context, address.Address) ([]*types.SignedVoucher, error) `perm:"write"`
PaychVoucherSubmit func(context.Context, address.Address, *types.SignedVoucher) (cid.Cid, error) `perm:"sign"`
@ -191,6 +194,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg)
}
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
return c.Internal.MpoolPushMessage(ctx, msg)
}
func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error {
return c.Internal.MinerRegister(ctx, addr)
}
@ -287,6 +294,10 @@ func (c *FullNodeStruct) StateMinerWorker(ctx context.Context, m address.Address
return c.Internal.StateMinerWorker(ctx, m, ts)
}
func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) {
return c.Internal.StateMinerPeerID(ctx, m, ts)
}
func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
return c.Internal.StateCall(ctx, msg, ts)
}
@ -299,8 +310,8 @@ func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, t
return c.Internal.StateReadState(ctx, act, ts)
}
func (c *FullNodeStruct) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychCreate(ctx, from, to, amt)
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
}
func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) {
@ -319,8 +330,8 @@ func (c *FullNodeStruct) PaychVoucherCheckSpendable(ctx context.Context, addr ad
return c.Internal.PaychVoucherCheckSpendable(ctx, addr, sv, secret, proof)
}
func (c *FullNodeStruct) PaychVoucherAdd(ctx context.Context, addr address.Address, sv *types.SignedVoucher, proof []byte) error {
return c.Internal.PaychVoucherAdd(ctx, addr, sv, proof)
func (c *FullNodeStruct) PaychVoucherAdd(ctx context.Context, addr address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
return c.Internal.PaychVoucherAdd(ctx, addr, sv, proof, minDelta)
}
func (c *FullNodeStruct) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*types.SignedVoucher, error) {
@ -335,6 +346,10 @@ func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid
return c.Internal.PaychClose(ctx, a)
}
func (c *FullNodeStruct) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return c.Internal.PaychAllocateLane(ctx, ch)
}
func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) {
return c.Internal.PaychNewPayment(ctx, from, to, amount, extra, tl, minClose)
}

View File

@ -18,10 +18,12 @@ const (
DealSealing
DealComplete
// Client specific
// Internal
DealError // deal failed with an unexpected error
DealExpired
DealNoUpdate = DealUnknown
)
// TODO: check if this exists anywhere else

View File

@ -32,6 +32,7 @@ func init() {
cbor.RegisterCborType(types.ModVerifyParams{})
cbor.RegisterCborType(types.Signature{})
cbor.RegisterCborType(actors.PaymentInfo{})
cbor.RegisterCborType(api.PaymentInfo{})
cbor.RegisterCborType(actors.InclusionProof{})
}

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/storage/commitment"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
@ -45,6 +46,7 @@ type Handler struct {
askLk sync.Mutex
secst *sectorblocks.SectorBlocks
commt *commitment.Tracker
full api.FullNode
// TODO: Use a custom protocol or graphsync in the future
@ -71,7 +73,7 @@ type minerDealUpdate struct {
mut func(*MinerDeal)
}
func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *commitment.Tracker, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
@ -83,6 +85,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
h := &Handler{
secst: secst,
commt: commt,
dag: dag,
full: fullNode,
@ -186,6 +189,8 @@ func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) {
h.handle(ctx, deal, h.staged, api.DealSealing)
case api.DealSealing:
h.handle(ctx, deal, h.sealing, api.DealComplete)
case api.DealComplete:
h.handle(ctx, deal, h.complete, api.DealNoUpdate)
}
}

View File

@ -22,6 +22,11 @@ type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) {
go func() {
mut, err := cb(ctx, deal)
if err == nil && next == api.DealNoUpdate {
return
}
select {
case h.updated <- minerDealUpdate{
newState: next,
@ -119,7 +124,8 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
}
for i, voucher := range deal.Proposal.Payment.Vouchers {
if err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil); err != nil {
// TODO: Set correct minAmount
if _, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil, types.NewInt(0)); err != nil {
return nil, xerrors.Errorf("consuming payment voucher %d: %w", i, err)
}
}
@ -190,8 +196,8 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
}
func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID)
func (h *Handler) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
status, err := h.secst.WaitSeal(ctx, deal.SectorID)
if err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
@ -212,7 +218,7 @@ func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus,
}
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
status, err := h.waitSealed(deal)
status, err := h.waitSealed(ctx, deal)
if err != nil {
return nil, err
}
@ -235,7 +241,8 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// store proofs for channels
for i, v := range deal.Proposal.Payment.Vouchers {
if v.Extra.Method == actors.MAMethods.PaymentVerifyInclusion {
if err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, v, proofB); err != nil {
// TODO: Set correct minAmount
if _, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, v, proofB, types.NewInt(0)); err != nil {
return nil, xerrors.Errorf("storing payment voucher %d proof: %w", i, err)
}
}
@ -253,3 +260,22 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, nil
}
func (h *Handler) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
mcid, err := h.commt.WaitCommit(ctx, deal.Proposal.MinerAddress, deal.SectorID)
if err != nil {
log.Warnf("Waiting for sector commitment message: %s", err)
}
err = h.sendSignedResponse(StorageDealResponse{
State: api.DealComplete,
Proposal: deal.ProposalCid,
SectorCommitMessage: &mcid,
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)
}
return nil, nil
}

View File

@ -2,6 +2,7 @@ package chain
import (
"encoding/base64"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
@ -16,6 +17,8 @@ type MessagePool struct {
pending map[address.Address]*msgSet
sm *stmgr.StateManager
ps *pubsub.PubSub
}
type msgSet struct {
@ -36,20 +39,38 @@ func (ms *msgSet) add(m *types.SignedMessage) {
ms.msgs[m.Message.Nonce] = m
}
func NewMessagePool(sm *stmgr.StateManager) *MessagePool {
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
mp := &MessagePool{
pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
}
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
return mp
}
func (mp *MessagePool) Push(m *types.SignedMessage) error {
msgb, err := m.Serialize()
if err != nil {
return err
}
if err := mp.Add(m); err != nil {
return err
}
return mp.ps.Publish("/fil/messages", msgb)
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.addLocked(m)
}
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
data, err := m.Message.Serialize()
if err != nil {
return err
@ -79,6 +100,10 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.getNonceLocked(addr)
}
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
mset, ok := mp.pending[addr]
if ok {
return mset.startNonce + uint64(len(mset.msgs)), nil
@ -92,6 +117,32 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
return act.Nonce, nil
}
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
mp.lk.Lock()
defer mp.lk.Unlock()
nonce, err := mp.getNonceLocked(addr)
if err != nil {
return nil, err
}
msg, err := cb(nonce)
if err != nil {
return nil, err
}
msgb, err := msg.Serialize()
if err != nil {
return nil, err
}
if err := mp.addLocked(msg); err != nil {
return nil, err
}
return msg, mp.ps.Publish("/fil/messages", msgb)
}
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
mp.lk.Lock()
defer mp.lk.Unlock()

View File

@ -2,6 +2,7 @@ package stmgr
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
@ -104,3 +105,20 @@ func GetPower(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr add
return mpow, tpow, nil
}
func GetMinerPeerID(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (peer.ID, error) {
recp, err := sm.Call(ctx, &types.Message{
To: maddr,
From: maddr,
Method: actors.MAMethods.GetPeerID,
}, ts)
if err != nil {
return "", xerrors.Errorf("callRaw failed: %w", err)
}
if recp.ExitCode != 0 {
return "", xerrors.Errorf("getting miner peer ID failed (exit code %d)", recp.ExitCode)
}
return peer.IDFromBytes(recp.Return)
}

View File

@ -165,23 +165,39 @@ var clientFindCmd = &cli.Command{
var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "retrieve data from network",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Usage: "address to use for transactions",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
fmt.Println("Usage: retrieve [CID] [outfile]")
return nil
}
file, err := cid.Parse(cctx.Args().First())
if err != nil {
return err
}
api, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
ctx := ReqContext(cctx)
var payer address.Address
if cctx.String("address") != "" {
payer, err = address.NewFromString(cctx.String("address"))
} else {
payer, err = api.WalletDefaultAddress(ctx)
}
if err != nil {
return err
}
file, err := cid.Parse(cctx.Args().Get(0))
if err != nil {
return err
}
// Check if we already have this data locally
/*has, err := api.ClientHasLocal(ctx, file)
@ -202,6 +218,8 @@ var clientRetrieveCmd = &cli.Command{
// TODO: parse offer strings from `client find`, make this smarter
order := offers[0].Order()
order.Client = payer
err = api.ClientRetrieve(ctx, order, cctx.Args().Get(1))
if err == nil {
fmt.Println("Success")

View File

@ -12,18 +12,18 @@ var paychCmd = &cli.Command{
Name: "paych",
Usage: "Manage payment channels",
Subcommands: []*cli.Command{
paychCreateCmd,
paychGetCmd,
paychListCmd,
paychVoucherCmd,
},
}
var paychCreateCmd = &cli.Command{
Name: "create",
Usage: "Create a new payment channel",
var paychGetCmd = &cli.Command{
Name: "get",
Usage: "Create a new payment channel or get existing one",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 3 {
return fmt.Errorf("must pass three arguments: <from> <to> <amount>")
return fmt.Errorf("must pass three arguments: <from> <to> <available funds>")
}
from, err := address.NewFromString(cctx.Args().Get(0))
@ -48,7 +48,7 @@ var paychCreateCmd = &cli.Command{
ctx := ReqContext(cctx)
info, err := api.PaychCreate(ctx, from, to, amt)
info, err := api.PaychGet(ctx, from, to, amt)
if err != nil {
return err
}
@ -203,7 +203,7 @@ var paychVoucherAddCmd = &cli.Command{
ctx := ReqContext(cctx)
// TODO: allow passing proof bytes
if err := api.PaychVoucherAdd(ctx, ch, sv, nil); err != nil {
if _, err := api.PaychVoucherAdd(ctx, ch, sv, nil, types.NewInt(0)); err != nil {
return err
}

View File

@ -2,9 +2,8 @@ package cli
import (
"fmt"
"github.com/filecoin-project/go-lotus/chain/address"
types "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/types"
"gopkg.in/urfave/cli.v2"
)
@ -56,39 +55,19 @@ var sendCmd = &cli.Command{
fromAddr = addr
}
nonce, err := api.MpoolGetNonce(ctx, fromAddr)
if err != nil {
return err
}
msg := &types.Message{
From: fromAddr,
To: toAddr,
Value: val,
Nonce: nonce,
GasLimit: types.NewInt(1000),
GasPrice: types.NewInt(0),
}
sermsg, err := msg.Serialize()
_, err = api.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
sig, err := api.WalletSign(ctx, fromAddr, sermsg)
if err != nil {
return err
}
smsg := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
if err := api.MpoolPush(ctx, smsg); err != nil {
return err
}
return nil
},
}

View File

@ -166,31 +166,21 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A
return err
}
nonce, err := api.MpoolGetNonce(ctx, waddr)
if err != nil {
return err
}
msg := &types.Message{
To: addr,
From: waddr,
Method: actors.MAMethods.UpdatePeerID,
Params: enc,
Nonce: nonce,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
}
smsg, err := api.WalletSignMessage(ctx, waddr, msg)
smsg, err := api.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
if err := api.MpoolPush(ctx, smsg); err != nil {
return err
}
log.Info("Waiting for message: ", smsg.Cid())
ret, err := api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
@ -212,11 +202,6 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
return address.Undef, err
}
nonce, err := api.MpoolGetNonce(ctx, defOwner)
if err != nil {
return address.Undef, err
}
k, err := api.WalletNew(ctx, types.KTBLS)
if err != nil {
return address.Undef, err
@ -234,10 +219,9 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
return address.Undef, err
}
createStorageMinerMsg := types.Message{
createStorageMinerMsg := &types.Message{
To: actors.StorageMarketAddress,
From: defOwner,
Nonce: nonce,
Value: collateral,
Method: actors.SMAMethods.CreateStorageMiner,
@ -247,30 +231,12 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
GasPrice: types.NewInt(0),
}
unsigned, err := createStorageMinerMsg.Serialize()
if err != nil {
return address.Undef, err
}
log.Info("Signing StorageMarket.CreateStorageMiner")
sig, err := api.WalletSign(ctx, defOwner, unsigned)
if err != nil {
return address.Undef, err
}
signed := &types.SignedMessage{
Message: createStorageMinerMsg,
Signature: *sig,
}
log.Infof("Pushing %s to Mpool", signed.Cid())
err = api.MpoolPush(ctx, signed)
signed, err := api.MpoolPushMessage(ctx, createStorageMinerMsg)
if err != nil {
return address.Undef, err
}
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
log.Infof("Waiting for confirmation")
mw, err := api.ChainWaitMsg(ctx, signed.Cid())

2
go.mod
View File

@ -85,6 +85,8 @@ require (
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
)
replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0
replace github.com/filecoin-project/go-bls-sigs => ./extern/go-bls-sigs
replace github.com/filecoin-project/go-sectorbuilder => ./extern/go-sectorbuilder

25
go.sum
View File

@ -9,7 +9,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/OpenPeeDeeP/depguard v1.0.0/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
@ -78,7 +78,7 @@ github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 h1:EzDjxMg43q1tA2c0MV3tNbaontnHLplHyFF
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1/go.mod h1:0eHX/BVySxPc6SE2mZRoppGq7qcEagxdmQnA3dzork8=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-critic/go-critic v0.0.0-20181204210945-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-lintpack/lintpack v0.5.2/go.mod h1:NwZuYi2nUHho8XEIZ6SIxihrnPoqBTDqfpXvXAN0sXM=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
@ -118,16 +118,16 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4=
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a/go.mod h1:ryS0uhF+x9jgbj/N71xsEqODy9BN81/GonCZiOzirOk=
github.com/golangci/errcheck v0.0.0-20181003203344-ef45e06d44b6/go.mod h1:DbHgvLiFKX1Sh2T1w8Q/h4NAI8MHIpzCdnBUDTXU3I0=
github.com/golangci/errcheck v0.0.0-20181223084120-ef45e06d44b6/go.mod h1:DbHgvLiFKX1Sh2T1w8Q/h4NAI8MHIpzCdnBUDTXU3I0=
github.com/golangci/go-misc v0.0.0-20180628070357-927a3d87b613/go.mod h1:SyvUF2NxV+sN8upjjeVYr5W7tyxaT1JVtvhKhOn2ii8=
github.com/golangci/go-tools v0.0.0-20180109140146-af6baa5dc196/go.mod h1:unzUULGw35sjyOYjUt0jMTXqHlZPpPc6e+xfO4cd6mM=
github.com/golangci/go-tools v0.0.0-20190318055746-e32c54105b7c/go.mod h1:unzUULGw35sjyOYjUt0jMTXqHlZPpPc6e+xfO4cd6mM=
github.com/golangci/goconst v0.0.0-20180610141641-041c5f2b40f3/go.mod h1:JXrF4TWy4tXYn62/9x8Wm/K/dm06p8tCKwFRDPZG/1o=
github.com/golangci/gocyclo v0.0.0-20180528134321-2becd97e67ee/go.mod h1:ozx7R9SIwqmqf5pRP90DhR2Oay2UIjGuKheCBCNwAYU=
github.com/golangci/gofmt v0.0.0-20181105071733-0b8337e80d98/go.mod h1:9qCChq59u/eW8im404Q2WWTrnBUQKjpNYKMbU4M7EFU=
github.com/golangci/golangci-lint v1.17.1/go.mod h1:+5sJSl2h3aly+fpmL2meSP8CaSKua2E4Twi9LPy7b1g=
github.com/golangci/gosec v0.0.0-20180901114220-66fb7fc33547/go.mod h1:0qUabqiIQgfmlAmulqxyiGkkyF6/tOGSnY2cnPVwrzU=
github.com/golangci/ineffassign v0.0.0-20180808204949-42439a7714cc/go.mod h1:e5tpTHCfVze+7EpLEozzMB3eafxo2KT5veNg1k6byQU=
github.com/golangci/lint-1 v0.0.0-20180610141402-ee948d087217/go.mod h1:66R6K6P6VWk9I95jvqGxkqJxVWGFy9XlDwLwVz1RCFg=
github.com/golangci/gofmt v0.0.0-20181222123516-0b8337e80d98/go.mod h1:9qCChq59u/eW8im404Q2WWTrnBUQKjpNYKMbU4M7EFU=
github.com/golangci/golangci-lint v1.18.0/go.mod h1:kaqo8l0OZKYPtjNmG4z4HrWLgcYNIJ9B9q3LWri9uLg=
github.com/golangci/gosec v0.0.0-20190211064107-66fb7fc33547/go.mod h1:0qUabqiIQgfmlAmulqxyiGkkyF6/tOGSnY2cnPVwrzU=
github.com/golangci/ineffassign v0.0.0-20190609212857-42439a7714cc/go.mod h1:e5tpTHCfVze+7EpLEozzMB3eafxo2KT5veNg1k6byQU=
github.com/golangci/lint-1 v0.0.0-20190420132249-ee948d087217/go.mod h1:66R6K6P6VWk9I95jvqGxkqJxVWGFy9XlDwLwVz1RCFg=
github.com/golangci/maligned v0.0.0-20180506175553-b1d89398deca/go.mod h1:tvlJhZqDe4LMs4ZHD0oMUlt9G2LWuDGoisJTBzLMV9o=
github.com/golangci/misspell v0.0.0-20180809174111-950f5d19e770/go.mod h1:dEbvlSfYbMQDtrpRMQU675gSDLDNa8sCPPChZ7PhiVA=
github.com/golangci/prealloc v0.0.0-20180630174525-215b22d4de21/go.mod h1:tf5+bzsHdTM0bsB7+8mt0GUMvjCgwLpTapNZHU8AajI=
@ -586,8 +586,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/timakin/bodyclose v0.0.0-20190407043127-4a873e97b2bb/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
github.com/timakin/bodyclose v0.0.0-20190721030226-87058b9bfcec/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ultraware/funlen v0.0.1/go.mod h1:Dp4UiAus7Wdb9KUZsYWZEWiRzGuM2kXM1lPbfaF6xhA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
@ -682,7 +683,6 @@ golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190313220215-9f648a60d977/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -752,6 +752,7 @@ golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20190521203540-521d6ed310dd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190813142322-97f12d73768f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -801,5 +802,5 @@ launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbc
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed/go.mod h1:Xkxe497xwlCKkIaQYRfC7CSLworTXY9RMqwhhCm+8Nc=
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b/go.mod h1:2odslEg/xrtNQqCYg2/jCoyKnw3vv5biOc3JnIcYfL4=
mvdan.cc/unparam v0.0.0-20190124213536-fbb59629db34/go.mod h1:H6SUd1XjIs+qQCyskXg5OFSrilMRUkD8ePJpHKDPaeY=
mvdan.cc/unparam v0.0.0-20190209190245-fbb59629db34/go.mod h1:H6SUd1XjIs+qQCyskXg5OFSrilMRUkD8ePJpHKDPaeY=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=

View File

@ -115,10 +115,10 @@ func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rp
h.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
}
func doCall(f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
defer func() {
if i := recover(); i != nil {
err = xerrors.Errorf("panic in rpc method: %s", i)
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
log.Error(err)
}
}()
@ -193,7 +193,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
///////////////////
callResult, err := doCall(handler.handlerFunc, callParams)
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
if err != nil {
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
return
@ -212,7 +212,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
log.Warnf("error in RPC call: %s", err)
log.Warnf("error in RPC call to '%s': %s", req.Method, err)
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),

View File

@ -36,6 +36,7 @@ class Block extends React.Component {
<div>
<Address client={this.props.conn} addr={m.From} mountWindow={this.props.mountWindow}/><b>&nbsp;=>&nbsp;</b>
<Address client={this.props.conn} addr={m.To} mountWindow={this.props.mountWindow} transfer={m.Value} method={m.Method}/>
<span>&nbsp;N{m.Nonce}</span>
<span>&nbsp;{m.receipt.GasUsed}Gas</span>
{m.receipt.ExitCode !== 0 ? <span>&nbsp;<b>EXIT:{m.receipt.ExitCode}</b></span> : <span/>}
</div>
@ -56,7 +57,7 @@ class Block extends React.Component {
)
}
return (<Cristal initialSize={{width: 700, height: 200}} onClose={this.props.onClose} title={`Block ${this.props.cid['/']}`}>
return (<Cristal className="CristalScroll" initialSize={{width: 700, height: 400}} onClose={this.props.onClose} title={`Block ${this.props.cid['/']}`}>
{content}
</Cristal>)
}

View File

@ -49,6 +49,23 @@ class Client extends React.Component {
console.log("deal cid: ", dealcid)
}
retrieve = (deal) => async () => {
console.log(deal)
let client = await this.props.client.call('Filecoin.WalletDefaultAddress', [])
let order = {
Root: deal.PieceRef,
Size: deal.Size,
// TODO: support offset
Total: String(deal.Size * 2),
Client: client,
Miner: deal.Miner
}
await this.props.client.call('Filecoin.ClientRetrieve', [order, '/dev/null'])
}
render() {
let ppb = Math.round(this.state.total / this.state.blocks * 100) / 100
let ppmbb = Math.round(ppb / (this.state.kbs / 1000) * 100) / 100
@ -67,6 +84,7 @@ class Client extends React.Component {
let deals = this.state.deals.map((deal, i) => <div key={i}>
<ul>
<li>{i}. Proposal: {deal.ProposalCid['/'].substr(0, 18)}... <Address nobalance={true} client={this.props.client} addr={deal.Miner} mountWindow={this.props.mountWindow}/>: <b>{dealStates[deal.State]}</b>
{dealStates[deal.State] === 'Complete' ? <span>&nbsp;<a href="#" onClick={this.retrieve(deal)}>[Retrieve]</a></span> : <span/> }
<ul>
<li>Data: {deal.PieceRef['/']}, <b>{deal.Size}</b>B; Duration: <b>{deal.Duration}</b>Blocks</li>
<li>Total: <b>{deal.TotalPrice}</b>FIL; Per Block: <b>{Math.round(deal.TotalPrice / deal.Duration * 100) / 100}</b>FIL; PerMbyteByteBlock: <b>{Math.round(deal.TotalPrice / deal.Duration / (deal.Size / 1000000) * 100) / 100}</b>FIL</li>

View File

@ -132,7 +132,7 @@ class FullNode extends React.Component {
extra = <span>Verif: &lt;<b><Address nobalance={true} client={this.props.client} addr={voucher.Extra.Actor} method={voucher.Extra.Method} mountWindow={this.props.mountWindow}/></b>&gt;</span>
}
return <div key={voucher.Nonce} className="FullNode-voucher">
return <div key={`${addr} ${voucher.Lane} ${voucher.Nonce}`} className="FullNode-voucher">
Voucher Nonce:<b>{voucher.Nonce}</b> Lane:<b>{voucher.Lane}</b> Amt:<b>{voucher.Amount}</b> TL:<b>{voucher.TimeLock}</b> MinCl:<b>{voucher.MinCloseHeight}</b> {extra}
</div>
})

View File

@ -104,11 +104,11 @@ class StorageNode extends React.Component {
<div>
<Address client={this.props.fullConn} addr={this.state.actor} mountWindow={this.props.mountWindow}/>
</div>
<div>{this.state.statusCounts.map((c, i) => <span>{sealCodes[i]}: {c} | </span>)}</div>
<div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div>
<div>
{this.state.staged ? this.state.staged.map(s => (
<div>{s.SectorID} {sealCodes[s.SealStatusCode]}</div>
)) : <div></div>}
{this.state.staged ? this.state.staged.map((s, i) => (
<div key={i}>{s.SectorID} {sealCodes[s.SealStatusCode]}</div>
)) : <div/>}
</div>
</div>

View File

@ -19,8 +19,6 @@ async function pushMessage(client, from, inmsg) {
inmsg.Method = 0
}
inmsg.Nonce = await client.call('Filecoin.MpoolGetNonce', [from])
/* const msg = [
inmsg.To,
inmsg.From,
@ -36,11 +34,9 @@ async function pushMessage(client, from, inmsg) {
Buffer.from(inmsg.Params, 'base64'),
]*/
const signed = await client.call('Filecoin.WalletSignMessage', [from, inmsg])
console.log(inmsg)
console.log(signed)
await client.call('Filecoin.MpoolPush', [signed])
await client.call('Filecoin.MpoolPushMessage', [inmsg])
}
export default pushMessage

View File

@ -1,6 +1,7 @@
package main
import (
"bufio"
"fmt"
"github.com/gorilla/websocket"
"github.com/opentracing/opentracing-go/log"
@ -44,14 +45,19 @@ func newWsMux() *outmux {
func (m *outmux) msgsToChan(r *io.PipeReader, ch chan []byte) {
defer close(ch)
br := bufio.NewReader(r)
for {
buf := make([]byte, 1)
n, err := r.Read(buf)
buf, _, err := br.ReadLine()
if err != nil {
return
}
out := make([]byte, len(buf)+1)
copy(out, buf)
out[len(out)-1] = '\n'
select {
case ch <- buf[:n]:
case ch <- out:
case <-m.stop:
return
}

View File

@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
"github.com/filecoin-project/go-lotus/storage"
"github.com/filecoin-project/go-lotus/storage/commitment"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
@ -232,8 +233,8 @@ func Online() Option {
Override(new(*deals.Client), deals.NewClient),
Override(RunDealClientKey, modules.RunDealClient),
Override(new(*paych.Store), modules.PaychStore),
Override(new(*paych.Manager), modules.PaymentChannelManager),
Override(new(*paych.Store), paych.NewStore),
Override(new(*paych.Manager), paych.NewManager),
Override(new(*miner.Miner), miner.NewMiner),
),
@ -243,6 +244,7 @@ func Online() Option {
Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
Override(new(*sector.Store), sector.NewStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(*commitment.Tracker), commitment.NewTracker),
Override(new(*storage.Miner), modules.StorageMiner),
Override(new(dtypes.StagingDAG), modules.StagingDAG),

View File

@ -1,8 +1,9 @@
package full
package client
import (
"context"
"errors"
"golang.org/x/xerrors"
"os"
"github.com/ipfs/go-blockservice"
@ -26,18 +27,20 @@ import (
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/node/impl/paych"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
type ClientAPI struct {
type API struct {
fx.In
ChainAPI
StateAPI
WalletAPI
PaychAPI
full.ChainAPI
full.StateAPI
full.WalletAPI
paych.PaychAPI
DealClient *deals.Client
RetDiscovery discovery.PeerResolver
@ -49,7 +52,7 @@ type ClientAPI struct {
Filestore dtypes.ClientFilestore `optional:"true"`
}
func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
// TODO: make this a param
self, err := a.WalletDefaultAddress(ctx)
if err != nil {
@ -93,6 +96,7 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
}
head := a.Chain.GetHeaviestTipSet()
payment, err := a.PaychNewPayment(ctx, self, miner, total, extra, head.Height()+blocksDuration, head.Height()+blocksDuration)
if err != nil {
return nil, err
@ -118,7 +122,7 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
return &c, err
}
func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.DealClient.List()
if err != nil {
return nil, err
@ -143,7 +147,7 @@ func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error)
return out, nil
}
func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore)))
@ -157,7 +161,7 @@ func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, err
return true, nil
}
func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
@ -171,7 +175,7 @@ func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.Que
return out, nil
}
func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
func (a *API) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
f, err := os.Open(path)
if err != nil {
return cid.Undef, err
@ -208,7 +212,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err
return nd.Cid(), bufferedDS.Commit()
}
func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) {
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
if a.Filestore == nil {
return nil, errors.New("listing imports is not supported with in-memory dag yet")
}
@ -237,21 +241,30 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error)
}
}
func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
if order.MinerPeerID == "" {
pid, err := a.StateMinerPeerID(ctx, order.Miner, nil)
if err != nil {
return err
}
order.MinerPeerID = pid
}
outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return err
}
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner, outFile)
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile)
if err != nil {
_ = outFile.Close()
return err
return xerrors.Errorf("RetrieveUnixfs: %w", err)
}
return outFile.Close()
}
func (a *ClientAPI) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
return a.DealClient.QueryAsk(ctx, p, miner)
}

View File

@ -3,10 +3,6 @@ package impl
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/gbrlsnchs/jwt/v3"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -14,6 +10,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
type CommonAPI struct {

View File

@ -2,12 +2,15 @@ package impl
import (
"context"
"github.com/filecoin-project/go-lotus/node/impl/client"
"github.com/filecoin-project/go-lotus/node/impl/paych"
logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/miner"
"github.com/filecoin-project/go-lotus/node/impl/full"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("node")
@ -15,9 +18,9 @@ var log = logging.Logger("node")
type FullNodeAPI struct {
CommonAPI
full.ChainAPI
full.ClientAPI
client.API
full.MpoolAPI
full.PaychAPI
paych.PaychAPI
full.StateAPI
full.WalletAPI

View File

@ -3,19 +3,19 @@ package full
import (
"context"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type MpoolAPI struct {
fx.In
PubSub *pubsub.PubSub
Mpool *chain.MessagePool
WalletAPI
Mpool *chain.MessagePool
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
@ -25,15 +25,18 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types
}
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {
msgb, err := smsg.Serialize()
if err != nil {
return err
}
if err := a.Mpool.Add(smsg); err != nil {
return err
return a.Mpool.Push(smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
if msg.Nonce != 0 {
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
}
return a.PubSub.Publish("/fil/messages", msgb)
return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) {
msg.Nonce = nonce
return a.WalletSignMessage(ctx, msg.From, msg)
})
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {

View File

@ -5,24 +5,24 @@ import (
"fmt"
"strconv"
"github.com/ipfs/go-hamt-ipld"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/state"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/vm"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/bufbstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/state"
"github.com/ipfs/go-hamt-ipld"
cbor "github.com/ipfs/go-ipld-cbor"
"go.uber.org/fx"
)
type StateAPI struct {
@ -186,6 +186,10 @@ func (a *StateAPI) StateMinerWorker(ctx context.Context, m address.Address, ts *
return w, nil
}
func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) {
return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m)
}
func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
return a.StateManager.Call(ctx, msg, ts)
}

View File

@ -1,151 +1,60 @@
package full
package paych
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
full "github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/paych"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
)
type PaychAPI struct {
fx.In
MpoolAPI
WalletAPI
ChainAPI
full.MpoolAPI
full.WalletAPI
full.ChainAPI
PaychMgr *paych.Manager
}
func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil {
return nil, aerr
}
nonce, err := a.MpoolGetNonce(ctx, from)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, ensureFunds)
if err != nil {
return nil, err
}
enc, err := actors.SerializeParams(&actors.ExecParams{
Params: params,
Code: actors.PaymentChannelActorCodeCid,
})
msg := &types.Message{
To: actors.InitActorAddress,
From: from,
Value: amt,
Nonce: nonce,
Method: actors.IAMethods.Exec,
Params: enc,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mcid := smsg.Cid()
mwait, err := a.ChainWaitMsg(ctx, mcid)
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return nil, err
}
if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil {
return nil, err
}
return &api.ChannelInfo{
Channel: paychaddr,
Channel: ch,
ChannelMessage: mcid,
}, nil
}
func (a *PaychAPI) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return a.PaychMgr.AllocateLane(ch)
}
func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*api.PaymentInfo, error) {
ch, err := a.PaychMgr.OutboundChanTo(from, to)
if err != nil {
return nil, err
}
var chMsg *cid.Cid
if ch == address.Undef {
// don't have matching channel, open new
// TODO: this should be more atomic
chInfo, err := a.PaychCreate(ctx, from, to, amount)
if err != nil {
return nil, err
}
ch = chInfo.Channel
chMsg = &chInfo.ChannelMessage
} else {
// already have chanel to the destination, add funds, and open a new lane
// TODO: track free funds in channel
nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil {
return nil, err
}
msg := &types.Message{
To: ch,
From: from,
Value: amount,
Nonce: nonce,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
}
lane, err := a.PaychMgr.AllocateLane(ch)
// TODO: Fix free fund tracking in PaychGet
ch, err := a.PaychGet(ctx, from, to, amount)
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch, types.SignedVoucher{
lane, err := a.PaychMgr.AllocateLane(ch.Channel)
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch.Channel, types.SignedVoucher{
Amount: amount,
Lane: lane,
@ -156,10 +65,14 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
if err != nil {
return nil, err
}
var pchCid *cid.Cid
if ch.ChannelMessage != cid.Undef {
pchCid = &ch.ChannelMessage
}
return &api.PaymentInfo{
Channel: ch,
ChannelMessage: chMsg,
Channel: ch.Channel,
ChannelMessage: pchCid,
Voucher: sv,
}, nil
}
@ -221,14 +134,14 @@ func (a *PaychAPI) PaychVoucherCheckSpendable(ctx context.Context, ch address.Ad
return a.PaychMgr.CheckVoucherSpendable(ctx, ch, sv, secret, proof)
}
func (a *PaychAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *types.SignedVoucher, proof []byte) error {
func (a *PaychAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
_ = a.PaychMgr.TrackInboundChannel(ctx, ch) // TODO: expose those calls
if err := a.PaychVoucherCheckValid(ctx, ch, sv); err != nil {
return err
return types.NewInt(0), err
}
return a.PaychMgr.AddVoucher(ctx, ch, sv, proof)
return a.PaychMgr.AddVoucher(ctx, ch, sv, proof, minDelta)
}
// PaychVoucherCreate creates a new signed voucher on the given payment channel
@ -266,7 +179,7 @@ func (a *PaychAPI) paychVoucherCreate(ctx context.Context, pch address.Address,
sv.Signature = sig
if err := a.PaychMgr.AddVoucher(ctx, pch, sv, nil); err != nil {
if _, err := a.PaychMgr.AddVoucher(ctx, pch, sv, nil, types.NewInt(0)); err != nil {
return nil, xerrors.Errorf("failed to persist voucher: %w", err)
}

View File

@ -1,15 +0,0 @@
package modules
import (
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/paych"
)
func PaychStore(ds dtypes.MetadataDS) *paych.Store {
return paych.NewStore(ds)
}
func PaymentChannelManager(sm *stmgr.StateManager, store *paych.Store) (*paych.Manager, error) {
return paych.NewManager(sm, store), nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/go-lotus/node/repo"
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/storage"
"github.com/filecoin-project/go-lotus/storage/commitment"
"github.com/filecoin-project/go-lotus/storage/sector"
)
@ -65,13 +66,13 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui
}
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) {
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store, commt *commitment.Tracker) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
sm, err := storage.NewMiner(api, maddr, h, ds, secst)
sm, err := storage.NewMiner(api, maddr, h, ds, secst, commt)
if err != nil {
return nil, err
}

View File

@ -7,24 +7,42 @@ import (
"strconv"
logging "github.com/ipfs/go-log"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/impl/full"
)
var log = logging.Logger("paych")
type ManagerApi struct {
fx.In
full.MpoolAPI
full.WalletAPI
full.ChainAPI
}
type Manager struct {
store *Store
sm *stmgr.StateManager
mpool full.MpoolAPI
wallet full.WalletAPI
chain full.ChainAPI
}
func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager {
func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager {
return &Manager{
store: pchstore,
sm: sm,
mpool: api.MpoolAPI,
wallet: api.WalletAPI,
chain: api.ChainAPI,
}
}
@ -63,25 +81,34 @@ func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address)
})
}
func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error {
func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Address) (*ChannelInfo, error) {
_, st, err := pm.loadPaychState(ctx, ch)
if err != nil {
return err
return nil, err
}
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
return nil, err
}
return pm.store.TrackChannel(&ChannelInfo{
return &ChannelInfo{
Channel: ch,
Control: st.From,
Target: st.To,
Direction: DirOutbound,
NextLane: maxLane + 1,
})
}, nil
}
func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error {
ci, err := pm.loadOutboundChannelInfo(ctx, ch)
if err != nil {
return err
}
return pm.store.TrackChannel(ci)
}
func (pm *Manager) ListChannels() ([]address.Address, error) {
@ -221,12 +248,12 @@ func (pm *Manager) getPaychOwner(ctx context.Context, ch address.Address) (addre
return address.NewFromBytes(ret.Return)
}
func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *types.SignedVoucher, proof []byte) error {
func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
if err := pm.CheckVoucherValid(ctx, ch, sv); err != nil {
return err
return types.NewInt(0), err
}
return pm.store.AddVoucher(ch, sv, proof)
return pm.store.AddVoucher(ch, sv, proof, minDelta)
}
func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) {
@ -240,6 +267,9 @@ func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*Vou
}
func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, error) {
pm.store.lk.Lock()
defer pm.store.lk.Unlock()
return pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false

119
paych/simple.go Normal file
View File

@ -0,0 +1,119 @@
package paych
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
)
func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil {
return address.Undef, cid.Undef, aerr
}
enc, aerr := actors.SerializeParams(&actors.ExecParams{
Params: params,
Code: actors.PaymentChannelActorCodeCid,
})
if aerr != nil {
return address.Undef, cid.Undef, aerr
}
msg := &types.Message{
To: actors.InitActorAddress,
From: from,
Value: amt,
Method: actors.IAMethods.Exec,
Params: enc,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil {
return address.Undef, cid.Undef, err
}
mcid := smsg.Cid()
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
mwait, err := pm.chain.ChainWaitMsg(ctx, mcid)
if err != nil {
return address.Undef, cid.Undef, err
}
if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return address.Undef, cid.Undef, err
}
ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr)
if err != nil {
return address.Undef, cid.Undef, err
}
if err := pm.store.trackChannel(ci); err != nil {
return address.Undef, cid.Undef, err
}
return paychaddr, mcid, nil
}
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
msg := &types.Message{
To: ch,
From: from,
Value: amt,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
if err != nil {
return err
}
if mwait.Receipt.ExitCode != 0 {
return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
return nil
}
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) {
pm.store.lk.Lock()
defer pm.store.lk.Unlock()
ch, err := pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
}
return ci.Control == from && ci.Target == to
})
if err != nil {
return address.Undef, cid.Undef, err
}
if ch != address.Undef {
// TODO: Track available funds
return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree)
}
return pm.createPaych(ctx, from, to, ensureFree)
}

View File

@ -4,17 +4,19 @@ import (
"bytes"
"errors"
"fmt"
"math"
"strings"
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
var ErrChannelNotTracked = errors.New("channel not tracked")
@ -30,7 +32,7 @@ type Store struct {
ds datastore.Batching
}
func NewStore(ds datastore.Batching) *Store {
func NewStore(ds dtypes.MetadataDS) *Store {
ds = namespace.Wrap(ds, datastore.NewKey("/paych/"))
return &Store{
ds: ds,
@ -95,6 +97,10 @@ func (ps *Store) TrackChannel(ch *ChannelInfo) error {
ps.lk.Lock()
defer ps.lk.Unlock()
return ps.trackChannel(ch)
}
func (ps *Store) trackChannel(ch *ChannelInfo) error {
_, err := ps.getChannelInfo(ch.Channel)
switch err {
default:
@ -139,9 +145,6 @@ func (ps *Store) ListChannels() ([]address.Address, error) {
}
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err
@ -179,17 +182,24 @@ func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, erro
return address.Undef, nil
}
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte) error {
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return err
return types.NewInt(0), err
}
var bestAmount types.BigInt
var bestNonce uint64 = math.MaxUint64
// look for duplicates
for i, v := range ci.Vouchers {
if v.Voucher.Lane == sv.Lane && v.Voucher.Nonce+1 > bestNonce+1 {
bestNonce = v.Voucher.Nonce
bestAmount = v.Voucher.Amount
}
if !sv.Equals(v.Voucher) {
continue
}
@ -199,7 +209,7 @@ func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof [
break
}
log.Warnf("AddVoucher: voucher re-added with matching proof")
return nil
return types.NewInt(0), nil
}
log.Warnf("AddVoucher: adding proof to stored voucher")
@ -208,7 +218,16 @@ func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof [
Proof: proof,
}
return ps.putChannelInfo(ci)
return types.NewInt(0), ps.putChannelInfo(ci)
}
if bestAmount == (types.BigInt{}) {
bestAmount = types.NewInt(0)
}
delta := types.BigSub(sv.Amount, bestAmount)
if types.BigCmp(minDelta, delta) > 0 {
return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; bestAmt=%s; v.Amt=%s", minDelta, delta, bestAmount, sv.Amount)
}
ci.Vouchers = append(ci.Vouchers, &VoucherInfo{
@ -220,7 +239,7 @@ func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof [
ci.NextLane = sv.Lane + 1
}
return ps.putChannelInfo(ci)
return delta, ps.putChannelInfo(ci)
}
func (ps *Store) AllocateLane(ch address.Address) (uint64, error) {

View File

@ -17,7 +17,10 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
payapi "github.com/filecoin-project/go-lotus/node/impl/paych"
"github.com/filecoin-project/go-lotus/paych"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
@ -25,10 +28,13 @@ var log = logging.Logger("retrieval")
type Client struct {
h host.Host
pmgr *paych.Manager
payapi payapi.PaychAPI
}
func NewClient(h host.Host) *Client {
return &Client{h: h}
func NewClient(h host.Host, pmgr *paych.Manager, payapi payapi.PaychAPI) *Client {
return &Client{h: h, pmgr: pmgr, payapi: payapi}
}
func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer {
@ -70,11 +76,18 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.
}
type clientStream struct {
payapi payapi.PaychAPI
stream network.Stream
root cid.Cid
size types.BigInt
offset uint64
paych address.Address
lane uint64
total types.BigInt
transferred types.BigInt
windowSize uint64 // how much we "trust" the peer
verifier BlockVerifier
}
@ -91,7 +104,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error {
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error {
s, err := c.h.NewStream(ctx, miner, ProtocolID)
if err != nil {
return err
@ -102,12 +115,28 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
// TODO: Support in handler
// TODO: Allow client to specify this
paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total)
if err != nil {
return xerrors.Errorf("getting payment channel: %w", err)
}
lane, err := c.pmgr.AllocateLane(paych)
if err != nil {
return xerrors.Errorf("allocating payment lane: %w", err)
}
cst := clientStream{
payapi: c.payapi,
stream: s,
root: root,
size: types.NewInt(size),
offset: initialOffset,
paych: paych,
lane: lane,
total: total,
transferred: types.NewInt(0),
windowSize: build.UnixfsChunkSize,
verifier: &UnixFs0Verifier{Root: root},
}
@ -119,9 +148,9 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
}
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
err := cst.doOneExchange(toFetch, out)
err := cst.doOneExchange(ctx, toFetch, out)
if err != nil {
return err
return xerrors.Errorf("retrieval exchange: %w", err)
}
cst.offset += toFetch
@ -130,9 +159,17 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
return nil
}
func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error {
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error {
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
payment, err := cst.setupPayment(ctx, payAmount)
if err != nil {
return xerrors.Errorf("setting up retrieval payment: %w", err)
}
deal := DealProposal{
Ref: cst.root,
Payment: payment,
Ref: cst.root,
Params: RetParams{
Unixfs0: &Unixfs0Offer{
Offset: cst.offset,
@ -179,12 +216,12 @@ func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
var block Block
if err := cborrpc.ReadCborRPC(cst.stream, &block); err != nil {
return err
return xerrors.Errorf("reading fetchBlock response: %w", err)
}
dataBlocks, err := cst.consumeBlockMessage(block, out)
if err != nil {
return err
return xerrors.Errorf("consuming retrieved blocks: %w", err)
}
i += dataBlocks
@ -221,3 +258,20 @@ func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64
return 1, nil
}
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
amount := types.BigAdd(cst.transferred, toSend)
sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane)
if err != nil {
return api.PaymentInfo{}, err
}
cst.transferred = amount
return api.PaymentInfo{
Channel: cst.paych,
ChannelMessage: nil,
Voucher: sv,
}, nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
@ -19,14 +20,17 @@ import (
type Miner struct {
sectorBlocks *sectorblocks.SectorBlocks
full api.FullNode
pricePerByte types.BigInt
// TODO: Unseal price
}
func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner {
func NewMiner(sblks *sectorblocks.SectorBlocks, full api.FullNode) *Miner {
return &Miner{
sectorBlocks: sblks,
full: full,
pricePerByte: types.NewInt(2), // TODO: allow setting
}
}
@ -118,8 +122,10 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
unixfs0 := deal.Params.Unixfs0
// TODO: Verify payment, check how much we can send based on that
// Or reject (possibly returning the payment to retain reputation with the client)
expPayment := types.BigMul(hnd.m.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
if _, err := hnd.m.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Voucher, nil, expPayment); err != nil {
return false, xerrors.Errorf("processing retrieval payment: %w", err)
}
// If the file isn't open (new deal stream), isn't the right file, or isn't
// at the right offset, (re)open it

View File

@ -1,6 +1,7 @@
package retrieval
import (
"github.com/filecoin-project/go-lotus/api"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -61,9 +62,10 @@ type RetParams struct {
}
type DealProposal struct {
Payment api.PaymentInfo
Ref cid.Cid
Params RetParams
// TODO: payment
}
type DealResponse struct {

View File

@ -0,0 +1,140 @@
package commitment
import (
"context"
"fmt"
"sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
var log = logging.Logger("commitment")
func init() {
cbor.RegisterCborType(commitment{})
}
var commitmentDsPrefix = datastore.NewKey("/commitments")
type Tracker struct {
commitDs datastore.Datastore
lk sync.Mutex
waits map[datastore.Key]chan struct{}
}
func NewTracker(ds dtypes.MetadataDS) *Tracker {
return &Tracker{
commitDs: namespace.Wrap(ds, commitmentDsPrefix),
waits: map[datastore.Key]chan struct{}{},
}
}
type commitment struct {
Msg cid.Cid
}
func commitmentKey(miner address.Address, sectorId uint64) datastore.Key {
return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId))
}
func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, mcid cid.Cid) error {
key := commitmentKey(miner, sectorId)
ct.lk.Lock()
defer ct.lk.Unlock()
tracking, err := ct.commitDs.Get(key)
switch err {
case datastore.ErrNotFound:
var comm commitment
if err := cbor.DecodeInto(tracking, &comm); err != nil {
return err
}
if !comm.Msg.Equals(mcid) {
return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, mcid)
}
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, mcid)
return nil
case nil:
break
default:
return err
}
comm := &commitment{Msg: mcid}
commB, err := cbor.DumpObject(comm)
if err != nil {
return err
}
if err := ct.commitDs.Put(key, commB); err != nil {
return err
}
waits, ok := ct.waits[key]
if ok {
close(waits)
delete(ct.waits, key)
}
return nil
}
func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sectorId uint64) (cid.Cid, error) {
key := commitmentKey(miner, sectorId)
ct.lk.Lock()
tracking, err := ct.commitDs.Get(key)
if err != datastore.ErrNotFound {
ct.lk.Unlock()
if err != nil {
return cid.Undef, err
}
var comm commitment
if err := cbor.DecodeInto(tracking, &comm); err != nil {
return cid.Undef, err
}
return comm.Msg, nil
}
wait, ok := ct.waits[key]
if !ok {
wait = make(chan struct{})
ct.waits[key] = wait
}
ct.lk.Unlock()
select {
case <-wait:
tracking, err := ct.commitDs.Get(key)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err)
}
var comm commitment
if err := cbor.DecodeInto(tracking, &comm); err != nil {
return cid.Undef, err
}
return comm.Msg, nil
case <-ctx.Done():
return cid.Undef, ctx.Err()
}
}

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/storage/commitment"
"github.com/filecoin-project/go-lotus/storage/sector"
)
@ -26,6 +27,7 @@ type Miner struct {
api storageMinerApi
secst *sector.Store
commt *commitment.Tracker
maddr address.Address
@ -54,13 +56,14 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) {
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) {
return &Miner{
api: api,
maddr: addr,
h: h,
ds: ds,
secst: secst,
commt: commt,
}, nil
}
@ -159,13 +162,11 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return errors.Wrap(err, "pushing commit sector message to mpool")
}
m.trackCommitSectorMessage(smsg)
return nil
}
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
return errors.Wrap(err, "tracking sector commitment")
}
// make sure the message gets included in the chain successfully
func (m *Miner) trackCommitSectorMessage(smsg *types.SignedMessage) {
log.Warning("not currently tracking commit sector messages")
return nil
}
func (m *Miner) runPoSt(ctx context.Context) {