Merge pull request #5350 from GFZRZK/GFZRZK/optmize/avoid_race_in_mpool_cfg
avoid use mp.cfg directly to avoid race
This commit is contained in:
commit
7c7301f701
@ -48,9 +48,13 @@ func saveConfig(cfg *types.MpoolConfig, ds dtypes.MetadataDS) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) GetConfig() *types.MpoolConfig {
|
func (mp *MessagePool) GetConfig() *types.MpoolConfig {
|
||||||
mp.cfgLk.Lock()
|
return mp.getConfig().Clone()
|
||||||
defer mp.cfgLk.Unlock()
|
}
|
||||||
return mp.cfg.Clone()
|
|
||||||
|
func (mp *MessagePool) getConfig() *types.MpoolConfig {
|
||||||
|
mp.cfgLk.RLock()
|
||||||
|
defer mp.cfgLk.RUnlock()
|
||||||
|
return mp.cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateConfg(cfg *types.MpoolConfig) error {
|
func validateConfg(cfg *types.MpoolConfig) error {
|
||||||
|
@ -133,7 +133,7 @@ type MessagePool struct {
|
|||||||
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
||||||
curTs *types.TipSet
|
curTs *types.TipSet
|
||||||
|
|
||||||
cfgLk sync.Mutex
|
cfgLk sync.RWMutex
|
||||||
cfg *types.MpoolConfig
|
cfg *types.MpoolConfig
|
||||||
|
|
||||||
api Provider
|
api Provider
|
||||||
@ -781,7 +781,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
|||||||
|
|
||||||
if incr {
|
if incr {
|
||||||
mp.currentSize++
|
mp.currentSize++
|
||||||
if mp.currentSize > mp.cfg.SizeLimitHigh {
|
if mp.currentSize > mp.getConfig().SizeLimitHigh {
|
||||||
// send signal to prune messages if it hasnt already been sent
|
// send signal to prune messages if it hasnt already been sent
|
||||||
select {
|
select {
|
||||||
case mp.pruneTrigger <- struct{}{}:
|
case mp.pruneTrigger <- struct{}{}:
|
||||||
|
@ -19,7 +19,8 @@ func (mp *MessagePool) pruneExcessMessages() error {
|
|||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
if mp.currentSize < mp.cfg.SizeLimitHigh {
|
mpCfg := mp.getConfig()
|
||||||
|
if mp.currentSize < mpCfg.SizeLimitHigh {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -27,7 +28,7 @@ func (mp *MessagePool) pruneExcessMessages() error {
|
|||||||
case <-mp.pruneCooldown:
|
case <-mp.pruneCooldown:
|
||||||
err := mp.pruneMessages(context.TODO(), ts)
|
err := mp.pruneMessages(context.TODO(), ts)
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(mp.cfg.PruneCooldown)
|
time.Sleep(mpCfg.PruneCooldown)
|
||||||
mp.pruneCooldown <- struct{}{}
|
mp.pruneCooldown <- struct{}{}
|
||||||
}()
|
}()
|
||||||
return err
|
return err
|
||||||
@ -53,8 +54,9 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
|
|||||||
// protected actors -- not pruned
|
// protected actors -- not pruned
|
||||||
protected := make(map[address.Address]struct{})
|
protected := make(map[address.Address]struct{})
|
||||||
|
|
||||||
|
mpCfg := mp.getConfig()
|
||||||
// we never prune priority addresses
|
// we never prune priority addresses
|
||||||
for _, actor := range mp.cfg.PriorityAddrs {
|
for _, actor := range mpCfg.PriorityAddrs {
|
||||||
protected[actor] = struct{}{}
|
protected[actor] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,7 +92,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
|
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
|
||||||
loWaterMark := mp.cfg.SizeLimitLow
|
loWaterMark := mpCfg.SizeLimitLow
|
||||||
keepLoop:
|
keepLoop:
|
||||||
for _, chain := range chains {
|
for _, chain := range chains {
|
||||||
for _, m := range chain.msgs {
|
for _, m := range chain.msgs {
|
||||||
|
@ -532,14 +532,14 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
|||||||
log.Infow("select priority messages done", "took", dt)
|
log.Infow("select priority messages done", "took", dt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
mpCfg := mp.getConfig()
|
||||||
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
|
result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow)
|
||||||
gasLimit := int64(build.BlockGasLimit)
|
gasLimit := int64(build.BlockGasLimit)
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
|
|
||||||
// 1. Get priority actor chains
|
// 1. Get priority actor chains
|
||||||
var chains []*msgChain
|
var chains []*msgChain
|
||||||
priority := mp.cfg.PriorityAddrs
|
priority := mpCfg.PriorityAddrs
|
||||||
for _, actor := range priority {
|
for _, actor := range priority {
|
||||||
mset, ok := pending[actor]
|
mset, ok := pending[actor]
|
||||||
if ok {
|
if ok {
|
||||||
|
Loading…
Reference in New Issue
Block a user