diff --git a/api/api_full.go b/api/api_full.go index 71e3987df..2d8a4e515 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -184,6 +184,9 @@ type FullNode interface { MpoolGetNonce(context.Context, address.Address) (uint64, error) MpoolSub(context.Context) (<-chan MpoolUpdate, error) + // MpoolClear clears pending messages from the mpool + MpoolClear(context.Context, bool) error + // MpoolGetConfig returns (a copy of) the current mpool config MpoolGetConfig(context.Context) (*types.MpoolConfig, error) // MpoolSetConfig sets the mpool config to (a copy of) the supplied config diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 5f027543e..0b8ba00e4 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -106,7 +106,9 @@ type FullNodeStruct struct { MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"` - MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` + MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` + MpoolClear func(context.Context, bool) error `perm:"write"` + MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"` MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"` @@ -494,6 +496,10 @@ func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) return c.Internal.MpoolPending(ctx, tsk) } +func (c *FullNodeStruct) MpoolClear(ctx context.Context, local bool) error { + return c.Internal.MpoolClear(ctx, local) +} + func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { return c.Internal.MpoolPush(ctx, smsg) } diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index d55b45e14..644a9104f 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -744,30 +745,42 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } } + var merr error + for _, ts := range revert { pts, err := mp.api.LoadTipSet(ts.Parents()) if err != nil { - return err - } - - msgs, err := mp.MessagesForBlocks(ts.Blocks()) - if err != nil { - return err + log.Errorf("error loading reverted tipset parent: %s", err) + merr = multierror.Append(merr, err) + continue } mp.curTs = pts + msgs, err := mp.MessagesForBlocks(ts.Blocks()) + if err != nil { + log.Errorf("error retrieving messages for reverted block: %s", err) + merr = multierror.Append(merr, err) + continue + } + for _, msg := range msgs { add(msg) } } for _, ts := range apply { + mp.curTs = ts + for _, b := range ts.Blocks() { bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { - return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) + xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) + log.Errorf("error retrieving messages for block: %s", xerr) + merr = multierror.Append(merr, xerr) + continue } + for _, msg := range smsgs { rm(msg.Message.From, msg.Message.Nonce) maybeRepub(msg.Cid()) @@ -778,8 +791,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) maybeRepub(msg.Cid()) } } - - mp.curTs = ts } if repubTrigger { @@ -862,7 +873,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } } - return nil + return merr } type statBucket struct { @@ -962,3 +973,40 @@ func (mp *MessagePool) loadLocal() error { return nil } + +func (mp *MessagePool) Clear(local bool) { + mp.lk.Lock() + defer mp.lk.Unlock() + + // remove everything if local is true, including removing local messages from + // the datastore + if local { + for a := range mp.localAddrs { + mset, ok := mp.pending[a] + if !ok { + continue + } + + for _, m := range mset.msgs { + err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes()))) + if err != nil { + log.Warnf("error deleting local message: %s", err) + } + } + } + + mp.pending = make(map[address.Address]*msgSet) + mp.republished = nil + + return + } + + // remove everything except the local messages + for a := range mp.pending { + _, isLocal := mp.localAddrs[a] + if isLocal { + continue + } + delete(mp.pending, a) + } +} diff --git a/cli/mpool.go b/cli/mpool.go index 62ab06dae..4562ef398 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -20,6 +20,7 @@ var mpoolCmd = &cli.Command{ Usage: "Manage message pool", Subcommands: []*cli.Command{ mpoolPending, + mpoolClear, mpoolSub, mpoolStat, mpoolReplaceCmd, @@ -83,6 +84,38 @@ var mpoolPending = &cli.Command{ }, } +var mpoolClear = &cli.Command{ + Name: "clear", + Usage: "Clear all pending messages from the mpool (USE WITH CARE)", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "local", + Usage: "also clear local messages", + }, + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "must be specified for the action to take effect", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + + really := cctx.Bool("really-do-it") + if !really { + return fmt.Errorf("--really-do-it must be specified for this action to have an effect; you have been warned.") + } + + local := cctx.Bool("local") + + ctx := ReqContext(cctx) + return api.MpoolClear(ctx, local) + }, +} + var mpoolSub = &cli.Command{ Name: "sub", Usage: "Subscribe to mpool changes", diff --git a/documentation/en/mpool.md b/documentation/en/mpool.md index 0425b04e0..bbb7f2440 100644 --- a/documentation/en/mpool.md +++ b/documentation/en/mpool.md @@ -20,6 +20,7 @@ The full node API defines the following methods for interacting with the mpool: MpoolSub(context.Context) (<-chan MpoolUpdate, error) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) MpoolSetConfig(context.Context, *types.MpoolConfig) error + MpoolClear(context.Context, local bool) error ``` ### MpoolPending @@ -61,6 +62,13 @@ Returns (a copy of) the current mpool configuration. Sets the mpool configuration to (a copy of) the supplied configuration object. +### MpoolClear + +Clears pending messages from the mpool; if `local` is `true` then local messages are also cleared and removed from the datastore. + +This should be used with extreme care and only in the case of errors during head changes that +would leave the mpool in an inconsistent state. + ## Command Line Interfae @@ -75,6 +83,7 @@ lotus mpool stat [--local] lotus mpool replace [--gas-feecap ] [--gas-premium ] [--gas-limit ] [from] [nonce] lotus mpool find [--from
] [--to
] [--method ] lotus mpool config [] +lotus mpool clear [--local] ``` ### lotus mpool pending @@ -98,6 +107,12 @@ Searches for messages in the mpool. ### lotus mpool config Gets or sets the current mpool configuration. +### lotus mpool clear +Unconditionally clears pending messages from the mpool. +If the `--local` flag is passed, then local messages are also cleared; otherwise local messages are retained. + +*Warning*: this command should only be used in the case of head change errors leaving the mpool in an state. + ## Configuration The mpool a few parameters that can be configured by the user, either through the API diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index bde7d4f81..fe2b054dd 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -105,6 +105,11 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty } } +func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error { + a.Mpool.Clear(local) + return nil +} + func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { return a.Mpool.Push(smsg) }