paych: option to force off-chain get

This commit is contained in:
Łukasz Magiera 2022-01-06 16:04:39 +01:00
parent 5b585c0285
commit 8b19b84140
16 changed files with 468 additions and 81 deletions

View File

@ -690,10 +690,14 @@ type FullNode interface {
// The Paych methods are for interacting with and managing payment channels // The Paych methods are for interacting with and managing payment channels
// PaychGet gets or creates a payment channel between address pair // PaychGet gets or creates a payment channel between address pair
// - If reserve is false, the specified amount will be added to the channel through on-chain send for future use // - If opts.Reserve is false, the specified amount will be added to the channel through on-chain send for future use
// - If reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds // - If opts.Reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds
// available, funds will be added through an on-chain message. // available, funds will be added through an on-chain message.
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*ChannelInfo, error) //perm:sign // - When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic
// channel creation/funds adding). If the operation can't be performed without sending a message an error will be
// returned. Note that even when this option is specified, this call can be blocked by previous operations on the
// channel waiting for on-chain operations.
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts PaychGetOpts) (*ChannelInfo, error) //perm:sign
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign
PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign
PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign
@ -832,6 +836,11 @@ const (
PCHOutbound PCHOutbound
) )
type PaychGetOpts struct {
Reserve bool
OffChain bool
}
type PaychStatus struct { type PaychStatus struct {
ControlAddr address.Address ControlAddr address.Address
Direction PCHDir Direction PCHDir

View File

@ -1976,7 +1976,7 @@ func (mr *MockFullNodeMockRecorder) PaychCollect(arg0, arg1 interface{}) *gomock
} }
// PaychGet mocks base method. // PaychGet mocks base method.
func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int, arg4 bool) (*api.ChannelInfo, error) { func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int, arg4 api.PaychGetOpts) (*api.ChannelInfo, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3, arg4) ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(*api.ChannelInfo) ret0, _ := ret[0].(*api.ChannelInfo)

View File

@ -306,7 +306,7 @@ type FullNodeStruct struct {
PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"` PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) `perm:"sign"` PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"` PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"`
@ -2179,14 +2179,14 @@ func (s *FullNodeStub) PaychCollect(p0 context.Context, p1 address.Address) (cid
return *new(cid.Cid), ErrNotSupported return *new(cid.Cid), ErrNotSupported
} }
func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) { func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) {
if s.Internal.PaychGet == nil { if s.Internal.PaychGet == nil {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.PaychGet(p0, p1, p2, p3, p4) return s.Internal.PaychGet(p0, p1, p2, p3, p4)
} }
func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) { func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }

View File

@ -338,8 +338,10 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
} }
func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
// v0 always reserves return w.FullNode.PaychGet(ctx, from, to, amt, api.PaychGetOpts{
return w.FullNode.PaychGet(ctx, from, to, amt, true) Reserve: true, // v0 always reserves
OffChain: false,
})
} }
var _ FullNode = &WrapperV1Full{} var _ FullNode = &WrapperV1Full{}

View File

@ -8,7 +8,7 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/paychmgr" "github.com/filecoin-project/lotus/paychmgr"
@ -79,7 +79,10 @@ var paychAddFundsCmd = &cli.Command{
// Send a message to chain to create channel / add funds to existing // Send a message to chain to create channel / add funds to existing
// channel // channel
info, err := api.PaychGet(ctx, from, to, types.BigInt(amt), cctx.Bool("reserve")) info, err := api.PaychGet(ctx, from, to, types.BigInt(amt), lapi.PaychGetOpts{
Reserve: cctx.Bool("reserve"),
OffChain: false,
})
if err != nil { if err != nil {
return err return err
} }
@ -166,7 +169,7 @@ var paychStatusCmd = &cli.Command{
}, },
} }
func paychStatus(writer io.Writer, avail *api.ChannelAvailableFunds) { func paychStatus(writer io.Writer, avail *lapi.ChannelAvailableFunds) {
if avail.Channel == nil { if avail.Channel == nil {
if avail.PendingWaitSentinel != nil { if avail.PendingWaitSentinel != nil {
fmt.Fprint(writer, "Creating channel\n") fmt.Fprint(writer, "Creating channel\n")

View File

@ -4514,9 +4514,13 @@ Response:
### PaychGet ### PaychGet
PaychGet gets or creates a payment channel between address pair PaychGet gets or creates a payment channel between address pair
- If reserve is false, the specified amount will be added to the channel through on-chain send for future use - If opts.Reserve is false, the specified amount will be added to the channel through on-chain send for future use
- If reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds - If opts.Reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds
available, funds will be added through an on-chain message. available, funds will be added through an on-chain message.
- When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic
channel creation/funds adding). If the operation can't be performed without sending a message an error will be
returned. Note that even when this option is specified, this call can be blocked by previous operations on the
channel waiting for on-chain operations.
Perms: sign Perms: sign
@ -4527,7 +4531,10 @@ Inputs:
"f01234", "f01234",
"f01234", "f01234",
"0", "0",
true {
"Reserve": true,
"OffChain": true
}
] ]
``` ```

View File

@ -58,7 +58,10 @@ func TestPaymentChannelsAPI(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
channelAmt := int64(7000) channelAmt := int64(7000)
channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt), true) channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt), api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
require.NoError(t, err) require.NoError(t, err)
channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.WaitSentinel) channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.WaitSentinel)

