Merge pull request #7883 from filecoin-project/feat/paych-avail-reuse

feat: paychmgr: Support paych funding (a.k.a. fast paid retrieval)
This commit is contained in:
Łukasz Magiera 2022-03-02 13:28:12 +00:00 committed by GitHub
commit b8473aeedd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1204 additions and 136 deletions

View File

@ -689,7 +689,17 @@ type FullNode interface {
// MethodGroup: Paych
// The Paych methods are for interacting with and managing payment channels
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign
// PaychGet gets or creates a payment channel between address pair
// The specified amount will be reserved for use. If there aren't enough non-reserved funds
// available, funds will be added through an on-chain message.
// - When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic
// channel creation/funds adding). If the operation can't be performed without sending a message an error will be
// returned. Note that even when this option is specified, this call can be blocked by previous operations on the
// channel waiting for on-chain operations.
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts PaychGetOpts) (*ChannelInfo, error) //perm:sign
// PaychFund gets or creates a payment channel between address pair.
// The specified amount will be added to the channel through on-chain send for future use
PaychFund(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign
PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign
PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign
@ -828,6 +838,10 @@ const (
PCHOutbound
)
type PaychGetOpts struct {
OffChain bool
}
type PaychStatus struct {
ControlAddr address.Address
Direction PCHDir
@ -845,16 +859,23 @@ type ChannelAvailableFunds struct {
From address.Address
// To is the to address of the channel
To address.Address
// ConfirmedAmt is the amount of funds that have been confirmed on-chain
// for the channel
// ConfirmedAmt is the total amount of funds that have been confirmed on-chain for the channel
ConfirmedAmt types.BigInt
// PendingAmt is the amount of funds that are pending confirmation on-chain
PendingAmt types.BigInt
// NonReservedAmt is part of ConfirmedAmt that is available for use (e.g. when the payment channel was pre-funded)
NonReservedAmt types.BigInt
// PendingAvailableAmt is the amount of funds that are pending confirmation on-chain that will become available once confirmed
PendingAvailableAmt types.BigInt
// PendingWaitSentinel can be used with PaychGetWaitReady to wait for
// confirmation of pending funds
PendingWaitSentinel *cid.Cid
// QueuedAmt is the amount that is queued up behind a pending request
QueuedAmt types.BigInt
// VoucherRedeemedAmt is the amount that is redeemed by vouchers on-chain
// and in the local datastore
VoucherReedeemedAmt types.BigInt

View File

@ -1975,19 +1975,34 @@ func (mr *MockFullNodeMockRecorder) PaychCollect(arg0, arg1 interface{}) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychCollect", reflect.TypeOf((*MockFullNode)(nil).PaychCollect), arg0, arg1)
}
// PaychGet mocks base method.
func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int) (*api.ChannelInfo, error) {
// PaychFund mocks base method.
func (m *MockFullNode) PaychFund(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int) (*api.ChannelInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3)
ret := m.ctrl.Call(m, "PaychFund", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*api.ChannelInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PaychFund indicates an expected call of PaychFund.
func (mr *MockFullNodeMockRecorder) PaychFund(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychFund", reflect.TypeOf((*MockFullNode)(nil).PaychFund), arg0, arg1, arg2, arg3)
}
// PaychGet mocks base method.
func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int, arg4 api.PaychGetOpts) (*api.ChannelInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(*api.ChannelInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PaychGet indicates an expected call of PaychGet.
func (mr *MockFullNodeMockRecorder) PaychGet(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
func (mr *MockFullNodeMockRecorder) PaychGet(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychGet", reflect.TypeOf((*MockFullNode)(nil).PaychGet), arg0, arg1, arg2, arg3)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychGet", reflect.TypeOf((*MockFullNode)(nil).PaychGet), arg0, arg1, arg2, arg3, arg4)
}
// PaychGetWaitReady mocks base method.

View File

@ -306,7 +306,9 @@ type FullNodeStruct struct {
PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychFund func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"`
@ -2187,14 +2189,25 @@ func (s *FullNodeStub) PaychCollect(p0 context.Context, p1 address.Address) (cid
return *new(cid.Cid), ErrNotSupported
}
func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
func (s *FullNodeStruct) PaychFund(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
if s.Internal.PaychFund == nil {
return nil, ErrNotSupported
}
return s.Internal.PaychFund(p0, p1, p2, p3)
}
func (s *FullNodeStub) PaychFund(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) {
if s.Internal.PaychGet == nil {
return nil, ErrNotSupported
}
return s.Internal.PaychGet(p0, p1, p2, p3)
return s.Internal.PaychGet(p0, p1, p2, p3, p4)
}
func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) {
return nil, ErrNotSupported
}

View File

@ -337,4 +337,8 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
finish(w.ClientExport(ctx, eref, *ref))
}
func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
return w.FullNode.PaychFund(ctx, from, to, amt)
}
var _ FullNode = &WrapperV1Full{}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -8,7 +8,7 @@ import (
"sort"
"strings"
"github.com/filecoin-project/lotus/api"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/paychmgr"
@ -39,12 +39,15 @@ var paychAddFundsCmd = &cli.Command{
Usage: "Add funds to the payment channel between fromAddress and toAddress. Creates the payment channel if it doesn't already exist.",
ArgsUsage: "[fromAddress toAddress amount]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "restart-retrievals",
Usage: "restart stalled retrieval deals on this payment channel",
Value: true,
},
&cli.BoolFlag{
Name: "reserve",
Usage: "mark funds as reserved",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 3 {
@ -66,7 +69,7 @@ var paychAddFundsCmd = &cli.Command{
return ShowHelp(cctx, fmt.Errorf("parsing amount failed: %s", err))
}
api, closer, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
@ -76,7 +79,14 @@ var paychAddFundsCmd = &cli.Command{
// Send a message to chain to create channel / add funds to existing
// channel
info, err := api.PaychGet(ctx, from, to, types.BigInt(amt))
var info *lapi.ChannelInfo
if cctx.Bool("reserve") {
info, err = api.PaychGet(ctx, from, to, types.BigInt(amt), lapi.PaychGetOpts{
OffChain: false,
})
} else {
info, err = api.PaychFund(ctx, from, to, types.BigInt(amt))
}
if err != nil {
return err
}
@ -163,13 +173,13 @@ var paychStatusCmd = &cli.Command{
},
}
func paychStatus(writer io.Writer, avail *api.ChannelAvailableFunds) {
func paychStatus(writer io.Writer, avail *lapi.ChannelAvailableFunds) {
if avail.Channel == nil {
if avail.PendingWaitSentinel != nil {
fmt.Fprint(writer, "Creating channel\n")
fmt.Fprintf(writer, " From: %s\n", avail.From)
fmt.Fprintf(writer, " To: %s\n", avail.To)
fmt.Fprintf(writer, " Pending Amt: %d\n", avail.PendingAmt)
fmt.Fprintf(writer, " Pending Amt: %s\n", types.FIL(avail.PendingAmt))
fmt.Fprintf(writer, " Wait Sentinel: %s\n", avail.PendingWaitSentinel)
return
}
@ -189,10 +199,12 @@ func paychStatus(writer io.Writer, avail *api.ChannelAvailableFunds) {
{"Channel", avail.Channel.String()},
{"From", avail.From.String()},
{"To", avail.To.String()},
{"Confirmed Amt", fmt.Sprintf("%d", avail.ConfirmedAmt)},
{"Pending Amt", fmt.Sprintf("%d", avail.PendingAmt)},
{"Queued Amt", fmt.Sprintf("%d", avail.QueuedAmt)},
{"Voucher Redeemed Amt", fmt.Sprintf("%d", avail.VoucherReedeemedAmt)},
{"Confirmed Amt", fmt.Sprintf("%s", types.FIL(avail.ConfirmedAmt))},
{"Available Amt", fmt.Sprintf("%s", types.FIL(avail.NonReservedAmt))},
{"Voucher Redeemed Amt", fmt.Sprintf("%s", types.FIL(avail.VoucherReedeemedAmt))},
{"Pending Amt", fmt.Sprintf("%s", types.FIL(avail.PendingAmt))},
{"Pending Available Amt", fmt.Sprintf("%s", types.FIL(avail.PendingAvailableAmt))},
{"Queued Amt", fmt.Sprintf("%s", types.FIL(avail.QueuedAmt))},
}
if avail.PendingWaitSentinel != nil {
nameValues = append(nameValues, []string{
@ -576,7 +588,7 @@ func outputVoucher(w io.Writer, v *paych.SignedVoucher, export bool) error {
}
}
fmt.Fprintf(w, "Lane %d, Nonce %d: %s", v.Lane, v.Nonce, v.Amount.String())
fmt.Fprintf(w, "Lane %d, Nonce %d: %s", v.Lane, v.Nonce, types.FIL(v.Amount))
if export {
fmt.Fprintf(w, "; %s", enc)
}

View File

@ -4061,6 +4061,8 @@ Response:
"To": "f01234",
"ConfirmedAmt": "0",
"PendingAmt": "0",
"NonReservedAmt": "0",
"PendingAvailableAmt": "0",
"PendingWaitSentinel": null,
"QueuedAmt": "0",
"VoucherReedeemedAmt": "0"
@ -4088,6 +4090,8 @@ Response:
"To": "f01234",
"ConfirmedAmt": "0",
"PendingAmt": "0",
"NonReservedAmt": "0",
"PendingAvailableAmt": "0",
"PendingWaitSentinel": null,
"QueuedAmt": "0",
"VoucherReedeemedAmt": "0"

View File

@ -147,6 +147,7 @@
* [PaychAvailableFunds](#PaychAvailableFunds)
* [PaychAvailableFundsByFromTo](#PaychAvailableFundsByFromTo)
* [PaychCollect](#PaychCollect)
* [PaychFund](#PaychFund)
* [PaychGet](#PaychGet)
* [PaychGetWaitReady](#PaychGetWaitReady)
* [PaychList](#PaychList)
@ -4456,6 +4457,8 @@ Response:
"To": "f01234",
"ConfirmedAmt": "0",
"PendingAmt": "0",
"NonReservedAmt": "0",
"PendingAvailableAmt": "0",
"PendingWaitSentinel": null,
"QueuedAmt": "0",
"VoucherReedeemedAmt": "0"
@ -4483,6 +4486,8 @@ Response:
"To": "f01234",
"ConfirmedAmt": "0",
"PendingAmt": "0",
"NonReservedAmt": "0",
"PendingAvailableAmt": "0",
"PendingWaitSentinel": null,
"QueuedAmt": "0",
"VoucherReedeemedAmt": "0"
@ -4508,8 +4513,10 @@ Response:
}
```
### PaychGet
There are not yet any comments for this method.
### PaychFund
PaychFund gets or creates a payment channel between address pair.
The specified amount will be added to the channel through on-chain send for future use
Perms: sign
@ -4532,6 +4539,40 @@ Response:
}
```
### PaychGet
PaychGet gets or creates a payment channel between address pair
The specified amount will be reserved for use. If there aren't enough non-reserved funds
available, funds will be added through an on-chain message.
- When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic
channel creation/funds adding). If the operation can't be performed without sending a message an error will be
returned. Note that even when this option is specified, this call can be blocked by previous operations on the
channel waiting for on-chain operations.
Perms: sign
Inputs:
```json
[
"f01234",
"f01234",
"0",
{
"OffChain": true
}
]
```
Response:
```json
{
"Channel": "f01234",
"WaitSentinel": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
}
```
### PaychGetWaitReady

View File

@ -1363,6 +1363,7 @@ USAGE:
OPTIONS:
--restart-retrievals restart stalled retrieval deals on this payment channel (default: true)
--reserve mark funds as reserved (default: false)
--help, -h show help (default: false)
```

View File

@ -121,6 +121,14 @@
# env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORRETRIEVAL
#SimultaneousTransfersForRetrieval = 20
# Require that retrievals perform no on-chain operations. Paid retrievals
# without existing payment channels with available funds will fail instead
# of automatically performing on-chain operations.
#
# type: bool
# env var: LOTUS_CLIENT_OFFCHAINRETRIEVAL
#OffChainRetrieval = false
[Wallet]
# type: string

2
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.19.0
github.com/filecoin-project/go-fil-markets v1.19.1
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4

4
go.sum
View File

@ -327,8 +327,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.19.0 h1:kap2q2wTM6tfkVO5gMA5DD9GUeTvkDhMfhjCtEwMDM8=
github.com/filecoin-project/go-fil-markets v1.19.0/go.mod h1:qsb3apmo4RSJYCEq40QxVdU7UZospN6nFJLOBHuaIbc=
github.com/filecoin-project/go-fil-markets v1.19.1 h1:o5sziAp8zCsvIg3KYMgIpwm8gyOl4MDzEKEf0Qq5L3U=
github.com/filecoin-project/go-fil-markets v1.19.1/go.mod h1:qsb3apmo4RSJYCEq40QxVdU7UZospN6nFJLOBHuaIbc=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -65,7 +65,9 @@ func TestPaymentChannelsAPI(t *testing.T) {
require.NoError(t, err)
channelAmt := int64(7000)
channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt))
channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt), api.PaychGetOpts{
OffChain: false,
})
require.NoError(t, err)
channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.WaitSentinel)

View File

@ -144,10 +144,10 @@ func TestPaymentChannelStatus(t *testing.T) {
require.True(t, stateCreating || stateCreated)
channelAmtAtto := types.BigMul(types.NewInt(channelAmt), types.NewInt(build.FilecoinPrecision))
channelAmtStr := fmt.Sprintf("%d", channelAmtAtto)
channelAmtStr := fmt.Sprintf("%s", types.FIL(channelAmtAtto))
if stateCreating {
// If we're in the creating state (most likely) the amount should be pending
require.Regexp(t, regexp.MustCompile("Pending.*"+channelAmtStr), out)
require.Regexp(t, regexp.MustCompile("Pending Amt.*"+channelAmtStr), out)
}
// Wait for create channel to complete
@ -170,7 +170,7 @@ func TestPaymentChannelStatus(t *testing.T) {
out = creatorCLI.RunCmd("paych", "status", chstr)
fmt.Println(out)
voucherAmtAtto := types.BigMul(types.NewInt(voucherAmt), types.NewInt(build.FilecoinPrecision))
voucherAmtStr := fmt.Sprintf("%d", voucherAmtAtto)
voucherAmtStr := fmt.Sprintf("%s", types.FIL(voucherAmtAtto))
// Output should include voucher amount
require.Regexp(t, regexp.MustCompile("Voucher.*"+voucherAmtStr), out)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
@ -17,6 +18,8 @@ import (
)
type retrievalClientNode struct {
forceOffChain bool
chainAPI full.ChainAPI
payAPI payapi.PaychAPI
stateAPI full.StateAPI
@ -24,8 +27,13 @@ type retrievalClientNode struct {
// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the
// Lotus Node
func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode {
return &retrievalClientNode{payAPI: payAPI, chainAPI: chainAPI, stateAPI: stateAPI}
func NewRetrievalClientNode(forceOffChain bool, payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode {
return &retrievalClientNode{
forceOffChain: forceOffChain,
chainAPI: chainAPI,
payAPI: payAPI,
stateAPI: stateAPI,
}
}
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
@ -34,10 +42,14 @@ func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stat
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable)
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{
OffChain: rcn.forceOffChain,
})
if err != nil {
log.Errorw("paych get failed", "error", err)
return address.Undef, cid.Undef, err
}
return ci.Channel, ci.WaitSentinel, nil
}

View File

@ -121,7 +121,7 @@ var ChainNode = Options(
// Markets (retrieval)
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
Override(new(retrievalmarket.BlockstoreAccessor), modules.RetrievalBlockstoreAccessor),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(false)),
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
// Markets (storage)
@ -221,6 +221,8 @@ func ConfigFullNode(c interface{}) Option {
),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(cfg.Client.OffChainRetrieval)),
If(cfg.Wallet.RemoteBackend != "",
Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)),
),

View File

@ -105,6 +105,14 @@ and storage providers for storage deals`,
Comment: `The maximum number of simultaneous data transfers between the client
and storage providers for retrieval deals`,
},
{
Name: "OffChainRetrieval",
Type: "bool",
Comment: `Require that retrievals perform no on-chain operations. Paid retrievals
without existing payment channels with available funds will fail instead
of automatically performing on-chain operations.`,
},
},
"Common": []DocField{
{

View File

@ -387,6 +387,11 @@ type Client struct {
// The maximum number of simultaneous data transfers between the client
// and storage providers for retrieval deals
SimultaneousTransfersForRetrieval uint64
// Require that retrievals perform no on-chain operations. Paid retrievals
// without existing payment channels with available funds will fail instead
// of automatically performing on-chain operations.
OffChainRetrieval bool
}
type Wallet struct {

View File

@ -22,8 +22,26 @@ type PaychAPI struct {
PaychMgr *paychmgr.Manager
}
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts api.PaychGetOpts) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, paychmgr.GetOpts{
Reserve: true,
OffChain: opts.OffChain,
})
if err != nil {
return nil, err
}
return &api.ChannelInfo{
Channel: ch,
WaitSentinel: mcid,
}, nil
}
func (a *PaychAPI) PaychFund(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, paychmgr.GetOpts{
Reserve: false,
OffChain: false,
})
if err != nil {
return nil, err
}
@ -55,7 +73,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
// TODO: Fix free fund tracking in PaychGet
// TODO: validate voucher spec before locking funds
ch, err := a.PaychGet(ctx, from, to, amount)
ch, err := a.PaychGet(ctx, from, to, amount, api.PaychGetOpts{OffChain: false})
if err != nil {
return nil, err
}

View File

@ -202,26 +202,28 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))
client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor)
if err != nil {
return nil, err
}
client.OnReady(marketevents.ReadyLogger("retrieval client"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))
client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor)
if err != nil {
return nil, err
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
return client.Start(ctx)
},
})
return client, nil
}
client.OnReady(marketevents.ReadyLogger("retrieval client"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
return client.Start(ctx)
},
})
return client, nil
}

View File

@ -196,7 +196,7 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{172}); err != nil {
if _, err := w.Write([]byte{174}); err != nil {
return err
}
@ -346,6 +346,38 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.AvailableAmount (big.Int) (struct)
if len("AvailableAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"AvailableAmount\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("AvailableAmount"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("AvailableAmount")); err != nil {
return err
}
if err := t.AvailableAmount.MarshalCBOR(w); err != nil {
return err
}
// t.PendingAvailableAmount (big.Int) (struct)
if len("PendingAvailableAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PendingAvailableAmount\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingAvailableAmount"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("PendingAvailableAmount")); err != nil {
return err
}
if err := t.PendingAvailableAmount.MarshalCBOR(w); err != nil {
return err
}
// t.PendingAmount (big.Int) (struct)
if len("PendingAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PendingAmount\" was too long")
@ -577,6 +609,26 @@ func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) error {
return xerrors.Errorf("unmarshaling t.Amount: %w", err)
}
}
// t.AvailableAmount (big.Int) (struct)
case "AvailableAmount":
{
if err := t.AvailableAmount.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.AvailableAmount: %w", err)
}
}
// t.PendingAvailableAmount (big.Int) (struct)
case "PendingAvailableAmount":
{
if err := t.PendingAvailableAmount.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.PendingAvailableAmount: %w", err)
}
}
// t.PendingAmount (big.Int) (struct)
case "PendingAmount":

View File

@ -101,13 +101,22 @@ func (pm *Manager) Stop() error {
return nil
}
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
type GetOpts struct {
Reserve bool
OffChain bool
}
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, opts GetOpts) (address.Address, cid.Cid, error) {
if !opts.Reserve && opts.OffChain {
return address.Undef, cid.Undef, xerrors.Errorf("can't fund payment channels without on-chain operations")
}
chanAccessor, err := pm.accessorByFromTo(from, to)
if err != nil {
return address.Undef, cid.Undef, err
}
return chanAccessor.getPaych(ctx, amt)
return chanAccessor.getPaych(ctx, amt, opts)
}
func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) {
@ -142,6 +151,8 @@ func (pm *Manager) AvailableFundsByFromTo(ctx context.Context, from address.Addr
To: to,
ConfirmedAmt: types.NewInt(0),
PendingAmt: types.NewInt(0),
NonReservedAmt: types.NewInt(0),
PendingAvailableAmt: types.NewInt(0),
PendingWaitSentinel: nil,
QueuedAmt: types.NewInt(0),
VoucherReedeemedAmt: types.NewInt(0),

View File

@ -106,7 +106,7 @@ func (ca *channelAccessor) outboundActiveByFromTo(ctx context.Context, from, to
ca.lk.Lock()
defer ca.lk.Unlock()
return ca.store.OutboundActiveByFromTo(ctx, from, to)
return ca.store.OutboundActiveByFromTo(ctx, ca.api, from, to)
}
// createVoucher creates a voucher with the given specification, setting its

View File

@ -475,18 +475,18 @@ func TestAddVoucherInboundWalletKey(t *testing.T) {
toAcct := tutils.NewActorAddr(t, "toAct")
// Create an actor for the channel in state
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock := newMockManagerAPI()
mock.setAccountAddress(fromAcct, from)
mock.setAccountAddress(toAcct, to)
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
// Create a manager

View File

@ -25,6 +25,23 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
var onChainReserve = GetOpts{
Reserve: true,
OffChain: false,
}
var onChainNoReserve = GetOpts{
Reserve: false,
OffChain: false,
}
var offChainReserve = GetOpts{
Reserve: true,
OffChain: true,
}
var offChainNoReserve = GetOpts{
Reserve: false,
OffChain: true,
}
func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt {
createChannelRet := init2.ExecReturn{
IDAddress: ch,
@ -55,7 +72,7 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
require.NoError(t, err)
amt := big.NewInt(10)
ch, mcid, err := mgr.GetPaych(ctx, from, to, amt)
ch, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
require.Equal(t, address.Undef, ch)
@ -65,6 +82,42 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
require.Equal(t, amt, pushedMsg.Message.Value)
}
func TestPaychGetOffchainNoReserveFails(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt, offChainNoReserve)
require.Error(t, err)
}
func TestPaychGetCreateOffchainReserveFails(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt, offChainReserve)
require.Error(t, err)
}
// TestPaychGetCreateChannelThenAddFunds tests creating a channel and then
// adding funds to it
func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
@ -79,12 +132,20 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Should have no channels yet (message sent but channel not created)
@ -101,7 +162,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
// 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
// 4. This GetPaych should return after create channel from first
// GetPaych completes
@ -155,6 +216,82 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
<-done
}
func TestPaychGetCreatePrefundedChannelThenAddFunds(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainNoReserve)
require.NoError(t, err)
// Should have no channels yet (message sent but channel not created)
cis, err := mgr.ListChannels(ctx)
require.NoError(t, err)
require.Len(t, cis, 0)
// 1. Set up create channel response (sent in response to WaitForMsg())
response := testChannelResponse(t, ch)
done := make(chan struct{})
go func() {
defer close(done)
// 2. Request add funds - shouldn't block
amt2 := big.NewInt(3)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, offChainReserve)
// 4. This GetPaych should return after create channel from first
// GetPaych completes
require.NoError(t, err)
// Expect the channel to be the same
require.Equal(t, ch, ch2)
require.Equal(t, cid.Undef, addFundsMsgCid)
// Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels(ctx)
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Amount should be amount sent to first GetPaych (to create
// channel).
// PendingAmount should be zero, AvailableAmount should be Amount minus what we requested
ci, err := mgr.GetChannelInfo(ctx, ch)
require.NoError(t, err)
require.EqualValues(t, 10, ci.Amount.Int64())
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.EqualValues(t, 7, ci.AvailableAmount.Int64())
require.Nil(t, ci.CreateMsg)
require.Nil(t, ci.AddFundsMsg)
}()
// 3. Send create channel response
mock.receiveMsgResponse(createMsgCid, response)
<-done
}
// TestPaychGetCreateChannelWithErrorThenCreateAgain tests that if an
// operation is queued up behind a create channel operation, and the create
// channel fails, then the waiting operation can succeed.
@ -174,7 +311,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
// Send create message for a channel
amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// 1. Set up create channel response (sent in response to WaitForMsg())
@ -192,7 +329,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
// Because first channel create fails, this request
// should be for channel create again.
amt2 := big.NewInt(5)
ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
require.Equal(t, address.Undef, ch2)
@ -240,7 +377,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
// Send create message for a channel
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Send error create channel response
@ -251,7 +388,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
// Send create message for a channel again
amt2 := big.NewInt(7)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
// Send success create channel response
@ -288,12 +425,20 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Send success create channel response
@ -302,7 +447,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send add funds message for channel
amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
// Send error add funds response
@ -329,7 +474,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send add funds message for channel again
amt3 := big.NewInt(2)
_, mcid3, err := mgr.GetPaych(ctx, from, to, amt3)
_, mcid3, err := mgr.GetPaych(ctx, from, to, amt3, onChainReserve)
require.NoError(t, err)
// Send success add funds response
@ -375,7 +520,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
// Send create message for a channel with value 10
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Simulate shutting down system
@ -385,6 +530,14 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
mock2 := newMockManagerAPI()
defer mock2.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock2.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr2, err := newManager(store, mock2)
require.NoError(t, err)
@ -402,7 +555,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
// 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2)
ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2, onChainReserve)
// 4. This GetPaych should return after create channel from first
// GetPaych completes
@ -450,12 +603,20 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
mock := newMockManagerAPI()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Send success create channel response
@ -464,7 +625,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
// Send add funds message for channel
amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
// Simulate shutting down system
@ -474,6 +635,8 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
mock2 := newMockManagerAPI()
defer mock2.close()
mock2.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr2, err := newManager(store, mock2)
require.NoError(t, err)
@ -510,19 +673,27 @@ func TestPaychGetWait(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
expch := tutils.NewIDAddr(t, 100)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(expch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// 1. Get
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
expch := tutils.NewIDAddr(t, 100)
go func() {
// 3. Send response
response := testChannelResponse(t, expch)
@ -542,7 +713,7 @@ func TestPaychGetWait(t *testing.T) {
// Request add funds
amt2 := big.NewInt(15)
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2)
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
go func() {
@ -577,7 +748,7 @@ func TestPaychGetWaitErr(t *testing.T) {
// 1. Create channel
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
done := make(chan address.Address)
@ -624,7 +795,7 @@ func TestPaychGetWaitCtx(t *testing.T) {
require.NoError(t, err)
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// When the context is cancelled, should unblock wait
@ -651,12 +822,20 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
@ -674,7 +853,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1)
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve)
require.NoError(t, err)
}()
@ -683,7 +862,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2)
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
@ -736,6 +915,480 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
require.Equal(t, types.BigAdd(addFundsAmt1, addFundsAmt2), addFundsMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReserve(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 prefunds
addFundsAmt2 := big.NewInt(3) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.Equal(t, addFundsMcid1, addFundsMcid2)
// Send success add funds response
mock.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
// Wait for add funds response
addFundsCh, err := mgr.GetPaychWaitReady(ctx, addFundsMcid1)
require.NoError(t, err)
require.Equal(t, ch, addFundsCh)
// Make sure that one create channel message and one add funds message was
// sent
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount is the sum of the individual
// amounts
addFundsMsg := mock.pushedMessages(addFundsMcid1)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, types.BigAdd(addFundsAmt1, addFundsAmt2), addFundsMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 prefunds
addFundsAmt2 := big.NewInt(3) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.NotEqual(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Send success add funds response
mock.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
// Wait for add funds response
addFundsCh, err := mgr.GetPaychWaitReady(ctx, addFundsMcid1)
require.NoError(t, err)
require.Equal(t, ch, addFundsCh)
// Make sure that one create channel message and one add funds message was
// sent
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount is the sum of the individual
// amounts
addFundsMsg := mock.pushedMessages(addFundsMcid1)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReservePrefundedOneOffchain(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(3) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.Equal(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 1, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReservePrefundedBothOffchainOneFail(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(6) // 2 reserves too much
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve)
require.Error(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.Equal(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 1, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReserveOneOffchainOneFail(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(6) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve)
require.Error(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.NotEqual(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount is the sum of the individual
// amounts
addFundsMsg := mock.pushedMessages(addFundsMcid1)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value)
}
// TestPaychGetMergeAddFundsCtxCancelOne tests that when a queued add funds
// request is cancelled, its amount is removed from the total merged add funds
func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
@ -750,12 +1403,20 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
mock := newMockManagerAPI()
defer mock.close()
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: types.NewInt(20),
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
@ -772,7 +1433,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1)
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1, onChainReserve)
}()
go func() {
@ -780,7 +1441,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2)
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
@ -853,7 +1514,7 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
@ -868,14 +1529,14 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5))
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5), onChainReserve)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
_, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3))
_, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3), onChainReserve)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
@ -941,7 +1602,7 @@ func TestPaychAvailableFunds(t *testing.T) {
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Available funds should reflect create channel message sent
@ -966,7 +1627,7 @@ func TestPaychAvailableFunds(t *testing.T) {
// Request add funds - should block until create channel has completed
var err error
_, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt)
_, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt, onChainReserve)
require.NoError(t, err)
}()

