fix: track payment channel by ID instead of from/to
This commit is contained in:
parent
63bb09553a
commit
1ef9113ff0
@ -8,8 +8,6 @@ import (
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
xerrors "golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -103,42 +101,9 @@ func HandleManager(lc fx.Lifecycle, pm *Manager) {
|
||||
})
|
||||
}
|
||||
|
||||
// Start checks the datastore to see if there are any channels that have
|
||||
// outstanding add funds messages, and if so, waits on the messages.
|
||||
// Outstanding messages can occur if an add funds message was sent
|
||||
// and then lotus was shut down or crashed before the result was
|
||||
// received.
|
||||
// Start restarts tracking of any messages that were sent to chain.
|
||||
func (pm *Manager) Start() error {
|
||||
cis, err := pm.store.WithPendingAddFunds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
group := errgroup.Group{}
|
||||
for _, chanInfo := range cis {
|
||||
ci := chanInfo
|
||||
if ci.CreateMsg != nil {
|
||||
group.Go(func() error {
|
||||
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
|
||||
}
|
||||
go ca.waitForPaychCreateMsg(ci.Control, ci.Target, *ci.CreateMsg, nil)
|
||||
return nil
|
||||
})
|
||||
} else if ci.AddFundsMsg != nil {
|
||||
group.Go(func() error {
|
||||
ca, err := pm.accessorByAddress(*ci.Channel)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
|
||||
}
|
||||
go ca.waitForAddFundsMsg(ci.Control, ci.Target, *ci.AddFundsMsg, nil)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return group.Wait()
|
||||
return pm.restartPending()
|
||||
}
|
||||
|
||||
// Stop shuts down any processes used by the manager
|
||||
|
@ -284,7 +284,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
|
||||
|
||||
// 2. Should block until create channel has completed.
|
||||
// Because first channel create fails, this request
|
||||
// should be for channel create.
|
||||
// should be for channel create again.
|
||||
amt2 := big.NewInt(5)
|
||||
ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
|
||||
require.NoError(t, err)
|
||||
|
@ -69,4 +69,9 @@ func TestPaychSettle(t *testing.T) {
|
||||
ch2, err := mgr.GetPaychWaitReady(ctx, mcid2)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, ch, ch2)
|
||||
|
||||
// There should now be two channels
|
||||
cis, err := mgr.ListChannels()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, cis, 2)
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
@ -191,9 +193,7 @@ func (ca *channelAccessor) processTask(
|
||||
}
|
||||
|
||||
// If a channel has not yet been created, create one.
|
||||
// Note that if the previous attempt to create the channel failed because of a VM error
|
||||
// (eg not enough gas), both channelInfo.Channel and channelInfo.CreateMsg will be nil.
|
||||
if channelInfo == nil || channelInfo.Channel == nil && channelInfo.CreateMsg == nil {
|
||||
if channelInfo == nil {
|
||||
mcid, err := ca.createPaych(ctx, from, to, amt, onComplete)
|
||||
if err != nil {
|
||||
return &paychFundsRes{err: err}
|
||||
@ -218,7 +218,7 @@ func (ca *channelAccessor) processTask(
|
||||
|
||||
// We need to add more funds, so send an add funds message to
|
||||
// cover the amount for this request
|
||||
mcid, err := ca.addFunds(ctx, from, to, amt, onComplete)
|
||||
mcid, err := ca.addFunds(ctx, channelInfo, amt, onComplete)
|
||||
if err != nil {
|
||||
return &paychFundsRes{err: err}
|
||||
}
|
||||
@ -257,43 +257,45 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
|
||||
mcid := smsg.Cid()
|
||||
|
||||
// Create a new channel in the store
|
||||
if _, err := ca.store.createChannel(from, to, mcid, amt); err != nil {
|
||||
ci, err := ca.store.CreateChannel(from, to, mcid, amt)
|
||||
if err != nil {
|
||||
log.Errorf("creating channel: %s", err)
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
// Wait for the channel to be created on chain
|
||||
go ca.waitForPaychCreateMsg(from, to, mcid, cb)
|
||||
go ca.waitForPaychCreateMsg(ci.ChannelID, mcid, cb)
|
||||
|
||||
return mcid, nil
|
||||
}
|
||||
|
||||
// waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the
|
||||
// created payment channel
|
||||
func (ca *channelAccessor) waitForPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) {
|
||||
err := ca.waitPaychCreateMsg(from, to, mcid)
|
||||
func (ca *channelAccessor) waitForPaychCreateMsg(channelID string, mcid cid.Cid, cb onCompleteFn) {
|
||||
err := ca.waitPaychCreateMsg(channelID, mcid)
|
||||
ca.msgWaitComplete(mcid, err, cb)
|
||||
}
|
||||
|
||||
func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid) error {
|
||||
func (ca *channelAccessor) waitPaychCreateMsg(channelID string, mcid cid.Cid) error {
|
||||
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
log.Errorf("wait msg: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// If channel creation failed
|
||||
if mwait.Receipt.ExitCode != 0 {
|
||||
err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
|
||||
log.Error(err)
|
||||
|
||||
ca.lk.Lock()
|
||||
defer ca.lk.Unlock()
|
||||
|
||||
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
|
||||
channelInfo.PendingAmount = big.NewInt(0)
|
||||
channelInfo.CreateMsg = nil
|
||||
})
|
||||
// Channel creation failed, so remove the channel from the datastore
|
||||
dserr := ca.store.RemoveChannel(channelID)
|
||||
if dserr != nil {
|
||||
log.Errorf("failed to remove channel %s: %s", channelID, dserr)
|
||||
}
|
||||
|
||||
err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -308,7 +310,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A
|
||||
defer ca.lk.Unlock()
|
||||
|
||||
// Store robust address of channel
|
||||
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
|
||||
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
|
||||
channelInfo.Channel = &decodedReturn.RobustAddress
|
||||
channelInfo.Amount = channelInfo.PendingAmount
|
||||
channelInfo.PendingAmount = big.NewInt(0)
|
||||
@ -319,12 +321,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A
|
||||
}
|
||||
|
||||
// addFunds sends a message to add funds to the channel and returns the message cid
|
||||
func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, to address.Address, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) {
|
||||
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) {
|
||||
msg := &types.Message{
|
||||
To: *channelInfo.Channel,
|
||||
From: channelInfo.Control,
|
||||
@ -341,7 +338,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t
|
||||
mcid := smsg.Cid()
|
||||
|
||||
// Store the add funds message CID on the channel
|
||||
ca.mutateChannelInfo(from, to, func(ci *ChannelInfo) {
|
||||
ca.mutateChannelInfo(channelInfo.ChannelID, func(ci *ChannelInfo) {
|
||||
ci.PendingAmount = amt
|
||||
ci.AddFundsMsg = &mcid
|
||||
})
|
||||
@ -353,18 +350,18 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t
|
||||
log.Errorf("saving add funds message CID %s: %s", mcid, err)
|
||||
}
|
||||
|
||||
go ca.waitForAddFundsMsg(from, to, mcid, cb)
|
||||
go ca.waitForAddFundsMsg(channelInfo.ChannelID, mcid, cb)
|
||||
|
||||
return &mcid, nil
|
||||
}
|
||||
|
||||
// waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
|
||||
func (ca *channelAccessor) waitForAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) {
|
||||
err := ca.waitAddFundsMsg(from, to, mcid)
|
||||
func (ca *channelAccessor) waitForAddFundsMsg(channelID string, mcid cid.Cid, cb onCompleteFn) {
|
||||
err := ca.waitAddFundsMsg(channelID, mcid)
|
||||
ca.msgWaitComplete(mcid, err, cb)
|
||||
}
|
||||
|
||||
func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid) error {
|
||||
func (ca *channelAccessor) waitAddFundsMsg(channelID string, mcid cid.Cid) error {
|
||||
mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@ -378,7 +375,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
|
||||
ca.lk.Lock()
|
||||
defer ca.lk.Unlock()
|
||||
|
||||
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
|
||||
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
|
||||
channelInfo.PendingAmount = big.NewInt(0)
|
||||
channelInfo.AddFundsMsg = nil
|
||||
})
|
||||
@ -390,7 +387,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
|
||||
defer ca.lk.Unlock()
|
||||
|
||||
// Store updated amount
|
||||
ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) {
|
||||
ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) {
|
||||
channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount)
|
||||
channelInfo.PendingAmount = big.NewInt(0)
|
||||
channelInfo.AddFundsMsg = nil
|
||||
@ -400,8 +397,8 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr
|
||||
}
|
||||
|
||||
// Change the state of the channel in the store
|
||||
func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Address, mutate func(*ChannelInfo)) {
|
||||
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to)
|
||||
func (ca *channelAccessor) mutateChannelInfo(channelID string, mutate func(*ChannelInfo)) {
|
||||
channelInfo, err := ca.store.ByChannelID(channelID)
|
||||
|
||||
// If there's an error reading or writing to the store just log an error.
|
||||
// For now we're assuming it's unlikely to happen in practice.
|
||||
@ -420,6 +417,44 @@ func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Ad
|
||||
}
|
||||
}
|
||||
|
||||
// restartPending checks the datastore to see if there are any channels that
|
||||
// have outstanding create / add funds messages, and if so, waits on the
|
||||
// messages.
|
||||
// Outstanding messages can occur if a create / add funds message was sent and
|
||||
// then the system was shut down or crashed before the result was received.
|
||||
func (pm *Manager) restartPending() error {
|
||||
cis, err := pm.store.WithPendingAddFunds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
group := errgroup.Group{}
|
||||
for _, chanInfo := range cis {
|
||||
ci := chanInfo
|
||||
if ci.CreateMsg != nil {
|
||||
group.Go(func() error {
|
||||
ca, err := pm.accessorByFromTo(ci.Control, ci.Target)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err)
|
||||
}
|
||||
go ca.waitForPaychCreateMsg(ci.ChannelID, *ci.CreateMsg, nil)
|
||||
return nil
|
||||
})
|
||||
} else if ci.AddFundsMsg != nil {
|
||||
group.Go(func() error {
|
||||
ca, err := pm.accessorByAddress(*ci.Channel)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err)
|
||||
}
|
||||
go ca.waitForAddFundsMsg(ci.ChannelID, *ci.AddFundsMsg, nil)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return group.Wait()
|
||||
}
|
||||
|
||||
// getPaychWaitReady waits for a the response to the message with the given cid
|
||||
func (ca *channelAccessor) getPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) {
|
||||
ca.lk.Lock()
|
||||
|
@ -150,7 +150,7 @@ func (ps *Store) findChans(filter func(*ChannelInfo) bool, max int) ([]ChannelIn
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ci, err := unmarshallChannelInfo(&stored, res)
|
||||
ci, err := unmarshallChannelInfo(&stored, res.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -313,11 +313,25 @@ func (ps *Store) WithPendingAddFunds() ([]ChannelInfo, error) {
|
||||
}, 0)
|
||||
}
|
||||
|
||||
// createChannel creates an outbound channel for the given from / to, ensuring
|
||||
// ByChannelID gets channel info by channel ID
|
||||
func (ps *Store) ByChannelID(channelID string) (*ChannelInfo, error) {
|
||||
var stored ChannelInfo
|
||||
|
||||
res, err := ps.ds.Get(dskeyForChannel(channelID))
|
||||
if err != nil {
|
||||
if err == datastore.ErrNotFound {
|
||||
return nil, ErrChannelNotTracked
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return unmarshallChannelInfo(&stored, res)
|
||||
}
|
||||
|
||||
// CreateChannel creates an outbound channel for the given from / to, ensuring
|
||||
// it has a higher sequence number than any existing channel with the same from / to
|
||||
func (ps *Store) createChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
|
||||
func (ps *Store) CreateChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) {
|
||||
ci := &ChannelInfo{
|
||||
ChannelID: uuid.New().String(),
|
||||
Direction: DirOutbound,
|
||||
NextLane: 0,
|
||||
Control: from,
|
||||
@ -341,15 +355,22 @@ func (ps *Store) createChannel(from address.Address, to address.Address, createM
|
||||
return ci, err
|
||||
}
|
||||
|
||||
// RemoveChannel removes the channel with the given channel ID
|
||||
func (ps *Store) RemoveChannel(channelID string) error {
|
||||
return ps.ds.Delete(dskeyForChannel(channelID))
|
||||
}
|
||||
|
||||
// The datastore key used to identify the channel info
|
||||
func dskeyForChannel(ci *ChannelInfo) datastore.Key {
|
||||
chanKey := fmt.Sprintf("%s->%s", ci.Control.String(), ci.Target.String())
|
||||
return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, chanKey})
|
||||
func dskeyForChannel(channelID string) datastore.Key {
|
||||
return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, channelID})
|
||||
}
|
||||
|
||||
// putChannelInfo stores the channel info in the datastore
|
||||
func (ps *Store) putChannelInfo(ci *ChannelInfo) error {
|
||||
k := dskeyForChannel(ci)
|
||||
if len(ci.ChannelID) == 0 {
|
||||
ci.ChannelID = uuid.New().String()
|
||||
}
|
||||
k := dskeyForChannel(ci.ChannelID)
|
||||
|
||||
b, err := marshallChannelInfo(ci)
|
||||
if err != nil {
|
||||
@ -380,8 +401,8 @@ func marshallChannelInfo(ci *ChannelInfo) ([]byte, error) {
|
||||
return cborrpc.Dump(ci)
|
||||
}
|
||||
|
||||
func unmarshallChannelInfo(stored *ChannelInfo, res dsq.Result) (*ChannelInfo, error) {
|
||||
if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
|
||||
func unmarshallChannelInfo(stored *ChannelInfo, value []byte) (*ChannelInfo, error) {
|
||||
if err := stored.UnmarshalCBOR(bytes.NewReader(value)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user