View File

@ -12,6 +12,7 @@ import (
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
@ -57,7 +58,10 @@ func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stat
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) { func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when // TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain // querying the chain
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, true) ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return address.Undef, cid.Undef, err
} }

View File

@ -22,8 +22,8 @@ type PaychAPI struct {
PaychMgr *paychmgr.Manager PaychMgr *paychmgr.Manager
} }
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*api.ChannelInfo, error) { func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts api.PaychGetOpts) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, reserve) ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -55,7 +55,10 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
// TODO: Fix free fund tracking in PaychGet // TODO: Fix free fund tracking in PaychGet
// TODO: validate voucher spec before locking funds // TODO: validate voucher spec before locking funds
ch, err := a.PaychGet(ctx, from, to, amount, true) ch, err := a.PaychGet(ctx, from, to, amount, api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -101,13 +101,17 @@ func (pm *Manager) Stop() error {
return nil return nil
} }
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) { func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, opts api.PaychGetOpts) (address.Address, cid.Cid, error) {
if !opts.Reserve && opts.OffChain {
return address.Undef, cid.Undef, xerrors.Errorf("can't fund payment channels without on-chain operations")
}
chanAccessor, err := pm.accessorByFromTo(from, to) chanAccessor, err := pm.accessorByFromTo(from, to)
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return address.Undef, cid.Undef, err
} }
return chanAccessor.getPaych(ctx, amt, reserve) return chanAccessor.getPaych(ctx, amt, opts)
} }
func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) { func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) {

View File

@ -19,12 +19,30 @@ import (
init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init" init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing" tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/lotus/api"
lotusinit "github.com/filecoin-project/lotus/chain/actors/builtin/init" lotusinit "github.com/filecoin-project/lotus/chain/actors/builtin/init"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/actors/builtin/paych"
paychmock "github.com/filecoin-project/lotus/chain/actors/builtin/paych/mock" paychmock "github.com/filecoin-project/lotus/chain/actors/builtin/paych/mock"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
var onChainReserve = api.PaychGetOpts{
Reserve: true,
OffChain: false,
}
var onChainNoReserve = api.PaychGetOpts{
Reserve: false,
OffChain: false,
}
var offChainReserve = api.PaychGetOpts{
Reserve: true,
OffChain: true,
}
var offChainNoReserve = api.PaychGetOpts{
Reserve: false,
OffChain: true,
}
func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt { func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt {
createChannelRet := init2.ExecReturn{ createChannelRet := init2.ExecReturn{
IDAddress: ch, IDAddress: ch,
@ -55,7 +73,7 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
amt := big.NewInt(10) amt := big.NewInt(10)
ch, mcid, err := mgr.GetPaych(ctx, from, to, amt, true) ch, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, address.Undef, ch) require.Equal(t, address.Undef, ch)
@ -65,6 +83,42 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
require.Equal(t, amt, pushedMsg.Message.Value) require.Equal(t, amt, pushedMsg.Message.Value)
} }
func TestPaychGetOffchainNoReserveFails(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt, offChainNoReserve)
require.Error(t, err)
}
func TestPaychGetCreateOffchainReserveFails(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt, offChainReserve)
require.Error(t, err)
}
// TestPaychGetCreateChannelThenAddFunds tests creating a channel and then // TestPaychGetCreateChannelThenAddFunds tests creating a channel and then
// adding funds to it // adding funds to it
func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
@ -83,7 +137,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
amt := big.NewInt(10) amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Should have no channels yet (message sent but channel not created) // Should have no channels yet (message sent but channel not created)
@ -100,7 +154,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
// 2. Request add funds - should block until create channel has completed // 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, true) ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
// 4. This GetPaych should return after create channel from first // 4. This GetPaych should return after create channel from first
// GetPaych completes // GetPaych completes
@ -170,7 +224,7 @@ func TestPaychGetCreatePrefundedChannelThenAddFunds(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
amt := big.NewInt(10) amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, false) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainNoReserve)
require.NoError(t, err) require.NoError(t, err)
// Should have no channels yet (message sent but channel not created) // Should have no channels yet (message sent but channel not created)
@ -187,7 +241,7 @@ func TestPaychGetCreatePrefundedChannelThenAddFunds(t *testing.T) {
// 2. Request add funds - shouldn't block // 2. Request add funds - shouldn't block
amt2 := big.NewInt(3) amt2 := big.NewInt(3)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, true) ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, offChainReserve)
// 4. This GetPaych should return after create channel from first // 4. This GetPaych should return after create channel from first
// GetPaych completes // GetPaych completes
@ -240,7 +294,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
// Send create message for a channel // Send create message for a channel
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// 1. Set up create channel response (sent in response to WaitForMsg()) // 1. Set up create channel response (sent in response to WaitForMsg())
@ -258,7 +312,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
// Because first channel create fails, this request // Because first channel create fails, this request
// should be for channel create again. // should be for channel create again.
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, true) ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, address.Undef, ch2) require.Equal(t, address.Undef, ch2)
@ -305,7 +359,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
// Send create message for a channel // Send create message for a channel
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send error create channel response // Send error create channel response
@ -316,7 +370,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
// Send create message for a channel again // Send create message for a channel again
amt2 := big.NewInt(7) amt2 := big.NewInt(7)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, true) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send success create channel response // Send success create channel response
@ -357,7 +411,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send create message for a channel // Send create message for a channel
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send success create channel response // Send success create channel response
@ -366,7 +420,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send add funds message for channel // Send add funds message for channel
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, true) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send error add funds response // Send error add funds response
@ -393,7 +447,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send add funds message for channel again // Send add funds message for channel again
amt3 := big.NewInt(2) amt3 := big.NewInt(2)
_, mcid3, err := mgr.GetPaych(ctx, from, to, amt3, true) _, mcid3, err := mgr.GetPaych(ctx, from, to, amt3, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send success add funds response // Send success add funds response
@ -438,7 +492,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
amt := big.NewInt(10) amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Simulate shutting down system // Simulate shutting down system
@ -465,7 +519,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
// 2. Request add funds - should block until create channel has completed // 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2, true) ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2, onChainReserve)
// 4. This GetPaych should return after create channel from first // 4. This GetPaych should return after create channel from first
// GetPaych completes // GetPaych completes
@ -517,7 +571,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
// Send create message for a channel // Send create message for a channel
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send success create channel response // Send success create channel response
@ -526,7 +580,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
// Send add funds message for channel // Send add funds message for channel
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, true) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Simulate shutting down system // Simulate shutting down system
@ -580,7 +634,7 @@ func TestPaychGetWait(t *testing.T) {
// 1. Get // 1. Get
amt := big.NewInt(10) amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
expch := tutils.NewIDAddr(t, 100) expch := tutils.NewIDAddr(t, 100)
@ -603,7 +657,7 @@ func TestPaychGetWait(t *testing.T) {
// Request add funds // Request add funds
amt2 := big.NewInt(15) amt2 := big.NewInt(15)
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, true) _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
go func() { go func() {
@ -637,7 +691,7 @@ func TestPaychGetWaitErr(t *testing.T) {
// 1. Create channel // 1. Create channel
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
done := make(chan address.Address) done := make(chan address.Address)
@ -683,7 +737,7 @@ func TestPaychGetWaitCtx(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// When the context is cancelled, should unblock wait // When the context is cancelled, should unblock wait
@ -714,7 +768,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
@ -732,7 +786,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
var err error var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, true) addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
@ -741,7 +795,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
var err error var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, true) addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
// Wait for add funds requests to be queued up // Wait for add funds requests to be queued up
@ -810,7 +864,7 @@ func TestPaychGetMergePrefundAndReserve(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
@ -828,7 +882,7 @@ func TestPaychGetMergePrefundAndReserve(t *testing.T) {
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
var err error var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, false) addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
@ -837,7 +891,7 @@ func TestPaychGetMergePrefundAndReserve(t *testing.T) {
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
var err error var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, true) addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
// Wait for add funds requests to be queued up // Wait for add funds requests to be queued up
@ -906,7 +960,7 @@ func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, false) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
@ -924,7 +978,7 @@ func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) {
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
var err error var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, false) addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
@ -933,7 +987,7 @@ func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) {
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
var err error var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, true) addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
// Wait for add funds requests to be queued up // Wait for add funds requests to be queued up
@ -987,6 +1041,247 @@ func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) {
require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value) require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value)
} }
func TestPaychGetMergePrefundAndReservePrefundedOneOffchain(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(3) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.Equal(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 1, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReservePrefundedBothOffchainOneFail(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(6) // 2 reserves too much
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve)
require.Error(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.Equal(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 1, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
}
func TestPaychGetMergePrefundAndReserveOneOffchainOneFail(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err)
// Queue up two add funds requests behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(2)
addFundsAmt1 := big.NewInt(5) // 1 reserves
addFundsAmt2 := big.NewInt(6) // 2 reserves
var addFundsCh1 address.Address
var addFundsCh2 address.Address
var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
var err error
addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve)
require.NoError(t, err)
}()
go func() {
defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request
var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve)
require.Error(t, err)
}()
// Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds requests to be sent
addFundsSent.Wait()
// Expect add funds requests to have same channel as create channel and
// same message cid as each other (because they should have been merged)
require.Equal(t, ch, addFundsCh1)
require.Equal(t, ch, addFundsCh2)
require.NotEqual(t, cid.Undef, addFundsMcid1)
require.Equal(t, cid.Undef, addFundsMcid2)
// Make sure that one create channel message was sent
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, lotusinit.Address, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount is the sum of the individual
// amounts
addFundsMsg := mock.pushedMessages(addFundsMcid1)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value)
}
// TestPaychGetMergeAddFundsCtxCancelOne tests that when a queued add funds // TestPaychGetMergeAddFundsCtxCancelOne tests that when a queued add funds
// request is cancelled, its amount is removed from the total merged add funds // request is cancelled, its amount is removed from the total merged add funds
func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
@ -1005,7 +1300,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
@ -1022,7 +1317,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
defer addFundsSent.Done() defer addFundsSent.Done()
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1, true) _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1, onChainReserve)
}() }()
go func() { go func() {
@ -1030,7 +1325,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
var err error var err error
addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, true) addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()
// Wait for add funds requests to be queued up // Wait for add funds requests to be queued up
@ -1102,7 +1397,7 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
@ -1117,14 +1412,14 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
defer addFundsSent.Done() defer addFundsSent.Done()
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
_, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5), true) _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5), onChainReserve)
}() }()
go func() { go func() {
defer addFundsSent.Done() defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
_, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3), true) _, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3), onChainReserve)
}() }()
// Wait for add funds requests to be queued up // Wait for add funds requests to be queued up
waitForQueueSize(t, mgr, from, to, 2) waitForQueueSize(t, mgr, from, to, 2)
@ -1189,7 +1484,7 @@ func TestPaychAvailableFunds(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Available funds should reflect create channel message sent // Available funds should reflect create channel message sent
@ -1214,7 +1509,7 @@ func TestPaychAvailableFunds(t *testing.T) {
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
var err error var err error
_, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt, true) _, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
}() }()

