diff --git a/api/api.go b/api/api.go index 6066af3e6..153843535 100644 --- a/api/api.go +++ b/api/api.go @@ -109,10 +109,11 @@ type FullNode interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error) - PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) + PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychList(context.Context) ([]address.Address, error) PaychStatus(context.Context, address.Address) (*PaychStatus, error) PaychClose(context.Context, address.Address) (cid.Cid, error) + PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) @@ -246,8 +247,10 @@ type QueryOffer struct { func (o *QueryOffer) Order() RetrievalOrder { return RetrievalOrder{ - Root: o.Root, - Size: o.Size, + Root: o.Root, + Size: o.Size, + Total: o.MinPrice, + Miner: o.Miner, MinerPeerID: o.MinerPeerID, } @@ -258,7 +261,9 @@ type RetrievalOrder struct { Root cid.Cid Size uint64 // TODO: support offset + Total types.BigInt + Client address.Address Miner address.Address MinerPeerID peer.ID } diff --git a/api/struct.go b/api/struct.go index cd5d7b844..97f7ba9fb 100644 --- a/api/struct.go +++ b/api/struct.go @@ -82,10 +82,11 @@ type FullNodeStruct struct { StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` - PaychCreate func(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) `perm:"sign"` + PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"` PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"` PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"` + PaychAllocateLane func(context.Context, address.Address) (uint64, error) `perm:"sign"` PaychNewPayment func(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) `perm:"sign"` PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"` PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"` @@ -299,8 +300,8 @@ func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, t return c.Internal.StateReadState(ctx, act, ts) } -func (c *FullNodeStruct) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) { - return c.Internal.PaychCreate(ctx, from, to, amt) +func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) { + return c.Internal.PaychGet(ctx, from, to, ensureFunds) } func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) { @@ -335,6 +336,10 @@ func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid return c.Internal.PaychClose(ctx, a) } +func (c *FullNodeStruct) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) { + return c.Internal.PaychAllocateLane(ctx, ch) +} + func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) { return c.Internal.PaychNewPayment(ctx, from, to, amount, extra, tl, minClose) } diff --git a/cli/client.go b/cli/client.go index 567f02f1f..ce867d96b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -166,12 +166,17 @@ var clientRetrieveCmd = &cli.Command{ Name: "retrieve", Usage: "retrieve data from network", Action: func(cctx *cli.Context) error { - if cctx.NArg() != 2 { - fmt.Println("Usage: retrieve [CID] [outfile]") + if cctx.NArg() != 3 { + fmt.Println("Usage: retrieve [client address] [CID] [outfile]") return nil } - file, err := cid.Parse(cctx.Args().First()) + payer, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return err + } + + file, err := cid.Parse(cctx.Args().Get(1)) if err != nil { return err } @@ -202,7 +207,9 @@ var clientRetrieveCmd = &cli.Command{ // TODO: parse offer strings from `client find`, make this smarter order := offers[0].Order() - err = api.ClientRetrieve(ctx, order, cctx.Args().Get(1)) + order.Client = payer + + err = api.ClientRetrieve(ctx, order, cctx.Args().Get(2)) if err == nil { fmt.Println("Success") } diff --git a/cli/paych.go b/cli/paych.go index 206cbfb3d..e79d14141 100644 --- a/cli/paych.go +++ b/cli/paych.go @@ -12,18 +12,18 @@ var paychCmd = &cli.Command{ Name: "paych", Usage: "Manage payment channels", Subcommands: []*cli.Command{ - paychCreateCmd, + paychGetCmd, paychListCmd, paychVoucherCmd, }, } -var paychCreateCmd = &cli.Command{ - Name: "create", - Usage: "Create a new payment channel", +var paychGetCmd = &cli.Command{ + Name: "get", + Usage: "Create a new payment channel or get existing one", Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 3 { - return fmt.Errorf("must pass three arguments: ") + return fmt.Errorf("must pass three arguments: ") } from, err := address.NewFromString(cctx.Args().Get(0)) @@ -48,7 +48,7 @@ var paychCreateCmd = &cli.Command{ ctx := ReqContext(cctx) - info, err := api.PaychCreate(ctx, from, to, amt) + info, err := api.PaychGet(ctx, from, to, amt) if err != nil { return err } diff --git a/node/impl/full/client.go b/node/impl/client/client.go similarity index 85% rename from node/impl/full/client.go rename to node/impl/client/client.go index dab07ad17..82ee9e4e9 100644 --- a/node/impl/full/client.go +++ b/node/impl/client/client.go @@ -1,4 +1,4 @@ -package full +package client import ( "context" @@ -26,18 +26,20 @@ import ( "github.com/filecoin-project/go-lotus/chain/deals" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/node/impl/full" + "github.com/filecoin-project/go-lotus/node/impl/paych" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/retrieval" "github.com/filecoin-project/go-lotus/retrieval/discovery" ) -type ClientAPI struct { +type API struct { fx.In - ChainAPI - StateAPI - WalletAPI - PaychAPI + full.ChainAPI + full.StateAPI + full.WalletAPI + paych.PaychAPI DealClient *deals.Client RetDiscovery discovery.PeerResolver @@ -49,7 +51,7 @@ type ClientAPI struct { Filestore dtypes.ClientFilestore `optional:"true"` } -func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { +func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { // TODO: make this a param self, err := a.WalletDefaultAddress(ctx) if err != nil { @@ -118,7 +120,7 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add return &c, err } -func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { +func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { deals, err := a.DealClient.List() if err != nil { return nil, err @@ -143,7 +145,7 @@ func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) return out, nil } -func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { +func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore))) @@ -157,7 +159,7 @@ func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, err return true, nil } -func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) { +func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) { peers, err := a.RetDiscovery.GetPeers(root) if err != nil { return nil, err @@ -171,7 +173,7 @@ func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.Que return out, nil } -func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, error) { +func (a *API) ClientImport(ctx context.Context, path string) (cid.Cid, error) { f, err := os.Open(path) if err != nil { return cid.Undef, err @@ -208,7 +210,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err return nd.Cid(), bufferedDS.Commit() } -func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) { +func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { if a.Filestore == nil { return nil, errors.New("listing imports is not supported with in-memory dag yet") } @@ -237,13 +239,13 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) } } -func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error { +func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error { outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return err } - err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner, outFile) + err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile) if err != nil { _ = outFile.Close() return err diff --git a/node/impl/common.go b/node/impl/common.go index 6c564f739..571760c5e 100644 --- a/node/impl/common.go +++ b/node/impl/common.go @@ -3,10 +3,6 @@ package impl import ( "context" - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/build" - "github.com/filecoin-project/go-lotus/node/modules/dtypes" - "github.com/gbrlsnchs/jwt/v3" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -14,6 +10,10 @@ import ( ma "github.com/multiformats/go-multiaddr" "go.uber.org/fx" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" ) type CommonAPI struct { diff --git a/node/impl/full.go b/node/impl/full.go index c8c788064..7092f42b8 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -2,12 +2,15 @@ package impl import ( "context" + "github.com/filecoin-project/go-lotus/node/impl/client" + "github.com/filecoin-project/go-lotus/node/impl/paych" + + logging "github.com/ipfs/go-log" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/miner" "github.com/filecoin-project/go-lotus/node/impl/full" - logging "github.com/ipfs/go-log" ) var log = logging.Logger("node") @@ -15,9 +18,9 @@ var log = logging.Logger("node") type FullNodeAPI struct { CommonAPI full.ChainAPI - full.ClientAPI + client.API full.MpoolAPI - full.PaychAPI + paych.PaychAPI full.StateAPI full.WalletAPI diff --git a/node/impl/full/paych.go b/node/impl/paych/paych.go similarity index 67% rename from node/impl/full/paych.go rename to node/impl/paych/paych.go index 18d72e243..5cce7c79a 100644 --- a/node/impl/full/paych.go +++ b/node/impl/paych/paych.go @@ -1,151 +1,60 @@ -package full +package paych import ( "context" "fmt" + "github.com/ipfs/go-cid" + "go.uber.org/fx" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" + full "github.com/filecoin-project/go-lotus/node/impl/full" "github.com/filecoin-project/go-lotus/paych" - - "github.com/ipfs/go-cid" - "go.uber.org/fx" - "golang.org/x/xerrors" ) type PaychAPI struct { fx.In - MpoolAPI - WalletAPI - ChainAPI + full.MpoolAPI + full.WalletAPI + full.ChainAPI PaychMgr *paych.Manager } -func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { - params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to}) - if aerr != nil { - return nil, aerr - } - - nonce, err := a.MpoolGetNonce(ctx, from) +func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) { + ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, ensureFunds) if err != nil { return nil, err } - enc, err := actors.SerializeParams(&actors.ExecParams{ - Params: params, - Code: actors.PaymentChannelActorCodeCid, - }) - - msg := &types.Message{ - To: actors.InitActorAddress, - From: from, - Value: amt, - Nonce: nonce, - Method: actors.IAMethods.Exec, - Params: enc, - GasLimit: types.NewInt(1000000), - GasPrice: types.NewInt(0), - } - - smsg, err := a.WalletSignMessage(ctx, from, msg) - if err != nil { - return nil, err - } - - if err := a.MpoolPush(ctx, smsg); err != nil { - return nil, err - } - - mcid := smsg.Cid() - mwait, err := a.ChainWaitMsg(ctx, mcid) - if err != nil { - return nil, err - } - - if mwait.Receipt.ExitCode != 0 { - return nil, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) - } - - paychaddr, err := address.NewFromBytes(mwait.Receipt.Return) - if err != nil { - return nil, err - } - - if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil { - return nil, err - } - return &api.ChannelInfo{ - Channel: paychaddr, + Channel: ch, ChannelMessage: mcid, }, nil } +func (a *PaychAPI) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) { + return a.PaychMgr.AllocateLane(ch) +} + func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*api.PaymentInfo, error) { - ch, err := a.PaychMgr.OutboundChanTo(from, to) - if err != nil { - return nil, err - } - var chMsg *cid.Cid - if ch == address.Undef { - // don't have matching channel, open new - - // TODO: this should be more atomic - chInfo, err := a.PaychCreate(ctx, from, to, amount) - if err != nil { - return nil, err - } - ch = chInfo.Channel - chMsg = &chInfo.ChannelMessage - } else { - // already have chanel to the destination, add funds, and open a new lane - // TODO: track free funds in channel - - nonce, err := a.MpoolGetNonce(ctx, from) - if err != nil { - return nil, err - } - - msg := &types.Message{ - To: ch, - From: from, - Value: amount, - Nonce: nonce, - Method: 0, - GasLimit: types.NewInt(1000000), - GasPrice: types.NewInt(0), - } - - smsg, err := a.WalletSignMessage(ctx, from, msg) - if err != nil { - return nil, err - } - - if err := a.MpoolPush(ctx, smsg); err != nil { - return nil, err - } - - mwait, err := a.ChainWaitMsg(ctx, smsg.Cid()) - if err != nil { - return nil, err - } - - if mwait.Receipt.ExitCode != 0 { - return nil, fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode) - } - } - - lane, err := a.PaychMgr.AllocateLane(ch) + // TODO: Fix free fund tracking in PaychGet, pass amount + ch, err := a.PaychGet(ctx, from, to, types.NewInt(0)) if err != nil { return nil, err } - sv, err := a.paychVoucherCreate(ctx, ch, types.SignedVoucher{ + lane, err := a.PaychMgr.AllocateLane(ch.Channel) + if err != nil { + return nil, err + } + + sv, err := a.paychVoucherCreate(ctx, ch.Channel, types.SignedVoucher{ Amount: amount, Lane: lane, @@ -158,8 +67,8 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address } return &api.PaymentInfo{ - Channel: ch, - ChannelMessage: chMsg, + Channel: ch.Channel, + ChannelMessage: &ch.ChannelMessage, Voucher: sv, }, nil } diff --git a/paych/paych.go b/paych/paych.go index af12e686f..3f1954714 100644 --- a/paych/paych.go +++ b/paych/paych.go @@ -3,6 +3,7 @@ package paych import ( "context" "fmt" + "github.com/filecoin-project/go-lotus/node/impl/full" "math" "strconv" @@ -19,6 +20,10 @@ var log = logging.Logger("paych") type Manager struct { store *Store sm *stmgr.StateManager + + mpool *full.MpoolAPI + wallet *full.WalletAPI + chain *full.ChainAPI } func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager { @@ -240,6 +245,9 @@ func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*Vou } func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, error) { + pm.store.lk.Lock() + defer pm.store.lk.Unlock() + return pm.store.findChan(func(ci *ChannelInfo) bool { if ci.Direction != DirOutbound { return false diff --git a/paych/simple.go b/paych/simple.go new file mode 100644 index 000000000..9c7b64845 --- /dev/null +++ b/paych/simple.go @@ -0,0 +1,131 @@ +package paych + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { + params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to}) + if aerr != nil { + return address.Undef, cid.Undef, aerr + } + + nonce, err := pm.mpool.MpoolGetNonce(ctx, from) + if err != nil { + return address.Undef, cid.Undef, err + } + + enc, err := actors.SerializeParams(&actors.ExecParams{ + Params: params, + Code: actors.PaymentChannelActorCodeCid, + }) + + msg := &types.Message{ + To: actors.InitActorAddress, + From: from, + Value: amt, + Nonce: nonce, + Method: actors.IAMethods.Exec, + Params: enc, + GasLimit: types.NewInt(1000000), + GasPrice: types.NewInt(0), + } + + smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg) + if err != nil { + return address.Undef, cid.Undef, err + } + + if err := pm.mpool.MpoolPush(ctx, smsg); err != nil { + return address.Undef, cid.Undef, err + } + + mcid := smsg.Cid() + + // TODO: wait outside the store lock! + // (tricky because we need to setup channel tracking before we know it's address) + mwait, err := pm.chain.ChainWaitMsg(ctx, mcid) + if err != nil { + return address.Undef, cid.Undef, err + } + + if mwait.Receipt.ExitCode != 0 { + return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) + } + + paychaddr, err := address.NewFromBytes(mwait.Receipt.Return) + if err != nil { + return address.Undef, cid.Undef, err + } + + if err := pm.TrackOutboundChannel(ctx, paychaddr); err != nil { + return address.Undef, cid.Undef, err + } + + return paychaddr, mcid, nil +} + +func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error { + nonce, err := pm.mpool.MpoolGetNonce(ctx, from) + if err != nil { + return err + } + + msg := &types.Message{ + To: ch, + From: from, + Value: amt, + Nonce: nonce, + Method: 0, + GasLimit: types.NewInt(1000000), + GasPrice: types.NewInt(0), + } + + smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg) + if err != nil { + return err + } + + if err := pm.mpool.MpoolPush(ctx, smsg); err != nil { + return err + } + + mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock! + if err != nil { + return err + } + + if mwait.Receipt.ExitCode != 0 { + return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode) + } + + return nil +} + +func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) { + pm.store.lk.Lock() + defer pm.store.lk.Unlock() + + ch, err := pm.store.findChan(func(ci *ChannelInfo) bool { + if ci.Direction != DirOutbound { + return false + } + return ci.Control == from && ci.Target == to + }) + if err != nil { + return address.Undef, cid.Undef, err + } + if ch != address.Undef { + // TODO: Track available funds + return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree) + } + + return pm.createPaych(ctx, from, to, ensureFree) +} diff --git a/paych/store.go b/paych/store.go index a033c6d6b..29fcc4da5 100644 --- a/paych/store.go +++ b/paych/store.go @@ -139,9 +139,6 @@ func (ps *Store) ListChannels() ([]address.Address, error) { } func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) { - ps.lk.Lock() - defer ps.lk.Unlock() - res, err := ps.ds.Query(dsq.Query{}) if err != nil { return address.Undef, err diff --git a/retrieval/client.go b/retrieval/client.go index d46d9620c..4874c76c7 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -17,7 +17,10 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/lib/cborrpc" + payapi "github.com/filecoin-project/go-lotus/node/impl/paych" + "github.com/filecoin-project/go-lotus/paych" "github.com/filecoin-project/go-lotus/retrieval/discovery" ) @@ -25,6 +28,9 @@ var log = logging.Logger("retrieval") type Client struct { h host.Host + + pmgr *paych.Manager + payapi payapi.PaychAPI } func NewClient(h host.Host) *Client { @@ -70,11 +76,18 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid. } type clientStream struct { + payapi payapi.PaychAPI stream network.Stream root cid.Cid + size types.BigInt offset uint64 + paych address.Address + lane uint64 + total types.BigInt + transferred types.BigInt + windowSize uint64 // how much we "trust" the peer verifier BlockVerifier } @@ -91,7 +104,7 @@ type clientStream struct { // < ..Blocks // > DealProposal(...) // < ... -func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error { +func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error { s, err := c.h.NewStream(ctx, miner, ProtocolID) if err != nil { return err @@ -102,12 +115,28 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, // TODO: Support in handler // TODO: Allow client to specify this + paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total) + if err != nil { + return err + } + lane, err := c.pmgr.AllocateLane(paych) + if err != nil { + return err + } + cst := clientStream{ + payapi: c.payapi, stream: s, root: root, + size: types.NewInt(size), offset: initialOffset, + paych: paych, + lane: lane, + total: total, + transferred: types.NewInt(0), + windowSize: build.UnixfsChunkSize, verifier: &UnixFs0Verifier{Root: root}, } @@ -119,7 +148,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, } log.Infof("Retrieve %dB @%d", toFetch, cst.offset) - err := cst.doOneExchange(toFetch, out) + err := cst.doOneExchange(ctx, toFetch, out) if err != nil { return err } @@ -130,9 +159,17 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, return nil } -func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error { +func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error { + payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size) + + payment, err := cst.setupPayment(ctx, payAmount) + if err != nil { + return err + } + deal := DealProposal{ - Ref: cst.root, + Payment: payment, + Ref: cst.root, Params: RetParams{ Unixfs0: &Unixfs0Offer{ Offset: cst.offset, @@ -221,3 +258,20 @@ func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64 return 1, nil } + +func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) { + amount := types.BigAdd(cst.transferred, toSend) + + sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane) + if err != nil { + return api.PaymentInfo{}, err + } + + cst.transferred = amount + + return api.PaymentInfo{ + Channel: cst.paych, + ChannelMessage: nil, + Voucher: sv, + }, nil +} diff --git a/retrieval/types.go b/retrieval/types.go index 145b06732..60ccc90e1 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -1,6 +1,7 @@ package retrieval import ( + "github.com/filecoin-project/go-lotus/api" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -61,9 +62,10 @@ type RetParams struct { } type DealProposal struct { + Payment api.PaymentInfo + Ref cid.Cid Params RetParams - // TODO: payment } type DealResponse struct {