2020-02-13 00:28:23 +00:00
package paychmgr
2019-09-16 13:46:05 +00:00
import (
2020-02-29 03:23:55 +00:00
"bytes"
2019-09-16 13:46:05 +00:00
"context"
2022-06-09 08:36:50 +00:00
"errors"
2020-07-28 23:16:47 +00:00
"fmt"
2022-01-04 19:33:49 +00:00
"sort"
2020-08-10 15:25:58 +00:00
"sync"
2020-07-28 23:16:47 +00:00
2020-09-16 04:06:04 +00:00
"github.com/ipfs/go-cid"
2020-08-06 12:47:48 +00:00
"golang.org/x/sync/errgroup"
2020-09-16 04:06:04 +00:00
"golang.org/x/xerrors"
2020-08-06 12:47:48 +00:00
2020-09-16 04:06:04 +00:00
"github.com/filecoin-project/go-address"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/big"
2020-09-30 17:04:10 +00:00
2020-10-05 20:27:34 +00:00
init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init"
2020-02-12 23:52:19 +00:00
2020-09-16 04:06:04 +00:00
"github.com/filecoin-project/lotus/api"
2020-06-04 13:54:37 +00:00
"github.com/filecoin-project/lotus/build"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-09-16 13:46:05 +00:00
)
2020-07-28 23:16:47 +00:00
// paychFundsRes is the response to a create channel or add funds request
type paychFundsRes struct {
channel address . Address
mcid cid . Cid
err error
}
// fundsReq is a request to create a channel or add funds to a channel
type fundsReq struct {
2020-08-10 15:25:58 +00:00
ctx context . Context
promise chan * paychFundsRes
amt types . BigInt
2022-02-14 19:16:30 +00:00
opts GetOpts
2020-08-10 15:25:58 +00:00
lk sync . Mutex
// merge parent, if this req is part of a merge
merge * mergedFundsReq
}
2022-02-14 19:16:30 +00:00
func newFundsReq ( ctx context . Context , amt types . BigInt , opts GetOpts ) * fundsReq {
2022-01-04 23:09:19 +00:00
promise := make ( chan * paychFundsRes , 1 )
2020-08-10 15:25:58 +00:00
return & fundsReq {
ctx : ctx ,
promise : promise ,
amt : amt ,
2022-01-06 15:04:39 +00:00
opts : opts ,
2020-08-10 15:25:58 +00:00
}
}
// onComplete is called when the funds request has been executed
func ( r * fundsReq ) onComplete ( res * paychFundsRes ) {
select {
case <- r . ctx . Done ( ) :
case r . promise <- res :
}
}
// cancel is called when the req's context is cancelled
func ( r * fundsReq ) cancel ( ) {
r . lk . Lock ( )
2020-11-11 13:40:48 +00:00
defer r . lk . Unlock ( )
2020-08-10 15:25:58 +00:00
// If there's a merge parent, tell the merge parent to check if it has any
// active reqs left
2020-11-11 13:40:48 +00:00
if r . merge != nil {
r . merge . checkActive ( )
2020-08-10 15:25:58 +00:00
}
}
// isActive indicates whether the req's context has been cancelled
func ( r * fundsReq ) isActive ( ) bool {
2020-11-11 13:40:48 +00:00
return r . ctx . Err ( ) == nil
2020-08-10 15:25:58 +00:00
}
// setMergeParent sets the merge that this req is part of
func ( r * fundsReq ) setMergeParent ( m * mergedFundsReq ) {
r . lk . Lock ( )
defer r . lk . Unlock ( )
r . merge = m
}
// mergedFundsReq merges together multiple add funds requests that are queued
// up, so that only one message is sent for all the requests (instead of one
// message for each request)
type mergedFundsReq struct {
ctx context . Context
cancel context . CancelFunc
reqs [ ] * fundsReq
}
func newMergedFundsReq ( reqs [ ] * fundsReq ) * mergedFundsReq {
ctx , cancel := context . WithCancel ( context . Background ( ) )
2020-10-26 10:09:56 +00:00
rqs := make ( [ ] * fundsReq , len ( reqs ) )
copy ( rqs , reqs )
2020-08-10 15:25:58 +00:00
m := & mergedFundsReq {
ctx : ctx ,
cancel : cancel ,
2020-10-26 10:09:56 +00:00
reqs : rqs ,
2020-08-10 15:25:58 +00:00
}
for _ , r := range m . reqs {
r . setMergeParent ( m )
}
2022-01-04 19:33:49 +00:00
sort . Slice ( m . reqs , func ( i , j int ) bool {
2022-01-06 15:04:39 +00:00
if m . reqs [ i ] . opts . OffChain != m . reqs [ j ] . opts . OffChain { // off-chain first
return m . reqs [ i ] . opts . OffChain
}
if m . reqs [ i ] . opts . Reserve != m . reqs [ j ] . opts . Reserve { // non-reserve after off-chain
return m . reqs [ i ] . opts . Reserve
2022-01-04 19:33:49 +00:00
}
// sort by amount asc (reducing latency for smaller requests)
return m . reqs [ i ] . amt . LessThan ( m . reqs [ j ] . amt )
} )
2020-08-10 15:25:58 +00:00
// If the requests were all cancelled while being added, cancel the context
// immediately
m . checkActive ( )
return m
}
// Called when a fundsReq is cancelled
func ( m * mergedFundsReq ) checkActive ( ) {
// Check if there are any active fundsReqs
for _ , r := range m . reqs {
if r . isActive ( ) {
return
}
}
// If all fundsReqs have been cancelled, cancel the context
m . cancel ( )
}
// onComplete is called when the queue has executed the mergeFundsReq.
// Calls onComplete on each fundsReq in the mergeFundsReq.
func ( m * mergedFundsReq ) onComplete ( res * paychFundsRes ) {
for _ , r := range m . reqs {
if r . isActive ( ) {
r . onComplete ( res )
}
}
}
// sum is the sum of the amounts in all requests in the merge
2022-01-04 19:33:49 +00:00
func ( m * mergedFundsReq ) sum ( ) ( types . BigInt , types . BigInt ) {
2020-08-10 15:25:58 +00:00
sum := types . NewInt ( 0 )
2022-01-04 19:33:49 +00:00
avail := types . NewInt ( 0 )
2020-08-10 15:25:58 +00:00
for _ , r := range m . reqs {
if r . isActive ( ) {
sum = types . BigAdd ( sum , r . amt )
2022-01-06 15:04:39 +00:00
if ! r . opts . Reserve {
2022-01-04 19:33:49 +00:00
avail = types . BigAdd ( avail , r . amt )
}
2020-08-10 15:25:58 +00:00
}
}
2022-01-04 19:33:49 +00:00
return sum , avail
2020-07-28 23:16:47 +00:00
}
2022-01-05 15:11:32 +00:00
// completeAmount completes first non-reserving requests up to the available amount
2022-01-06 15:04:39 +00:00
func ( m * mergedFundsReq ) completeAmount ( avail types . BigInt , channelInfo * ChannelInfo ) ( * paychFundsRes , types . BigInt , types . BigInt ) {
used , failed := types . NewInt ( 0 ) , types . NewInt ( 0 )
2022-01-05 15:11:32 +00:00
next := 0
2022-01-06 15:04:39 +00:00
// order: [offchain+reserve, !offchain+reserve, !offchain+!reserve]
2022-01-05 15:11:32 +00:00
for i , r := range m . reqs {
2022-01-06 15:04:39 +00:00
if ! r . opts . Reserve {
2022-01-05 15:11:32 +00:00
// non-reserving request are put after reserving requests, so we are done here
break
}
2022-02-14 20:11:31 +00:00
// don't try to fill inactive requests
if ! r . isActive ( ) {
continue
}
2022-01-05 15:11:32 +00:00
if r . amt . GreaterThan ( types . BigSub ( avail , used ) ) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
2022-01-06 15:04:39 +00:00
if r . opts . OffChain {
// can't fill, so OffChain want an error
if r . isActive ( ) {
failed = types . BigAdd ( failed , r . amt )
2022-01-06 17:02:34 +00:00
r . onComplete ( & paychFundsRes {
channel : * channelInfo . Channel ,
err : xerrors . Errorf ( "not enough funds available in the payment channel %s; add funds with 'lotus paych add-funds %s %s %s'" , channelInfo . Channel , channelInfo . from ( ) , channelInfo . to ( ) , types . FIL ( r . amt ) . Unitless ( ) ) ,
} )
2022-01-06 15:04:39 +00:00
}
next = i + 1
continue
}
2022-01-05 15:11:32 +00:00
break
}
used = types . BigAdd ( used , r . amt )
r . onComplete ( & paychFundsRes { channel : * channelInfo . Channel } )
next = i + 1
}
m . reqs = m . reqs [ next : ]
2022-01-05 20:49:26 +00:00
if len ( m . reqs ) == 0 {
2022-01-06 15:04:39 +00:00
return & paychFundsRes { channel : * channelInfo . Channel } , used , failed
}
return nil , used , failed
}
2022-01-06 17:02:34 +00:00
func ( m * mergedFundsReq ) failOffChainNoChannel ( from , to address . Address ) ( * paychFundsRes , types . BigInt ) {
2022-01-06 15:04:39 +00:00
next := 0
freed := types . NewInt ( 0 )
for i , r := range m . reqs {
if ! r . opts . OffChain {
break
}
freed = types . BigAdd ( freed , r . amt )
if ! r . isActive ( ) {
continue
}
2022-01-06 17:02:34 +00:00
r . onComplete ( & paychFundsRes { err : xerrors . Errorf ( "payment channel doesn't exist, create with 'lotus paych add-funds %s %s %s'" , from , to , types . FIL ( r . amt ) . Unitless ( ) ) } )
2022-01-06 15:04:39 +00:00
next = i + 1
}
m . reqs = m . reqs [ next : ]
if len ( m . reqs ) == 0 {
2022-01-06 17:02:34 +00:00
return & paychFundsRes { err : xerrors . Errorf ( "payment channel doesn't exist, create with 'lotus paych add-funds %s %s 0'" , from , to ) } , freed
2022-01-05 20:49:26 +00:00
}
2022-01-06 15:04:39 +00:00
return nil , freed
2022-01-05 15:11:32 +00:00
}
2020-07-28 23:16:47 +00:00
// getPaych ensures that a channel exists between the from and to addresses,
2022-01-04 19:33:49 +00:00
// and reserves (or adds as available) the given amount of funds.
2020-07-28 23:16:47 +00:00
// If the channel does not exist a create channel message is sent and the
// message CID is returned.
// If the channel does exist an add funds message is sent and both the channel
// address and message CID are returned.
// If there is an in progress operation (create channel / add funds), getPaych
// blocks until the previous operation completes, then returns both the channel
// address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still
// be attempted.
2022-02-14 19:16:30 +00:00
func ( ca * channelAccessor ) getPaych ( ctx context . Context , amt types . BigInt , opts GetOpts ) ( address . Address , cid . Cid , error ) {
2020-07-28 23:16:47 +00:00
// Add the request to add funds to a queue and wait for the result
2022-01-06 15:04:39 +00:00
freq := newFundsReq ( ctx , amt , opts )
2021-12-11 21:03:00 +00:00
ca . enqueue ( ctx , freq )
2020-07-28 23:16:47 +00:00
select {
2020-08-10 15:25:58 +00:00
case res := <- freq . promise :
2020-07-28 23:16:47 +00:00
return res . channel , res . mcid , res . err
case <- ctx . Done ( ) :
2020-08-10 15:25:58 +00:00
freq . cancel ( )
2020-07-28 23:16:47 +00:00
return address . Undef , cid . Undef , ctx . Err ( )
}
}
2020-09-01 14:33:44 +00:00
// Queue up an add funds operation
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) enqueue ( ctx context . Context , task * fundsReq ) {
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
ca . fundsReqQueue = append ( ca . fundsReqQueue , task )
2021-12-11 21:03:00 +00:00
go ca . processQueue ( ctx , "" ) // nolint: errcheck
2020-07-28 23:16:47 +00:00
}
2020-08-10 15:25:58 +00:00
// Run the operations in the queue
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) processQueue ( ctx context . Context , channelID string ) ( * api . ChannelAvailableFunds , error ) {
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
2020-08-10 15:25:58 +00:00
// Remove cancelled requests
ca . filterQueue ( )
// If there's nothing in the queue, bail out
2020-07-28 23:16:47 +00:00
if len ( ca . fundsReqQueue ) == 0 {
2021-12-11 21:03:00 +00:00
return ca . currentAvailableFunds ( ctx , channelID , types . NewInt ( 0 ) )
2020-07-28 23:16:47 +00:00
}
2020-08-10 15:25:58 +00:00
// Merge all pending requests into one.
// For example if there are pending requests for 3, 2, 4 then
// amt = 3 + 2 + 4 = 9
2020-10-26 10:09:56 +00:00
merged := newMergedFundsReq ( ca . fundsReqQueue )
2022-01-04 19:33:49 +00:00
amt , avail := merged . sum ( )
2020-08-10 15:25:58 +00:00
if amt . IsZero ( ) {
// Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq
2021-12-11 21:03:00 +00:00
return ca . currentAvailableFunds ( ctx , channelID , amt )
2020-08-10 15:25:58 +00:00
}
2022-01-04 20:34:27 +00:00
res := ca . processTask ( merged , amt , avail )
2020-07-28 23:16:47 +00:00
// If the task is waiting on an external event (eg something to appear on
// chain) it will return nil
if res == nil {
// Stop processing the fundsReqQueue and wait. When the event occurs it will
2020-08-10 15:25:58 +00:00
// call processQueue() again
2021-12-11 21:03:00 +00:00
return ca . currentAvailableFunds ( ctx , channelID , amt )
2020-07-28 23:16:47 +00:00
}
2020-08-10 15:25:58 +00:00
// Finished processing so clear the queue
ca . fundsReqQueue = nil
2020-07-28 23:16:47 +00:00
// Call the task callback with its results
2020-08-10 15:25:58 +00:00
merged . onComplete ( res )
2020-09-01 14:33:44 +00:00
2021-12-11 21:03:00 +00:00
return ca . currentAvailableFunds ( ctx , channelID , types . NewInt ( 0 ) )
2020-08-10 15:25:58 +00:00
}
2020-07-28 23:16:47 +00:00
2020-08-10 15:25:58 +00:00
// filterQueue filters cancelled requests out of the queue
func ( ca * channelAccessor ) filterQueue ( ) {
if len ( ca . fundsReqQueue ) == 0 {
return
}
// Remove cancelled requests
i := 0
for _ , r := range ca . fundsReqQueue {
if r . isActive ( ) {
ca . fundsReqQueue [ i ] = r
i ++
}
}
// Allow GC of remaining slice elements
for rem := i ; rem < len ( ca . fundsReqQueue ) ; rem ++ {
ca . fundsReqQueue [ i ] = nil
2020-07-28 23:16:47 +00:00
}
2020-08-10 15:25:58 +00:00
// Resize slice
ca . fundsReqQueue = ca . fundsReqQueue [ : i ]
}
// queueSize is the size of the funds request queue (used by tests)
func ( ca * channelAccessor ) queueSize ( ) int {
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
return len ( ca . fundsReqQueue )
2020-07-28 23:16:47 +00:00
}
// msgWaitComplete is called when the message for a previous task is confirmed
// or there is an error.
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) msgWaitComplete ( ctx context . Context , mcid cid . Cid , err error ) {
2022-06-09 08:36:50 +00:00
// if context is canceled, should Not mark message to 'bad', just return.
if errors . Is ( err , context . Canceled ) || errors . Is ( err , context . DeadlineExceeded ) {
return
}
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
// Save the message result to the store
2021-12-11 21:03:00 +00:00
dserr := ca . store . SaveMessageResult ( ctx , mcid , err )
2020-07-28 23:16:47 +00:00
if dserr != nil {
log . Errorf ( "saving message result: %s" , dserr )
}
// Inform listeners that the message has completed
ca . msgListeners . fireMsgComplete ( mcid , err )
// The queue may have been waiting for msg completion to proceed, so
// process the next queue item
if len ( ca . fundsReqQueue ) > 0 {
2021-12-11 21:03:00 +00:00
go ca . processQueue ( ctx , "" ) // nolint: errcheck
2020-07-28 23:16:47 +00:00
}
}
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) currentAvailableFunds ( ctx context . Context , channelID string , queuedAmt types . BigInt ) ( * api . ChannelAvailableFunds , error ) {
2020-09-04 11:44:09 +00:00
if len ( channelID ) == 0 {
return nil , nil
2020-09-01 14:33:44 +00:00
}
2020-09-04 11:44:09 +00:00
2021-12-11 21:03:00 +00:00
channelInfo , err := ca . store . ByChannelID ( ctx , channelID )
2020-09-03 06:20:08 +00:00
if err != nil {
return nil , err
}
2020-09-01 14:33:44 +00:00
// The channel may have a pending create or add funds message
waitSentinel := channelInfo . CreateMsg
if waitSentinel == nil {
waitSentinel = channelInfo . AddFundsMsg
}
// Get the total amount redeemed by vouchers.
// This includes vouchers that have been submitted, and vouchers that are
// in the datastore but haven't yet been submitted.
totalRedeemed := types . NewInt ( 0 )
if channelInfo . Channel != nil {
ch := * channelInfo . Channel
_ , pchState , err := ca . sa . loadPaychActorState ( ca . chctx , ch )
if err != nil {
return nil , err
}
2021-12-11 21:03:00 +00:00
laneStates , err := ca . laneState ( ctx , pchState , ch )
2020-09-01 14:33:44 +00:00
if err != nil {
return nil , err
}
for _ , ls := range laneStates {
2020-09-23 05:19:43 +00:00
r , err := ls . Redeemed ( )
if err != nil {
return nil , err
}
totalRedeemed = types . BigAdd ( totalRedeemed , r )
2020-09-01 14:33:44 +00:00
}
}
return & api . ChannelAvailableFunds {
Channel : channelInfo . Channel ,
2020-09-04 11:44:09 +00:00
From : channelInfo . from ( ) ,
To : channelInfo . to ( ) ,
2020-09-01 14:33:44 +00:00
ConfirmedAmt : channelInfo . Amount ,
PendingAmt : channelInfo . PendingAmount ,
2022-02-16 19:39:43 +00:00
NonReservedAmt : channelInfo . AvailableAmount ,
2022-01-04 22:20:11 +00:00
PendingAvailableAmt : channelInfo . PendingAvailableAmount ,
2020-09-01 14:33:44 +00:00
PendingWaitSentinel : waitSentinel ,
QueuedAmt : queuedAmt ,
VoucherReedeemedAmt : totalRedeemed ,
} , nil
}
2020-07-28 23:16:47 +00:00
// processTask checks the state of the channel and takes appropriate action
// (see description of getPaych).
// Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain)
2022-01-04 20:34:27 +00:00
func ( ca * channelAccessor ) processTask ( merged * mergedFundsReq , amt , avail types . BigInt ) * paychFundsRes {
ctx := merged . ctx
2020-07-28 23:16:47 +00:00
// Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel.
2022-01-06 18:51:26 +00:00
channelInfo , err := ca . store . OutboundActiveByFromTo ( ctx , ca . api , ca . from , ca . to )
2020-07-28 23:16:47 +00:00
if err != nil && err != ErrChannelNotTracked {
return & paychFundsRes { err : err }
}
// If a channel has not yet been created, create one.
2020-08-06 12:47:48 +00:00
if channelInfo == nil {
2022-01-06 17:02:34 +00:00
res , freed := merged . failOffChainNoChannel ( ca . from , ca . to )
2022-01-06 15:04:39 +00:00
if res != nil {
return res
}
amt = types . BigSub ( amt , freed )
2022-01-04 19:33:49 +00:00
mcid , err := ca . createPaych ( ctx , amt , avail )
2020-07-28 23:16:47 +00:00
if err != nil {
return & paychFundsRes { err : err }
}
return & paychFundsRes { mcid : mcid }
}
// If the create channel message has been sent but the channel hasn't
// been created on chain yet
if channelInfo . CreateMsg != nil {
// Wait for the channel to be created before trying again
return nil
}
// If an add funds message was sent to the chain but hasn't been confirmed
// on chain yet
if channelInfo . AddFundsMsg != nil {
// Wait for the add funds message to be confirmed before trying again
return nil
}
2022-01-05 15:11:32 +00:00
// Try to fill requests using available funds, without going to the chain
2022-01-05 20:49:26 +00:00
res , amt := ca . completeAvailable ( ctx , merged , channelInfo , amt , avail )
2022-01-04 20:34:27 +00:00
2022-01-05 20:49:26 +00:00
if res != nil || amt . LessThanEqual ( types . NewInt ( 0 ) ) {
return res
2022-01-04 20:34:27 +00:00
}
2020-07-28 23:16:47 +00:00
// We need to add more funds, so send an add funds message to
// cover the amount for this request
2022-01-04 19:33:49 +00:00
mcid , err := ca . addFunds ( ctx , channelInfo , amt , avail )
2020-07-28 23:16:47 +00:00
if err != nil {
return & paychFundsRes { err : err }
}
return & paychFundsRes { channel : * channelInfo . Channel , mcid : * mcid }
}
// createPaych sends a message to create the channel and returns the message cid
2022-01-04 19:33:49 +00:00
func ( ca * channelAccessor ) createPaych ( ctx context . Context , amt , avail types . BigInt ) ( cid . Cid , error ) {
2020-09-30 17:58:34 +00:00
mb , err := ca . messageBuilder ( ctx , ca . from )
2020-09-30 17:04:10 +00:00
if err != nil {
return cid . Undef , err
2019-09-17 08:15:26 +00:00
}
2020-09-30 17:58:34 +00:00
msg , err := mb . Create ( ca . to , amt )
2020-09-30 17:04:10 +00:00
if err != nil {
return cid . Undef , err
2019-09-16 13:46:05 +00:00
}
2020-08-12 20:17:21 +00:00
smsg , err := ca . api . MpoolPushMessage ( ctx , msg , nil )
2019-09-16 13:46:05 +00:00
if err != nil {
2020-04-17 00:25:06 +00:00
return cid . Undef , xerrors . Errorf ( "initializing paych actor: %w" , err )
2019-09-16 13:46:05 +00:00
}
2020-04-21 20:48:43 +00:00
mcid := smsg . Cid ( )
2020-07-28 23:16:47 +00:00
// Create a new channel in the store
2022-01-04 19:33:49 +00:00
ci , err := ca . store . CreateChannel ( ctx , ca . from , ca . to , mcid , amt , avail )
2020-08-06 12:47:48 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
log . Errorf ( "creating channel: %s" , err )
return cid . Undef , err
}
// Wait for the channel to be created on chain
2021-12-11 21:03:00 +00:00
go ca . waitForPaychCreateMsg ( ctx , ci . ChannelID , mcid )
2020-07-28 23:16:47 +00:00
2020-04-21 20:48:43 +00:00
return mcid , nil
2020-04-17 00:25:06 +00:00
}
2019-09-16 13:46:05 +00:00
2020-07-28 23:16:47 +00:00
// waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the
2020-04-20 21:44:27 +00:00
// created payment channel
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) waitForPaychCreateMsg ( ctx context . Context , channelID string , mcid cid . Cid ) {
err := ca . waitPaychCreateMsg ( ctx , channelID , mcid )
ca . msgWaitComplete ( ctx , mcid , err )
2020-07-28 23:16:47 +00:00
}
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) waitPaychCreateMsg ( ctx context . Context , channelID string , mcid cid . Cid ) error {
2021-04-05 17:56:53 +00:00
mwait , err := ca . api . StateWaitMsg ( ca . chctx , mcid , build . MessageConfidence , api . LookbackNoLimit , true )
2019-09-16 13:46:05 +00:00
if err != nil {
2021-02-11 11:00:26 +00:00
log . Errorf ( "wait msg: %v" , err )
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
2020-08-06 12:47:48 +00:00
// If channel creation failed
2019-09-16 13:46:05 +00:00
if mwait . Receipt . ExitCode != 0 {
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
2020-08-06 12:47:48 +00:00
// Channel creation failed, so remove the channel from the datastore
2021-12-11 21:03:00 +00:00
dserr := ca . store . RemoveChannel ( ctx , channelID )
2020-08-06 12:47:48 +00:00
if dserr != nil {
log . Errorf ( "failed to remove channel %s: %s" , channelID , dserr )
}
2020-07-28 23:16:47 +00:00
2020-08-06 12:47:48 +00:00
err := xerrors . Errorf ( "payment channel creation failed (exit code %d)" , mwait . Receipt . ExitCode )
log . Error ( err )
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
2020-09-30 17:04:10 +00:00
// TODO: ActorUpgrade abstract over this.
// This "works" because it hasn't changed from v0 to v2, but we still
// need an abstraction here.
var decodedReturn init2 . ExecReturn
2020-02-29 03:23:55 +00:00
err = decodedReturn . UnmarshalCBOR ( bytes . NewReader ( mwait . Receipt . Return ) )
2019-09-16 13:46:05 +00:00
if err != nil {
2020-04-21 20:48:43 +00:00
log . Error ( err )
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
2019-09-16 17:23:48 +00:00
2020-07-28 23:16:47 +00:00
// Store robust address of channel
2021-12-11 21:03:00 +00:00
ca . mutateChannelInfo ( ctx , channelID , func ( channelInfo * ChannelInfo ) {
2020-07-28 23:16:47 +00:00
channelInfo . Channel = & decodedReturn . RobustAddress
channelInfo . Amount = channelInfo . PendingAmount
2022-01-04 19:33:49 +00:00
channelInfo . AvailableAmount = channelInfo . PendingAvailableAmount
2020-07-28 23:16:47 +00:00
channelInfo . PendingAmount = big . NewInt ( 0 )
2022-01-04 19:33:49 +00:00
channelInfo . PendingAvailableAmount = big . NewInt ( 0 )
2020-07-28 23:16:47 +00:00
channelInfo . CreateMsg = nil
} )
return nil
2019-09-16 13:46:05 +00:00
}
2022-01-05 15:11:32 +00:00
// completeAvailable fills reserving fund requests using already available funds, without interacting with the chain
2022-01-05 20:49:26 +00:00
func ( ca * channelAccessor ) completeAvailable ( ctx context . Context , merged * mergedFundsReq , channelInfo * ChannelInfo , amt , av types . BigInt ) ( * paychFundsRes , types . BigInt ) {
2022-01-05 15:11:32 +00:00
toReserve := types . BigSub ( amt , av )
avail := types . NewInt ( 0 )
// reserve at most what we need
ca . mutateChannelInfo ( ctx , channelInfo . ChannelID , func ( ci * ChannelInfo ) {
avail = ci . AvailableAmount
if avail . GreaterThan ( toReserve ) {
avail = toReserve
}
ci . AvailableAmount = big . Sub ( ci . AvailableAmount , avail )
} )
2022-01-06 15:04:39 +00:00
res , used , failed := merged . completeAmount ( avail , channelInfo )
2022-01-05 15:11:32 +00:00
// return any unused reserved funds (e.g. from cancelled requests)
ca . mutateChannelInfo ( ctx , channelInfo . ChannelID , func ( ci * ChannelInfo ) {
ci . AvailableAmount = types . BigAdd ( ci . AvailableAmount , types . BigSub ( avail , used ) )
} )
2022-01-06 15:04:39 +00:00
return res , types . BigSub ( amt , types . BigAdd ( used , failed ) )
2022-01-05 15:11:32 +00:00
}
2020-07-28 23:16:47 +00:00
// addFunds sends a message to add funds to the channel and returns the message cid
2022-01-04 19:33:49 +00:00
func ( ca * channelAccessor ) addFunds ( ctx context . Context , channelInfo * ChannelInfo , amt , avail types . BigInt ) ( * cid . Cid , error ) {
2019-09-16 13:46:05 +00:00
msg := & types . Message {
2020-08-06 21:08:42 +00:00
To : * channelInfo . Channel ,
From : channelInfo . Control ,
Value : amt ,
Method : 0 ,
2019-09-16 13:46:05 +00:00
}
2020-08-12 20:17:21 +00:00
smsg , err := ca . api . MpoolPushMessage ( ctx , msg , nil )
2019-09-16 13:46:05 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
return nil , err
2019-09-16 13:46:05 +00:00
}
2020-04-21 20:48:43 +00:00
mcid := smsg . Cid ( )
2020-07-28 23:16:47 +00:00
// Store the add funds message CID on the channel
2021-12-11 21:03:00 +00:00
ca . mutateChannelInfo ( ctx , channelInfo . ChannelID , func ( ci * ChannelInfo ) {
2020-07-28 23:16:47 +00:00
ci . PendingAmount = amt
2022-01-04 19:33:49 +00:00
ci . PendingAvailableAmount = avail
2020-07-28 23:16:47 +00:00
ci . AddFundsMsg = & mcid
} )
// Store a reference from the message CID to the channel, so that we can
// look up the channel from the message CID
2021-12-11 21:03:00 +00:00
err = ca . store . SaveNewMessage ( ctx , channelInfo . ChannelID , mcid )
2020-07-28 23:16:47 +00:00
if err != nil {
log . Errorf ( "saving add funds message CID %s: %s" , mcid , err )
}
2021-12-11 21:03:00 +00:00
go ca . waitForAddFundsMsg ( ctx , channelInfo . ChannelID , mcid )
2020-07-28 23:16:47 +00:00
return & mcid , nil
}
2022-01-04 19:33:49 +00:00
// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
2020-07-28 23:16:47 +00:00
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) waitForAddFundsMsg ( ctx context . Context , channelID string , mcid cid . Cid ) {
err := ca . waitAddFundsMsg ( ctx , channelID , mcid )
ca . msgWaitComplete ( ctx , mcid , err )
2020-04-17 00:25:06 +00:00
}
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) waitAddFundsMsg ( ctx context . Context , channelID string , mcid cid . Cid ) error {
2021-04-05 17:56:53 +00:00
mwait , err := ca . api . StateWaitMsg ( ca . chctx , mcid , build . MessageConfidence , api . LookbackNoLimit , true )
2019-09-16 13:46:05 +00:00
if err != nil {
2020-04-21 20:48:43 +00:00
log . Error ( err )
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
if mwait . Receipt . ExitCode != 0 {
2020-07-28 23:16:47 +00:00
err := xerrors . Errorf ( "voucher channel creation failed: adding funds (exit code %d)" , mwait . Receipt . ExitCode )
log . Error ( err )
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
2021-12-11 21:03:00 +00:00
ca . mutateChannelInfo ( ctx , channelID , func ( channelInfo * ChannelInfo ) {
2020-07-28 23:16:47 +00:00
channelInfo . PendingAmount = big . NewInt ( 0 )
2022-01-04 19:33:49 +00:00
channelInfo . PendingAvailableAmount = big . NewInt ( 0 )
2020-07-28 23:16:47 +00:00
channelInfo . AddFundsMsg = nil
} )
return err
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
ca . lk . Lock ( )
defer ca . lk . Unlock ( )
// Store updated amount
2021-12-11 21:03:00 +00:00
ca . mutateChannelInfo ( ctx , channelID , func ( channelInfo * ChannelInfo ) {
2020-07-28 23:16:47 +00:00
channelInfo . Amount = types . BigAdd ( channelInfo . Amount , channelInfo . PendingAmount )
2022-01-04 19:33:49 +00:00
channelInfo . AvailableAmount = types . BigAdd ( channelInfo . AvailableAmount , channelInfo . PendingAvailableAmount )
2020-07-28 23:16:47 +00:00
channelInfo . PendingAmount = big . NewInt ( 0 )
2022-01-04 19:33:49 +00:00
channelInfo . PendingAvailableAmount = big . NewInt ( 0 )
2020-07-28 23:16:47 +00:00
channelInfo . AddFundsMsg = nil
2019-09-16 13:46:05 +00:00
} )
2020-07-28 23:16:47 +00:00
return nil
}
// Change the state of the channel in the store
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) mutateChannelInfo ( ctx context . Context , channelID string , mutate func ( * ChannelInfo ) ) {
channelInfo , err := ca . store . ByChannelID ( ctx , channelID )
2020-07-28 23:16:47 +00:00
// If there's an error reading or writing to the store just log an error.
// For now we're assuming it's unlikely to happen in practice.
// Later we may want to implement a transactional approach, whereby
// we record to the store that we're going to send a message, send
// the message, and then record that the message was sent.
2019-09-16 13:46:05 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
log . Errorf ( "Error reading channel info from store: %s" , err )
2020-08-06 18:01:38 +00:00
return
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
mutate ( channelInfo )
2021-12-11 21:03:00 +00:00
err = ca . store . putChannelInfo ( ctx , channelInfo )
2020-07-28 23:16:47 +00:00
if err != nil {
log . Errorf ( "Error writing channel info to store: %s" , err )
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
}
2020-08-06 12:47:48 +00:00
// restartPending checks the datastore to see if there are any channels that
// have outstanding create / add funds messages, and if so, waits on the
// messages.
// Outstanding messages can occur if a create / add funds message was sent and
// then the system was shut down or crashed before the result was received.
2021-12-11 21:03:00 +00:00
func ( pm * Manager ) restartPending ( ctx context . Context ) error {
cis , err := pm . store . WithPendingAddFunds ( ctx )
2020-08-06 12:47:48 +00:00
if err != nil {
return err
}
group := errgroup . Group { }
for _ , chanInfo := range cis {
ci := chanInfo
if ci . CreateMsg != nil {
group . Go ( func ( ) error {
ca , err := pm . accessorByFromTo ( ci . Control , ci . Target )
if err != nil {
return xerrors . Errorf ( "error initializing payment channel manager %s -> %s: %s" , ci . Control , ci . Target , err )
}
2021-12-11 21:03:00 +00:00
go ca . waitForPaychCreateMsg ( ctx , ci . ChannelID , * ci . CreateMsg )
2020-08-06 12:47:48 +00:00
return nil
} )
} else if ci . AddFundsMsg != nil {
group . Go ( func ( ) error {
2021-12-11 21:03:00 +00:00
ca , err := pm . accessorByAddress ( ctx , * ci . Channel )
2020-08-06 12:47:48 +00:00
if err != nil {
return xerrors . Errorf ( "error initializing payment channel manager %s: %s" , ci . Channel , err )
}
2021-12-11 21:03:00 +00:00
go ca . waitForAddFundsMsg ( ctx , ci . ChannelID , * ci . AddFundsMsg )
2020-08-06 12:47:48 +00:00
return nil
} )
}
}
return group . Wait ( )
}
2020-07-28 23:16:47 +00:00
// getPaychWaitReady waits for a the response to the message with the given cid
func ( ca * channelAccessor ) getPaychWaitReady ( ctx context . Context , mcid cid . Cid ) ( address . Address , error ) {
ca . lk . Lock ( )
// First check if the message has completed
2021-12-11 21:03:00 +00:00
msgInfo , err := ca . store . GetMessage ( ctx , mcid )
2020-04-22 20:58:26 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
ca . lk . Unlock ( )
return address . Undef , err
}
// If the create channel / add funds message failed, return an error
if len ( msgInfo . Err ) > 0 {
ca . lk . Unlock ( )
return address . Undef , xerrors . New ( msgInfo . Err )
}
// If the message has completed successfully
if msgInfo . Received {
ca . lk . Unlock ( )
// Get the channel address
2021-12-11 21:03:00 +00:00
ci , err := ca . store . ByMessageCid ( ctx , mcid )
2020-07-28 23:16:47 +00:00
if err != nil {
return address . Undef , err
}
if ci . Channel == nil {
panic ( fmt . Sprintf ( "create / add funds message %s succeeded but channelInfo.Channel is nil" , mcid ) )
}
return * ci . Channel , nil
2020-04-22 20:58:26 +00:00
}
2020-07-28 23:16:47 +00:00
// The message hasn't completed yet so wait for it to complete
promise := ca . msgPromise ( ctx , mcid )
// Unlock while waiting
ca . lk . Unlock ( )
select {
case res := <- promise :
return res . channel , res . err
case <- ctx . Done ( ) :
return address . Undef , ctx . Err ( )
}
}
type onMsgRes struct {
channel address . Address
err error
}
// msgPromise returns a channel that receives the result of the message with
// the given CID
func ( ca * channelAccessor ) msgPromise ( ctx context . Context , mcid cid . Cid ) chan onMsgRes {
promise := make ( chan onMsgRes )
triggerUnsub := make ( chan struct { } )
2020-08-10 21:52:59 +00:00
unsub := ca . msgListeners . onMsgComplete ( mcid , func ( err error ) {
2020-07-28 23:16:47 +00:00
close ( triggerUnsub )
// Use a go-routine so as not to block the event handler loop
go func ( ) {
res := onMsgRes { err : err }
if res . err == nil {
// Get the channel associated with the message cid
2021-12-11 21:03:00 +00:00
ci , err := ca . store . ByMessageCid ( ctx , mcid )
2020-07-28 23:16:47 +00:00
if err != nil {
res . err = err
} else {
res . channel = * ci . Channel
}
}
// Pass the result to the caller
select {
case promise <- res :
case <- ctx . Done ( ) :
}
} ( )
} )
// Unsubscribe when the message is received or the context is done
go func ( ) {
select {
case <- ctx . Done ( ) :
case <- triggerUnsub :
}
2020-08-10 21:52:59 +00:00
unsub ( )
2020-07-28 23:16:47 +00:00
} ( )
return promise
2019-09-16 13:46:05 +00:00
}
2020-09-01 14:33:44 +00:00
2021-12-11 21:03:00 +00:00
func ( ca * channelAccessor ) availableFunds ( ctx context . Context , channelID string ) ( * api . ChannelAvailableFunds , error ) {
return ca . processQueue ( ctx , channelID )
2020-09-01 14:33:44 +00:00
}