View File

@ -46,7 +46,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) {
// Send create message for a channel with value 10 // Send create message for a channel with value 10
createAmt := big.NewInt(10) createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, true) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send create channel response // Send create channel response
@ -82,7 +82,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) {
require.Equal(t, res.Shortfall, excessAmt) require.Equal(t, res.Shortfall, excessAmt)
// Add funds so as to cover the voucher shortfall // Add funds so as to cover the voucher shortfall
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt, true) _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Trigger add funds confirmation // Trigger add funds confirmation

View File

@ -29,7 +29,7 @@ func TestPaychSettle(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
amt := big.NewInt(10) amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt, true) _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
// Send channel create response // Send channel create response
@ -49,7 +49,7 @@ func TestPaychSettle(t *testing.T) {
// (should create a new channel because the previous channel // (should create a new channel because the previous channel
// is settling) // is settling)
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, true) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, cid.Undef, mcid2) require.NotEqual(t, cid.Undef, mcid2)

View File

@ -33,20 +33,20 @@ type fundsReq struct {
ctx context.Context ctx context.Context
promise chan *paychFundsRes promise chan *paychFundsRes
amt types.BigInt amt types.BigInt
reserve bool opts api.PaychGetOpts
lk sync.Mutex lk sync.Mutex
// merge parent, if this req is part of a merge // merge parent, if this req is part of a merge
merge *mergedFundsReq merge *mergedFundsReq
} }
func newFundsReq(ctx context.Context, amt types.BigInt, reserve bool) *fundsReq { func newFundsReq(ctx context.Context, amt types.BigInt, opts api.PaychGetOpts) *fundsReq {
promise := make(chan *paychFundsRes, 1) promise := make(chan *paychFundsRes, 1)
return &fundsReq{ return &fundsReq{
ctx: ctx, ctx: ctx,
promise: promise, promise: promise,
amt: amt, amt: amt,
reserve: reserve, opts: opts,
} }
} }
@ -108,8 +108,12 @@ func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq {
} }
sort.Slice(m.reqs, func(i, j int) bool { sort.Slice(m.reqs, func(i, j int) bool {
if m.reqs[i].reserve != m.reqs[j].reserve { // non-reserve first if m.reqs[i].opts.OffChain != m.reqs[j].opts.OffChain { // off-chain first
return m.reqs[i].reserve return m.reqs[i].opts.OffChain
}
if m.reqs[i].opts.Reserve != m.reqs[j].opts.Reserve { // non-reserve after off-chain
return m.reqs[i].opts.Reserve
} }
// sort by amount asc (reducing latency for smaller requests) // sort by amount asc (reducing latency for smaller requests)
@ -154,7 +158,7 @@ func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
for _, r := range m.reqs { for _, r := range m.reqs {
if r.isActive() { if r.isActive() {
sum = types.BigAdd(sum, r.amt) sum = types.BigAdd(sum, r.amt)
if !r.reserve { if !r.opts.Reserve {
avail = types.BigAdd(avail, r.amt) avail = types.BigAdd(avail, r.amt)
} }
} }
@ -164,17 +168,30 @@ func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
} }
// completeAmount completes first non-reserving requests up to the available amount // completeAmount completes first non-reserving requests up to the available amount
func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *ChannelInfo) (*paychFundsRes, types.BigInt) { func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *ChannelInfo) (*paychFundsRes, types.BigInt, types.BigInt) {
used := types.NewInt(0) used, failed := types.NewInt(0), types.NewInt(0)
next := 0 next := 0
// order: [offchain+reserve, !offchain+reserve, !offchain+!reserve]
for i, r := range m.reqs { for i, r := range m.reqs {
if !r.reserve { if !r.opts.Reserve {
// non-reserving request are put after reserving requests, so we are done here // non-reserving request are put after reserving requests, so we are done here
break break
} }
if r.amt.GreaterThan(types.BigSub(avail, used)) { if r.amt.GreaterThan(types.BigSub(avail, used)) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill // requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
if r.opts.OffChain {
// can't fill, so OffChain want an error
if r.isActive() {
failed = types.BigAdd(failed, r.amt)
r.onComplete(&paychFundsRes{channel: *channelInfo.Channel, err: xerrors.Errorf("not enough available funds in the payment channel")})
}
next = i + 1
continue
}
break break
} }
@ -190,9 +207,34 @@ func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *Channel
m.reqs = m.reqs[next:] m.reqs = m.reqs[next:]
if len(m.reqs) == 0 { if len(m.reqs) == 0 {
return &paychFundsRes{channel: *channelInfo.Channel}, used return &paychFundsRes{channel: *channelInfo.Channel}, used, failed
} }
return nil, used return nil, used, failed
}
func (m *mergedFundsReq) failOffChain(msg string) (*paychFundsRes, types.BigInt) {
next := 0
freed := types.NewInt(0)
for i, r := range m.reqs {
if !r.opts.OffChain {
break
}
freed = types.BigAdd(freed, r.amt)
if !r.isActive() {
continue
}
r.onComplete(&paychFundsRes{err: xerrors.New(msg)})
next = i + 1
}
m.reqs = m.reqs[next:]
if len(m.reqs) == 0 {
return &paychFundsRes{err: xerrors.New(msg)}, freed
}
return nil, freed
} }
// getPaych ensures that a channel exists between the from and to addresses, // getPaych ensures that a channel exists between the from and to addresses,
@ -206,9 +248,9 @@ func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *Channel
// address and the CID of the new add funds message. // address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still // If an operation returns an error, subsequent waiting operations will still
// be attempted. // be attempted.
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) { func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, opts api.PaychGetOpts) (address.Address, cid.Cid, error) {
// Add the request to add funds to a queue and wait for the result // Add the request to add funds to a queue and wait for the result
freq := newFundsReq(ctx, amt, reserve) freq := newFundsReq(ctx, amt, opts)
ca.enqueue(ctx, freq) ca.enqueue(ctx, freq)
select { select {
case res := <-freq.promise: case res := <-freq.promise:
@ -398,6 +440,12 @@ func (ca *channelAccessor) processTask(merged *mergedFundsReq, amt, avail types.
// If a channel has not yet been created, create one. // If a channel has not yet been created, create one.
if channelInfo == nil { if channelInfo == nil {
res, freed := merged.failOffChain("payment channel doesn't exist")
if res != nil {
return res
}
amt = types.BigSub(amt, freed)
mcid, err := ca.createPaych(ctx, amt, avail) mcid, err := ca.createPaych(ctx, amt, avail)
if err != nil { if err != nil {
return &paychFundsRes{err: err} return &paychFundsRes{err: err}
@ -536,14 +584,14 @@ func (ca *channelAccessor) completeAvailable(ctx context.Context, merged *merged
ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail) ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail)
}) })
res, used := merged.completeAmount(avail, channelInfo) res, used, failed := merged.completeAmount(avail, channelInfo)
// return any unused reserved funds (e.g. from cancelled requests) // return any unused reserved funds (e.g. from cancelled requests)
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) { ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used)) ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used))
}) })
return res, types.BigSub(amt, used) return res, types.BigSub(amt, types.BigAdd(used, failed))
} }
// addFunds sends a message to add funds to the channel and returns the message cid // addFunds sends a message to add funds to the channel and returns the message cid

