Paych improvements; Retrieval payments

This commit is contained in:
Łukasz Magiera 2019-09-16 15:46:05 +02:00
parent c31151684b
commit 385e0cfd48
13 changed files with 285 additions and 162 deletions

View File

@ -109,10 +109,11 @@ type FullNode interface {
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error)
PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychClose(context.Context, address.Address) (cid.Cid, error)
PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error)
PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error)
@ -246,8 +247,10 @@ type QueryOffer struct {
func (o *QueryOffer) Order() RetrievalOrder {
return RetrievalOrder{
Root: o.Root,
Size: o.Size,
Root: o.Root,
Size: o.Size,
Total: o.MinPrice,
Miner: o.Miner,
MinerPeerID: o.MinerPeerID,
}
@ -258,7 +261,9 @@ type RetrievalOrder struct {
Root cid.Cid
Size uint64
// TODO: support offset
Total types.BigInt
Client address.Address
Miner address.Address
MinerPeerID peer.ID
}

View File

@ -82,10 +82,11 @@ type FullNodeStruct struct {
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
PaychCreate func(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"`
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
PaychAllocateLane func(context.Context, address.Address) (uint64, error) `perm:"sign"`
PaychNewPayment func(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) `perm:"sign"`
PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"`
@ -299,8 +300,8 @@ func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, t
return c.Internal.StateReadState(ctx, act, ts)
}
func (c *FullNodeStruct) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychCreate(ctx, from, to, amt)
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
}
func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) {
@ -335,6 +336,10 @@ func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid
return c.Internal.PaychClose(ctx, a)
}
func (c *FullNodeStruct) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return c.Internal.PaychAllocateLane(ctx, ch)
}
func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) {
return c.Internal.PaychNewPayment(ctx, from, to, amount, extra, tl, minClose)
}

View File

