diff --git a/paychmgr/settler/settler.go b/paychmgr/settler/settler.go index 9d48fdb3b..636684118 100644 --- a/paychmgr/settler/settler.go +++ b/paychmgr/settler/settler.go @@ -2,6 +2,17 @@ package settler import ( "context" + "sync" + + "go.uber.org/fx" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -9,12 +20,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/paych" - "go.uber.org/fx" ) +var log = logging.Logger("payment-channel-settler") + // API are the dependencies need to run the payment channel settler type API struct { fx.In @@ -24,14 +33,18 @@ type API struct { payapi.PaychAPI } -type paymentChannelSettler struct { - ctx context.Context - paych payapi.PaychAPI +type settlerAPI interface { + PaychList(context.Context) ([]address.Address, error) + PaychStatus(context.Context, address.Address) (*api.PaychStatus, error) + PaychVoucherCheckSpendable(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error) + PaychVoucherList(context.Context, address.Address) ([]*paych.SignedVoucher, error) + PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher) (cid.Cid, error) + StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) } -type eventAPI struct { - full.ChainAPI - full.StateAPI +type paymentChannelSettler struct { + ctx context.Context + api settlerAPI } // SettlePaymentChannels checks the chain for events related to payment channels settling and @@ -39,31 +52,34 @@ type eventAPI struct { func SettlePaymentChannels(lc fx.Lifecycle, api API) error { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - pcs := &paymentChannelSettler{ - ctx: ctx, - paych: api.PaychAPI, - } - - ev := events.NewEvents(ctx, &eventAPI{api.ChainAPI, api.StateAPI}) + pcs := newPaymentChannelSettler(ctx, &api) + ev := events.NewEvents(ctx, &api) return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher) }, }) return nil } +func newPaymentChannelSettler(ctx context.Context, api settlerAPI) *paymentChannelSettler { + return &paymentChannelSettler{ + ctx: ctx, + api: api, + } +} + func (pcs *paymentChannelSettler) check(ts *types.TipSet) (done bool, more bool, err error) { return false, true, nil } func (pcs *paymentChannelSettler) messageHandler(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { - vouchers, err := pcs.paych.PaychVoucherList(pcs.ctx, msg.To) + vouchers, err := pcs.api.PaychVoucherList(pcs.ctx, msg.To) if err != nil { return true, err } bestByLane := make(map[uint64]*paych.SignedVoucher) for _, voucher := range vouchers { - spendable, err := pcs.paych.PaychVoucherCheckSpendable(pcs.ctx, msg.To, voucher, nil, nil) + spendable, err := pcs.api.PaychVoucherCheckSpendable(pcs.ctx, msg.To, voucher, nil, nil) if err != nil { return true, err } @@ -73,18 +89,29 @@ func (pcs *paymentChannelSettler) messageHandler(msg *types.Message, rec *types. } } } + var wg sync.WaitGroup + wg.Add(len(bestByLane)) for _, voucher := range bestByLane { - _, err := pcs.paych.PaychVoucherSubmit(pcs.ctx, msg.To, voucher) + submitMessageCID, err := pcs.api.PaychVoucherSubmit(pcs.ctx, msg.To, voucher) if err != nil { return true, err } - // TODO: StateWaitMsg? + go func(voucher *paych.SignedVoucher, submitMessageCID cid.Cid) { + defer wg.Done() + msgLookup, err := pcs.api.StateWaitMsg(pcs.ctx, submitMessageCID, build.MessageConfidence) + if err != nil { + log.Errorf("submitting voucher: %s", err.Error()) + } + if msgLookup.Receipt.ExitCode != 0 { + log.Errorf("failed submitting voucher: %+v", voucher) + } + }(voucher, submitMessageCID) } + wg.Wait() return true, nil } func (pcs *paymentChannelSettler) revertHandler(ctx context.Context, ts *types.TipSet) error { - // TODO: fill in return nil } @@ -93,15 +120,15 @@ func (pcs *paymentChannelSettler) matcher(msg *types.Message) (matchOnce bool, m if msg.Method != builtin.MethodsPaych.Settle { return false, false, nil } - // Check if this payment channel is of concern to this miner (i.e. tracked in payment channel store), + // Check if this payment channel is of concern to this node (i.e. tracked in payment channel store), // and its inbound (i.e. we're getting vouchers that we may need to redeem) - trackedAddresses, err := pcs.paych.PaychList(pcs.ctx) + trackedAddresses, err := pcs.api.PaychList(pcs.ctx) if err != nil { return false, false, err } for _, addr := range trackedAddresses { if msg.To == addr { - status, err := pcs.paych.PaychStatus(pcs.ctx, addr) + status, err := pcs.api.PaychStatus(pcs.ctx, addr) if err != nil { return false, false, err }