View File

@ -47,7 +47,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) {
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Send create channel response
@ -83,7 +83,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) {
require.Equal(t, res.Shortfall, excessAmt)
// Add funds so as to cover the voucher shortfall
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt)
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt, onChainReserve)
require.NoError(t, err)
// Trigger add funds confirmation

View File

@ -30,7 +30,7 @@ func TestPaychSettle(t *testing.T) {
require.NoError(t, err)
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err)
// Send channel create response
@ -50,7 +50,7 @@ func TestPaychSettle(t *testing.T) {
// (should create a new channel because the previous channel
// is settling)
amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err)
require.NotEqual(t, cid.Undef, mcid2)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"github.com/ipfs/go-cid"
@ -32,18 +33,20 @@ type fundsReq struct {
ctx context.Context
promise chan *paychFundsRes
amt types.BigInt
opts GetOpts
lk sync.Mutex
// merge parent, if this req is part of a merge
merge *mergedFundsReq
}
func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq {
promise := make(chan *paychFundsRes)
func newFundsReq(ctx context.Context, amt types.BigInt, opts GetOpts) *fundsReq {
promise := make(chan *paychFundsRes, 1)
return &fundsReq{
ctx: ctx,
promise: promise,
amt: amt,
opts: opts,
}
}
@ -104,6 +107,19 @@ func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq {
r.setMergeParent(m)
}
sort.Slice(m.reqs, func(i, j int) bool {
if m.reqs[i].opts.OffChain != m.reqs[j].opts.OffChain { // off-chain first
return m.reqs[i].opts.OffChain
}
if m.reqs[i].opts.Reserve != m.reqs[j].opts.Reserve { // non-reserve after off-chain
return m.reqs[i].opts.Reserve
}
// sort by amount asc (reducing latency for smaller requests)
return m.reqs[i].amt.LessThan(m.reqs[j].amt)
})
// If the requests were all cancelled while being added, cancel the context
// immediately
m.checkActive()
@ -135,18 +151,97 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
}
// sum is the sum of the amounts in all requests in the merge
func (m *mergedFundsReq) sum() types.BigInt {
func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
sum := types.NewInt(0)
avail := types.NewInt(0)
for _, r := range m.reqs {
if r.isActive() {
sum = types.BigAdd(sum, r.amt)
if !r.opts.Reserve {
avail = types.BigAdd(avail, r.amt)
}
}
}
return sum
return sum, avail
}
// completeAmount completes first non-reserving requests up to the available amount
func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *ChannelInfo) (*paychFundsRes, types.BigInt, types.BigInt) {
used, failed := types.NewInt(0), types.NewInt(0)
next := 0
// order: [offchain+reserve, !offchain+reserve, !offchain+!reserve]
for i, r := range m.reqs {
if !r.opts.Reserve {
// non-reserving request are put after reserving requests, so we are done here
break
}
// don't try to fill inactive requests
if !r.isActive() {
continue
}
if r.amt.GreaterThan(types.BigSub(avail, used)) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
if r.opts.OffChain {
// can't fill, so OffChain want an error
if r.isActive() {
failed = types.BigAdd(failed, r.amt)
r.onComplete(&paychFundsRes{
channel: *channelInfo.Channel,
err: xerrors.Errorf("not enough funds available in the payment channel %s; add funds with 'lotus paych add-funds %s %s %s'", channelInfo.Channel, channelInfo.from(), channelInfo.to(), types.FIL(r.amt).Unitless()),
})
}
next = i + 1
continue
}
break
}
used = types.BigAdd(used, r.amt)
r.onComplete(&paychFundsRes{channel: *channelInfo.Channel})
next = i + 1
}
m.reqs = m.reqs[next:]
if len(m.reqs) == 0 {
return &paychFundsRes{channel: *channelInfo.Channel}, used, failed
}
return nil, used, failed
}
func (m *mergedFundsReq) failOffChainNoChannel(from, to address.Address) (*paychFundsRes, types.BigInt) {
next := 0
freed := types.NewInt(0)
for i, r := range m.reqs {
if !r.opts.OffChain {
break
}
freed = types.BigAdd(freed, r.amt)
if !r.isActive() {
continue
}
r.onComplete(&paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s %s'", from, to, types.FIL(r.amt).Unitless())})
next = i + 1
}
m.reqs = m.reqs[next:]
if len(m.reqs) == 0 {
return &paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s 0'", from, to)}, freed
}
return nil, freed
}
// getPaych ensures that a channel exists between the from and to addresses,
// and adds the given amount of funds.
// and reserves (or adds as available) the given amount of funds.
// If the channel does not exist a create channel message is sent and the
// message CID is returned.
// If the channel does exist an add funds message is sent and both the channel
@ -156,9 +251,9 @@ func (m *mergedFundsReq) sum() types.BigInt {
// address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still
// be attempted.
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) {
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, opts GetOpts) (address.Address, cid.Cid, error) {
// Add the request to add funds to a queue and wait for the result
freq := newFundsReq(ctx, amt)
freq := newFundsReq(ctx, amt, opts)
ca.enqueue(ctx, freq)
select {
case res := <-freq.promise:
@ -195,14 +290,14 @@ func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) (
// For example if there are pending requests for 3, 2, 4 then
// amt = 3 + 2 + 4 = 9
merged := newMergedFundsReq(ca.fundsReqQueue)
amt := merged.sum()
amt, avail := merged.sum()
if amt.IsZero() {
// Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq
return ca.currentAvailableFunds(ctx, channelID, amt)
}
res := ca.processTask(merged.ctx, amt)
res := ca.processTask(merged, amt, avail)
// If the task is waiting on an external event (eg something to appear on
// chain) it will return nil
@ -322,6 +417,8 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID
To: channelInfo.to(),
ConfirmedAmt: channelInfo.Amount,
PendingAmt: channelInfo.PendingAmount,
NonReservedAmt: channelInfo.AvailableAmount,
PendingAvailableAmt: channelInfo.PendingAvailableAmount,
PendingWaitSentinel: waitSentinel,
QueuedAmt: queuedAmt,
VoucherReedeemedAmt: totalRedeemed,
@ -333,18 +430,26 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID
// Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain)
func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes {
func (ca *channelAccessor) processTask(merged *mergedFundsReq, amt, avail types.BigInt) *paychFundsRes {
ctx := merged.ctx
// Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel.
channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.from, ca.to)
channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.api, ca.from, ca.to)
if err != nil && err != ErrChannelNotTracked {
return &paychFundsRes{err: err}
}
// If a channel has not yet been created, create one.
if channelInfo == nil {
mcid, err := ca.createPaych(ctx, amt)
res, freed := merged.failOffChainNoChannel(ca.from, ca.to)
if res != nil {
return res
}
amt = types.BigSub(amt, freed)
mcid, err := ca.createPaych(ctx, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
@ -366,9 +471,16 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p
return nil
}
// Try to fill requests using available funds, without going to the chain
res, amt := ca.completeAvailable(ctx, merged, channelInfo, amt, avail)
if res != nil || amt.LessThanEqual(types.NewInt(0)) {
return res
}
// We need to add more funds, so send an add funds message to
// cover the amount for this request
mcid, err := ca.addFunds(ctx, channelInfo, amt)
mcid, err := ca.addFunds(ctx, channelInfo, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
@ -376,7 +488,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p
}
// createPaych sends a message to create the channel and returns the message cid
func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) {
func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) {
mb, err := ca.messageBuilder(ctx, ca.from)
if err != nil {
return cid.Undef, err
@ -393,7 +505,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (c
mcid := smsg.Cid()
// Create a new channel in the store
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt)
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail)
if err != nil {
log.Errorf("creating channel: %s", err)
return cid.Undef, err
@ -452,15 +564,41 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Channel = &decodedReturn.RobustAddress
channelInfo.Amount = channelInfo.PendingAmount
channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.CreateMsg = nil
})
return nil
}
// completeAvailable fills reserving fund requests using already available funds, without interacting with the chain
func (ca *channelAccessor) completeAvailable(ctx context.Context, merged *mergedFundsReq, channelInfo *ChannelInfo, amt, av types.BigInt) (*paychFundsRes, types.BigInt) {
toReserve := types.BigSub(amt, av)
avail := types.NewInt(0)
// reserve at most what we need
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
avail = ci.AvailableAmount
if avail.GreaterThan(toReserve) {
avail = toReserve
}
ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail)
})
res, used, failed := merged.completeAmount(avail, channelInfo)
// return any unused reserved funds (e.g. from cancelled requests)
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used))
})
return res, types.BigSub(amt, types.BigAdd(used, failed))
}
// addFunds sends a message to add funds to the channel and returns the message cid
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt) (*cid.Cid, error) {
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
msg := &types.Message{
To: *channelInfo.Channel,
From: channelInfo.Control,
@ -477,6 +615,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf
// Store the add funds message CID on the channel
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.PendingAmount = amt
ci.PendingAvailableAmount = avail
ci.AddFundsMsg = &mcid
})
@ -492,6 +631,8 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf
return &mcid, nil
}
// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitAddFundsMsg(ctx, channelID, mcid)
@ -514,6 +655,7 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})
@ -526,7 +668,9 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string
// Store updated amount
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount)
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})

