lotus/chain/messagepool/repub.go

186 lines
4.5 KiB
Go
Raw Normal View History

2020-08-10 08:07:36 +00:00
package messagepool
import (
"context"
"sort"
"time"
2020-08-10 08:07:36 +00:00
2022-06-14 15:00:51 +00:00
"github.com/ipfs/go-cid"
2020-08-10 08:07:36 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
2022-06-14 15:00:51 +00:00
2020-08-10 08:07:36 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
2020-08-10 08:07:36 +00:00
"github.com/filecoin-project/lotus/chain/types"
)
2020-08-11 11:26:06 +00:00
const repubMsgLimit = 30
2020-08-10 08:07:36 +00:00
var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
2020-08-10 08:07:36 +00:00
mp.curTsLk.Lock()
ts := mp.curTs
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
if err != nil {
2020-08-12 06:18:04 +00:00
mp.curTsLk.Unlock()
2020-08-10 08:07:36 +00:00
return xerrors.Errorf("computing basefee: %w", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
2020-08-10 08:07:36 +00:00
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
mset, ok, err := mp.getPendingMset(ctx, actor)
if err != nil {
log.Debugf("failed to get mset: %w", err)
return
}
2020-08-10 08:07:36 +00:00
if !ok {
return
2020-08-10 08:07:36 +00:00
}
if len(mset.msgs) == 0 {
return
2020-08-10 08:07:36 +00:00
}
// we need to copy this while holding the lock to avoid races with concurrent modification
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
for nonce, m := range mset.msgs {
pend[nonce] = m
}
pending[actor] = pend
})
2020-08-10 08:07:36 +00:00
mp.lk.Unlock()
2020-08-12 06:18:04 +00:00
mp.curTsLk.Unlock()
2020-08-10 08:07:36 +00:00
if len(pending) == 0 {
return nil
}
var chains []*msgChain
for actor, mset := range pending {
// We use the baseFee lower bound for createChange so that we optimistically include
// chains that might become profitable in the next 20 blocks.
// We still check the lowerBound condition for individual messages so that we don't send
// messages that will be rejected by the mpool spam protector, so this is safe to do.
next := mp.createMessageChains(actor, mset, baseFeeLowerBound, ts)
2020-08-10 08:07:36 +00:00
chains = append(chains, next...)
}
if len(chains) == 0 {
return nil
}
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
2020-08-10 08:07:36 +00:00
var msgs []*types.SignedMessage
loop:
for i := 0; i < len(chains); {
chain := chains[i]
2020-08-10 08:07:36 +00:00
// we can exceed this if we have picked (some) longer chain already
if len(msgs) > repubMsgLimit {
break
}
// there is not enough gas for any message
if gasLimit <= minGas {
break
}
// has the chain been invalidated?
if !chain.valid {
i++
continue
2020-08-10 08:07:36 +00:00
}
// does it fit in a block?
if chain.gasLimit <= gasLimit {
// check the baseFee lower bound -- only republish messages that can be included in the chain
// within the next 20 blocks.
for _, m := range chain.msgs {
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
chain.Invalidate()
continue loop
}
gasLimit -= m.Message.GasLimit
msgs = append(msgs, m)
}
// we processed the whole chain, advance
i++
continue
}
// we can't fit the current chain but there is gas to spare
// trim it and push it down
chain.Trim(gasLimit, repubMsgLimit, mp, baseFee)
for j := i; j < len(chains)-1; j++ {
if chains[j].Before(chains[j+1]) {
break
}
chains[j], chains[j+1] = chains[j+1], chains[j]
}
2020-08-10 08:07:36 +00:00
}
count := 0
if len(msgs) > repubMsgLimit {
msgs = msgs[:repubMsgLimit]
}
2020-08-10 08:07:36 +00:00
log.Infof("republishing %d messages", len(msgs))
for _, m := range msgs {
mb, err := m.Serialize()
if err != nil {
return xerrors.Errorf("cannot serialize message: %w", err)
}
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb)
if err != nil {
return xerrors.Errorf("cannot publish: %w", err)
}
count++
if count < len(msgs) {
// this delay is here to encourage the pubsub subsystem to process the messages serially
// and avoid creating nonce gaps because of concurrent validation.
time.Sleep(RepublishBatchDelay)
}
}
if len(msgs) > 0 {
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
msgsEv := make([]MessagePoolEvtMessage, 0, len(msgs))
for _, m := range msgs {
msgsEv = append(msgsEv, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
}
return MessagePoolEvt{
Action: "repub",
Messages: msgsEv,
}
})
}
// track most recently republished messages
republished := make(map[cid.Cid]struct{})
for _, m := range msgs[:count] {
republished[m.Cid()] = struct{}{}
2020-08-10 08:07:36 +00:00
}
mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.republished = republished
mp.lk.Unlock()
2020-08-10 08:07:36 +00:00
return nil
}