paych: API to pre-fund channels

This commit is contained in:
Łukasz Magiera 2022-01-04 20:33:49 +01:00
parent c315111bc7
commit 8e46b9ea5d
9 changed files with 168 additions and 33 deletions

View File

@ -689,7 +689,11 @@ type FullNode interface {
// MethodGroup: Paych
// The Paych methods are for interacting with and managing payment channels
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign
// PaychGet gets or creates a payment channel between address pair
// - If reserve is false, the specified amount will be added to the channel through on-chain send for future use
// - If reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds
// available, funds will be added through an on-chain message.
PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*ChannelInfo, error) //perm:sign
PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign
PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign
PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign

View File

@ -306,7 +306,7 @@ type FullNodeStruct struct {
PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"`
@ -2179,14 +2179,14 @@ func (s *FullNodeStub) PaychCollect(p0 context.Context, p1 address.Address) (cid
return *new(cid.Cid), ErrNotSupported
}
func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) {
if s.Internal.PaychGet == nil {
return nil, ErrNotSupported
}
return s.Internal.PaychGet(p0, p1, p2, p3)
return s.Internal.PaychGet(p0, p1, p2, p3, p4)
}
func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) {
func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) {
return nil, ErrNotSupported
}

View File

@ -337,4 +337,9 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder
finish(w.ClientExport(ctx, eref, *ref))
}
func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
// v0 always reserves
return w.FullNode.PaychGet(ctx, from, to, amt, true)
}
var _ FullNode = &WrapperV1Full{}

View File

@ -34,7 +34,7 @@ func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stat
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable)
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, true)
if err != nil {
return address.Undef, cid.Undef, err
}

View File

@ -22,8 +22,8 @@ type PaychAPI struct {
PaychMgr *paychmgr.Manager
}
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt)
func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*api.ChannelInfo, error) {
ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, reserve)
if err != nil {
return nil, err
}
@ -55,7 +55,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address
// TODO: Fix free fund tracking in PaychGet
// TODO: validate voucher spec before locking funds
ch, err := a.PaychGet(ctx, from, to, amount)
ch, err := a.PaychGet(ctx, from, to, amount, true)
if err != nil {
return nil, err
}

View File

@ -196,7 +196,7 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{172}); err != nil {
if _, err := w.Write([]byte{174}); err != nil {
return err
}
@ -346,6 +346,38 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.AvailableAmount (big.Int) (struct)
if len("AvailableAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"AvailableAmount\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("AvailableAmount"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("AvailableAmount")); err != nil {
return err
}
if err := t.AvailableAmount.MarshalCBOR(w); err != nil {
return err
}
// t.PendingAvailableAmount (big.Int) (struct)
if len("PendingAvailableAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PendingAvailableAmount\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingAvailableAmount"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("PendingAvailableAmount")); err != nil {
return err
}
if err := t.PendingAvailableAmount.MarshalCBOR(w); err != nil {
return err
}
// t.PendingAmount (big.Int) (struct)
if len("PendingAmount") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PendingAmount\" was too long")
@ -577,6 +609,26 @@ func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) error {
return xerrors.Errorf("unmarshaling t.Amount: %w", err)
}
}
// t.AvailableAmount (big.Int) (struct)
case "AvailableAmount":
{
if err := t.AvailableAmount.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.AvailableAmount: %w", err)
}
}
// t.PendingAvailableAmount (big.Int) (struct)
case "PendingAvailableAmount":
{
if err := t.PendingAvailableAmount.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.PendingAvailableAmount: %w", err)
}
}
// t.PendingAmount (big.Int) (struct)
case "PendingAmount":

View File

