diff --git a/chain/messagepool/config.go b/chain/messagepool/config.go index a06a4a27d..a55fb649d 100644 --- a/chain/messagepool/config.go +++ b/chain/messagepool/config.go @@ -1,6 +1,8 @@ package messagepool import ( + "time" + "github.com/filecoin-project/lotus/chain/types" ) @@ -8,6 +10,7 @@ var ( ReplaceByFeeRatioDefault = 1.25 MemPoolSizeLimitHiDefault = 30000 MemPoolSizeLimitLoDefault = 20000 + PruneCooldownDefault = time.Minute ) func (mp *MessagePool) GetConfig() *types.MpoolConfig { @@ -29,5 +32,6 @@ func DefaultConfig() *types.MpoolConfig { SizeLimitHigh: MemPoolSizeLimitHiDefault, SizeLimitLow: MemPoolSizeLimitLoDefault, ReplaceByFeeRatio: ReplaceByFeeRatioDefault, + PruneCooldown: PruneCooldownDefault, } } diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 56fa74f94..3ca1b3966 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -91,6 +91,9 @@ type MessagePool struct { // pruneTrigger is a channel used to trigger a mempool pruning pruneTrigger chan struct{} + // pruneCooldown is a channel used to allow a cooldown time between prunes + pruneCooldown chan struct{} + blsSigCache *lru.TwoQueueCache changes *lps.PubSub @@ -209,23 +212,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa cfg := DefaultConfig() mp := &MessagePool{ - closer: make(chan struct{}), - repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), - localAddrs: make(map[address.Address]struct{}), - pending: make(map[address.Address]*msgSet), - minGasPrice: types.NewInt(0), - pruneTrigger: make(chan struct{}, 1), - blsSigCache: cache, - sigValCache: verifcache, - changes: lps.New(50), - localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), - api: api, - netName: netName, - cfg: cfg, - rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)), - rbfDenom: types.NewInt(RbfDenom), + closer: make(chan struct{}), + repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), + localAddrs: make(map[address.Address]struct{}), + pending: make(map[address.Address]*msgSet), + minGasPrice: types.NewInt(0), + pruneTrigger: make(chan struct{}, 1), + pruneCooldown: make(chan struct{}, 1), + blsSigCache: cache, + sigValCache: verifcache, + changes: lps.New(50), + localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), + api: api, + netName: netName, + cfg: cfg, + rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)), + rbfDenom: types.NewInt(RbfDenom), } + // enable initial prunes + mp.pruneCooldown <- struct{}{} + if err := mp.loadLocal(); err != nil { log.Errorf("loading local messages: %+v", err) } @@ -249,10 +256,12 @@ func (mp *MessagePool) Close() error { } func (mp *MessagePool) Prune() { - //so, its a single slot buffered channel. The first send fills the channel, - //the second send goes through when the pruning starts, - //and the third send goes through (and noops) after the pruning finishes - //and goes through the loop again + // this magic incantation of triggering prune thrice is here to make the Prune method + // synchronous: + // so, its a single slot buffered channel. The first send fills the channel, + // the second send goes through when the pruning starts, + // and the third send goes through (and noops) after the pruning finishes + // and goes through the loop again mp.pruneTrigger <- struct{}{} mp.pruneTrigger <- struct{}{} mp.pruneTrigger <- struct{}{} diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index a39b7d266..143dd029e 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -23,7 +23,17 @@ func (mp *MessagePool) pruneExcessMessages() error { return nil } - return mp.pruneMessages(context.TODO(), ts) + select { + case <-mp.pruneCooldown: + err := mp.pruneMessages(context.TODO(), ts) + go func() { + time.Sleep(mp.cfg.PruneCooldown) + mp.pruneCooldown <- struct{}{} + }() + return err + default: + return xerrors.New("cannot prune before cooldown") + } } func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error { diff --git a/chain/types/mpool.go b/chain/types/mpool.go index a55a835ca..cd6a99a47 100644 --- a/chain/types/mpool.go +++ b/chain/types/mpool.go @@ -1,6 +1,8 @@ package types import ( + "time" + "github.com/filecoin-project/go-address" ) @@ -9,6 +11,7 @@ type MpoolConfig struct { SizeLimitHigh int SizeLimitLow int ReplaceByFeeRatio float64 + PruneCooldown time.Duration } func (mc *MpoolConfig) Clone() *MpoolConfig {