@ -166,12 +166,17 @@ var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "retrieve data from network",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
fmt.Println("Usage: retrieve [CID] [outfile]")
if cctx.NArg() != 3 {
fmt.Println("Usage: retrieve [client address] [CID] [outfile]")
return nil
}
file, err := cid.Parse(cctx.Args().First())
payer, err := address.NewFromString(cctx.Args().Get(0))
if err != nil {
return err
}
file, err := cid.Parse(cctx.Args().Get(1))
if err != nil {
return err
}
@ -202,7 +207,9 @@ var clientRetrieveCmd = &cli.Command{
// TODO: parse offer strings from `client find`, make this smarter
order := offers[0].Order()
err = api.ClientRetrieve(ctx, order, cctx.Args().Get(1))
order.Client = payer
err = api.ClientRetrieve(ctx, order, cctx.Args().Get(2))
if err == nil {
fmt.Println("Success")
}

View File

@ -12,18 +12,18 @@ var paychCmd = &cli.Command{
Name: "paych",
Usage: "Manage payment channels",
Subcommands: []*cli.Command{
paychCreateCmd,
paychGetCmd,
paychListCmd,
paychVoucherCmd,
},
}
var paychCreateCmd = &cli.Command{
Name: "create",
Usage: "Create a new payment channel",
var paychGetCmd = &cli.Command{
Name: "get",
Usage: "Create a new payment channel or get existing one",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 3 {
return fmt.Errorf("must pass three arguments: <from> <to> <amount>")
return fmt.Errorf("must pass three arguments: <from> <to> <available funds>")
}
from, err := address.NewFromString(cctx.Args().Get(0))
@ -48,7 +48,7 @@ var paychCreateCmd = &cli.Command{
ctx := ReqContext(cctx)
info, err := api.PaychCreate(ctx, from, to, amt)
info, err := api.PaychGet(ctx, from, to, amt)
if err != nil {
return err
}

View File

@ -1,4 +1,4 @@
package full
package client
import (
"context"
@ -26,18 +26,20 @@ import (
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/node/impl/paych"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
type ClientAPI struct {
type API struct {
fx.In
ChainAPI
StateAPI
WalletAPI
PaychAPI
full.ChainAPI
full.StateAPI
full.WalletAPI
paych.PaychAPI
DealClient *deals.Client
RetDiscovery discovery.PeerResolver
@ -49,7 +51,7 @@ type ClientAPI struct {
Filestore dtypes.ClientFilestore `optional:"true"`
}
func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
// TODO: make this a param
self, err := a.WalletDefaultAddress(ctx)
if err != nil {
@ -118,7 +120,7 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
return &c, err
}
func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.DealClient.List()
if err != nil {
return nil, err
@ -143,7 +145,7 @@ func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error)
return out, nil
}
func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore)))
@ -157,7 +159,7 @@ func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, err
return true, nil
}
func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
@ -171,7 +173,7 @@ func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.Que
return out, nil
}
func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
func (a *API) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
f, err := os.Open(path)
if err != nil {
return cid.Undef, err
@ -208,7 +210,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err
return nd.Cid(), bufferedDS.Commit()
}
func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) {
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
if a.Filestore == nil {
return nil, errors.New("listing imports is not supported with in-memory dag yet")
}
@ -237,13 +239,13 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error)
}
}
func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return err
}
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner, outFile)
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile)
if err != nil {
_ = outFile.Close()
return err

View File

@ -3,10 +3,6 @@ package impl
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/gbrlsnchs/jwt/v3"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -14,6 +10,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
type CommonAPI struct {

View File

@ -2,12 +2,15 @@ package impl
import (
"context"
"github.com/filecoin-project/go-lotus/node/impl/client"
"github.com/filecoin-project/go-lotus/node/impl/paych"
logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/miner"
"github.com/filecoin-project/go-lotus/node/impl/full"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("node")
@ -15,9 +18,9 @@ var log = logging.Logger("node")
type FullNodeAPI struct {
CommonAPI
full.ChainAPI
full.ClientAPI
client.API
full.MpoolAPI
full.PaychAPI
paych.PaychAPI
full.StateAPI
full.WalletAPI

View File

@ -1,151 +1,60 @@
package full
package paych
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
full "github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/paych"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
)
type PaychAPI struct {
fx.In
MpoolAPI
WalletAPI
ChainAPI
full.MpoolAPI
full.WalletAPI
full.ChainAPI
PaychMgr *paych.Manager
}
func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil {
return nil, aerr
}
nonce, err := a.MpoolGetNonce(ctx, from)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, ensureFunds)
if err != nil {
return nil, err
}
enc, err := actors.SerializeParams(&actors.ExecParams{
Params: params,
Code: actors.PaymentChannelActorCodeCid,
})
msg := &types.Message{
To: actors.InitActorAddress,
From: from,
Value: amt,
Nonce: nonce,
Method: actors.IAMethods.Exec,
Params: enc,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mcid := smsg.Cid()
mwait, err := a.ChainWaitMsg(ctx, mcid)
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return nil, err
}
if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil {
return nil, err
}
return &api.ChannelInfo{
Channel: paychaddr,
Channel: ch,
ChannelMessage: mcid,
}, nil
}
func (a *PaychAPI) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
return a.PaychMgr.AllocateLane(ch)
}
func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*api.PaymentInfo, error) {
ch, err := a.PaychMgr.OutboundChanTo(from, to)
if err != nil {
return nil, err
}
var chMsg *cid.Cid
if ch == address.Undef {
// don't have matching channel, open new
// TODO: this should be more atomic
chInfo, err := a.PaychCreate(ctx, from, to, amount)
if err != nil {
return nil, err
}
ch = chInfo.Channel
chMsg = &chInfo.ChannelMessage
} else {
// already have chanel to the destination, add funds, and open a new lane
// TODO: track free funds in channel
nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil {
return nil, err
}
msg := &types.Message{
To: ch,
From: from,
Value: amount,
Nonce: nonce,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
}
lane, err := a.PaychMgr.AllocateLane(ch)
// TODO: Fix free fund tracking in PaychGet, pass amount
ch, err := a.PaychGet(ctx, from, to, types.NewInt(0))
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch, types.SignedVoucher{
lane, err := a.PaychMgr.AllocateLane(ch.Channel)
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch.Channel, types.SignedVoucher{
Amount: amount,
Lane: lane,
@ -158,8 +67,8 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
}
return &api.PaymentInfo{
Channel: ch,
ChannelMessage: chMsg,
Channel: ch.Channel,
ChannelMessage: &ch.ChannelMessage,
Voucher: sv,
}, nil
}

View File

@ -3,6 +3,7 @@ package paych
import (
"context"
"fmt"
"github.com/filecoin-project/go-lotus/node/impl/full"
"math"
"strconv"
@ -19,6 +20,10 @@ var log = logging.Logger("paych")
type Manager struct {
store *Store
sm *stmgr.StateManager
mpool *full.MpoolAPI
wallet *full.WalletAPI
chain *full.ChainAPI
}
func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager {
@ -240,6 +245,9 @@ func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*Vou
}
func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, error) {
pm.store.lk.Lock()
defer pm.store.lk.Unlock()
return pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false

131
paych/simple.go Normal file
View File

@ -0,0 +1,131 @@
package paych
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
)
func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil {
return address.Undef, cid.Undef, aerr
}
nonce, err := pm.mpool.MpoolGetNonce(ctx, from)
if err != nil {
return address.Undef, cid.Undef, err
}
enc, err := actors.SerializeParams(&actors.ExecParams{
Params: params,
Code: actors.PaymentChannelActorCodeCid,
})
msg := &types.Message{
To: actors.InitActorAddress,
From: from,
Value: amt,
Nonce: nonce,
Method: actors.IAMethods.Exec,
Params: enc,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg)
if err != nil {
return address.Undef, cid.Undef, err
}
if err := pm.mpool.MpoolPush(ctx, smsg); err != nil {
return address.Undef, cid.Undef, err
}
mcid := smsg.Cid()
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
mwait, err := pm.chain.ChainWaitMsg(ctx, mcid)
if err != nil {
return address.Undef, cid.Undef, err
}
if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return address.Undef, cid.Undef, err
}
if err := pm.TrackOutboundChannel(ctx, paychaddr); err != nil {
return address.Undef, cid.Undef, err
}
return paychaddr, mcid, nil
}
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
nonce, err := pm.mpool.MpoolGetNonce(ctx, from)
if err != nil {
return err
}
msg := &types.Message{
To: ch,
From: from,
Value: amt,
Nonce: nonce,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg)
if err != nil {
return err
}
if err := pm.mpool.MpoolPush(ctx, smsg); err != nil {
return err
}
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
if err != nil {
return err
}
if mwait.Receipt.ExitCode != 0 {
return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
return nil
}
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) {
pm.store.lk.Lock()
defer pm.store.lk.Unlock()
ch, err := pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
}
return ci.Control == from && ci.Target == to
})
if err != nil {
return address.Undef, cid.Undef, err
}
if ch != address.Undef {
// TODO: Track available funds
return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree)
}
return pm.createPaych(ctx, from, to, ensureFree)
}