View File

@ -207,7 +207,10 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes
t.RecordMessage("my balance: %d", balance) t.RecordMessage("my balance: %d", balance)
t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance) t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance, true) channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance, api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
if err != nil { if err != nil {
return fmt.Errorf("failed to create payment channel: %w", err) return fmt.Errorf("failed to create payment channel: %w", err)
} }
@ -230,7 +233,10 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes
// we wait for 2 confirmations, so we have the assurance the channel is tracked. // we wait for 2 confirmations, so we have the assurance the channel is tracked.
t.RecordMessage("reloading paych; now it should have an address") t.RecordMessage("reloading paych; now it should have an address")
channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero(), true) channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero(), api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
if err != nil { if err != nil {
return fmt.Errorf("failed to reload payment channel: %w", err) return fmt.Errorf("failed to reload payment channel: %w", err)
} }

View File

@ -124,7 +124,10 @@ func runSender(ctx context.Context, t *testkit.TestEnvironment, clients []*testk
time.Sleep(20 * time.Second) time.Sleep(20 * time.Second)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt, true) channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt, api.PaychGetOpts{
Reserve: true,
OffChain: false,
})
if err != nil { if err != nil {
return fmt.Errorf("failed to create payment channel: %w", err) return fmt.Errorf("failed to create payment channel: %w", err)
} }