Merge pull request #3100 from filecoin-project/feat/mpool-improve-repub

trigger early republish from head changes
This commit is contained in:
Łukasz Magiera 2020-08-17 09:46:46 +02:00 committed by GitHub
commit eb616bff7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 2 deletions

View File

@ -75,8 +75,12 @@ type MessagePool struct {
addSema chan struct{}
closer chan struct{}
repubTk *clock.Ticker
closer chan struct{}
repubTk *clock.Ticker
repubTrigger chan struct{}
republished map[cid.Cid]struct{}
localAddrs map[address.Address]struct{}
@ -166,6 +170,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
addSema: make(chan struct{}, 1),
closer: make(chan struct{}),
repubTk: build.Clock.Ticker(RepublishInterval),
repubTrigger: make(chan struct{}, 1),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
@ -224,6 +229,10 @@ func (mp *MessagePool) runLoop() {
if err := mp.republishPendingMessages(); err != nil {
log.Errorf("error while republishing messages: %s", err)
}
case <-mp.repubTrigger:
if err := mp.republishPendingMessages(); err != nil {
log.Errorf("error while republishing messages: %s", err)
}
case <-mp.pruneTrigger:
if err := mp.pruneExcessMessages(); err != nil {
log.Errorf("failed to prune excess messages from mempool: %s", err)
@ -676,6 +685,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
repubTrigger := false
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
add := func(m *types.SignedMessage) {
s, ok := rmsgs[m.Message.From]
@ -700,6 +710,17 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.Remove(from, nonce)
}
maybeRepub := func(cid cid.Cid) {
if !repubTrigger {
mp.lk.Lock()
_, republished := mp.republished[cid]
mp.lk.Unlock()
if republished {
repubTrigger = true
}
}
}
for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents())
if err != nil {
@ -726,16 +747,25 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
}
for _, msg := range smsgs {
rm(msg.Message.From, msg.Message.Nonce)
maybeRepub(msg.Cid())
}
for _, msg := range bmsgs {
rm(msg.From, msg.Nonce)
maybeRepub(msg.Cid())
}
}
mp.curTs = ts
}
if repubTrigger {
select {
case mp.repubTrigger <- struct{}{}:
default:
}
}
for _, s := range rmsgs {
for _, msg := range s {
if err := mp.addSkipChecks(msg); err != nil {

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
const repubMsgLimit = 30
@ -26,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error {
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
for actor := range mp.localAddrs {
mset, ok := mp.pending[actor]
if !ok {
@ -115,6 +117,7 @@ func (mp *MessagePool) republishPendingMessages() error {
}
}
count := 0
log.Infof("republishing %d messages", len(msgs))
for _, m := range msgs {
mb, err := m.Serialize()
@ -126,7 +129,20 @@ func (mp *MessagePool) republishPendingMessages() error {
if err != nil {
return xerrors.Errorf("cannot publish: %w", err)
}
count++
}
// track most recently republished messages
republished := make(map[cid.Cid]struct{})
for _, m := range msgs[:count] {
republished[m.Cid()] = struct{}{}
}
mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.republished = republished
mp.lk.Unlock()
return nil
}