View File

@ -139,9 +139,6 @@ func (ps *Store) ListChannels() ([]address.Address, error) {
}
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err

View File

@ -17,7 +17,10 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
payapi "github.com/filecoin-project/go-lotus/node/impl/paych"
"github.com/filecoin-project/go-lotus/paych"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
@ -25,6 +28,9 @@ var log = logging.Logger("retrieval")
type Client struct {
h host.Host
pmgr *paych.Manager
payapi payapi.PaychAPI
}
func NewClient(h host.Host) *Client {
@ -70,11 +76,18 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.
}
type clientStream struct {
payapi payapi.PaychAPI
stream network.Stream
root cid.Cid
size types.BigInt
offset uint64
paych address.Address
lane uint64
total types.BigInt
transferred types.BigInt
windowSize uint64 // how much we "trust" the peer
verifier BlockVerifier
}
@ -91,7 +104,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address, out io.Writer) error {
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error {
s, err := c.h.NewStream(ctx, miner, ProtocolID)
if err != nil {
return err
@ -102,12 +115,28 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
// TODO: Support in handler
// TODO: Allow client to specify this
paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total)
if err != nil {
return err
}
lane, err := c.pmgr.AllocateLane(paych)
if err != nil {
return err
}
cst := clientStream{
payapi: c.payapi,
stream: s,
root: root,
size: types.NewInt(size),
offset: initialOffset,
paych: paych,
lane: lane,
total: total,
transferred: types.NewInt(0),
windowSize: build.UnixfsChunkSize,
verifier: &UnixFs0Verifier{Root: root},
}
@ -119,7 +148,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
}
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
err := cst.doOneExchange(toFetch, out)
err := cst.doOneExchange(ctx, toFetch, out)
if err != nil {
return err
}
@ -130,9 +159,17 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
return nil
}
func (cst *clientStream) doOneExchange(toFetch uint64, out io.Writer) error {
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error {
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
payment, err := cst.setupPayment(ctx, payAmount)
if err != nil {
return err
}
deal := DealProposal{
Ref: cst.root,
Payment: payment,
Ref: cst.root,
Params: RetParams{
Unixfs0: &Unixfs0Offer{
Offset: cst.offset,
@ -221,3 +258,20 @@ func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64
return 1, nil
}
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
amount := types.BigAdd(cst.transferred, toSend)
sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane)
if err != nil {
return api.PaymentInfo{}, err
}
cst.transferred = amount
return api.PaymentInfo{
Channel: cst.paych,
ChannelMessage: nil,
Voucher: sv,
}, nil
}

View File

@ -1,6 +1,7 @@
package retrieval
import (
"github.com/filecoin-project/go-lotus/api"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -61,9 +62,10 @@ type RetParams struct {
}
type DealProposal struct {
Payment api.PaymentInfo
Ref cid.Cid
Params RetParams
// TODO: payment
}
type DealResponse struct {