wip mpool fixes

This commit is contained in:
Łukasz Magiera 2019-11-23 02:26:32 +01:00
parent 2deae35dcc
commit 6af2e946d1
3 changed files with 87 additions and 4 deletions

View File

@ -99,7 +99,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
sm: sm, sm: sm,
ps: ps, ps: ps,
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),
maxTxPoolSize: 100000, maxTxPoolSize: 5000,
blsSigCache: cache, blsSigCache: cache,
changes: lps.New(50), changes: lps.New(50),
} }
@ -254,12 +254,23 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
} }
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr) // sanity check
if err != nil {
return 0, err
}
mset, ok := mp.pending[addr] mset, ok := mp.pending[addr]
if ok { if ok {
if stateNonce > mset.nextNonce {
log.Errorf("state nonce was larger than mset.nextNonce")
return stateNonce, nil
}
return mset.nextNonce, nil return mset.nextNonce, nil
} }
return mp.getStateNonce(addr) return stateNonce, nil
} }
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
@ -328,8 +339,7 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
delete(mset.msgs, nonce) delete(mset.msgs, nonce)
if len(mset.msgs) == 0 { if len(mset.msgs) == 0 {
// FIXME: This is racy delete(mp.pending, from)
//delete(mp.pending, from)
} else { } else {
var max uint64 var max uint64
for nonce := range mset.msgs { for nonce := range mset.msgs {

View File

@ -0,0 +1,7 @@
package chain
import "testing"
func TestMessagePool(t *testing.T) {
}

View File

@ -4,7 +4,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
) )
var mpoolCmd = &cli.Command{ var mpoolCmd = &cli.Command{
@ -13,6 +17,7 @@ var mpoolCmd = &cli.Command{
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
mpoolPending, mpoolPending,
mpoolSub, mpoolSub,
mpoolStat,
}, },
} }
@ -76,3 +81,64 @@ var mpoolSub = &cli.Command{
} }
}, },
} }
type statBucket struct {
msgs map[uint64]*types.SignedMessage
}
var mpoolStat = &cli.Command{
Name: "stat",
Usage: "print mempool stats",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
ts, err := api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
msgs, err := api.MpoolPending(ctx, nil)
if err != nil {
return err
}
buckets := map[address.Address]*statBucket{}
for _, v := range msgs {
bkt, ok := buckets[v.Message.From]
if !ok {
bkt = &statBucket{
msgs: map[uint64]*types.SignedMessage{},
}
buckets[v.Message.From] = bkt
}
bkt.msgs[v.Message.Nonce] = v
}
for a, bkt := range buckets {
act, err := api.StateGetActor(ctx, a, ts)
if err != nil {
return err
}
cur := act.Nonce
for {
_, ok := bkt.msgs[cur]
if !ok {
break
}
cur++
}
fmt.Printf("%s, cur %d\n", a, cur-act.Nonce)
}
return nil
},
}