From 6af2e946d1e81ba182b533044b733340945c7e2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 23 Nov 2019 02:26:32 +0100 Subject: [PATCH] wip mpool fixes --- chain/messagepool.go | 18 ++++++++--- chain/messagepool_test.go | 7 +++++ cli/mpool.go | 66 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 4 deletions(-) create mode 100644 chain/messagepool_test.go diff --git a/chain/messagepool.go b/chain/messagepool.go index 57053c966..9f88acd22 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -99,7 +99,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { sm: sm, ps: ps, minGasPrice: types.NewInt(0), - maxTxPoolSize: 100000, + maxTxPoolSize: 5000, blsSigCache: cache, changes: lps.New(50), } @@ -254,12 +254,23 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { } func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { + stateNonce, err := mp.getStateNonce(addr) // sanity check + if err != nil { + return 0, err + } + mset, ok := mp.pending[addr] if ok { + if stateNonce > mset.nextNonce { + log.Errorf("state nonce was larger than mset.nextNonce") + + return stateNonce, nil + } + return mset.nextNonce, nil } - return mp.getStateNonce(addr) + return stateNonce, nil } func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { @@ -328,8 +339,7 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { delete(mset.msgs, nonce) if len(mset.msgs) == 0 { - // FIXME: This is racy - //delete(mp.pending, from) + delete(mp.pending, from) } else { var max uint64 for nonce := range mset.msgs { diff --git a/chain/messagepool_test.go b/chain/messagepool_test.go new file mode 100644 index 000000000..36928ea28 --- /dev/null +++ b/chain/messagepool_test.go @@ -0,0 +1,7 @@ +package chain + +import "testing" + +func TestMessagePool(t *testing.T) { + +} diff --git a/cli/mpool.go b/cli/mpool.go index f01732884..62b7f84ff 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -4,7 +4,11 @@ import ( "encoding/json" "fmt" + "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" ) var mpoolCmd = &cli.Command{ @@ -13,6 +17,7 @@ var mpoolCmd = &cli.Command{ Subcommands: []*cli.Command{ mpoolPending, mpoolSub, + mpoolStat, }, } @@ -76,3 +81,64 @@ var mpoolSub = &cli.Command{ } }, } + +type statBucket struct { + msgs map[uint64]*types.SignedMessage +} + +var mpoolStat = &cli.Command{ + Name: "stat", + Usage: "print mempool stats", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + ts, err := api.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + + msgs, err := api.MpoolPending(ctx, nil) + if err != nil { + return err + } + + buckets := map[address.Address]*statBucket{} + + for _, v := range msgs { + bkt, ok := buckets[v.Message.From] + if !ok { + bkt = &statBucket{ + msgs: map[uint64]*types.SignedMessage{}, + } + buckets[v.Message.From] = bkt + } + + bkt.msgs[v.Message.Nonce] = v + } + for a, bkt := range buckets { + act, err := api.StateGetActor(ctx, a, ts) + if err != nil { + return err + } + + cur := act.Nonce + for { + _, ok := bkt.msgs[cur] + if !ok { + break + } + cur++ + } + + fmt.Printf("%s, cur %d\n", a, cur-act.Nonce) + } + + return nil + }, +}