diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 5cb2f3048..92f29b8ba 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -75,8 +75,12 @@ type MessagePool struct { addSema chan struct{} - closer chan struct{} - repubTk *clock.Ticker + closer chan struct{} + + repubTk *clock.Ticker + repubTrigger chan struct{} + + republished map[cid.Cid]struct{} localAddrs map[address.Address]struct{} @@ -166,6 +170,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa addSema: make(chan struct{}, 1), closer: make(chan struct{}), repubTk: build.Clock.Ticker(RepublishInterval), + repubTrigger: make(chan struct{}, 1), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), minGasPrice: types.NewInt(0), @@ -224,6 +229,10 @@ func (mp *MessagePool) runLoop() { if err := mp.republishPendingMessages(); err != nil { log.Errorf("error while republishing messages: %s", err) } + case <-mp.repubTrigger: + if err := mp.republishPendingMessages(); err != nil { + log.Errorf("error while republishing messages: %s", err) + } case <-mp.pruneTrigger: if err := mp.pruneExcessMessages(); err != nil { log.Errorf("failed to prune excess messages from mempool: %s", err) @@ -676,6 +685,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.curTsLk.Lock() defer mp.curTsLk.Unlock() + repubTrigger := false rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage) add := func(m *types.SignedMessage) { s, ok := rmsgs[m.Message.From] @@ -700,6 +710,17 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(from, nonce) } + maybeRepub := func(cid cid.Cid) { + if !repubTrigger { + mp.lk.Lock() + _, republished := mp.republished[cid] + mp.lk.Unlock() + if republished { + repubTrigger = true + } + } + } + for _, ts := range revert { pts, err := mp.api.LoadTipSet(ts.Parents()) if err != nil { @@ -726,16 +747,25 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } for _, msg := range smsgs { rm(msg.Message.From, msg.Message.Nonce) + maybeRepub(msg.Cid()) } for _, msg := range bmsgs { rm(msg.From, msg.Nonce) + maybeRepub(msg.Cid()) } } mp.curTs = ts } + if repubTrigger { + select { + case mp.repubTrigger <- struct{}{}: + default: + } + } + for _, s := range rmsgs { for _, msg := range s { if err := mp.addSkipChecks(msg); err != nil { diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index a55bc4f3f..ba375cab8 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/messagepool/gasguess" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" ) const repubMsgLimit = 30 @@ -26,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error { pending := make(map[address.Address]map[uint64]*types.SignedMessage) mp.lk.Lock() + mp.republished = nil // clear this to avoid races triggering an early republish for actor := range mp.localAddrs { mset, ok := mp.pending[actor] if !ok { @@ -115,6 +117,7 @@ func (mp *MessagePool) republishPendingMessages() error { } } + count := 0 log.Infof("republishing %d messages", len(msgs)) for _, m := range msgs { mb, err := m.Serialize() @@ -126,7 +129,20 @@ func (mp *MessagePool) republishPendingMessages() error { if err != nil { return xerrors.Errorf("cannot publish: %w", err) } + + count++ } + // track most recently republished messages + republished := make(map[cid.Cid]struct{}) + for _, m := range msgs[:count] { + republished[m.Cid()] = struct{}{} + } + + mp.lk.Lock() + // update the republished set so that we can trigger early republish from head changes + mp.republished = republished + mp.lk.Unlock() + return nil }