@ -101,13 +101,13 @@ func (pm *Manager) Stop() error {
return nil
}
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) {
chanAccessor, err := pm.accessorByFromTo(from, to)
if err != nil {
return address.Undef, cid.Undef, err
}
return chanAccessor.getPaych(ctx, amt)
return chanAccessor.getPaych(ctx, amt, reserve)
}
func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"github.com/ipfs/go-cid"
@ -32,18 +33,20 @@ type fundsReq struct {
ctx context.Context
promise chan *paychFundsRes
amt types.BigInt
reserve bool
lk sync.Mutex
// merge parent, if this req is part of a merge
merge *mergedFundsReq
}
func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq {
func newFundsReq(ctx context.Context, amt types.BigInt, reserve bool) *fundsReq {
promise := make(chan *paychFundsRes)
return &fundsReq{
ctx: ctx,
promise: promise,
amt: amt,
reserve: reserve,
}
}
@ -104,6 +107,15 @@ func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq {
r.setMergeParent(m)
}
sort.Slice(m.reqs, func(i, j int) bool {
if m.reqs[i].reserve != m.reqs[j].reserve { // non-reserve first
return m.reqs[j].reserve
}
// sort by amount asc (reducing latency for smaller requests)
return m.reqs[i].amt.LessThan(m.reqs[j].amt)
})
// If the requests were all cancelled while being added, cancel the context
// immediately
m.checkActive()
@ -135,18 +147,24 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
}
// sum is the sum of the amounts in all requests in the merge
func (m *mergedFundsReq) sum() types.BigInt {
func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
sum := types.NewInt(0)
avail := types.NewInt(0)
for _, r := range m.reqs {
if r.isActive() {
sum = types.BigAdd(sum, r.amt)
if !r.reserve {
avail = types.BigAdd(avail, r.amt)
}
}
}
return sum
return sum, avail
}
// getPaych ensures that a channel exists between the from and to addresses,
// and adds the given amount of funds.
// and reserves (or adds as available) the given amount of funds.
// If the channel does not exist a create channel message is sent and the
// message CID is returned.
// If the channel does exist an add funds message is sent and both the channel
@ -156,9 +174,9 @@ func (m *mergedFundsReq) sum() types.BigInt {
// address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still
// be attempted.
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) {
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) {
// Add the request to add funds to a queue and wait for the result
freq := newFundsReq(ctx, amt)
freq := newFundsReq(ctx, amt, reserve)
ca.enqueue(ctx, freq)
select {
case res := <-freq.promise:
@ -195,14 +213,57 @@ func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) (
// For example if there are pending requests for 3, 2, 4 then
// amt = 3 + 2 + 4 = 9
merged := newMergedFundsReq(ca.fundsReqQueue)
amt := merged.sum()
amt, avail := merged.sum()
if amt.IsZero() {
// Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq
return ca.currentAvailableFunds(ctx, channelID, amt)
}
res := ca.processTask(merged.ctx, amt)
{
toReserve := types.BigSub(amt, avail)
avail := types.NewInt(0)
// reserve at most what we need
ca.mutateChannelInfo(ctx, channelID, func(ci *ChannelInfo) {
avail = ci.AvailableAmount
if avail.GreaterThan(toReserve) {
avail = toReserve
}
ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail)
})
used := types.NewInt(0)
next := 0
for i, r := range merged.reqs {
if !r.reserve {
// non-reserving request are put after reserving requests, so we are done here
break
}
if r.amt.GreaterThan(types.BigSub(avail, used)) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
}
// don't try to fill inactive requests
if !r.isActive() {
continue
}
used = types.BigAdd(used, r.amt)
r.onComplete(&paychFundsRes{})
next = i + 1
}
merged.reqs = merged.reqs[next:]
// return any unused reserved funds (e.g. from cancelled requests)
ca.mutateChannelInfo(ctx, channelID, func(ci *ChannelInfo) {
ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used))
})
}
res := ca.processTask(merged.ctx, amt, avail)
// If the task is waiting on an external event (eg something to appear on
// chain) it will return nil
@ -333,7 +394,7 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID
// Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain)
func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes {
func (ca *channelAccessor) processTask(ctx context.Context, amt, avail types.BigInt) *paychFundsRes {
// Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel.
@ -344,7 +405,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p
// If a channel has not yet been created, create one.
if channelInfo == nil {
mcid, err := ca.createPaych(ctx, amt)
mcid, err := ca.createPaych(ctx, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
@ -368,7 +429,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p
// We need to add more funds, so send an add funds message to
// cover the amount for this request
mcid, err := ca.addFunds(ctx, channelInfo, amt)
mcid, err := ca.addFunds(ctx, channelInfo, amt, avail)
if err != nil {
return &paychFundsRes{err: err}
}
@ -376,7 +437,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p
}
// createPaych sends a message to create the channel and returns the message cid
func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) {
func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) {
mb, err := ca.messageBuilder(ctx, ca.from)
if err != nil {
return cid.Undef, err
@ -393,7 +454,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (c
mcid := smsg.Cid()
// Create a new channel in the store
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt)
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail)
if err != nil {
log.Errorf("creating channel: %s", err)
return cid.Undef, err
@ -452,7 +513,9 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Channel = &decodedReturn.RobustAddress
channelInfo.Amount = channelInfo.PendingAmount
channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.CreateMsg = nil
})
@ -460,7 +523,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str
}
// addFunds sends a message to add funds to the channel and returns the message cid
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt) (*cid.Cid, error) {
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
msg := &types.Message{
To: *channelInfo.Channel,
From: channelInfo.Control,
@ -477,6 +540,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf
// Store the add funds message CID on the channel
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.PendingAmount = amt
ci.PendingAvailableAmount = avail
ci.AddFundsMsg = &mcid
})
@ -492,6 +556,8 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf
return &mcid, nil
}
// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitAddFundsMsg(ctx, channelID, mcid)
@ -514,6 +580,7 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})
@ -526,7 +593,9 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string
// Store updated amount
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount)
channelInfo.PendingAmount = big.NewInt(0)
channelInfo.PendingAvailableAmount = big.NewInt(0)
channelInfo.AddFundsMsg = nil
})

View File

@ -74,6 +74,10 @@ type ChannelInfo struct {
// has locally been added to the channel. It should reflect the channel's
// Balance on chain as long as all operations occur on the same datastore.
Amount types.BigInt
// AvailableAmount indicates how much afil is non-reverved
AvailableAmount types.BigInt
// PendingAvailableAmount is available amount that we're awaiting confirmation of
PendingAvailableAmount types.BigInt
// PendingAmount is the amount that we're awaiting confirmation of
PendingAmount types.BigInt
// CreateMsg is the CID of a pending create message (while waiting for confirmation)
@ -416,14 +420,15 @@ func (ps *Store) ByChannelID(ctx context.Context, channelID string) (*ChannelInf
}
// CreateChannel creates an outbound channel for the given from / to
func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt, avail types.BigInt) (*ChannelInfo, error) {
ci := &ChannelInfo{
Direction: DirOutbound,
NextLane: 0,
Control: from,
Target: to,
CreateMsg: &createMsgCid,
PendingAmount: amt,
Direction: DirOutbound,
NextLane: 0,
Control: from,
Target: to,
CreateMsg: &createMsgCid,
PendingAmount: amt,
PendingAvailableAmount: avail,
}
// Save the new channel