From 8e46b9ea5d294e44035a47fe763c9aba4d07d42e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 4 Jan 2022 20:33:49 +0100 Subject: [PATCH] paych: API to pre-fund channels --- api/api_full.go | 6 +- api/proxy_gen.go | 8 +-- api/v0api/v1_wrapper.go | 5 ++ markets/retrievaladapter/client.go | 2 +- node/impl/paych/paych.go | 6 +- paychmgr/cbor_gen.go | 54 ++++++++++++++++- paychmgr/manager.go | 4 +- paychmgr/simple.go | 97 +++++++++++++++++++++++++----- paychmgr/store.go | 19 +++--- 9 files changed, 168 insertions(+), 33 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index cf58a3cc6..cf9a1f22e 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -689,7 +689,11 @@ type FullNode interface { // MethodGroup: Paych // The Paych methods are for interacting with and managing payment channels - PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign + // PaychGet gets or creates a payment channel between address pair + // - If reserve is false, the specified amount will be added to the channel through on-chain send for future use + // - If reserve is true, the specified amount will be reserved for use. If there aren't enough non-reserved funds + // available, funds will be added through an on-chain message. + PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*ChannelInfo, error) //perm:sign PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign diff --git a/api/proxy_gen.go b/api/proxy_gen.go index e353a7c6e..9a134cec9 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -306,7 +306,7 @@ type FullNodeStruct struct { PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"` - PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"` + PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) `perm:"sign"` PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"` @@ -2179,14 +2179,14 @@ func (s *FullNodeStub) PaychCollect(p0 context.Context, p1 address.Address) (cid return *new(cid.Cid), ErrNotSupported } -func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { +func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) { if s.Internal.PaychGet == nil { return nil, ErrNotSupported } - return s.Internal.PaychGet(p0, p1, p2, p3) + return s.Internal.PaychGet(p0, p1, p2, p3, p4) } -func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { +func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 bool) (*ChannelInfo, error) { return nil, ErrNotSupported } diff --git a/api/v0api/v1_wrapper.go b/api/v0api/v1_wrapper.go index 7e0d7a94a..605b27b0c 100644 --- a/api/v0api/v1_wrapper.go +++ b/api/v0api/v1_wrapper.go @@ -337,4 +337,9 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder finish(w.ClientExport(ctx, eref, *ref)) } +func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { + // v0 always reserves + return w.FullNode.PaychGet(ctx, from, to, amt, true) +} + var _ FullNode = &WrapperV1Full{} diff --git a/markets/retrievaladapter/client.go b/markets/retrievaladapter/client.go index 1bef23e12..4ed2e905a 100644 --- a/markets/retrievaladapter/client.go +++ b/markets/retrievaladapter/client.go @@ -34,7 +34,7 @@ func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stat func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) { // TODO: respect the provided TipSetToken (a serialized TipSetKey) when // querying the chain - ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable) + ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, true) if err != nil { return address.Undef, cid.Undef, err } diff --git a/node/impl/paych/paych.go b/node/impl/paych/paych.go index df3b1e3e4..d308f6248 100644 --- a/node/impl/paych/paych.go +++ b/node/impl/paych/paych.go @@ -22,8 +22,8 @@ type PaychAPI struct { PaychMgr *paychmgr.Manager } -func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { - ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt) +func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (*api.ChannelInfo, error) { + ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, reserve) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address // TODO: Fix free fund tracking in PaychGet // TODO: validate voucher spec before locking funds - ch, err := a.PaychGet(ctx, from, to, amount) + ch, err := a.PaychGet(ctx, from, to, amount, true) if err != nil { return nil, err } diff --git a/paychmgr/cbor_gen.go b/paychmgr/cbor_gen.go index caa4143a2..428c09a9e 100644 --- a/paychmgr/cbor_gen.go +++ b/paychmgr/cbor_gen.go @@ -196,7 +196,7 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{172}); err != nil { + if _, err := w.Write([]byte{174}); err != nil { return err } @@ -346,6 +346,38 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { return err } + // t.AvailableAmount (big.Int) (struct) + if len("AvailableAmount") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"AvailableAmount\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("AvailableAmount"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("AvailableAmount")); err != nil { + return err + } + + if err := t.AvailableAmount.MarshalCBOR(w); err != nil { + return err + } + + // t.PendingAvailableAmount (big.Int) (struct) + if len("PendingAvailableAmount") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PendingAvailableAmount\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingAvailableAmount"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PendingAvailableAmount")); err != nil { + return err + } + + if err := t.PendingAvailableAmount.MarshalCBOR(w); err != nil { + return err + } + // t.PendingAmount (big.Int) (struct) if len("PendingAmount") > cbg.MaxLength { return xerrors.Errorf("Value in field \"PendingAmount\" was too long") @@ -577,6 +609,26 @@ func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) error { return xerrors.Errorf("unmarshaling t.Amount: %w", err) } + } + // t.AvailableAmount (big.Int) (struct) + case "AvailableAmount": + + { + + if err := t.AvailableAmount.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.AvailableAmount: %w", err) + } + + } + // t.PendingAvailableAmount (big.Int) (struct) + case "PendingAvailableAmount": + + { + + if err := t.PendingAvailableAmount.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.PendingAvailableAmount: %w", err) + } + } // t.PendingAmount (big.Int) (struct) case "PendingAmount": diff --git a/paychmgr/manager.go b/paychmgr/manager.go index e0fcd7a75..eed475547 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -101,13 +101,13 @@ func (pm *Manager) Stop() error { return nil } -func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { +func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) { chanAccessor, err := pm.accessorByFromTo(from, to) if err != nil { return address.Undef, cid.Undef, err } - return chanAccessor.getPaych(ctx, amt) + return chanAccessor.getPaych(ctx, amt, reserve) } func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) { diff --git a/paychmgr/simple.go b/paychmgr/simple.go index 502338e29..fd849d3ae 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "sync" "github.com/ipfs/go-cid" @@ -32,18 +33,20 @@ type fundsReq struct { ctx context.Context promise chan *paychFundsRes amt types.BigInt + reserve bool lk sync.Mutex // merge parent, if this req is part of a merge merge *mergedFundsReq } -func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq { +func newFundsReq(ctx context.Context, amt types.BigInt, reserve bool) *fundsReq { promise := make(chan *paychFundsRes) return &fundsReq{ ctx: ctx, promise: promise, amt: amt, + reserve: reserve, } } @@ -104,6 +107,15 @@ func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq { r.setMergeParent(m) } + sort.Slice(m.reqs, func(i, j int) bool { + if m.reqs[i].reserve != m.reqs[j].reserve { // non-reserve first + return m.reqs[j].reserve + } + + // sort by amount asc (reducing latency for smaller requests) + return m.reqs[i].amt.LessThan(m.reqs[j].amt) + }) + // If the requests were all cancelled while being added, cancel the context // immediately m.checkActive() @@ -135,18 +147,24 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) { } // sum is the sum of the amounts in all requests in the merge -func (m *mergedFundsReq) sum() types.BigInt { +func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) { sum := types.NewInt(0) + avail := types.NewInt(0) + for _, r := range m.reqs { if r.isActive() { sum = types.BigAdd(sum, r.amt) + if !r.reserve { + avail = types.BigAdd(avail, r.amt) + } } } - return sum + + return sum, avail } // getPaych ensures that a channel exists between the from and to addresses, -// and adds the given amount of funds. +// and reserves (or adds as available) the given amount of funds. // If the channel does not exist a create channel message is sent and the // message CID is returned. // If the channel does exist an add funds message is sent and both the channel @@ -156,9 +174,9 @@ func (m *mergedFundsReq) sum() types.BigInt { // address and the CID of the new add funds message. // If an operation returns an error, subsequent waiting operations will still // be attempted. -func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) { +func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) { // Add the request to add funds to a queue and wait for the result - freq := newFundsReq(ctx, amt) + freq := newFundsReq(ctx, amt, reserve) ca.enqueue(ctx, freq) select { case res := <-freq.promise: @@ -195,14 +213,57 @@ func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) ( // For example if there are pending requests for 3, 2, 4 then // amt = 3 + 2 + 4 = 9 merged := newMergedFundsReq(ca.fundsReqQueue) - amt := merged.sum() + amt, avail := merged.sum() if amt.IsZero() { // Note: The amount can be zero if requests are cancelled as we're // building the mergedFundsReq return ca.currentAvailableFunds(ctx, channelID, amt) } - res := ca.processTask(merged.ctx, amt) + { + toReserve := types.BigSub(amt, avail) + avail := types.NewInt(0) + + // reserve at most what we need + ca.mutateChannelInfo(ctx, channelID, func(ci *ChannelInfo) { + avail = ci.AvailableAmount + if avail.GreaterThan(toReserve) { + avail = toReserve + } + ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail) + }) + + used := types.NewInt(0) + + next := 0 + for i, r := range merged.reqs { + if !r.reserve { + // non-reserving request are put after reserving requests, so we are done here + break + } + + if r.amt.GreaterThan(types.BigSub(avail, used)) { + // requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill + } + + // don't try to fill inactive requests + if !r.isActive() { + continue + } + + used = types.BigAdd(used, r.amt) + r.onComplete(&paychFundsRes{}) + next = i + 1 + } + merged.reqs = merged.reqs[next:] + + // return any unused reserved funds (e.g. from cancelled requests) + ca.mutateChannelInfo(ctx, channelID, func(ci *ChannelInfo) { + ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used)) + }) + } + + res := ca.processTask(merged.ctx, amt, avail) // If the task is waiting on an external event (eg something to appear on // chain) it will return nil @@ -333,7 +394,7 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID // Note that processTask may be called repeatedly in the same state, and should // return nil if there is no state change to be made (eg when waiting for a // message to be confirmed on chain) -func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes { +func (ca *channelAccessor) processTask(ctx context.Context, amt, avail types.BigInt) *paychFundsRes { // Get the payment channel for the from/to addresses. // Note: It's ok if we get ErrChannelNotTracked. It just means we need to // create a channel. @@ -344,7 +405,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p // If a channel has not yet been created, create one. if channelInfo == nil { - mcid, err := ca.createPaych(ctx, amt) + mcid, err := ca.createPaych(ctx, amt, avail) if err != nil { return &paychFundsRes{err: err} } @@ -368,7 +429,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p // We need to add more funds, so send an add funds message to // cover the amount for this request - mcid, err := ca.addFunds(ctx, channelInfo, amt) + mcid, err := ca.addFunds(ctx, channelInfo, amt, avail) if err != nil { return &paychFundsRes{err: err} } @@ -376,7 +437,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p } // createPaych sends a message to create the channel and returns the message cid -func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) { +func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) { mb, err := ca.messageBuilder(ctx, ca.from) if err != nil { return cid.Undef, err @@ -393,7 +454,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (c mcid := smsg.Cid() // Create a new channel in the store - ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt) + ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail) if err != nil { log.Errorf("creating channel: %s", err) return cid.Undef, err @@ -452,7 +513,9 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.Channel = &decodedReturn.RobustAddress channelInfo.Amount = channelInfo.PendingAmount + channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.CreateMsg = nil }) @@ -460,7 +523,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str } // addFunds sends a message to add funds to the channel and returns the message cid -func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt) (*cid.Cid, error) { +func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) { msg := &types.Message{ To: *channelInfo.Channel, From: channelInfo.Control, @@ -477,6 +540,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf // Store the add funds message CID on the channel ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) { ci.PendingAmount = amt + ci.PendingAvailableAmount = avail ci.AddFundsMsg = &mcid }) @@ -492,6 +556,8 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf return &mcid, nil } +// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) { + // waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) { err := ca.waitAddFundsMsg(ctx, channelID, mcid) @@ -514,6 +580,7 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil }) @@ -526,7 +593,9 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string // Store updated amount ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount) + channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount) channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil }) diff --git a/paychmgr/store.go b/paychmgr/store.go index 62849e6be..bbc549b86 100644 --- a/paychmgr/store.go +++ b/paychmgr/store.go @@ -74,6 +74,10 @@ type ChannelInfo struct { // has locally been added to the channel. It should reflect the channel's // Balance on chain as long as all operations occur on the same datastore. Amount types.BigInt + // AvailableAmount indicates how much afil is non-reverved + AvailableAmount types.BigInt + // PendingAvailableAmount is available amount that we're awaiting confirmation of + PendingAvailableAmount types.BigInt // PendingAmount is the amount that we're awaiting confirmation of PendingAmount types.BigInt // CreateMsg is the CID of a pending create message (while waiting for confirmation) @@ -416,14 +420,15 @@ func (ps *Store) ByChannelID(ctx context.Context, channelID string) (*ChannelInf } // CreateChannel creates an outbound channel for the given from / to -func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) { +func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt, avail types.BigInt) (*ChannelInfo, error) { ci := &ChannelInfo{ - Direction: DirOutbound, - NextLane: 0, - Control: from, - Target: to, - CreateMsg: &createMsgCid, - PendingAmount: amt, + Direction: DirOutbound, + NextLane: 0, + Control: from, + Target: to, + CreateMsg: &createMsgCid, + PendingAmount: amt, + PendingAvailableAmount: avail, } // Save the new channel