trigger early republish from head changes
This commit is contained in:
parent
548e94d3fc
commit
3b6e2bdb7b
@ -75,8 +75,12 @@ type MessagePool struct {
|
|||||||
|
|
||||||
addSema chan struct{}
|
addSema chan struct{}
|
||||||
|
|
||||||
closer chan struct{}
|
closer chan struct{}
|
||||||
repubTk *clock.Ticker
|
|
||||||
|
repubTk *clock.Ticker
|
||||||
|
repubTrigger chan struct{}
|
||||||
|
|
||||||
|
republished map[cid.Cid]struct{}
|
||||||
|
|
||||||
localAddrs map[address.Address]struct{}
|
localAddrs map[address.Address]struct{}
|
||||||
|
|
||||||
@ -166,6 +170,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
addSema: make(chan struct{}, 1),
|
addSema: make(chan struct{}, 1),
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: build.Clock.Ticker(RepublishInterval),
|
repubTk: build.Clock.Ticker(RepublishInterval),
|
||||||
|
repubTrigger: make(chan struct{}, 1),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
@ -224,6 +229,10 @@ func (mp *MessagePool) runLoop() {
|
|||||||
if err := mp.republishPendingMessages(); err != nil {
|
if err := mp.republishPendingMessages(); err != nil {
|
||||||
log.Errorf("error while republishing messages: %s", err)
|
log.Errorf("error while republishing messages: %s", err)
|
||||||
}
|
}
|
||||||
|
case <-mp.repubTrigger:
|
||||||
|
if err := mp.republishPendingMessages(); err != nil {
|
||||||
|
log.Errorf("error while republishing messages: %s", err)
|
||||||
|
}
|
||||||
case <-mp.pruneTrigger:
|
case <-mp.pruneTrigger:
|
||||||
if err := mp.pruneExcessMessages(); err != nil {
|
if err := mp.pruneExcessMessages(); err != nil {
|
||||||
log.Errorf("failed to prune excess messages from mempool: %s", err)
|
log.Errorf("failed to prune excess messages from mempool: %s", err)
|
||||||
@ -676,6 +685,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
repubTrigger := false
|
||||||
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
|
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
add := func(m *types.SignedMessage) {
|
add := func(m *types.SignedMessage) {
|
||||||
s, ok := rmsgs[m.Message.From]
|
s, ok := rmsgs[m.Message.From]
|
||||||
@ -700,6 +710,17 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.Remove(from, nonce)
|
mp.Remove(from, nonce)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maybeRepub := func(cid cid.Cid) {
|
||||||
|
if !repubTrigger {
|
||||||
|
mp.lk.Lock()
|
||||||
|
_, republished := mp.republished[cid]
|
||||||
|
mp.lk.Unlock()
|
||||||
|
if republished {
|
||||||
|
repubTrigger = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, ts := range revert {
|
for _, ts := range revert {
|
||||||
pts, err := mp.api.LoadTipSet(ts.Parents())
|
pts, err := mp.api.LoadTipSet(ts.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -726,16 +747,25 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
}
|
}
|
||||||
for _, msg := range smsgs {
|
for _, msg := range smsgs {
|
||||||
rm(msg.Message.From, msg.Message.Nonce)
|
rm(msg.Message.From, msg.Message.Nonce)
|
||||||
|
maybeRepub(msg.Cid())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range bmsgs {
|
for _, msg := range bmsgs {
|
||||||
rm(msg.From, msg.Nonce)
|
rm(msg.From, msg.Nonce)
|
||||||
|
maybeRepub(msg.Cid())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.curTs = ts
|
mp.curTs = ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if repubTrigger {
|
||||||
|
select {
|
||||||
|
case mp.repubTrigger <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, s := range rmsgs {
|
for _, s := range rmsgs {
|
||||||
for _, msg := range s {
|
for _, msg := range s {
|
||||||
if err := mp.addSkipChecks(msg); err != nil {
|
if err := mp.addSkipChecks(msg); err != nil {
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
const repubMsgLimit = 30
|
const repubMsgLimit = 30
|
||||||
@ -26,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
|
|
||||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
|
mp.republished = nil // clear this to avoid races triggering an early republish
|
||||||
for actor := range mp.localAddrs {
|
for actor := range mp.localAddrs {
|
||||||
mset, ok := mp.pending[actor]
|
mset, ok := mp.pending[actor]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -115,6 +117,7 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
log.Infof("republishing %d messages", len(msgs))
|
log.Infof("republishing %d messages", len(msgs))
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
mb, err := m.Serialize()
|
mb, err := m.Serialize()
|
||||||
@ -126,7 +129,20 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("cannot publish: %w", err)
|
return xerrors.Errorf("cannot publish: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// track most recently republished messages
|
||||||
|
republished := make(map[cid.Cid]struct{})
|
||||||
|
for _, m := range msgs[:count] {
|
||||||
|
republished[m.Cid()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.lk.Lock()
|
||||||
|
// update the republished set so that we can trigger early republish from head changes
|
||||||
|
mp.republished = republished
|
||||||
|
mp.lk.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user