From 1ef9113ff0e0be8e2b5630bf65339bbb3a5e3102 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 6 Aug 2020 08:47:48 -0400 Subject: [PATCH] fix: track payment channel by ID instead of from/to --- paychmgr/manager.go | 39 +-------------- paychmgr/paychget_test.go | 2 +- paychmgr/settle_test.go | 5 ++ paychmgr/simple.go | 99 ++++++++++++++++++++++++++------------- paychmgr/store.go | 41 ++++++++++++---- 5 files changed, 106 insertions(+), 80 deletions(-) diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 79ccfe51a..7f47640f1 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -8,8 +8,6 @@ import ( "github.com/ipfs/go-datastore" - "golang.org/x/sync/errgroup" - xerrors "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" @@ -103,42 +101,9 @@ func HandleManager(lc fx.Lifecycle, pm *Manager) { }) } -// Start checks the datastore to see if there are any channels that have -// outstanding add funds messages, and if so, waits on the messages. -// Outstanding messages can occur if an add funds message was sent -// and then lotus was shut down or crashed before the result was -// received. +// Start restarts tracking of any messages that were sent to chain. func (pm *Manager) Start() error { - cis, err := pm.store.WithPendingAddFunds() - if err != nil { - return err - } - - group := errgroup.Group{} - for _, chanInfo := range cis { - ci := chanInfo - if ci.CreateMsg != nil { - group.Go(func() error { - ca, err := pm.accessorByFromTo(ci.Control, ci.Target) - if err != nil { - return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err) - } - go ca.waitForPaychCreateMsg(ci.Control, ci.Target, *ci.CreateMsg, nil) - return nil - }) - } else if ci.AddFundsMsg != nil { - group.Go(func() error { - ca, err := pm.accessorByAddress(*ci.Channel) - if err != nil { - return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err) - } - go ca.waitForAddFundsMsg(ci.Control, ci.Target, *ci.AddFundsMsg, nil) - return nil - }) - } - } - - return group.Wait() + return pm.restartPending() } // Stop shuts down any processes used by the manager diff --git a/paychmgr/paychget_test.go b/paychmgr/paychget_test.go index 11e6683b2..c19fadb7f 100644 --- a/paychmgr/paychget_test.go +++ b/paychmgr/paychget_test.go @@ -284,7 +284,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // 2. Should block until create channel has completed. // Because first channel create fails, this request - // should be for channel create. + // should be for channel create again. amt2 := big.NewInt(5) ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) require.NoError(t, err) diff --git a/paychmgr/settle_test.go b/paychmgr/settle_test.go index 88f44d848..292b139ea 100644 --- a/paychmgr/settle_test.go +++ b/paychmgr/settle_test.go @@ -69,4 +69,9 @@ func TestPaychSettle(t *testing.T) { ch2, err := mgr.GetPaychWaitReady(ctx, mcid2) require.NoError(t, err) require.NotEqual(t, ch, ch2) + + // There should now be two channels + cis, err := mgr.ListChannels() + require.NoError(t, err) + require.Len(t, cis, 2) } diff --git a/paychmgr/simple.go b/paychmgr/simple.go index 0113f7061..67b5a4f41 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -5,6 +5,8 @@ import ( "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -191,9 +193,7 @@ func (ca *channelAccessor) processTask( } // If a channel has not yet been created, create one. - // Note that if the previous attempt to create the channel failed because of a VM error - // (eg not enough gas), both channelInfo.Channel and channelInfo.CreateMsg will be nil. - if channelInfo == nil || channelInfo.Channel == nil && channelInfo.CreateMsg == nil { + if channelInfo == nil { mcid, err := ca.createPaych(ctx, from, to, amt, onComplete) if err != nil { return &paychFundsRes{err: err} @@ -218,7 +218,7 @@ func (ca *channelAccessor) processTask( // We need to add more funds, so send an add funds message to // cover the amount for this request - mcid, err := ca.addFunds(ctx, from, to, amt, onComplete) + mcid, err := ca.addFunds(ctx, channelInfo, amt, onComplete) if err != nil { return &paychFundsRes{err: err} } @@ -257,43 +257,45 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add mcid := smsg.Cid() // Create a new channel in the store - if _, err := ca.store.createChannel(from, to, mcid, amt); err != nil { + ci, err := ca.store.CreateChannel(from, to, mcid, amt) + if err != nil { log.Errorf("creating channel: %s", err) return cid.Undef, err } // Wait for the channel to be created on chain - go ca.waitForPaychCreateMsg(from, to, mcid, cb) + go ca.waitForPaychCreateMsg(ci.ChannelID, mcid, cb) return mcid, nil } // waitForPaychCreateMsg waits for mcid to appear on chain and stores the robust address of the // created payment channel -func (ca *channelAccessor) waitForPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) { - err := ca.waitPaychCreateMsg(from, to, mcid) +func (ca *channelAccessor) waitForPaychCreateMsg(channelID string, mcid cid.Cid, cb onCompleteFn) { + err := ca.waitPaychCreateMsg(channelID, mcid) ca.msgWaitComplete(mcid, err, cb) } -func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.Address, mcid cid.Cid) error { +func (ca *channelAccessor) waitPaychCreateMsg(channelID string, mcid cid.Cid) error { mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence) if err != nil { log.Errorf("wait msg: %w", err) return err } + // If channel creation failed if mwait.Receipt.ExitCode != 0 { - err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) - log.Error(err) - ca.lk.Lock() defer ca.lk.Unlock() - ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) { - channelInfo.PendingAmount = big.NewInt(0) - channelInfo.CreateMsg = nil - }) + // Channel creation failed, so remove the channel from the datastore + dserr := ca.store.RemoveChannel(channelID) + if dserr != nil { + log.Errorf("failed to remove channel %s: %s", channelID, dserr) + } + err := xerrors.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) + log.Error(err) return err } @@ -308,7 +310,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A defer ca.lk.Unlock() // Store robust address of channel - ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) { + ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) { channelInfo.Channel = &decodedReturn.RobustAddress channelInfo.Amount = channelInfo.PendingAmount channelInfo.PendingAmount = big.NewInt(0) @@ -319,12 +321,7 @@ func (ca *channelAccessor) waitPaychCreateMsg(from address.Address, to address.A } // addFunds sends a message to add funds to the channel and returns the message cid -func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, to address.Address, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) { - channelInfo, err := ca.store.OutboundActiveByFromTo(from, to) - if err != nil { - return nil, err - } - +func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt, cb onCompleteFn) (*cid.Cid, error) { msg := &types.Message{ To: *channelInfo.Channel, From: channelInfo.Control, @@ -341,7 +338,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t mcid := smsg.Cid() // Store the add funds message CID on the channel - ca.mutateChannelInfo(from, to, func(ci *ChannelInfo) { + ca.mutateChannelInfo(channelInfo.ChannelID, func(ci *ChannelInfo) { ci.PendingAmount = amt ci.AddFundsMsg = &mcid }) @@ -353,18 +350,18 @@ func (ca *channelAccessor) addFunds(ctx context.Context, from address.Address, t log.Errorf("saving add funds message CID %s: %s", mcid, err) } - go ca.waitForAddFundsMsg(from, to, mcid, cb) + go ca.waitForAddFundsMsg(channelInfo.ChannelID, mcid, cb) return &mcid, nil } // waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any -func (ca *channelAccessor) waitForAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid, cb onCompleteFn) { - err := ca.waitAddFundsMsg(from, to, mcid) +func (ca *channelAccessor) waitForAddFundsMsg(channelID string, mcid cid.Cid, cb onCompleteFn) { + err := ca.waitAddFundsMsg(channelID, mcid) ca.msgWaitComplete(mcid, err, cb) } -func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Address, mcid cid.Cid) error { +func (ca *channelAccessor) waitAddFundsMsg(channelID string, mcid cid.Cid) error { mwait, err := ca.api.StateWaitMsg(ca.waitCtx, mcid, build.MessageConfidence) if err != nil { log.Error(err) @@ -378,7 +375,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr ca.lk.Lock() defer ca.lk.Unlock() - ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) { + ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) { channelInfo.PendingAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil }) @@ -390,7 +387,7 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr defer ca.lk.Unlock() // Store updated amount - ca.mutateChannelInfo(from, to, func(channelInfo *ChannelInfo) { + ca.mutateChannelInfo(channelID, func(channelInfo *ChannelInfo) { channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount) channelInfo.PendingAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil @@ -400,8 +397,8 @@ func (ca *channelAccessor) waitAddFundsMsg(from address.Address, to address.Addr } // Change the state of the channel in the store -func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Address, mutate func(*ChannelInfo)) { - channelInfo, err := ca.store.OutboundActiveByFromTo(from, to) +func (ca *channelAccessor) mutateChannelInfo(channelID string, mutate func(*ChannelInfo)) { + channelInfo, err := ca.store.ByChannelID(channelID) // If there's an error reading or writing to the store just log an error. // For now we're assuming it's unlikely to happen in practice. @@ -420,6 +417,44 @@ func (ca *channelAccessor) mutateChannelInfo(from address.Address, to address.Ad } } +// restartPending checks the datastore to see if there are any channels that +// have outstanding create / add funds messages, and if so, waits on the +// messages. +// Outstanding messages can occur if a create / add funds message was sent and +// then the system was shut down or crashed before the result was received. +func (pm *Manager) restartPending() error { + cis, err := pm.store.WithPendingAddFunds() + if err != nil { + return err + } + + group := errgroup.Group{} + for _, chanInfo := range cis { + ci := chanInfo + if ci.CreateMsg != nil { + group.Go(func() error { + ca, err := pm.accessorByFromTo(ci.Control, ci.Target) + if err != nil { + return xerrors.Errorf("error initializing payment channel manager %s -> %s: %s", ci.Control, ci.Target, err) + } + go ca.waitForPaychCreateMsg(ci.ChannelID, *ci.CreateMsg, nil) + return nil + }) + } else if ci.AddFundsMsg != nil { + group.Go(func() error { + ca, err := pm.accessorByAddress(*ci.Channel) + if err != nil { + return xerrors.Errorf("error initializing payment channel manager %s: %s", ci.Channel, err) + } + go ca.waitForAddFundsMsg(ci.ChannelID, *ci.AddFundsMsg, nil) + return nil + }) + } + } + + return group.Wait() +} + // getPaychWaitReady waits for a the response to the message with the given cid func (ca *channelAccessor) getPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) { ca.lk.Lock() diff --git a/paychmgr/store.go b/paychmgr/store.go index bf7b3ee9b..d7c6e82e7 100644 --- a/paychmgr/store.go +++ b/paychmgr/store.go @@ -150,7 +150,7 @@ func (ps *Store) findChans(filter func(*ChannelInfo) bool, max int) ([]ChannelIn return nil, err } - ci, err := unmarshallChannelInfo(&stored, res) + ci, err := unmarshallChannelInfo(&stored, res.Value) if err != nil { return nil, err } @@ -313,11 +313,25 @@ func (ps *Store) WithPendingAddFunds() ([]ChannelInfo, error) { }, 0) } -// createChannel creates an outbound channel for the given from / to, ensuring +// ByChannelID gets channel info by channel ID +func (ps *Store) ByChannelID(channelID string) (*ChannelInfo, error) { + var stored ChannelInfo + + res, err := ps.ds.Get(dskeyForChannel(channelID)) + if err != nil { + if err == datastore.ErrNotFound { + return nil, ErrChannelNotTracked + } + return nil, err + } + + return unmarshallChannelInfo(&stored, res) +} + +// CreateChannel creates an outbound channel for the given from / to, ensuring // it has a higher sequence number than any existing channel with the same from / to -func (ps *Store) createChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) { +func (ps *Store) CreateChannel(from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) { ci := &ChannelInfo{ - ChannelID: uuid.New().String(), Direction: DirOutbound, NextLane: 0, Control: from, @@ -341,15 +355,22 @@ func (ps *Store) createChannel(from address.Address, to address.Address, createM return ci, err } +// RemoveChannel removes the channel with the given channel ID +func (ps *Store) RemoveChannel(channelID string) error { + return ps.ds.Delete(dskeyForChannel(channelID)) +} + // The datastore key used to identify the channel info -func dskeyForChannel(ci *ChannelInfo) datastore.Key { - chanKey := fmt.Sprintf("%s->%s", ci.Control.String(), ci.Target.String()) - return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, chanKey}) +func dskeyForChannel(channelID string) datastore.Key { + return datastore.KeyWithNamespaces([]string{dsKeyChannelInfo, channelID}) } // putChannelInfo stores the channel info in the datastore func (ps *Store) putChannelInfo(ci *ChannelInfo) error { - k := dskeyForChannel(ci) + if len(ci.ChannelID) == 0 { + ci.ChannelID = uuid.New().String() + } + k := dskeyForChannel(ci.ChannelID) b, err := marshallChannelInfo(ci) if err != nil { @@ -380,8 +401,8 @@ func marshallChannelInfo(ci *ChannelInfo) ([]byte, error) { return cborrpc.Dump(ci) } -func unmarshallChannelInfo(stored *ChannelInfo, res dsq.Result) (*ChannelInfo, error) { - if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil { +func unmarshallChannelInfo(stored *ChannelInfo, value []byte) (*ChannelInfo, error) { + if err := stored.UnmarshalCBOR(bytes.NewReader(value)); err != nil { return nil, err }