add cooldown to message pruning

This commit is contained in:
vyzo 2020-08-07 19:50:10 +03:00
parent c735d0da0e
commit 1b56b88132
4 changed files with 46 additions and 20 deletions

View File

@ -1,6 +1,8 @@
package messagepool package messagepool
import ( import (
"time"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -8,6 +10,7 @@ var (
ReplaceByFeeRatioDefault = 1.25 ReplaceByFeeRatioDefault = 1.25
MemPoolSizeLimitHiDefault = 30000 MemPoolSizeLimitHiDefault = 30000
MemPoolSizeLimitLoDefault = 20000 MemPoolSizeLimitLoDefault = 20000
PruneCooldownDefault = time.Minute
) )
func (mp *MessagePool) GetConfig() *types.MpoolConfig { func (mp *MessagePool) GetConfig() *types.MpoolConfig {
@ -29,5 +32,6 @@ func DefaultConfig() *types.MpoolConfig {
SizeLimitHigh: MemPoolSizeLimitHiDefault, SizeLimitHigh: MemPoolSizeLimitHiDefault,
SizeLimitLow: MemPoolSizeLimitLoDefault, SizeLimitLow: MemPoolSizeLimitLoDefault,
ReplaceByFeeRatio: ReplaceByFeeRatioDefault, ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
PruneCooldown: PruneCooldownDefault,
} }
} }

View File

@ -91,6 +91,9 @@ type MessagePool struct {
// pruneTrigger is a channel used to trigger a mempool pruning // pruneTrigger is a channel used to trigger a mempool pruning
pruneTrigger chan struct{} pruneTrigger chan struct{}
// pruneCooldown is a channel used to allow a cooldown time between prunes
pruneCooldown chan struct{}
blsSigCache *lru.TwoQueueCache blsSigCache *lru.TwoQueueCache
changes *lps.PubSub changes *lps.PubSub
@ -209,23 +212,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
cfg := DefaultConfig() cfg := DefaultConfig()
mp := &MessagePool{ mp := &MessagePool{
closer: make(chan struct{}), closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}), localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet), pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),
pruneTrigger: make(chan struct{}, 1), pruneTrigger: make(chan struct{}, 1),
blsSigCache: cache, pruneCooldown: make(chan struct{}, 1),
sigValCache: verifcache, blsSigCache: cache,
changes: lps.New(50), sigValCache: verifcache,
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), changes: lps.New(50),
api: api, localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
netName: netName, api: api,
cfg: cfg, netName: netName,
rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)), cfg: cfg,
rbfDenom: types.NewInt(RbfDenom), rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)),
rbfDenom: types.NewInt(RbfDenom),
} }
// enable initial prunes
mp.pruneCooldown <- struct{}{}
if err := mp.loadLocal(); err != nil { if err := mp.loadLocal(); err != nil {
log.Errorf("loading local messages: %+v", err) log.Errorf("loading local messages: %+v", err)
} }
@ -249,10 +256,12 @@ func (mp *MessagePool) Close() error {
} }
func (mp *MessagePool) Prune() { func (mp *MessagePool) Prune() {
//so, its a single slot buffered channel. The first send fills the channel, // this magic incantation of triggering prune thrice is here to make the Prune method
//the second send goes through when the pruning starts, // synchronous:
//and the third send goes through (and noops) after the pruning finishes // so, its a single slot buffered channel. The first send fills the channel,
//and goes through the loop again // the second send goes through when the pruning starts,
// and the third send goes through (and noops) after the pruning finishes
// and goes through the loop again
mp.pruneTrigger <- struct{}{} mp.pruneTrigger <- struct{}{}
mp.pruneTrigger <- struct{}{} mp.pruneTrigger <- struct{}{}
mp.pruneTrigger <- struct{}{} mp.pruneTrigger <- struct{}{}

View File

@ -23,7 +23,17 @@ func (mp *MessagePool) pruneExcessMessages() error {
return nil return nil
} }
return mp.pruneMessages(context.TODO(), ts) select {
case <-mp.pruneCooldown:
err := mp.pruneMessages(context.TODO(), ts)
go func() {
time.Sleep(mp.cfg.PruneCooldown)
mp.pruneCooldown <- struct{}{}
}()
return err
default:
return xerrors.New("cannot prune before cooldown")
}
} }
func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error { func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error {

View File

@ -1,6 +1,8 @@
package types package types
import ( import (
"time"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
) )
@ -9,6 +11,7 @@ type MpoolConfig struct {
SizeLimitHigh int SizeLimitHigh int
SizeLimitLow int SizeLimitLow int
ReplaceByFeeRatio float64 ReplaceByFeeRatio float64
PruneCooldown time.Duration
} }
func (mc *MpoolConfig) Clone() *MpoolConfig { func (mc *MpoolConfig) Clone() *MpoolConfig {