Merge pull request #3448 from filecoin-project/feat/paych-get-avail-funds

paych: Add PaychAvailableFunds API method
This commit is contained in:
Whyrusleeping 2020-09-03 15:26:25 -07:00 committed by GitHub
commit 021f4a881c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 692 additions and 259 deletions

View File

@ -421,6 +421,7 @@ type FullNode interface {
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error)
PaychAvailableFunds(from, to address.Address) (*ChannelAvailableFunds, error)
PaychList(context.Context) ([]address.Address, error) PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error) PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychSettle(context.Context, address.Address) (cid.Cid, error) PaychSettle(context.Context, address.Address) (cid.Cid, error)
@ -429,7 +430,7 @@ type FullNode interface {
PaychNewPayment(ctx context.Context, from, to address.Address, vouchers []VoucherSpec) (*PaymentInfo, error) PaychNewPayment(ctx context.Context, from, to address.Address, vouchers []VoucherSpec) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *paych.SignedVoucher) error PaychVoucherCheckValid(context.Context, address.Address, *paych.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error) PaychVoucherCheckSpendable(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error)
PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*paych.SignedVoucher, error) PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*VoucherCreateResult, error)
PaychVoucherAdd(context.Context, address.Address, *paych.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) PaychVoucherAdd(context.Context, address.Address, *paych.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
PaychVoucherList(context.Context, address.Address) ([]*paych.SignedVoucher, error) PaychVoucherList(context.Context, address.Address) ([]*paych.SignedVoucher, error)
PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error) PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error)
@ -538,6 +539,23 @@ type ChannelInfo struct {
WaitSentinel cid.Cid WaitSentinel cid.Cid
} }
type ChannelAvailableFunds struct {
Channel *address.Address
// ConfirmedAmt is the amount of funds that have been confirmed on-chain
// for the channel
ConfirmedAmt types.BigInt
// PendingAmt is the amount of funds that are pending confirmation on-chain
PendingAmt types.BigInt
// PendingWaitSentinel can be used with PaychGetWaitReady to wait for
// confirmation of pending funds
PendingWaitSentinel *cid.Cid
// QueuedAmt is the amount that is queued up behind a pending request
QueuedAmt types.BigInt
// VoucherRedeemedAmt is the amount that is redeemed by vouchers on-chain
// and in the local datastore
VoucherReedeemedAmt types.BigInt
}
type PaymentInfo struct { type PaymentInfo struct {
Channel address.Address Channel address.Address
WaitSentinel cid.Cid WaitSentinel cid.Cid
@ -553,6 +571,16 @@ type VoucherSpec struct {
Extra *paych.ModVerifyParams Extra *paych.ModVerifyParams
} }
// VoucherCreateResult is the response to calling PaychVoucherCreate
type VoucherCreateResult struct {
// Voucher that was created, or nil if there was an error or if there
// were insufficient funds in the channel
Voucher *paych.SignedVoucher
// Shortfall is the additional amount that would be needed in the channel
// in order to be able to create the voucher
Shortfall types.BigInt
}
type MinerPower struct { type MinerPower struct {
MinerPower power.Claim MinerPower power.Claim
TotalPower power.Claim TotalPower power.Claim

View File

@ -208,6 +208,7 @@ type FullNodeStruct struct {
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"` PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"` PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"`
PaychAvailableFunds func(address.Address, address.Address) (*api.ChannelAvailableFunds, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*api.PaychStatus, error) `perm:"read"` PaychStatus func(context.Context, address.Address) (*api.PaychStatus, error) `perm:"read"`
PaychSettle func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"` PaychSettle func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
@ -218,7 +219,7 @@ type FullNodeStruct struct {
PaychVoucherCheckValid func(context.Context, address.Address, *paych.SignedVoucher) error `perm:"read"` PaychVoucherCheckValid func(context.Context, address.Address, *paych.SignedVoucher) error `perm:"read"`
PaychVoucherCheckSpendable func(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"` PaychVoucherCheckSpendable func(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"`
PaychVoucherAdd func(context.Context, address.Address, *paych.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) `perm:"write"` PaychVoucherAdd func(context.Context, address.Address, *paych.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) `perm:"write"`
PaychVoucherCreate func(context.Context, address.Address, big.Int, uint64) (*paych.SignedVoucher, error) `perm:"sign"` PaychVoucherCreate func(context.Context, address.Address, big.Int, uint64) (*api.VoucherCreateResult, error) `perm:"sign"`
PaychVoucherList func(context.Context, address.Address) ([]*paych.SignedVoucher, error) `perm:"write"` PaychVoucherList func(context.Context, address.Address) ([]*paych.SignedVoucher, error) `perm:"write"`
PaychVoucherSubmit func(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error) `perm:"sign"` PaychVoucherSubmit func(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error) `perm:"sign"`
} }
@ -899,6 +900,10 @@ func (c *FullNodeStruct) PaychGetWaitReady(ctx context.Context, sentinel cid.Cid
return c.Internal.PaychGetWaitReady(ctx, sentinel) return c.Internal.PaychGetWaitReady(ctx, sentinel)
} }
func (c *FullNodeStruct) PaychAvailableFunds(from address.Address, to address.Address) (*api.ChannelAvailableFunds, error) {
return c.Internal.PaychAvailableFunds(from, to)
}
func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) { func (c *FullNodeStruct) PaychList(ctx context.Context) ([]address.Address, error) {
return c.Internal.PaychList(ctx) return c.Internal.PaychList(ctx)
} }
@ -919,7 +924,7 @@ func (c *FullNodeStruct) PaychVoucherAdd(ctx context.Context, addr address.Addre
return c.Internal.PaychVoucherAdd(ctx, addr, sv, proof, minDelta) return c.Internal.PaychVoucherAdd(ctx, addr, sv, proof, minDelta)
} }
func (c *FullNodeStruct) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*paych.SignedVoucher, error) { func (c *FullNodeStruct) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*api.VoucherCreateResult, error) {
return c.Internal.PaychVoucherCreate(ctx, pch, amt, lane) return c.Internal.PaychVoucherCreate(ctx, pch, amt, lane)
} }

View File

@ -96,18 +96,24 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if vouch1.Voucher == nil {
t.Fatal(fmt.Errorf("Not enough funds to create voucher: missing %d", vouch1.Shortfall))
}
vouch2, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(2000), lane) vouch2, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(2000), lane)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delta1, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch1, nil, abi.NewTokenAmount(1000)) if vouch2.Voucher == nil {
t.Fatal(fmt.Errorf("Not enough funds to create voucher: missing %d", vouch2.Shortfall))
}
delta1, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch1.Voucher, nil, abi.NewTokenAmount(1000))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !delta1.Equals(abi.NewTokenAmount(1000)) { if !delta1.Equals(abi.NewTokenAmount(1000)) {
t.Fatal("voucher didn't have the right amount") t.Fatal("voucher didn't have the right amount")
} }
delta2, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch2, nil, abi.NewTokenAmount(1000)) delta2, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch2.Voucher, nil, abi.NewTokenAmount(1000))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -217,9 +217,9 @@ var paychVoucherCreateCmd = &cli.Command{
return err return err
} }
amt, err := types.BigFromString(cctx.Args().Get(1)) amt, err := types.ParseFIL(cctx.Args().Get(1))
if err != nil { if err != nil {
return err return ShowHelp(cctx, fmt.Errorf("parsing amount failed: %s", err))
} }
lane := cctx.Int("lane") lane := cctx.Int("lane")
@ -232,12 +232,16 @@ var paychVoucherCreateCmd = &cli.Command{
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
sv, err := api.PaychVoucherCreate(ctx, ch, amt, uint64(lane)) v, err := api.PaychVoucherCreate(ctx, ch, types.BigInt(amt), uint64(lane))
if err != nil { if err != nil {
return err return err
} }
enc, err := EncodedString(sv) if v.Voucher == nil {
return fmt.Errorf("Could not create voucher: insufficient funds in channel, shortfall: %d", v.Shortfall)
}
enc, err := EncodedString(v.Voucher)
if err != nil { if err != nil {
return err return err
} }

View File

@ -6,6 +6,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"regexp"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@ -248,6 +249,50 @@ func TestPaymentChannelVouchers(t *testing.T) {
checkVoucherOutput(t, bestSpendable, bestVouchers) checkVoucherOutput(t, bestSpendable, bestVouchers)
} }
// TestPaymentChannelVoucherCreateShortfall verifies that if a voucher amount
// is greater than what's left in the channel, voucher create fails
func TestPaymentChannelVoucherCreateShortfall(t *testing.T) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
blocktime := 5 * time.Millisecond
ctx := context.Background()
nodes, addrs := startTwoNodesOneMiner(ctx, t, blocktime)
paymentCreator := nodes[0]
creatorAddr := addrs[0]
receiverAddr := addrs[1]
// Create mock CLI
mockCLI := newMockCLI(t)
creatorCLI := mockCLI.client(paymentCreator.ListenAddr)
// creator: paych get <creator> <receiver> <amount>
channelAmt := 100
cmd := []string{creatorAddr.String(), receiverAddr.String(), fmt.Sprintf("%d", channelAmt)}
chstr := creatorCLI.runCmd(paychGetCmd, cmd)
chAddr, err := address.NewFromString(chstr)
require.NoError(t, err)
// creator: paych voucher create <channel> <amount> --lane=1
voucherAmt1 := 60
lane1 := "--lane=1"
cmd = []string{lane1, chAddr.String(), strconv.Itoa(voucherAmt1)}
voucher1 := creatorCLI.runCmd(paychVoucherCreateCmd, cmd)
fmt.Println(voucher1)
// creator: paych voucher create <channel> <amount> --lane=2
lane2 := "--lane=2"
voucherAmt2 := 70
cmd = []string{lane2, chAddr.String(), strconv.Itoa(voucherAmt2)}
_, err = creatorCLI.runCmdRaw(paychVoucherCreateCmd, cmd)
// Should fail because channel doesn't have required amount
require.Error(t, err)
shortfall := voucherAmt1 + voucherAmt2 - channelAmt
require.Regexp(t, regexp.MustCompile(fmt.Sprintf("shortfall: %d", shortfall)), err.Error())
}
func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) { func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) {
lines := strings.Split(list, "\n") lines := strings.Split(list, "\n")
listVouchers := make(map[string]string) listVouchers := make(map[string]string)
@ -350,6 +395,13 @@ type mockCLIClient struct {
} }
func (c *mockCLIClient) runCmd(cmd *cli.Command, input []string) string { func (c *mockCLIClient) runCmd(cmd *cli.Command, input []string) string {
out, err := c.runCmdRaw(cmd, input)
require.NoError(c.t, err)
return out
}
func (c *mockCLIClient) runCmdRaw(cmd *cli.Command, input []string) (string, error) {
// prepend --api=<node api listener address> // prepend --api=<node api listener address>
apiFlag := "--api=" + c.addr.String() apiFlag := "--api=" + c.addr.String()
input = append([]string{apiFlag}, input...) input = append([]string{apiFlag}, input...)
@ -359,12 +411,11 @@ func (c *mockCLIClient) runCmd(cmd *cli.Command, input []string) string {
require.NoError(c.t, err) require.NoError(c.t, err)
err = cmd.Action(cli.NewContext(c.cctx.App, fs, c.cctx)) err = cmd.Action(cli.NewContext(c.cctx.App, fs, c.cctx))
require.NoError(c.t, err)
// Get the output // Get the output
str := strings.TrimSpace(c.out.String()) str := strings.TrimSpace(c.out.String())
c.out.Reset() c.out.Reset()
return str return str, err
} }
func (c *mockCLIClient) flagSet(cmd *cli.Command) *flag.FlagSet { func (c *mockCLIClient) flagSet(cmd *cli.Command) *flag.FlagSet {

View File

@ -3,11 +3,14 @@ package retrievaladapter
import ( import (
"context" "context"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
@ -56,7 +59,10 @@ func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymen
if err != nil { if err != nil {
return nil, err return nil, err
} }
return voucher, nil if voucher.Voucher == nil {
return nil, xerrors.Errorf("Could not create voucher - shortfall: %d", voucher.Shortfall)
}
return voucher.Voucher, nil
} }
func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) { func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {

View File

@ -3,9 +3,10 @@ package paych
import ( import (
"context" "context"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin/paych"
@ -38,6 +39,10 @@ func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt t
}, nil }, nil
} }
func (a *PaychAPI) PaychAvailableFunds(from, to address.Address) (*api.ChannelAvailableFunds, error) {
return a.PaychMgr.AvailableFunds(from, to)
}
func (a *PaychAPI) PaychGetWaitReady(ctx context.Context, sentinel cid.Cid) (address.Address, error) { func (a *PaychAPI) PaychGetWaitReady(ctx context.Context, sentinel cid.Cid) (address.Address, error) {
return a.PaychMgr.GetPaychWaitReady(ctx, sentinel) return a.PaychMgr.GetPaychWaitReady(ctx, sentinel)
} }
@ -64,9 +69,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
svs := make([]*paych.SignedVoucher, len(vouchers)) svs := make([]*paych.SignedVoucher, len(vouchers))
for i, v := range vouchers { for i, v := range vouchers {
sv, err := a.paychVoucherCreate(ctx, ch.Channel, paych.SignedVoucher{ sv, err := a.PaychMgr.CreateVoucher(ctx, ch.Channel, paych.SignedVoucher{
ChannelAddr: ch.Channel,
Amount: v.Amount, Amount: v.Amount,
Lane: lane, Lane: lane,
@ -78,8 +81,11 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
if err != nil { if err != nil {
return nil, err return nil, err
} }
if sv.Voucher == nil {
return nil, xerrors.Errorf("Could not create voucher - shortfall of %d", sv.Shortfall)
}
svs[i] = sv svs[i] = sv.Voucher
} }
return &api.PaymentInfo{ return &api.PaymentInfo{
@ -129,41 +135,10 @@ func (a *PaychAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *
// that will be used to create the voucher, so if previous vouchers exist, the // that will be used to create the voucher, so if previous vouchers exist, the
// actual additional value of this voucher will only be the difference between // actual additional value of this voucher will only be the difference between
// the two. // the two.
func (a *PaychAPI) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*paych.SignedVoucher, error) { // If there are insufficient funds in the channel to create the voucher,
return a.paychVoucherCreate(ctx, pch, paych.SignedVoucher{ChannelAddr: pch, Amount: amt, Lane: lane}) // returns a nil voucher and the shortfall.
} func (a *PaychAPI) PaychVoucherCreate(ctx context.Context, pch address.Address, amt types.BigInt, lane uint64) (*api.VoucherCreateResult, error) {
return a.PaychMgr.CreateVoucher(ctx, pch, paych.SignedVoucher{Amount: amt, Lane: lane})
func (a *PaychAPI) paychVoucherCreate(ctx context.Context, pch address.Address, voucher paych.SignedVoucher) (*paych.SignedVoucher, error) {
ci, err := a.PaychMgr.GetChannelInfo(pch)
if err != nil {
return nil, xerrors.Errorf("get channel info: %w", err)
}
nonce, err := a.PaychMgr.NextNonceForLane(ctx, pch, voucher.Lane)
if err != nil {
return nil, xerrors.Errorf("getting next nonce for lane: %w", err)
}
sv := &voucher
sv.Nonce = nonce
vb, err := sv.SigningBytes()
if err != nil {
return nil, err
}
sig, err := a.WalletSign(ctx, ci.Control, vb)
if err != nil {
return nil, err
}
sv.Signature = sig
if _, err := a.PaychMgr.AddVoucherOutbound(ctx, pch, sv, nil, types.NewInt(0)); err != nil {
return nil, xerrors.Errorf("failed to persist voucher: %w", err)
}
return sv, nil
} }
func (a *PaychAPI) PaychVoucherList(ctx context.Context, pch address.Address) ([]*paych.SignedVoucher, error) { func (a *PaychAPI) PaychVoucherList(ctx context.Context, pch address.Address) ([]*paych.SignedVoucher, error) {

View File

@ -60,7 +60,7 @@ func (pm *Manager) accessorCacheKey(from address.Address, to address.Address) st
// access a channel use the same lock (the lock on the accessor) // access a channel use the same lock (the lock on the accessor)
func (pm *Manager) addAccessorToCache(from address.Address, to address.Address) *channelAccessor { func (pm *Manager) addAccessorToCache(from address.Address, to address.Address) *channelAccessor {
key := pm.accessorCacheKey(from, to) key := pm.accessorCacheKey(from, to)
ca := newChannelAccessor(pm) ca := newChannelAccessor(pm, from, to)
// TODO: Use LRU // TODO: Use LRU
pm.channels[key] = ca pm.channels[key] = ca
return ca return ca

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"sync" "sync"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
@ -49,6 +51,7 @@ type paychAPI interface {
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, maxFee *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, maxFee *api.MessageSendSpec) (*types.SignedMessage, error)
WalletHas(ctx context.Context, addr address.Address) (bool, error) WalletHas(ctx context.Context, addr address.Address) (bool, error)
WalletSign(ctx context.Context, k address.Address, msg []byte) (*crypto.Signature, error)
} }
// managerAPI defines all methods needed by the manager // managerAPI defines all methods needed by the manager
@ -135,7 +138,16 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt t
return address.Undef, cid.Undef, err return address.Undef, cid.Undef, err
} }
return chanAccessor.getPaych(ctx, from, to, amt) return chanAccessor.getPaych(ctx, amt)
}
func (pm *Manager) AvailableFunds(from address.Address, to address.Address) (*api.ChannelAvailableFunds, error) {
chanAccessor, err := pm.accessorByFromTo(from, to)
if err != nil {
return nil, err
}
return chanAccessor.availableFunds()
} }
// GetPaychWaitReady waits until the create channel / add funds message with the // GetPaychWaitReady waits until the create channel / add funds message with the
@ -179,6 +191,15 @@ func (pm *Manager) GetChannelInfo(addr address.Address) (*ChannelInfo, error) {
return ca.getChannelInfo(addr) return ca.getChannelInfo(addr)
} }
func (pm *Manager) CreateVoucher(ctx context.Context, ch address.Address, voucher paych.SignedVoucher) (*api.VoucherCreateResult, error) {
ca, err := pm.accessorByAddress(ch)
if err != nil {
return nil, err
}
return ca.createVoucher(ctx, ch, voucher)
}
// CheckVoucherValid checks if the given voucher is valid (is or could become spendable at some point). // CheckVoucherValid checks if the given voucher is valid (is or could become spendable at some point).
// If the channel is not in the store, fetches the channel from state (and checks that // If the channel is not in the store, fetches the channel from state (and checks that
// the channel To address is owned by the wallet). // the channel To address is owned by the wallet).
@ -309,14 +330,6 @@ func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*Vou
return ca.listVouchers(ctx, ch) return ca.listVouchers(ctx, ch)
} }
func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) {
ca, err := pm.accessorByAddress(ch)
if err != nil {
return 0, err
}
return ca.nextNonceForLane(ctx, ch, lane)
}
func (pm *Manager) Settle(ctx context.Context, addr address.Address) (cid.Cid, error) { func (pm *Manager) Settle(ctx context.Context, addr address.Address) (cid.Cid, error) {
ca, err := pm.accessorByAddress(addr) ca, err := pm.accessorByAddress(addr)
if err != nil { if err != nil {

View File

@ -5,6 +5,10 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/specs-actors/actors/crypto"
cbornode "github.com/ipfs/go-ipld-cbor" cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
@ -132,6 +136,7 @@ type mockPaychAPI struct {
waitingCalls map[cid.Cid]*waitingCall waitingCalls map[cid.Cid]*waitingCall
waitingResponses map[cid.Cid]*waitingResponse waitingResponses map[cid.Cid]*waitingResponse
wallet map[address.Address]struct{} wallet map[address.Address]struct{}
signingKey []byte
} }
func newMockPaychAPI() *mockPaychAPI { func newMockPaychAPI() *mockPaychAPI {
@ -240,3 +245,17 @@ func (pchapi *mockPaychAPI) addWalletAddress(addr address.Address) {
pchapi.wallet[addr] = struct{}{} pchapi.wallet[addr] = struct{}{}
} }
func (pchapi *mockPaychAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*crypto.Signature, error) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return sigs.Sign(crypto.SigTypeSecp256k1, pchapi.signingKey, msg)
}
func (pchapi *mockPaychAPI) addSigningKey(key []byte) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
pchapi.signingKey = key
}

View File

@ -5,6 +5,8 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -21,11 +23,36 @@ import (
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
) )
// insufficientFundsErr indicates that there are not enough funds in the
// channel to create a voucher
type insufficientFundsErr interface {
Shortfall() types.BigInt
}
type ErrInsufficientFunds struct {
shortfall types.BigInt
}
func newErrInsufficientFunds(shortfall types.BigInt) *ErrInsufficientFunds {
return &ErrInsufficientFunds{shortfall: shortfall}
}
func (e *ErrInsufficientFunds) Error() string {
return fmt.Sprintf("not enough funds in channel to cover voucher - shortfall: %d", e.shortfall)
}
func (e *ErrInsufficientFunds) Shortfall() types.BigInt {
return e.shortfall
}
// channelAccessor is used to simplify locking when accessing a channel // channelAccessor is used to simplify locking when accessing a channel
type channelAccessor struct { type channelAccessor struct {
// waitCtx is used by processes that wait for things to be confirmed from address.Address
// on chain to address.Address
waitCtx context.Context
// chctx is used by background processes (eg when waiting for things to be
// confirmed on chain)
chctx context.Context
sa *stateAccessor sa *stateAccessor
api managerAPI api managerAPI
store *Store store *Store
@ -34,14 +61,16 @@ type channelAccessor struct {
msgListeners msgListeners msgListeners msgListeners
} }
func newChannelAccessor(pm *Manager) *channelAccessor { func newChannelAccessor(pm *Manager, from address.Address, to address.Address) *channelAccessor {
return &channelAccessor{ return &channelAccessor{
lk: &channelLock{globalLock: &pm.lk}, from: from,
to: to,
chctx: pm.ctx,
sa: pm.sa, sa: pm.sa,
api: pm.pchapi, api: pm.pchapi,
store: pm.store, store: pm.store,
lk: &channelLock{globalLock: &pm.lk},
msgListeners: newMsgListeners(), msgListeners: newMsgListeners(),
waitCtx: pm.ctx,
} }
} }
@ -52,6 +81,69 @@ func (ca *channelAccessor) getChannelInfo(addr address.Address) (*ChannelInfo, e
return ca.store.ByAddress(addr) return ca.store.ByAddress(addr)
} }
// createVoucher creates a voucher with the given specification, setting its
// nonce, signing the voucher and storing it in the local datastore.
// If there are not enough funds in the channel to create the voucher, returns
// the shortfall in funds.
func (ca *channelAccessor) createVoucher(ctx context.Context, ch address.Address, voucher paych.SignedVoucher) (*api.VoucherCreateResult, error) {
ca.lk.Lock()
defer ca.lk.Unlock()
// Find the channel for the voucher
ci, err := ca.store.ByAddress(ch)
if err != nil {
return nil, xerrors.Errorf("failed to get channel info by address: %w", err)
}
// Set the voucher channel
sv := &voucher
sv.ChannelAddr = ch
// Get the next nonce on the given lane
sv.Nonce = ca.nextNonceForLane(ci, voucher.Lane)
// Sign the voucher
vb, err := sv.SigningBytes()
if err != nil {
return nil, xerrors.Errorf("failed to get voucher signing bytes: %w", err)
}
sig, err := ca.api.WalletSign(ctx, ci.Control, vb)
if err != nil {
return nil, xerrors.Errorf("failed to sign voucher: %w", err)
}
sv.Signature = sig
// Store the voucher
if _, err := ca.addVoucherUnlocked(ctx, ch, sv, nil, types.NewInt(0)); err != nil {
// If there are not enough funds in the channel to cover the voucher,
// return a voucher create result with the shortfall
var ife insufficientFundsErr
if xerrors.As(err, &ife) {
return &api.VoucherCreateResult{
Shortfall: ife.Shortfall(),
}, nil
}
return nil, xerrors.Errorf("failed to persist voucher: %w", err)
}
return &api.VoucherCreateResult{Voucher: sv, Shortfall: types.NewInt(0)}, nil
}
func (ca *channelAccessor) nextNonceForLane(ci *ChannelInfo, lane uint64) uint64 {
var maxnonce uint64
for _, v := range ci.Vouchers {
if v.Voucher.Lane == lane {
if v.Voucher.Nonce > maxnonce {
maxnonce = v.Voucher.Nonce
}
}
}
return maxnonce + 1
}
func (ca *channelAccessor) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (map[uint64]*paych.LaneState, error) { func (ca *channelAccessor) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (map[uint64]*paych.LaneState, error) {
ca.lk.Lock() ca.lk.Lock()
defer ca.lk.Unlock() defer ca.lk.Unlock()
@ -133,7 +225,7 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add
// Must not exceed actor balance // Must not exceed actor balance
newTotal := types.BigAdd(totalRedeemed, pchState.ToSend) newTotal := types.BigAdd(totalRedeemed, pchState.ToSend)
if act.Balance.LessThan(newTotal) { if act.Balance.LessThan(newTotal) {
return nil, fmt.Errorf("not enough funds in channel to cover voucher") return nil, newErrInsufficientFunds(types.BigSub(newTotal, act.Balance))
} }
if len(sv.Merges) != 0 { if len(sv.Merges) != 0 {
@ -221,6 +313,10 @@ func (ca *channelAccessor) addVoucher(ctx context.Context, ch address.Address, s
ca.lk.Lock() ca.lk.Lock()
defer ca.lk.Unlock() defer ca.lk.Unlock()
return ca.addVoucherUnlocked(ctx, ch, sv, proof, minDelta)
}
func (ca *channelAccessor) addVoucherUnlocked(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
ci, err := ca.store.ByAddress(ch) ci, err := ca.store.ByAddress(ch)
if err != nil { if err != nil {
return types.BigInt{}, err return types.BigInt{}, err
@ -382,28 +478,6 @@ func (ca *channelAccessor) listVouchers(ctx context.Context, ch address.Address)
return ca.store.VouchersForPaych(ch) return ca.store.VouchersForPaych(ch)
} }
func (ca *channelAccessor) nextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) {
ca.lk.Lock()
defer ca.lk.Unlock()
// TODO: should this take into account lane state?
vouchers, err := ca.store.VouchersForPaych(ch)
if err != nil {
return 0, err
}
var maxnonce uint64
for _, v := range vouchers {
if v.Voucher.Lane == lane {
if v.Voucher.Nonce > maxnonce {
maxnonce = v.Voucher.Nonce
}
}
}
return maxnonce + 1, nil
}
// laneState gets the LaneStates from chain, then applies all vouchers in // laneState gets the LaneStates from chain, then applies all vouchers in
// the data store over the chain state // the data store over the chain state
func (ca *channelAccessor) laneState(ctx context.Context, state *paych.State, ch address.Address) (map[uint64]*paych.LaneState, error) { func (ca *channelAccessor) laneState(ctx context.Context, state *paych.State, ch address.Address) (map[uint64]*paych.LaneState, error) {
@ -479,7 +553,7 @@ func (ca *channelAccessor) totalRedeemedWithVoucher(laneStates map[uint64]*paych
lane, ok := laneStates[sv.Lane] lane, ok := laneStates[sv.Lane]
if ok { if ok {
// If the voucher is for an existing lane, and the voucher nonce // If the voucher is for an existing lane, and the voucher nonce
// and is higher than the lane nonce // is higher than the lane nonce
if sv.Nonce > lane.Nonce { if sv.Nonce > lane.Nonce {
// Add the delta between the redeemed amount and the voucher // Add the delta between the redeemed amount and the voucher
// amount to the total // amount to the total

View File

@ -381,11 +381,76 @@ func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestCreateVoucher(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
// Create a voucher in lane 1
voucherLane1Amt := big.NewInt(5)
voucher := paych.SignedVoucher{
Lane: 1,
Amount: voucherLane1Amt,
}
res, err := s.mgr.CreateVoucher(ctx, s.ch, voucher)
require.NoError(t, err)
require.NotNil(t, res.Voucher)
require.Equal(t, s.ch, res.Voucher.ChannelAddr)
require.Equal(t, voucherLane1Amt, res.Voucher.Amount)
require.EqualValues(t, 0, res.Shortfall.Int64())
nonce := res.Voucher.Nonce
// Create a voucher in lane 1 again, with a higher amount
voucherLane1Amt = big.NewInt(8)
voucher = paych.SignedVoucher{
Lane: 1,
Amount: voucherLane1Amt,
}
res, err = s.mgr.CreateVoucher(ctx, s.ch, voucher)
require.NoError(t, err)
require.NotNil(t, res.Voucher)
require.Equal(t, s.ch, res.Voucher.ChannelAddr)
require.Equal(t, voucherLane1Amt, res.Voucher.Amount)
require.EqualValues(t, 0, res.Shortfall.Int64())
require.Equal(t, nonce+1, res.Voucher.Nonce)
// Create a voucher in lane 2 that covers all the remaining funds
// in the channel
voucherLane2Amt := big.Sub(s.amt, voucherLane1Amt)
voucher = paych.SignedVoucher{
Lane: 2,
Amount: voucherLane2Amt,
}
res, err = s.mgr.CreateVoucher(ctx, s.ch, voucher)
require.NoError(t, err)
require.NotNil(t, res.Voucher)
require.Equal(t, s.ch, res.Voucher.ChannelAddr)
require.Equal(t, voucherLane2Amt, res.Voucher.Amount)
require.EqualValues(t, 0, res.Shortfall.Int64())
// Create a voucher in lane 2 that exceeds the remaining funds in the
// channel
voucherLane2Amt = big.Add(voucherLane2Amt, big.NewInt(1))
voucher = paych.SignedVoucher{
Lane: 2,
Amount: voucherLane2Amt,
}
res, err = s.mgr.CreateVoucher(ctx, s.ch, voucher)
require.NoError(t, err)
// Expect a shortfall value equal to the amount required to add the voucher
// to the channel
require.Nil(t, res.Voucher)
require.EqualValues(t, 1, res.Shortfall.Int64())
}
func TestAddVoucherDelta(t *testing.T) { func TestAddVoucherDelta(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, _, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
voucherLane := uint64(1) voucherLane := uint64(1)
@ -393,23 +458,23 @@ func TestAddVoucherDelta(t *testing.T) {
minDelta := big.NewInt(2) minDelta := big.NewInt(2)
nonce := uint64(1) nonce := uint64(1)
voucherAmount := big.NewInt(1) voucherAmount := big.NewInt(1)
sv := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err := mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) _, err := s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.Error(t, err) require.Error(t, err)
// Expect success when adding a voucher whose amount is equal to minDelta // Expect success when adding a voucher whose amount is equal to minDelta
nonce++ nonce++
voucherAmount = big.NewInt(2) voucherAmount = big.NewInt(2)
sv = createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv = createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
delta, err := mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) delta, err := s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 2) require.EqualValues(t, delta.Int64(), 2)
// Check that delta is correct when there's an existing voucher // Check that delta is correct when there's an existing voucher
nonce++ nonce++
voucherAmount = big.NewInt(5) voucherAmount = big.NewInt(5)
sv = createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv = createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
delta, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) delta, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 3) require.EqualValues(t, delta.Int64(), 3)
@ -417,8 +482,8 @@ func TestAddVoucherDelta(t *testing.T) {
nonce = uint64(1) nonce = uint64(1)
voucherAmount = big.NewInt(6) voucherAmount = big.NewInt(6)
voucherLane = uint64(2) voucherLane = uint64(2)
sv = createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv = createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
delta, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) delta, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 6) require.EqualValues(t, delta.Int64(), 6)
} }
@ -427,7 +492,7 @@ func TestAddVoucherNextLane(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, _, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
minDelta := big.NewInt(0) minDelta := big.NewInt(0)
voucherAmount := big.NewInt(2) voucherAmount := big.NewInt(2)
@ -435,40 +500,40 @@ func TestAddVoucherNextLane(t *testing.T) {
// Add a voucher in lane 2 // Add a voucher in lane 2
nonce := uint64(1) nonce := uint64(1)
voucherLane := uint64(2) voucherLane := uint64(2)
sv := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err := mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) _, err := s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
ci, err := mgr.GetChannelInfo(ch) ci, err := s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 3) require.EqualValues(t, ci.NextLane, 3)
// Allocate a lane (should be lane 3) // Allocate a lane (should be lane 3)
lane, err := mgr.AllocateLane(ch) lane, err := s.mgr.AllocateLane(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, lane, 3) require.EqualValues(t, lane, 3)
ci, err = mgr.GetChannelInfo(ch) ci, err = s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 4) require.EqualValues(t, ci.NextLane, 4)
// Add a voucher in lane 1 // Add a voucher in lane 1
voucherLane = uint64(1) voucherLane = uint64(1)
sv = createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv = createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) _, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
ci, err = mgr.GetChannelInfo(ch) ci, err = s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 4) require.EqualValues(t, ci.NextLane, 4)
// Add a voucher in lane 7 // Add a voucher in lane 7
voucherLane = uint64(7) voucherLane = uint64(7)
sv = createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv = createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) _, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
ci, err = mgr.GetChannelInfo(ch) ci, err = s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 8) require.EqualValues(t, ci.NextLane, 8)
} }
@ -477,15 +542,15 @@ func TestAllocateLane(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, _, ch, _ := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
// First lane should be 0 // First lane should be 0
lane, err := mgr.AllocateLane(ch) lane, err := s.mgr.AllocateLane(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, lane, 0) require.EqualValues(t, lane, 0)
// Next lane should be 1 // Next lane should be 1
lane, err = mgr.AllocateLane(ch) lane, err = s.mgr.AllocateLane(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, lane, 1) require.EqualValues(t, lane, 1)
} }
@ -553,7 +618,7 @@ func TestAddVoucherProof(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, _, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
nonce := uint64(1) nonce := uint64(1)
voucherAmount := big.NewInt(1) voucherAmount := big.NewInt(1)
@ -563,34 +628,34 @@ func TestAddVoucherProof(t *testing.T) {
// Add a voucher with no proof // Add a voucher with no proof
var proof []byte var proof []byte
sv := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) sv := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err := mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) _, err := s.mgr.AddVoucherOutbound(ctx, s.ch, sv, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Expect one voucher with no proof // Expect one voucher with no proof
ci, err := mgr.GetChannelInfo(ch) ci, err := s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, ci.Vouchers, 1) require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 0) require.Len(t, ci.Vouchers[0].Proof, 0)
// Add same voucher with no proof // Add same voucher with no proof
voucherLane = uint64(1) voucherLane = uint64(1)
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, proof, minDelta) _, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, proof, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Expect one voucher with no proof // Expect one voucher with no proof
ci, err = mgr.GetChannelInfo(ch) ci, err = s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, ci.Vouchers, 1) require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 0) require.Len(t, ci.Vouchers[0].Proof, 0)
// Add same voucher with proof // Add same voucher with proof
proof = []byte{1} proof = []byte{1}
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, proof, minDelta) _, err = s.mgr.AddVoucherOutbound(ctx, s.ch, sv, proof, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Should add proof to existing voucher // Should add proof to existing voucher
ci, err = mgr.GetChannelInfo(ch) ci, err = s.mgr.GetChannelInfo(s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, ci.Vouchers, 1) require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 1) require.Len(t, ci.Vouchers[0].Proof, 1)
@ -663,47 +728,47 @@ func TestBestSpendable(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, mock, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
// Add vouchers to lane 1 with amounts: [1, 2, 3] // Add vouchers to lane 1 with amounts: [1, 2, 3]
voucherLane := uint64(1) voucherLane := uint64(1)
minDelta := big.NewInt(0) minDelta := big.NewInt(0)
nonce := uint64(1) nonce := uint64(1)
voucherAmount := big.NewInt(1) voucherAmount := big.NewInt(1)
svL1V1 := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) svL1V1 := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err := mgr.AddVoucherInbound(ctx, ch, svL1V1, nil, minDelta) _, err := s.mgr.AddVoucherInbound(ctx, s.ch, svL1V1, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
nonce++ nonce++
voucherAmount = big.NewInt(2) voucherAmount = big.NewInt(2)
svL1V2 := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) svL1V2 := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherInbound(ctx, ch, svL1V2, nil, minDelta) _, err = s.mgr.AddVoucherInbound(ctx, s.ch, svL1V2, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
nonce++ nonce++
voucherAmount = big.NewInt(3) voucherAmount = big.NewInt(3)
svL1V3 := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) svL1V3 := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherInbound(ctx, ch, svL1V3, nil, minDelta) _, err = s.mgr.AddVoucherInbound(ctx, s.ch, svL1V3, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Add voucher to lane 2 with amounts: [2] // Add voucher to lane 2 with amounts: [2]
voucherLane = uint64(2) voucherLane = uint64(2)
nonce = uint64(1) nonce = uint64(1)
voucherAmount = big.NewInt(2) voucherAmount = big.NewInt(2)
svL2V1 := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) svL2V1 := createTestVoucher(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherInbound(ctx, ch, svL2V1, nil, minDelta) _, err = s.mgr.AddVoucherInbound(ctx, s.ch, svL2V1, nil, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Return success exit code from calls to check if voucher is spendable // Return success exit code from calls to check if voucher is spendable
bsapi := newMockBestSpendableAPI(mgr) bsapi := newMockBestSpendableAPI(s.mgr)
mock.setCallResponse(&api.InvocResult{ s.mock.setCallResponse(&api.InvocResult{
MsgRct: &types.MessageReceipt{ MsgRct: &types.MessageReceipt{
ExitCode: 0, ExitCode: 0,
}, },
}) })
// Verify best spendable vouchers on each lane // Verify best spendable vouchers on each lane
vouchers, err := BestSpendableByLane(ctx, bsapi, ch) vouchers, err := BestSpendableByLane(ctx, bsapi, s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, vouchers, 2) require.Len(t, vouchers, 2)
@ -716,21 +781,21 @@ func TestBestSpendable(t *testing.T) {
require.EqualValues(t, 2, vchr.Amount.Int64()) require.EqualValues(t, 2, vchr.Amount.Int64())
// Submit voucher from lane 2 // Submit voucher from lane 2
_, err = mgr.SubmitVoucher(ctx, ch, svL2V1, nil, nil) _, err = s.mgr.SubmitVoucher(ctx, s.ch, svL2V1, nil, nil)
require.NoError(t, err) require.NoError(t, err)
// Best spendable voucher should no longer include lane 2 // Best spendable voucher should no longer include lane 2
// (because voucher has not been submitted) // (because voucher has not been submitted)
vouchers, err = BestSpendableByLane(ctx, bsapi, ch) vouchers, err = BestSpendableByLane(ctx, bsapi, s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, vouchers, 1) require.Len(t, vouchers, 1)
// Submit first voucher from lane 1 // Submit first voucher from lane 1
_, err = mgr.SubmitVoucher(ctx, ch, svL1V1, nil, nil) _, err = s.mgr.SubmitVoucher(ctx, s.ch, svL1V1, nil, nil)
require.NoError(t, err) require.NoError(t, err)
// Best spendable voucher for lane 1 should still be highest value voucher // Best spendable voucher for lane 1 should still be highest value voucher
vouchers, err = BestSpendableByLane(ctx, bsapi, ch) vouchers, err = BestSpendableByLane(ctx, bsapi, s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, vouchers, 1) require.Len(t, vouchers, 1)
@ -743,18 +808,18 @@ func TestCheckSpendable(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, mock, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
// Create voucher with Extra // Create voucher with Extra
voucherLane := uint64(1) voucherLane := uint64(1)
nonce := uint64(1) nonce := uint64(1)
voucherAmount := big.NewInt(1) voucherAmount := big.NewInt(1)
voucher := createTestVoucherWithExtra(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) voucher := createTestVoucherWithExtra(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
// Add voucher with proof // Add voucher with proof
minDelta := big.NewInt(0) minDelta := big.NewInt(0)
proof := []byte("proof") proof := []byte("proof")
_, err := mgr.AddVoucherInbound(ctx, ch, voucher, proof, minDelta) _, err := s.mgr.AddVoucherInbound(ctx, s.ch, voucher, proof, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Return success exit code from VM call, which indicates that voucher is // Return success exit code from VM call, which indicates that voucher is
@ -764,17 +829,17 @@ func TestCheckSpendable(t *testing.T) {
ExitCode: 0, ExitCode: 0,
}, },
} }
mock.setCallResponse(successResponse) s.mock.setCallResponse(successResponse)
// Check that spendable is true // Check that spendable is true
secret := []byte("secret") secret := []byte("secret")
otherProof := []byte("other proof") otherProof := []byte("other proof")
spendable, err := mgr.CheckVoucherSpendable(ctx, ch, voucher, secret, otherProof) spendable, err := s.mgr.CheckVoucherSpendable(ctx, s.ch, voucher, secret, otherProof)
require.NoError(t, err) require.NoError(t, err)
require.True(t, spendable) require.True(t, spendable)
// Check that the secret and proof were passed through correctly // Check that the secret and proof were passed through correctly
lastCall := mock.getLastCall() lastCall := s.mock.getLastCall()
var p paych.UpdateChannelStateParams var p paych.UpdateChannelStateParams
err = p.UnmarshalCBOR(bytes.NewReader(lastCall.Params)) err = p.UnmarshalCBOR(bytes.NewReader(lastCall.Params))
require.NoError(t, err) require.NoError(t, err)
@ -784,11 +849,11 @@ func TestCheckSpendable(t *testing.T) {
// Check that if no proof is supplied, the proof supplied to add voucher // Check that if no proof is supplied, the proof supplied to add voucher
// above is used // above is used
secret2 := []byte("secret2") secret2 := []byte("secret2")
spendable, err = mgr.CheckVoucherSpendable(ctx, ch, voucher, secret2, nil) spendable, err = s.mgr.CheckVoucherSpendable(ctx, s.ch, voucher, secret2, nil)
require.NoError(t, err) require.NoError(t, err)
require.True(t, spendable) require.True(t, spendable)
lastCall = mock.getLastCall() lastCall = s.mock.getLastCall()
var p2 paych.UpdateChannelStateParams var p2 paych.UpdateChannelStateParams
err = p2.UnmarshalCBOR(bytes.NewReader(lastCall.Params)) err = p2.UnmarshalCBOR(bytes.NewReader(lastCall.Params))
require.NoError(t, err) require.NoError(t, err)
@ -796,26 +861,26 @@ func TestCheckSpendable(t *testing.T) {
require.Equal(t, secret2, p2.Secret) require.Equal(t, secret2, p2.Secret)
// Check that if VM call returns non-success exit code, spendable is false // Check that if VM call returns non-success exit code, spendable is false
mock.setCallResponse(&api.InvocResult{ s.mock.setCallResponse(&api.InvocResult{
MsgRct: &types.MessageReceipt{ MsgRct: &types.MessageReceipt{
ExitCode: 1, ExitCode: 1,
}, },
}) })
spendable, err = mgr.CheckVoucherSpendable(ctx, ch, voucher, secret, nil) spendable, err = s.mgr.CheckVoucherSpendable(ctx, s.ch, voucher, secret, nil)
require.NoError(t, err) require.NoError(t, err)
require.False(t, spendable) require.False(t, spendable)
// Return success exit code (indicating voucher is spendable) // Return success exit code (indicating voucher is spendable)
mock.setCallResponse(successResponse) s.mock.setCallResponse(successResponse)
spendable, err = mgr.CheckVoucherSpendable(ctx, ch, voucher, secret, nil) spendable, err = s.mgr.CheckVoucherSpendable(ctx, s.ch, voucher, secret, nil)
require.NoError(t, err) require.NoError(t, err)
require.True(t, spendable) require.True(t, spendable)
// Check that voucher is no longer spendable once it has been submitted // Check that voucher is no longer spendable once it has been submitted
_, err = mgr.SubmitVoucher(ctx, ch, voucher, nil, nil) _, err = s.mgr.SubmitVoucher(ctx, s.ch, voucher, nil, nil)
require.NoError(t, err) require.NoError(t, err)
spendable, err = mgr.CheckVoucherSpendable(ctx, ch, voucher, secret, nil) spendable, err = s.mgr.CheckVoucherSpendable(ctx, s.ch, voucher, secret, nil)
require.NoError(t, err) require.NoError(t, err)
require.False(t, spendable) require.False(t, spendable)
} }
@ -824,28 +889,28 @@ func TestSubmitVoucher(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Set up a manager with a single payment channel // Set up a manager with a single payment channel
mgr, mock, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) s := testSetupMgrWithChannel(ctx, t)
// Create voucher with Extra // Create voucher with Extra
voucherLane := uint64(1) voucherLane := uint64(1)
nonce := uint64(1) nonce := uint64(1)
voucherAmount := big.NewInt(1) voucherAmount := big.NewInt(1)
voucher := createTestVoucherWithExtra(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) voucher := createTestVoucherWithExtra(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
// Add voucher with proof // Add voucher with proof
minDelta := big.NewInt(0) minDelta := big.NewInt(0)
addVoucherProof := []byte("proof") addVoucherProof := []byte("proof")
_, err := mgr.AddVoucherInbound(ctx, ch, voucher, addVoucherProof, minDelta) _, err := s.mgr.AddVoucherInbound(ctx, s.ch, voucher, addVoucherProof, minDelta)
require.NoError(t, err) require.NoError(t, err)
// Submit voucher // Submit voucher
secret := []byte("secret") secret := []byte("secret")
submitProof := []byte("submit proof") submitProof := []byte("submit proof")
submitCid, err := mgr.SubmitVoucher(ctx, ch, voucher, secret, submitProof) submitCid, err := s.mgr.SubmitVoucher(ctx, s.ch, voucher, secret, submitProof)
require.NoError(t, err) require.NoError(t, err)
// Check that the secret and proof were passed through correctly // Check that the secret and proof were passed through correctly
msg := mock.pushedMessages(submitCid) msg := s.mock.pushedMessages(submitCid)
var p paych.UpdateChannelStateParams var p paych.UpdateChannelStateParams
err = p.UnmarshalCBOR(bytes.NewReader(msg.Message.Params)) err = p.UnmarshalCBOR(bytes.NewReader(msg.Message.Params))
require.NoError(t, err) require.NoError(t, err)
@ -858,14 +923,14 @@ func TestSubmitVoucher(t *testing.T) {
voucherAmount = big.NewInt(2) voucherAmount = big.NewInt(2)
addVoucherProof2 := []byte("proof2") addVoucherProof2 := []byte("proof2")
secret2 := []byte("secret2") secret2 := []byte("secret2")
voucher = createTestVoucherWithExtra(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) voucher = createTestVoucherWithExtra(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
_, err = mgr.AddVoucherInbound(ctx, ch, voucher, addVoucherProof2, minDelta) _, err = s.mgr.AddVoucherInbound(ctx, s.ch, voucher, addVoucherProof2, minDelta)
require.NoError(t, err) require.NoError(t, err)
submitCid, err = mgr.SubmitVoucher(ctx, ch, voucher, secret2, nil) submitCid, err = s.mgr.SubmitVoucher(ctx, s.ch, voucher, secret2, nil)
require.NoError(t, err) require.NoError(t, err)
msg = mock.pushedMessages(submitCid) msg = s.mock.pushedMessages(submitCid)
var p2 paych.UpdateChannelStateParams var p2 paych.UpdateChannelStateParams
err = p2.UnmarshalCBOR(bytes.NewReader(msg.Message.Params)) err = p2.UnmarshalCBOR(bytes.NewReader(msg.Message.Params))
require.NoError(t, err) require.NoError(t, err)
@ -877,11 +942,11 @@ func TestSubmitVoucher(t *testing.T) {
voucherAmount = big.NewInt(3) voucherAmount = big.NewInt(3)
secret3 := []byte("secret2") secret3 := []byte("secret2")
proof3 := []byte("proof3") proof3 := []byte("proof3")
voucher = createTestVoucherWithExtra(t, ch, voucherLane, nonce, voucherAmount, fromKeyPrivate) voucher = createTestVoucherWithExtra(t, s.ch, voucherLane, nonce, voucherAmount, s.fromKeyPrivate)
submitCid, err = mgr.SubmitVoucher(ctx, ch, voucher, secret3, proof3) submitCid, err = s.mgr.SubmitVoucher(ctx, s.ch, voucher, secret3, proof3)
require.NoError(t, err) require.NoError(t, err)
msg = mock.pushedMessages(submitCid) msg = s.mock.pushedMessages(submitCid)
var p3 paych.UpdateChannelStateParams var p3 paych.UpdateChannelStateParams
err = p3.UnmarshalCBOR(bytes.NewReader(msg.Message.Params)) err = p3.UnmarshalCBOR(bytes.NewReader(msg.Message.Params))
require.NoError(t, err) require.NoError(t, err)
@ -889,7 +954,7 @@ func TestSubmitVoucher(t *testing.T) {
require.Equal(t, secret3, p3.Secret) require.Equal(t, secret3, p3.Secret)
// Verify that vouchers are marked as submitted // Verify that vouchers are marked as submitted
vis, err := mgr.ListVouchers(ctx, ch) vis, err := s.mgr.ListVouchers(ctx, s.ch)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, vis, 3) require.Len(t, vis, 3)
@ -898,55 +963,20 @@ func TestSubmitVoucher(t *testing.T) {
} }
// Attempting to submit the same voucher again should fail // Attempting to submit the same voucher again should fail
_, err = mgr.SubmitVoucher(ctx, ch, voucher, secret2, nil) _, err = s.mgr.SubmitVoucher(ctx, s.ch, voucher, secret2, nil)
require.Error(t, err) require.Error(t, err)
} }
func TestNextNonceForLane(t *testing.T) { type testScaffold struct {
ctx := context.Background() mgr *Manager
mock *mockManagerAPI
// Set up a manager with a single payment channel ch address.Address
mgr, _, ch, key := testSetupMgrWithChannel(ctx, t) amt big.Int
fromAcct address.Address
// Expect next nonce for non-existent lane to be 1 fromKeyPrivate []byte
next, err := mgr.NextNonceForLane(ctx, ch, 1)
require.NoError(t, err)
require.EqualValues(t, next, 1)
voucherAmount := big.NewInt(1)
minDelta := big.NewInt(0)
voucherAmount = big.NewInt(2)
// Add vouchers such that we have
// lane 1: nonce 2
// lane 1: nonce 4
voucherLane := uint64(1)
for _, nonce := range []uint64{2, 4} {
voucherAmount = big.Add(voucherAmount, big.NewInt(1))
sv := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, key)
_, err := mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
}
// lane 2: nonce 7
voucherLane = uint64(2)
nonce := uint64(7)
sv := createTestVoucher(t, ch, voucherLane, nonce, voucherAmount, key)
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
// Expect next nonce for lane 1 to be 5
next, err = mgr.NextNonceForLane(ctx, ch, 1)
require.NoError(t, err)
require.EqualValues(t, next, 5)
// Expect next nonce for lane 2 to be 8
next, err = mgr.NextNonceForLane(ctx, ch, 2)
require.NoError(t, err)
require.EqualValues(t, next, 8)
} }
func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, *mockManagerAPI, address.Address, []byte) { func testSetupMgrWithChannel(ctx context.Context, t *testing.T) *testScaffold {
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100) ch := tutils.NewIDAddr(t, 100)
@ -962,11 +992,12 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, *mock
mock.setAccountState(toAcct, account.State{Address: to}) mock.setAccountState(toAcct, account.State{Address: to})
// Create channel in state // Create channel in state
balance := big.NewInt(20)
act := &types.Actor{ act := &types.Actor{
Code: builtin.AccountActorCodeID, Code: builtin.AccountActorCodeID,
Head: cid.Cid{}, Head: cid.Cid{},
Nonce: 0, Nonce: 0,
Balance: big.NewInt(20), Balance: balance,
} }
mock.setPaychState(ch, act, paych.State{ mock.setPaychState(ch, act, paych.State{
From: fromAcct, From: fromAcct,
@ -991,7 +1022,17 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, *mock
err = mgr.store.putChannelInfo(ci) err = mgr.store.putChannelInfo(ci)
require.NoError(t, err) require.NoError(t, err)
return mgr, mock, ch, fromKeyPrivate // Add the from signing key to the wallet
mock.addSigningKey(fromKeyPrivate)
return &testScaffold{
mgr: mgr,
mock: mock,
ch: ch,
amt: balance,
fromAcct: fromAcct,
fromKeyPrivate: fromKeyPrivate,
}
} }
func testGenerateKeyPair(t *testing.T) ([]byte, []byte) { func testGenerateKeyPair(t *testing.T) ([]byte, []byte) {

View File

@ -6,6 +6,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/filecoin-project/specs-actors/actors/builtin/account"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
cborrpc "github.com/filecoin-project/go-cbor-util" cborrpc "github.com/filecoin-project/go-cbor-util"
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init" init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
@ -650,8 +656,6 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Queue up two add funds requests behind create channel // Queue up two add funds requests behind create channel
//var addFundsQueuedUp sync.WaitGroup
//addFundsQueuedUp.Add(2)
var addFundsSent sync.WaitGroup var addFundsSent sync.WaitGroup
addFundsSent.Add(2) addFundsSent.Add(2)
@ -662,7 +666,6 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
var addFundsMcid1 cid.Cid var addFundsMcid1 cid.Cid
var addFundsMcid2 cid.Cid var addFundsMcid2 cid.Cid
go func() { go func() {
//go addFundsQueuedUp.Done()
defer addFundsSent.Done() defer addFundsSent.Done()
// Request add funds - should block until create channel has completed // Request add funds - should block until create channel has completed
@ -671,7 +674,6 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
}() }()
go func() { go func() {
//go addFundsQueuedUp.Done()
defer addFundsSent.Done() defer addFundsSent.Done()
// Request add funds again - should merge with waiting add funds request // Request add funds again - should merge with waiting add funds request
@ -899,6 +901,168 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
require.Equal(t, createAmt, createMsg.Message.Value) require.Equal(t, createAmt, createMsg.Message.Value)
} }
// TestPaychAvailableFunds tests that PaychAvailableFunds returns the correct
// channel state
func TestPaychAvailableFunds(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewIDAddr(t, 102)
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(store, mock)
require.NoError(t, err)
// No channel created yet so available funds should be all zeroes
av, err := mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.Nil(t, av.Channel)
require.Nil(t, av.PendingWaitSentinel)
require.EqualValues(t, 0, av.ConfirmedAmt.Int64())
require.EqualValues(t, 0, av.PendingAmt.Int64())
require.EqualValues(t, 0, av.QueuedAmt.Int64())
require.EqualValues(t, 0, av.VoucherReedeemedAmt.Int64())
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
require.NoError(t, err)
// Available funds should reflect create channel message sent
av, err = mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.Nil(t, av.Channel)
require.EqualValues(t, 0, av.ConfirmedAmt.Int64())
require.EqualValues(t, createAmt, av.PendingAmt)
require.EqualValues(t, 0, av.QueuedAmt.Int64())
require.EqualValues(t, 0, av.VoucherReedeemedAmt.Int64())
// Should now have a pending wait sentinel
require.NotNil(t, av.PendingWaitSentinel)
// Queue up an add funds request behind create channel
var addFundsSent sync.WaitGroup
addFundsSent.Add(1)
addFundsAmt := big.NewInt(5)
var addFundsMcid cid.Cid
go func() {
defer addFundsSent.Done()
// Request add funds - should block until create channel has completed
_, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt)
require.NoError(t, err)
}()
// Wait for add funds request to be queued up
waitForQueueSize(t, mgr, from, to, 1)
// Available funds should now include queued funds
av, err = mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.Nil(t, av.Channel)
require.NotNil(t, av.PendingWaitSentinel)
require.EqualValues(t, 0, av.ConfirmedAmt.Int64())
// create amount is still pending
require.EqualValues(t, createAmt, av.PendingAmt)
// queued amount now includes add funds amount
require.EqualValues(t, addFundsAmt, av.QueuedAmt)
require.EqualValues(t, 0, av.VoucherReedeemedAmt.Int64())
// Create channel in state
arr, err := adt.MakeEmptyArray(mock.store).Root()
require.NoError(t, err)
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: createAmt,
}
mock.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: arr,
})
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, *av.PendingWaitSentinel)
require.NoError(t, err)
require.Equal(t, ch, chres)
// Wait for add funds request to be sent
addFundsSent.Wait()
// Available funds should now include the channel and also a wait sentinel
// for the add funds message
av, err = mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.NotNil(t, av.Channel)
require.NotNil(t, av.PendingWaitSentinel)
// create amount is now confirmed
require.EqualValues(t, createAmt, av.ConfirmedAmt)
// add funds amount it now pending
require.EqualValues(t, addFundsAmt, av.PendingAmt)
require.EqualValues(t, 0, av.QueuedAmt.Int64())
require.EqualValues(t, 0, av.VoucherReedeemedAmt.Int64())
// Send success add funds response
mock.receiveMsgResponse(addFundsMcid, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
// Wait for add funds response
_, err = mgr.GetPaychWaitReady(ctx, *av.PendingWaitSentinel)
require.NoError(t, err)
// Available funds should no longer have a wait sentinel
av, err = mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.NotNil(t, av.Channel)
require.Nil(t, av.PendingWaitSentinel)
// confirmed amount now includes create and add funds amounts
require.EqualValues(t, types.BigAdd(createAmt, addFundsAmt), av.ConfirmedAmt)
require.EqualValues(t, 0, av.PendingAmt.Int64())
require.EqualValues(t, 0, av.QueuedAmt.Int64())
require.EqualValues(t, 0, av.VoucherReedeemedAmt.Int64())
// Add some vouchers
voucherAmt1 := types.NewInt(3)
voucher := createTestVoucher(t, ch, 1, 1, voucherAmt1, fromKeyPrivate)
_, err = mgr.AddVoucherOutbound(ctx, ch, voucher, nil, types.NewInt(0))
require.NoError(t, err)
voucherAmt2 := types.NewInt(2)
voucher = createTestVoucher(t, ch, 2, 1, voucherAmt2, fromKeyPrivate)
_, err = mgr.AddVoucherOutbound(ctx, ch, voucher, nil, types.NewInt(0))
require.NoError(t, err)
av, err = mgr.AvailableFunds(from, to)
require.NoError(t, err)
require.NotNil(t, av.Channel)
require.Nil(t, av.PendingWaitSentinel)
require.EqualValues(t, types.BigAdd(createAmt, addFundsAmt), av.ConfirmedAmt)
require.EqualValues(t, 0, av.PendingAmt.Int64())
require.EqualValues(t, 0, av.QueuedAmt.Int64())
// voucher redeemed amount now includes vouchers
require.EqualValues(t, types.BigAdd(voucherAmt1, voucherAmt2), av.VoucherReedeemedAmt)
}
// waitForQueueSize waits for the funds request queue to be of the given size // waitForQueueSize waits for the funds request queue to be of the given size
func waitForQueueSize(t *testing.T, mgr *Manager, from address.Address, to address.Address, size int) { func waitForQueueSize(t *testing.T, mgr *Manager, from address.Address, to address.Address, size int) {
ca, err := mgr.accessorByFromTo(from, to) ca, err := mgr.accessorByFromTo(from, to)

View File

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/filecoin-project/lotus/api"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
@ -34,8 +36,6 @@ type paychFundsRes struct {
type fundsReq struct { type fundsReq struct {
ctx context.Context ctx context.Context
promise chan *paychFundsRes promise chan *paychFundsRes
from address.Address
to address.Address
amt types.BigInt amt types.BigInt
lk sync.Mutex lk sync.Mutex
@ -45,13 +45,11 @@ type fundsReq struct {
active bool active bool
} }
func newFundsReq(ctx context.Context, from address.Address, to address.Address, amt types.BigInt) *fundsReq { func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq {
promise := make(chan *paychFundsRes) promise := make(chan *paychFundsRes)
return &fundsReq{ return &fundsReq{
ctx: ctx, ctx: ctx,
promise: promise, promise: promise,
from: from,
to: to,
amt: amt, amt: amt,
active: true, active: true,
} }
@ -148,14 +146,6 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
} }
} }
func (m *mergedFundsReq) from() address.Address {
return m.reqs[0].from
}
func (m *mergedFundsReq) to() address.Address {
return m.reqs[0].to
}
// sum is the sum of the amounts in all requests in the merge // sum is the sum of the amounts in all requests in the merge
func (m *mergedFundsReq) sum() types.BigInt { func (m *mergedFundsReq) sum() types.BigInt {
sum := types.NewInt(0) sum := types.NewInt(0)
@ -178,9 +168,9 @@ func (m *mergedFundsReq) sum() types.BigInt {
// address and the CID of the new add funds message. // address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still // If an operation returns an error, subsequent waiting operations will still
// be attempted. // be attempted.
func (ca *channelAccessor) getPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) {
// Add the request to add funds to a queue and wait for the result // Add the request to add funds to a queue and wait for the result
freq := newFundsReq(ctx, from, to, amt) freq := newFundsReq(ctx, amt)
ca.enqueue(freq) ca.enqueue(freq)
select { select {
case res := <-freq.promise: case res := <-freq.promise:
@ -191,17 +181,17 @@ func (ca *channelAccessor) getPaych(ctx context.Context, from, to address.Addres
} }
} }
// Queue up an add funds operations // Queue up an add funds operation
func (ca *channelAccessor) enqueue(task *fundsReq) { func (ca *channelAccessor) enqueue(task *fundsReq) {
ca.lk.Lock() ca.lk.Lock()
defer ca.lk.Unlock() defer ca.lk.Unlock()
ca.fundsReqQueue = append(ca.fundsReqQueue, task) ca.fundsReqQueue = append(ca.fundsReqQueue, task)
go ca.processQueue() go ca.processQueue() // nolint: errcheck
} }
// Run the operations in the queue // Run the operations in the queue
func (ca *channelAccessor) processQueue() { func (ca *channelAccessor) processQueue() (*api.ChannelAvailableFunds, error) {
ca.lk.Lock() ca.lk.Lock()
defer ca.lk.Unlock() defer ca.lk.Unlock()
@ -210,7 +200,7 @@ func (ca *channelAccessor) processQueue() {
// If there's nothing in the queue, bail out // If there's nothing in the queue, bail out
if len(ca.fundsReqQueue) == 0 { if len(ca.fundsReqQueue) == 0 {
return return ca.currentAvailableFunds(types.NewInt(0))
} }
// Merge all pending requests into one. // Merge all pending requests into one.
@ -221,17 +211,17 @@ func (ca *channelAccessor) processQueue() {
if amt.IsZero() { if amt.IsZero() {
// Note: The amount can be zero if requests are cancelled as we're // Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq // building the mergedFundsReq
return return ca.currentAvailableFunds(amt)
} }
res := ca.processTask(merged.ctx, merged.from(), merged.to(), amt) res := ca.processTask(merged.ctx, amt)
// If the task is waiting on an external event (eg something to appear on // If the task is waiting on an external event (eg something to appear on
// chain) it will return nil // chain) it will return nil
if res == nil { if res == nil {
// Stop processing the fundsReqQueue and wait. When the event occurs it will // Stop processing the fundsReqQueue and wait. When the event occurs it will
// call processQueue() again // call processQueue() again
return return ca.currentAvailableFunds(amt)
} }
// Finished processing so clear the queue // Finished processing so clear the queue
@ -239,6 +229,8 @@ func (ca *channelAccessor) processQueue() {
// Call the task callback with its results // Call the task callback with its results
merged.onComplete(res) merged.onComplete(res)
return ca.currentAvailableFunds(types.NewInt(0))
} }
// filterQueue filters cancelled requests out of the queue // filterQueue filters cancelled requests out of the queue
@ -291,32 +283,83 @@ func (ca *channelAccessor) msgWaitComplete(mcid cid.Cid, err error) {
// The queue may have been waiting for msg completion to proceed, so // The queue may have been waiting for msg completion to proceed, so
// process the next queue item // process the next queue item
if len(ca.fundsReqQueue) > 0 { if len(ca.fundsReqQueue) > 0 {
go ca.processQueue() go ca.processQueue() // nolint: errcheck
} }
} }
func (ca *channelAccessor) currentAvailableFunds(queuedAmt types.BigInt) (*api.ChannelAvailableFunds, error) {
channelInfo, err := ca.store.OutboundActiveByFromTo(ca.from, ca.to)
if err == ErrChannelNotTracked {
// If the channel does not exist we still want to return an empty
// ChannelAvailableFunds, so that clients can check for the existence
// of a channel between from / to without getting an error.
return &api.ChannelAvailableFunds{
Channel: nil,
ConfirmedAmt: types.NewInt(0),
PendingAmt: types.NewInt(0),
PendingWaitSentinel: nil,
QueuedAmt: queuedAmt,
VoucherReedeemedAmt: types.NewInt(0),
}, nil
}
if err != nil {
return nil, err
}
// The channel may have a pending create or add funds message
waitSentinel := channelInfo.CreateMsg
if waitSentinel == nil {
waitSentinel = channelInfo.AddFundsMsg
}
// Get the total amount redeemed by vouchers.
// This includes vouchers that have been submitted, and vouchers that are
// in the datastore but haven't yet been submitted.
totalRedeemed := types.NewInt(0)
if channelInfo.Channel != nil {
ch := *channelInfo.Channel
_, pchState, err := ca.sa.loadPaychActorState(ca.chctx, ch)
if err != nil {
return nil, err
}
laneStates, err := ca.laneState(ca.chctx, pchState, ch)
if err != nil {
return nil, err
}
for _, ls := range laneStates {
totalRedeemed = types.BigAdd(totalRedeemed, ls.Redeemed)
}
}
return &api.ChannelAvailableFunds{
Channel: channelInfo.Channel,
ConfirmedAmt: channelInfo.Amount,
PendingAmt: channelInfo.PendingAmount,
PendingWaitSentinel: waitSentinel,
QueuedAmt: queuedAmt,
VoucherReedeemedAmt: totalRedeemed,
}, nil
}
// processTask checks the state of the channel and takes appropriate action // processTask checks the state of the channel and takes appropriate action
// (see description of getPaych). // (see description of getPaych).
// Note that processTask may be called repeatedly in the same state, and should // Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a // return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain) // message to be confirmed on chain)
func (ca *channelAccessor) processTask( func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes {
ctx context.Context,
from address.Address,
to address.Address,
amt types.BigInt,
) *paychFundsRes {
// Get the payment channel for the from/to addresses. // Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to // Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel. // create a channel.
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to) channelInfo, err := ca.store.OutboundActiveByFromTo(ca.from, ca.to)
if err != nil && err != ErrChannelNotTracked { if err != nil && err != ErrChannelNotTracked {
return &paychFundsRes{err: err} return &paychFundsRes{err: err}
} }
// If a channel has not yet been created, create one. // If a channel has not yet been created, create one.
if channelInfo == nil { if channelInfo == nil {
mcid, err := ca.createPaych(ctx, from, to, amt) mcid, err := ca.createPaych(ctx, amt)
if err != nil { if err != nil {
return &paychFundsRes{err: err} return &paychFundsRes{err: err}
} }
@ -348,8 +391,8 @@ func (ca *channelAccessor) processTask(
} }
// createPaych sends a message to create the channel and returns the message cid // createPaych sends a message to create the channel and returns the message cid
func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (cid.Cid, error) { func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) {
params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to}) params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: ca.from, To: ca.to})
if aerr != nil { if aerr != nil {
return cid.Undef, aerr return cid.Undef, aerr
} }
@ -364,7 +407,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
msg := &types.Message{ msg := &types.Message{
To: builtin.InitActorAddr, To: builtin.InitActorAddr,
From: from, From: ca.from,
Value: amt, Value: amt,
Method: builtin.MethodsInit.Exec, Method: builtin.MethodsInit.Exec,
Params: enc, Params: enc,
@ -377,7 +420,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
mcid := smsg.Cid() mcid := smsg.Cid()
// Create a new channel in the store // Create a new channel in the store
ci, err := ca.store.CreateChannel(from, to, mcid, amt) ci, err := ca.store.CreateChannel(ca.from, ca.to, mcid, amt)
if err != nil { if err != nil {
log.Errorf("creating channel: %s", err) log.Errorf("creating channel: %s", err)
return cid.Undef, err return cid.Undef, err
@ -397,7 +440,7 @@ func (ca *channelAccessor) waitForPaychCreateMsg(channelID string, mcid cid.Cid)
} }
func (ca *channelAccessor) waitPaychCreateMsg(channelID string, mcid cid.Cid) error { func (ca *channelAccessor) waitPaychCreateMsg(channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence) mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, build.MessageConfidence)
if err != nil { if err != nil {
log.Errorf("wait msg: %w", err) log.Errorf("wait msg: %w", err)
return err return err
@ -480,7 +523,7 @@ func (ca *channelAccessor) waitForAddFundsMsg(channelID string, mcid cid.Cid) {
} }
func (ca *channelAccessor) waitAddFundsMsg(channelID string, mcid cid.Cid) error { func (ca *channelAccessor) waitAddFundsMsg(channelID string, mcid cid.Cid) error {
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence) mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, build.MessageConfidence)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return err return err
@ -669,3 +712,7 @@ func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan on
return promise return promise
} }
func (ca *channelAccessor) availableFunds() (*api.ChannelAvailableFunds, error) {
return ca.processQueue()
}

View File

@ -59,12 +59,12 @@ type ChannelInfo struct {
ChannelID string ChannelID string
// Channel address - may be nil if the channel hasn't been created yet // Channel address - may be nil if the channel hasn't been created yet
Channel *address.Address Channel *address.Address
// Control is the address of the account that created the channel // Control is the address of the local node
Control address.Address Control address.Address
// Target is the address of the account on the other end of the channel // Target is the address of the remote node (on the other end of the channel)
Target address.Address Target address.Address
// Direction indicates if the channel is inbound (this node is the Target) // Direction indicates if the channel is inbound (Control is the "to" address)
// or outbound (this node is the Control) // or outbound (Control is the "from" address)
Direction uint64 Direction uint64
// Vouchers is a list of all vouchers sent on the channel // Vouchers is a list of all vouchers sent on the channel
Vouchers []*VoucherInfo Vouchers []*VoucherInfo