View File

@ -6,9 +6,8 @@ import (
"errors"
"fmt"
"golang.org/x/xerrors"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
@ -74,6 +73,10 @@ type ChannelInfo struct {
// has locally been added to the channel. It should reflect the channel's
// Balance on chain as long as all operations occur on the same datastore.
Amount types.BigInt
// AvailableAmount indicates how much afil is non-reserved
AvailableAmount types.BigInt
// PendingAvailableAmount is available amount that we're awaiting confirmation of
PendingAvailableAmount types.BigInt
// PendingAmount is the amount that we're awaiting confirmation of
PendingAmount types.BigInt
// CreateMsg is the CID of a pending create message (while waiting for confirmation)
@ -376,7 +379,7 @@ func (ps *Store) GetMessage(ctx context.Context, mcid cid.Cid) (*MsgInfo, error)
// OutboundActiveByFromTo looks for outbound channels that have not been
// settled, with the given from / to addresses
func (ps *Store) OutboundActiveByFromTo(ctx context.Context, from address.Address, to address.Address) (*ChannelInfo, error) {
func (ps *Store) OutboundActiveByFromTo(ctx context.Context, sma stateManagerAPI, from address.Address, to address.Address) (*ChannelInfo, error) {
return ps.findChan(ctx, func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
@ -384,6 +387,21 @@ func (ps *Store) OutboundActiveByFromTo(ctx context.Context, from address.Addres
if ci.Settling {
return false
}
if ci.Channel != nil {
_, st, err := sma.GetPaychState(ctx, *ci.Channel, nil)
if err != nil {
return false
}
sat, err := st.SettlingAt()
if err != nil {
return false
}
if sat != 0 {
return false
}
}
return ci.Control == from && ci.Target == to
})
}
@ -416,14 +434,15 @@ func (ps *Store) ByChannelID(ctx context.Context, channelID string) (*ChannelInf
}
// CreateChannel creates an outbound channel for the given from / to
func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt, avail types.BigInt) (*ChannelInfo, error) {
ci := &ChannelInfo{
Direction: DirOutbound,
NextLane: 0,
Control: from,
Target: to,
CreateMsg: &createMsgCid,
PendingAmount: amt,
Direction: DirOutbound,
NextLane: 0,
Control: from,
Target: to,
CreateMsg: &createMsgCid,
PendingAmount: amt,
PendingAvailableAmount: avail,
}
// Save the new channel
@ -497,5 +516,11 @@ func unmarshallChannelInfo(stored *ChannelInfo, value []byte) (*ChannelInfo, err
stored.Channel = nil
}
// backwards compat
if stored.AvailableAmount.Int == nil {
stored.AvailableAmount = types.NewInt(0)
stored.PendingAvailableAmount = types.NewInt(0)
}
return stored, nil
}

View File

@ -207,7 +207,9 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes
t.RecordMessage("my balance: %d", balance)
t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance, api.PaychGetOpts{
OffChain: false,
})
if err != nil {
return fmt.Errorf("failed to create payment channel: %w", err)
}
@ -230,7 +232,9 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes
// we wait for 2 confirmations, so we have the assurance the channel is tracked.
t.RecordMessage("reloading paych; now it should have an address")
channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero())
channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero(), api.PaychGetOpts{
OffChain: false,
})
if err != nil {
return fmt.Errorf("failed to reload payment channel: %w", err)
}

