172 lines
4.1 KiB
Go
172 lines
4.1 KiB
Go
package messagepool
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"time"
|
|
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/ipfs/go-cid"
|
|
)
|
|
|
|
const repubMsgLimit = 30
|
|
|
|
var RepublishBatchDelay = 200 * time.Millisecond
|
|
|
|
func (mp *MessagePool) republishPendingMessages() error {
|
|
mp.curTsLk.Lock()
|
|
ts := mp.curTs
|
|
|
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
|
if err != nil {
|
|
mp.curTsLk.Unlock()
|
|
return xerrors.Errorf("computing basefee: %w", err)
|
|
}
|
|
|
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
|
mp.lk.Lock()
|
|
mp.republished = nil // clear this to avoid races triggering an early republish
|
|
for actor := range mp.localAddrs {
|
|
mset, ok := mp.pending[actor]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if len(mset.msgs) == 0 {
|
|
continue
|
|
}
|
|
// we need to copy this while holding the lock to avoid races with concurrent modification
|
|
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
|
for nonce, m := range mset.msgs {
|
|
pend[nonce] = m
|
|
}
|
|
pending[actor] = pend
|
|
}
|
|
mp.lk.Unlock()
|
|
mp.curTsLk.Unlock()
|
|
|
|
if len(pending) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var chains []*msgChain
|
|
for actor, mset := range pending {
|
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
|
chains = append(chains, next...)
|
|
}
|
|
|
|
if len(chains) == 0 {
|
|
return nil
|
|
}
|
|
|
|
sort.Slice(chains, func(i, j int) bool {
|
|
return chains[i].Before(chains[j])
|
|
})
|
|
|
|
// we don't republish negative performing chains; this is an error that will be screamed
|
|
// at the user
|
|
if chains[0].gasPerf < 0 {
|
|
return xerrors.Errorf("skipping republish: all message chains have negative gas performance; best gas performance: %f", chains[0].gasPerf)
|
|
}
|
|
|
|
gasLimit := int64(build.BlockGasLimit)
|
|
minGas := int64(gasguess.MinGas)
|
|
var msgs []*types.SignedMessage
|
|
for i := 0; i < len(chains); {
|
|
chain := chains[i]
|
|
|
|
// we can exceed this if we have picked (some) longer chain already
|
|
if len(msgs) > repubMsgLimit {
|
|
break
|
|
}
|
|
|
|
// there is not enough gas for any message
|
|
if gasLimit <= minGas {
|
|
break
|
|
}
|
|
|
|
// we don't republish negative performing chains, as they won't be included in
|
|
// a block anyway
|
|
if chain.gasPerf < 0 {
|
|
break
|
|
}
|
|
|
|
// has the chain been invalidated?
|
|
if !chain.valid {
|
|
i++
|
|
continue
|
|
}
|
|
|
|
// does it fit in a block?
|
|
if chain.gasLimit <= gasLimit {
|
|
gasLimit -= chain.gasLimit
|
|
msgs = append(msgs, chain.msgs...)
|
|
i++
|
|
continue
|
|
}
|
|
|
|
// we can't fit the current chain but there is gas to spare
|
|
// trim it and push it down
|
|
chain.Trim(gasLimit, mp, baseFee, ts)
|
|
for j := i; j < len(chains)-1; j++ {
|
|
if chains[j].Before(chains[j+1]) {
|
|
break
|
|
}
|
|
chains[j], chains[j+1] = chains[j+1], chains[j]
|
|
}
|
|
}
|
|
|
|
count := 0
|
|
log.Infof("republishing %d messages", len(msgs))
|
|
for _, m := range msgs {
|
|
mb, err := m.Serialize()
|
|
if err != nil {
|
|
return xerrors.Errorf("cannot serialize message: %w", err)
|
|
}
|
|
|
|
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb)
|
|
if err != nil {
|
|
return xerrors.Errorf("cannot publish: %w", err)
|
|
}
|
|
|
|
count++
|
|
|
|
if count < len(msgs) {
|
|
// this delay is here to encourage the pubsub subsystem to process the messages serially
|
|
// and avoid creating nonce gaps because of concurrent validation.
|
|
time.Sleep(RepublishBatchDelay)
|
|
}
|
|
}
|
|
|
|
if len(msgs) > 0 {
|
|
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
|
|
msgs := make([]MessagePoolEvtMessage, 0, len(msgs))
|
|
for _, m := range msgs {
|
|
msgs = append(msgs, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
|
|
}
|
|
return MessagePoolEvt{
|
|
Action: "repub",
|
|
Messages: msgs,
|
|
}
|
|
})
|
|
}
|
|
|
|
// track most recently republished messages
|
|
republished := make(map[cid.Cid]struct{})
|
|
for _, m := range msgs[:count] {
|
|
republished[m.Cid()] = struct{}{}
|
|
}
|
|
|
|
mp.lk.Lock()
|
|
// update the republished set so that we can trigger early republish from head changes
|
|
mp.republished = republished
|
|
mp.lk.Unlock()
|
|
|
|
return nil
|
|
}
|