new message republishing logic

This commit is contained in:
vyzo 2020-08-10 11:07:36 +03:00
parent 42951d05a5
commit e876617c82
2 changed files with 108 additions and 61 deletions

View File

@ -19,7 +19,6 @@ import (
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
lps "github.com/whyrusleeping/pubsub"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
@ -38,8 +37,6 @@ var log = logging.Logger("messagepool")
const futureDebug = false
const repubMsgLimit = 5
const RbfDenom = 256
var (
@ -217,63 +214,8 @@ func (mp *MessagePool) runLoop() {
for {
select {
case <-mp.repubTk.C:
mp.lk.Lock()
msgsForAddr := make(map[address.Address][]*types.SignedMessage)
for a := range mp.localAddrs {
msgsForAddr[a] = mp.pendingFor(a)
}
mp.lk.Unlock()
var errout error
outputMsgs := []*types.SignedMessage{}
for a, msgs := range msgsForAddr {
a, err := mp.api.StateGetActor(a, nil)
if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err))
continue
}
curNonce := a.Nonce
for _, m := range msgs {
if m.Message.Nonce < curNonce {
continue
}
if m.Message.Nonce != curNonce {
break
}
outputMsgs = append(outputMsgs, m)
curNonce++
}
}
if len(outputMsgs) != 0 {
log.Infow("republishing local messages", "n", len(outputMsgs))
}
if len(outputMsgs) > repubMsgLimit {
outputMsgs = outputMsgs[:repubMsgLimit]
}
for _, msg := range outputMsgs {
msgb, err := msg.Serialize()
if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
continue
}
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
continue
}
}
if errout != nil {
log.Errorf("errors while republishing: %+v", errout)
if err := mp.republishPendingMessages(); err != nil {
log.Errorf("error while republishing messages: %s", err)
}
case <-mp.pruneTrigger:
if err := mp.pruneExcessMessages(); err != nil {
@ -284,7 +226,6 @@ func (mp *MessagePool) runLoop() {
return
}
}
}
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {

106
chain/messagepool/repub.go Normal file
View File

@ -0,0 +1,106 @@
package messagepool
import (
"context"
"sort"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
const repubMsgLimit = 5
func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Lock()
ts := mp.curTs
mp.curTsLk.Unlock()
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
if err != nil {
return xerrors.Errorf("computing basefee: %w", err)
}
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock()
for actor := range mp.localAddrs {
mset, ok := mp.pending[actor]
if !ok {
continue
}
if len(mset.msgs) == 0 {
continue
}
// we need to copy this while holding the lock to avoid races with concurrent modification
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
for nonce, m := range mset.msgs {
pend[nonce] = m
}
pending[actor] = pend
}
mp.lk.Unlock()
if len(pending) == 0 {
return nil
}
var chains []*msgChain
for actor, mset := range pending {
next := mp.createMessageChains(actor, mset, baseFee, ts)
chains = append(chains, next...)
}
if len(chains) == 0 {
return nil
}
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
// we don't republish negative performing chains; this is an error that will be screamed
// at the user
if chains[0].gasPerf < 0 {
return xerrors.Errorf("skipping republish: all message chains have negative gas performance; best gas performance: %f", chains[0].gasPerf)
}
gasLimit := int64(build.BlockGasLimit)
var msgs []*types.SignedMessage
for _, chain := range chains {
// we can exceed this if we have picked (some) longer chain already
if len(msgs) > repubMsgLimit {
break
}
// we don't republish negative performing chains, as they won't be included in
// a block anyway
if chain.gasPerf < 0 {
break
}
// we don't exceed the block gasLimit in our republish endeavor
if chain.gasLimit > gasLimit {
break
}
gasLimit -= chain.gasLimit
msgs = append(msgs, chain.msgs...)
}
log.Infof("republishing %d messages", len(msgs))
for _, m := range msgs {
mb, err := m.Serialize()
if err != nil {
return xerrors.Errorf("cannot serialize message: %w", err)
}
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb)
if err != nil {
return xerrors.Errorf("cannot publish: %w", err)
}
}
return nil
}