View File

@ -9,7 +9,7 @@ require (
github.com/drand/drand v1.3.0
github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-fil-markets v1.19.0
github.com/filecoin-project/go-fil-markets v1.19.1
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-state-types v0.1.3
github.com/filecoin-project/go-storedcounter v0.1.0

View File

@ -418,8 +418,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.19.0 h1:kap2q2wTM6tfkVO5gMA5DD9GUeTvkDhMfhjCtEwMDM8=
github.com/filecoin-project/go-fil-markets v1.19.0/go.mod h1:qsb3apmo4RSJYCEq40QxVdU7UZospN6nFJLOBHuaIbc=
github.com/filecoin-project/go-fil-markets v1.19.1 h1:o5sziAp8zCsvIg3KYMgIpwm8gyOl4MDzEKEf0Qq5L3U=
github.com/filecoin-project/go-fil-markets v1.19.1/go.mod h1:qsb3apmo4RSJYCEq40QxVdU7UZospN6nFJLOBHuaIbc=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
@ -1411,8 +1411,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd
github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0=
github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4=
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o=
github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw=
github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys=
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY=
@ -2941,4 +2942,4 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=

View File

@ -124,7 +124,9 @@ func runSender(ctx context.Context, t *testkit.TestEnvironment, clients []*testk
time.Sleep(20 * time.Second)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt, api.PaychGetOpts{
OffChain: false,
})
if err != nil {
return fmt.Errorf("failed to create payment channel: %w", err)
}