lotus/paychmgr/simple.go

773 lines
21 KiB
Go
Raw Normal View History

package paychmgr
2019-09-16 13:46:05 +00:00
import (
"bytes"
2019-09-16 13:46:05 +00:00
"context"
2020-07-28 23:16:47 +00:00
"fmt"
2022-01-04 19:33:49 +00:00
"sort"
"sync"
2020-07-28 23:16:47 +00:00
"github.com/ipfs/go-cid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/big"
2020-09-30 17:04:10 +00:00
2020-10-05 20:27:34 +00:00
init2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/init"
2020-02-12 23:52:19 +00:00
"github.com/filecoin-project/lotus/api"
2020-06-04 13:54:37 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
2019-09-16 13:46:05 +00:00
)
2020-07-28 23:16:47 +00:00
// paychFundsRes is the response to a create channel or add funds request
type paychFundsRes struct {
channel address.Address
mcid cid.Cid
err error
}
// fundsReq is a request to create a channel or add funds to a channel
type fundsReq struct {
ctx context.Context
promise chan *paychFundsRes
amt types.BigInt
2022-01-04 19:33:49 +00:00
reserve bool
lk sync.Mutex
// merge parent, if this req is part of a merge
merge *mergedFundsReq
}
2022-01-04 19:33:49 +00:00
func newFundsReq(ctx context.Context, amt types.BigInt, reserve bool) *fundsReq {
promise := make(chan *paychFundsRes)
return &fundsReq{
ctx: ctx,
promise: promise,
amt: amt,
2022-01-04 19:33:49 +00:00
reserve: reserve,
}
}
// onComplete is called when the funds request has been executed
func (r *fundsReq) onComplete(res *paychFundsRes) {
select {
case <-r.ctx.Done():
case r.promise <- res:
}
}
// cancel is called when the req's context is cancelled
func (r *fundsReq) cancel() {
r.lk.Lock()
defer r.lk.Unlock()
// If there's a merge parent, tell the merge parent to check if it has any
// active reqs left
if r.merge != nil {
r.merge.checkActive()
}
}
// isActive indicates whether the req's context has been cancelled
func (r *fundsReq) isActive() bool {
return r.ctx.Err() == nil
}
// setMergeParent sets the merge that this req is part of
func (r *fundsReq) setMergeParent(m *mergedFundsReq) {
r.lk.Lock()
defer r.lk.Unlock()
r.merge = m
}
// mergedFundsReq merges together multiple add funds requests that are queued
// up, so that only one message is sent for all the requests (instead of one
// message for each request)
type mergedFundsReq struct {
ctx context.Context
cancel context.CancelFunc
reqs []*fundsReq
}
func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq {
ctx, cancel := context.WithCancel(context.Background())
2020-10-26 10:09:56 +00:00
rqs := make([]*fundsReq, len(reqs))
copy(rqs, reqs)
m := &mergedFundsReq{
ctx: ctx,
cancel: cancel,
2020-10-26 10:09:56 +00:00
reqs: rqs,
}
for _, r := range m.reqs {
r.setMergeParent(m)
}
2022-01-04 19:33:49 +00:00
sort.Slice(m.reqs, func(i, j int) bool {
if m.reqs[i].reserve != m.reqs[j].reserve { // non-reserve first
2022-01-04 20:34:27 +00:00
return m.reqs[i].reserve
2022-01-04 19:33:49 +00:00
}
// sort by amount asc (reducing latency for smaller requests)
return m.reqs[i].amt.LessThan(m.reqs[j].amt)
})
// If the requests were all cancelled while being added, cancel the context
// immediately
m.checkActive()
return m
}
// Called when a fundsReq is cancelled
func (m *mergedFundsReq) checkActive() {
// Check if there are any active fundsReqs
for _, r := range m.reqs {
if r.isActive() {
return
}
}
// If all fundsReqs have been cancelled, cancel the context
m.cancel()
}
// onComplete is called when the queue has executed the mergeFundsReq.
// Calls onComplete on each fundsReq in the mergeFundsReq.
func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
for _, r := range m.reqs {
if r.isActive() {
r.onComplete(res)
}
}
}
// sum is the sum of the amounts in all requests in the merge
2022-01-04 19:33:49 +00:00
func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) {
sum := types.NewInt(0)
2022-01-04 19:33:49 +00:00
avail := types.NewInt(0)
for _, r := range m.reqs {
if r.isActive() {
sum = types.BigAdd(sum, r.amt)
2022-01-04 19:33:49 +00:00
if !r.reserve {
avail = types.BigAdd(avail, r.amt)
}
}
}
2022-01-04 19:33:49 +00:00
return sum, avail
2020-07-28 23:16:47 +00:00
}
// getPaych ensures that a channel exists between the from and to addresses,
2022-01-04 19:33:49 +00:00
// and reserves (or adds as available) the given amount of funds.
2020-07-28 23:16:47 +00:00
// If the channel does not exist a create channel message is sent and the
// message CID is returned.
// If the channel does exist an add funds message is sent and both the channel
// address and message CID are returned.
// If there is an in progress operation (create channel / add funds), getPaych
// blocks until the previous operation completes, then returns both the channel
// address and the CID of the new add funds message.
// If an operation returns an error, subsequent waiting operations will still
// be attempted.
2022-01-04 19:33:49 +00:00
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, reserve bool) (address.Address, cid.Cid, error) {
2020-07-28 23:16:47 +00:00
// Add the request to add funds to a queue and wait for the result
2022-01-04 19:33:49 +00:00
freq := newFundsReq(ctx, amt, reserve)
2021-12-11 21:03:00 +00:00
ca.enqueue(ctx, freq)
2020-07-28 23:16:47 +00:00
select {
case res := <-freq.promise:
2020-07-28 23:16:47 +00:00
return res.channel, res.mcid, res.err
case <-ctx.Done():
freq.cancel()
2020-07-28 23:16:47 +00:00
return address.Undef, cid.Undef, ctx.Err()
}
}
2020-09-01 14:33:44 +00:00
// Queue up an add funds operation
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) enqueue(ctx context.Context, task *fundsReq) {
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
ca.fundsReqQueue = append(ca.fundsReqQueue, task)
2021-12-11 21:03:00 +00:00
go ca.processQueue(ctx, "") // nolint: errcheck
2020-07-28 23:16:47 +00:00
}
// Run the operations in the queue
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) (*api.ChannelAvailableFunds, error) {
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
// Remove cancelled requests
ca.filterQueue()
// If there's nothing in the queue, bail out
2020-07-28 23:16:47 +00:00
if len(ca.fundsReqQueue) == 0 {
2021-12-11 21:03:00 +00:00
return ca.currentAvailableFunds(ctx, channelID, types.NewInt(0))
2020-07-28 23:16:47 +00:00
}
// Merge all pending requests into one.
// For example if there are pending requests for 3, 2, 4 then
// amt = 3 + 2 + 4 = 9
2020-10-26 10:09:56 +00:00
merged := newMergedFundsReq(ca.fundsReqQueue)
2022-01-04 19:33:49 +00:00
amt, avail := merged.sum()
if amt.IsZero() {
// Note: The amount can be zero if requests are cancelled as we're
// building the mergedFundsReq
2021-12-11 21:03:00 +00:00
return ca.currentAvailableFunds(ctx, channelID, amt)
}
2022-01-04 20:34:27 +00:00
res := ca.processTask(merged, amt, avail)
2020-07-28 23:16:47 +00:00
// If the task is waiting on an external event (eg something to appear on
// chain) it will return nil
if res == nil {
// Stop processing the fundsReqQueue and wait. When the event occurs it will
// call processQueue() again
2021-12-11 21:03:00 +00:00
return ca.currentAvailableFunds(ctx, channelID, amt)
2020-07-28 23:16:47 +00:00
}
// Finished processing so clear the queue
ca.fundsReqQueue = nil
2020-07-28 23:16:47 +00:00
// Call the task callback with its results
merged.onComplete(res)
2020-09-01 14:33:44 +00:00
2021-12-11 21:03:00 +00:00
return ca.currentAvailableFunds(ctx, channelID, types.NewInt(0))
}
2020-07-28 23:16:47 +00:00
// filterQueue filters cancelled requests out of the queue
func (ca *channelAccessor) filterQueue() {
if len(ca.fundsReqQueue) == 0 {
return
}
// Remove cancelled requests
i := 0
for _, r := range ca.fundsReqQueue {
if r.isActive() {
ca.fundsReqQueue[i] = r
i++
}
}
// Allow GC of remaining slice elements
for rem := i; rem < len(ca.fundsReqQueue); rem++ {
ca.fundsReqQueue[i] = nil
2020-07-28 23:16:47 +00:00
}
// Resize slice
ca.fundsReqQueue = ca.fundsReqQueue[:i]
}
// queueSize is the size of the funds request queue (used by tests)
func (ca *channelAccessor) queueSize() int {
ca.lk.Lock()
defer ca.lk.Unlock()
return len(ca.fundsReqQueue)
2020-07-28 23:16:47 +00:00
}
// msgWaitComplete is called when the message for a previous task is confirmed
// or there is an error.
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) msgWaitComplete(ctx context.Context, mcid cid.Cid, err error) {
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
// Save the message result to the store
2021-12-11 21:03:00 +00:00
dserr := ca.store.SaveMessageResult(ctx, mcid, err)
2020-07-28 23:16:47 +00:00
if dserr != nil {
log.Errorf("saving message result: %s", dserr)
}
// Inform listeners that the message has completed
ca.msgListeners.fireMsgComplete(mcid, err)
// The queue may have been waiting for msg completion to proceed, so
// process the next queue item
if len(ca.fundsReqQueue) > 0 {
2021-12-11 21:03:00 +00:00
go ca.processQueue(ctx, "") // nolint: errcheck
2020-07-28 23:16:47 +00:00
}
}
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID string, queuedAmt types.BigInt) (*api.ChannelAvailableFunds, error) {
if len(channelID) == 0 {
return nil, nil
2020-09-01 14:33:44 +00:00
}
2021-12-11 21:03:00 +00:00
channelInfo, err := ca.store.ByChannelID(ctx, channelID)
if err != nil {
return nil, err
}
2020-09-01 14:33:44 +00:00
// The channel may have a pending create or add funds message
waitSentinel := channelInfo.CreateMsg
if waitSentinel == nil {
waitSentinel = channelInfo.AddFundsMsg
}
// Get the total amount redeemed by vouchers.
// This includes vouchers that have been submitted, and vouchers that are
// in the datastore but haven't yet been submitted.
totalRedeemed := types.NewInt(0)
if channelInfo.Channel != nil {
ch := *channelInfo.Channel
_, pchState, err := ca.sa.loadPaychActorState(ca.chctx, ch)
if err != nil {
return nil, err
}
2021-12-11 21:03:00 +00:00
laneStates, err := ca.laneState(ctx, pchState, ch)
2020-09-01 14:33:44 +00:00
if err != nil {
return nil, err
}
for _, ls := range laneStates {
r, err := ls.Redeemed()
if err != nil {
return nil, err
}
totalRedeemed = types.BigAdd(totalRedeemed, r)
2020-09-01 14:33:44 +00:00
}
}
return &api.ChannelAvailableFunds{
Channel: channelInfo.Channel,
From: channelInfo.from(),
To: channelInfo.to(),
2020-09-01 14:33:44 +00:00
ConfirmedAmt: channelInfo.Amount,
PendingAmt: channelInfo.PendingAmount,
PendingWaitSentinel: waitSentinel,
QueuedAmt: queuedAmt,
VoucherReedeemedAmt: totalRedeemed,
}, nil
}
2020-07-28 23:16:47 +00:00
// processTask checks the state of the channel and takes appropriate action
// (see description of getPaych).
// Note that processTask may be called repeatedly in the same state, and should
// return nil if there is no state change to be made (eg when waiting for a
// message to be confirmed on chain)
2022-01-04 20:34:27 +00:00
func (ca *channelAccessor) processTask(merged *mergedFundsReq, amt, avail types.BigInt) *paychFundsRes {
ctx := merged.ctx
2020-07-28 23:16:47 +00:00
// Get the payment channel for the from/to addresses.
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
// create a channel.
2021-12-11 21:03:00 +00:00
channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.from, ca.to)
2020-07-28 23:16:47 +00:00
if err != nil && err != ErrChannelNotTracked {
return &paychFundsRes{err: err}
}
// If a channel has not yet been created, create one.
if channelInfo == nil {
2022-01-04 19:33:49 +00:00
mcid, err := ca.createPaych(ctx, amt, avail)
2020-07-28 23:16:47 +00:00
if err != nil {
return &paychFundsRes{err: err}
}
return &paychFundsRes{mcid: mcid}
}
// If the create channel message has been sent but the channel hasn't
// been created on chain yet
if channelInfo.CreateMsg != nil {
// Wait for the channel to be created before trying again
return nil
}
// If an add funds message was sent to the chain but hasn't been confirmed
// on chain yet
if channelInfo.AddFundsMsg != nil {
// Wait for the add funds message to be confirmed before trying again
return nil
}
2022-01-04 20:34:27 +00:00
{
toReserve := types.BigSub(amt, avail)
avail := types.NewInt(0)
// reserve at most what we need
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
avail = ci.AvailableAmount
if avail.GreaterThan(toReserve) {
avail = toReserve
}
ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail)
})
used := types.NewInt(0)
next := 0
for i, r := range merged.reqs {
if !r.reserve {
// non-reserving request are put after reserving requests, so we are done here
break
}
if r.amt.GreaterThan(types.BigSub(avail, used)) {
// requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill
break
}
// don't try to fill inactive requests
if !r.isActive() {
continue
}
used = types.BigAdd(used, r.amt)
r.onComplete(&paychFundsRes{channel: *channelInfo.Channel})
next = i + 1
}
merged.reqs = merged.reqs[next:]
// return any unused reserved funds (e.g. from cancelled requests)
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used))
})
amt = types.BigSub(amt, used)
}
if amt.LessThanEqual(types.NewInt(0)) {
return nil
}
2020-07-28 23:16:47 +00:00
// We need to add more funds, so send an add funds message to
// cover the amount for this request
2022-01-04 19:33:49 +00:00
mcid, err := ca.addFunds(ctx, channelInfo, amt, avail)
2020-07-28 23:16:47 +00:00
if err != nil {
return &paychFundsRes{err: err}
}
return &paychFundsRes{channel: *channelInfo.Channel, mcid: *mcid}
}
// createPaych sends a message to create the channel and returns the message cid
2022-01-04 19:33:49 +00:00
func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) {
mb, err := ca.messageBuilder(ctx, ca.from)
2020-09-30 17:04:10 +00:00
if err != nil {
return cid.Undef, err
2019-09-17 08:15:26 +00:00
}
msg, err := mb.Create(ca.to, amt)
2020-09-30 17:04:10 +00:00
if err != nil {
return cid.Undef, err
2019-09-16 13:46:05 +00:00
}
smsg, err := ca.api.MpoolPushMessage(ctx, msg, nil)
2019-09-16 13:46:05 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
2019-09-16 13:46:05 +00:00
}
mcid := smsg.Cid()
2020-07-28 23:16:47 +00:00
// Create a new channel in the store
2022-01-04 19:33:49 +00:00
ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail)
if err != nil {
2020-07-28 23:16:47 +00:00
log.Errorf("creating channel: %s", err)
return cid.Undef, err
}
// Wait for the channel to be created on chain
2021-12-11 21:03:00 +00:00
go ca.waitForPaychCreateMsg(ctx, ci.ChannelID, mcid)
2020-07-28 23:16:47 +00:00
return mcid, nil
}
2019-09-16 13:46:05 +00:00
2020-07-28 23:16:47 +00:00
// waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the
// created payment channel
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) waitForPaychCreateMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitPaychCreateMsg(ctx, channelID, mcid)
ca.msgWaitComplete(ctx, mcid, err)
2020-07-28 23:16:47 +00:00
}
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID string, mcid cid.Cid) error {
2021-04-05 17:56:53 +00:00
mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, build.MessageConfidence, api.LookbackNoLimit, true)
2019-09-16 13:46:05 +00:00
if err != nil {
2021-02-11 11:00:26 +00:00
log.Errorf("wait msg: %v", err)
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
// If channel creation failed
2019-09-16 13:46:05 +00:00
if mwait.Receipt.ExitCode != 0 {
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
// Channel creation failed, so remove the channel from the datastore
2021-12-11 21:03:00 +00:00
dserr := ca.store.RemoveChannel(ctx, channelID)
if dserr != nil {
log.Errorf("failed to remove channel %s: %s", channelID, dserr)
}
2020-07-28 23:16:47 +00:00
err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
2020-09-30 17:04:10 +00:00
// TODO: ActorUpgrade abstract over this.
// This "works" because it hasn't changed from v0 to v2, but we still
// need an abstraction here.
var decodedReturn init2.ExecReturn
err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return))
2019-09-16 13:46:05 +00:00
if err != nil {
log.Error(err)
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
2019-09-16 17:23:48 +00:00
2020-07-28 23:16:47 +00:00
// Store robust address of channel
2021-12-11 21:03:00 +00:00
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
2020-07-28 23:16:47 +00:00
channelInfo.Channel = &decodedReturn.RobustAddress
channelInfo.Amount = channelInfo.PendingAmount
2022-01-04 19:33:49 +00:00
channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount
2020-07-28 23:16:47 +00:00
channelInfo.PendingAmount = big.NewInt(0)
2022-01-04 19:33:49 +00:00
channelInfo.PendingAvailableAmount = big.NewInt(0)
2020-07-28 23:16:47 +00:00
channelInfo.CreateMsg = nil
})
return nil
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
// addFunds sends a message to add funds to the channel and returns the message cid
2022-01-04 19:33:49 +00:00
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
2019-09-16 13:46:05 +00:00
msg := &types.Message{
To: *channelInfo.Channel,
From: channelInfo.Control,
Value: amt,
Method: 0,
2019-09-16 13:46:05 +00:00
}
smsg, err := ca.api.MpoolPushMessage(ctx, msg, nil)
2019-09-16 13:46:05 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
return nil, err
2019-09-16 13:46:05 +00:00
}
mcid := smsg.Cid()
2020-07-28 23:16:47 +00:00
// Store the add funds message CID on the channel
2021-12-11 21:03:00 +00:00
ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) {
2020-07-28 23:16:47 +00:00
ci.PendingAmount = amt
2022-01-04 19:33:49 +00:00
ci.PendingAvailableAmount = avail
2020-07-28 23:16:47 +00:00
ci.AddFundsMsg = &mcid
})
// Store a reference from the message CID to the channel, so that we can
// look up the channel from the message CID
2021-12-11 21:03:00 +00:00
err = ca.store.SaveNewMessage(ctx, channelInfo.ChannelID, mcid)
2020-07-28 23:16:47 +00:00
if err != nil {
log.Errorf("saving add funds message CID %s: %s", mcid, err)
}
2021-12-11 21:03:00 +00:00
go ca.waitForAddFundsMsg(ctx, channelInfo.ChannelID, mcid)
2020-07-28 23:16:47 +00:00
return &mcid, nil
}
2022-01-04 19:33:49 +00:00
// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) {
2020-07-28 23:16:47 +00:00
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) {
err := ca.waitAddFundsMsg(ctx, channelID, mcid)
ca.msgWaitComplete(ctx, mcid, err)
}
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) error {
2021-04-05 17:56:53 +00:00
mwait, err := ca.api.StateWaitMsg(ca.chctx, mcid, build.MessageConfidence, api.LookbackNoLimit, true)
2019-09-16 13:46:05 +00:00
if err != nil {
log.Error(err)
2020-07-28 23:16:47 +00:00
return err
2019-09-16 13:46:05 +00:00
}
if mwait.Receipt.ExitCode != 0 {
2020-07-28 23:16:47 +00:00
err := xerrors.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
log.Error(err)
ca.lk.Lock()
defer ca.lk.Unlock()
2021-12-11 21:03:00 +00:00
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
2020-07-28 23:16:47 +00:00
channelInfo.PendingAmount = big.NewInt(0)
2022-01-04 19:33:49 +00:00
channelInfo.PendingAvailableAmount = big.NewInt(0)
2020-07-28 23:16:47 +00:00
channelInfo.AddFundsMsg = nil
})
return err
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
ca.lk.Lock()
defer ca.lk.Unlock()
// Store updated amount
2021-12-11 21:03:00 +00:00
ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) {
2020-07-28 23:16:47 +00:00
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
2022-01-04 19:33:49 +00:00
channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount)
2020-07-28 23:16:47 +00:00
channelInfo.PendingAmount = big.NewInt(0)
2022-01-04 19:33:49 +00:00
channelInfo.PendingAvailableAmount = big.NewInt(0)
2020-07-28 23:16:47 +00:00
channelInfo.AddFundsMsg = nil
2019-09-16 13:46:05 +00:00
})
2020-07-28 23:16:47 +00:00
return nil
}
// Change the state of the channel in the store
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) mutateChannelInfo(ctx context.Context, channelID string, mutate func(*ChannelInfo)) {
channelInfo, err := ca.store.ByChannelID(ctx, channelID)
2020-07-28 23:16:47 +00:00
// If there's an error reading or writing to the store just log an error.
// For now we're assuming it's unlikely to happen in practice.
// Later we may want to implement a transactional approach, whereby
// we record to the store that we're going to send a message, send
// the message, and then record that the message was sent.
2019-09-16 13:46:05 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
log.Errorf("Error reading channel info from store: %s", err)
return
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
mutate(channelInfo)
2021-12-11 21:03:00 +00:00
err = ca.store.putChannelInfo(ctx, channelInfo)
2020-07-28 23:16:47 +00:00
if err != nil {
log.Errorf("Error writing channel info to store: %s", err)
2019-09-16 13:46:05 +00:00
}
2020-07-28 23:16:47 +00:00
}
// restartPending checks the datastore to see if there are any channels that
// have outstanding create / add funds messages, and if so, waits on the
// messages.
// Outstanding messages can occur if a create / add funds message was sent and
// then the system was shut down or crashed before the result was received.
2021-12-11 21:03:00 +00:00
func (pm *Manager) restartPending(ctx context.Context) error {
cis, err := pm.store.WithPendingAddFunds(ctx)
if err != nil {
return err
}
group := errgroup.Group{}
for _, chanInfo := range cis {
ci := chanInfo
if ci.CreateMsg != nil {
group.Go(func() error {
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
}
2021-12-11 21:03:00 +00:00
go ca.waitForPaychCreateMsg(ctx, ci.ChannelID, *ci.CreateMsg)
return nil
})
} else if ci.AddFundsMsg != nil {
group.Go(func() error {
2021-12-11 21:03:00 +00:00
ca, err := pm.accessorByAddress(ctx, *ci.Channel)
if err != nil {
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
}
2021-12-11 21:03:00 +00:00
go ca.waitForAddFundsMsg(ctx, ci.ChannelID, *ci.AddFundsMsg)
return nil
})
}
}
return group.Wait()
}
2020-07-28 23:16:47 +00:00
// getPaychWaitReady waits for a the response to the message with the given cid
func (ca *channelAccessor) getPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
ca.lk.Lock()
// First check if the message has completed
2021-12-11 21:03:00 +00:00
msgInfo, err := ca.store.GetMessage(ctx, mcid)
2020-04-22 20:58:26 +00:00
if err != nil {
2020-07-28 23:16:47 +00:00
ca.lk.Unlock()
return address.Undef, err
}
// If the create channel / add funds message failed, return an error
if len(msgInfo.Err) > 0 {
ca.lk.Unlock()
return address.Undef, xerrors.New(msgInfo.Err)
}
// If the message has completed successfully
if msgInfo.Received {
ca.lk.Unlock()
// Get the channel address
2021-12-11 21:03:00 +00:00
ci, err := ca.store.ByMessageCid(ctx, mcid)
2020-07-28 23:16:47 +00:00
if err != nil {
return address.Undef, err
}
if ci.Channel == nil {
panic(fmt.Sprintf("create / add funds message %s succeeded but channelInfo.Channel is nil", mcid))
}
return *ci.Channel, nil
2020-04-22 20:58:26 +00:00
}
2020-07-28 23:16:47 +00:00
// The message hasn't completed yet so wait for it to complete
promise := ca.msgPromise(ctx, mcid)
// Unlock while waiting
ca.lk.Unlock()
select {
case res := <-promise:
return res.channel, res.err
case <-ctx.Done():
return address.Undef, ctx.Err()
}
}
type onMsgRes struct {
channel address.Address
err error
}
// msgPromise returns a channel that receives the result of the message with
// the given CID
func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan onMsgRes {
promise := make(chan onMsgRes)
triggerUnsub := make(chan struct{})
unsub := ca.msgListeners.onMsgComplete(mcid, func(err error) {
2020-07-28 23:16:47 +00:00
close(triggerUnsub)
// Use a go-routine so as not to block the event handler loop
go func() {
res := onMsgRes{err: err}
if res.err == nil {
// Get the channel associated with the message cid
2021-12-11 21:03:00 +00:00
ci, err := ca.store.ByMessageCid(ctx, mcid)
2020-07-28 23:16:47 +00:00
if err != nil {
res.err = err
} else {
res.channel = *ci.Channel
}
}
// Pass the result to the caller
select {
case promise <- res:
case <-ctx.Done():
}
}()
})
// Unsubscribe when the message is received or the context is done
go func() {
select {
case <-ctx.Done():
case <-triggerUnsub:
}
unsub()
2020-07-28 23:16:47 +00:00
}()
return promise
2019-09-16 13:46:05 +00:00
}
2020-09-01 14:33:44 +00:00
2021-12-11 21:03:00 +00:00
func (ca *channelAccessor) availableFunds(ctx context.Context, channelID string) (*api.ChannelAvailableFunds, error) {
return ca.processQueue(ctx, channelID)
2020-09-01 14:33:44 +00:00
}