diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index fe4011629..2a889c68d 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log/v2" lps "github.com/whyrusleeping/pubsub" - "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -38,8 +37,6 @@ var log = logging.Logger("messagepool") const futureDebug = false -const repubMsgLimit = 5 - const RbfDenom = 256 var ( @@ -217,63 +214,8 @@ func (mp *MessagePool) runLoop() { for { select { case <-mp.repubTk.C: - mp.lk.Lock() - - msgsForAddr := make(map[address.Address][]*types.SignedMessage) - for a := range mp.localAddrs { - msgsForAddr[a] = mp.pendingFor(a) - } - - mp.lk.Unlock() - - var errout error - outputMsgs := []*types.SignedMessage{} - - for a, msgs := range msgsForAddr { - a, err := mp.api.StateGetActor(a, nil) - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err)) - continue - } - - curNonce := a.Nonce - for _, m := range msgs { - if m.Message.Nonce < curNonce { - continue - } - if m.Message.Nonce != curNonce { - break - } - outputMsgs = append(outputMsgs, m) - curNonce++ - } - - } - - if len(outputMsgs) != 0 { - log.Infow("republishing local messages", "n", len(outputMsgs)) - } - - if len(outputMsgs) > repubMsgLimit { - outputMsgs = outputMsgs[:repubMsgLimit] - } - - for _, msg := range outputMsgs { - msgb, err := msg.Serialize() - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err)) - continue - } - - err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err)) - continue - } - } - - if errout != nil { - log.Errorf("errors while republishing: %+v", errout) + if err := mp.republishPendingMessages(); err != nil { + log.Errorf("error while republishing messages: %s", err) } case <-mp.pruneTrigger: if err := mp.pruneExcessMessages(); err != nil { @@ -284,7 +226,6 @@ func (mp *MessagePool) runLoop() { return } } - } func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go new file mode 100644 index 000000000..935ff17ec --- /dev/null +++ b/chain/messagepool/repub.go @@ -0,0 +1,106 @@ +package messagepool + +import ( + "context" + "sort" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" +) + +const repubMsgLimit = 5 + +func (mp *MessagePool) republishPendingMessages() error { + mp.curTsLk.Lock() + ts := mp.curTs + mp.curTsLk.Unlock() + + baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + if err != nil { + return xerrors.Errorf("computing basefee: %w", err) + } + + pending := make(map[address.Address]map[uint64]*types.SignedMessage) + mp.lk.Lock() + for actor := range mp.localAddrs { + mset, ok := mp.pending[actor] + if !ok { + continue + } + if len(mset.msgs) == 0 { + continue + } + // we need to copy this while holding the lock to avoid races with concurrent modification + pend := make(map[uint64]*types.SignedMessage, len(mset.msgs)) + for nonce, m := range mset.msgs { + pend[nonce] = m + } + pending[actor] = pend + } + mp.lk.Unlock() + + if len(pending) == 0 { + return nil + } + + var chains []*msgChain + for actor, mset := range pending { + next := mp.createMessageChains(actor, mset, baseFee, ts) + chains = append(chains, next...) + } + + if len(chains) == 0 { + return nil + } + + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) + }) + + // we don't republish negative performing chains; this is an error that will be screamed + // at the user + if chains[0].gasPerf < 0 { + return xerrors.Errorf("skipping republish: all message chains have negative gas performance; best gas performance: %f", chains[0].gasPerf) + } + + gasLimit := int64(build.BlockGasLimit) + var msgs []*types.SignedMessage + for _, chain := range chains { + // we can exceed this if we have picked (some) longer chain already + if len(msgs) > repubMsgLimit { + break + } + + // we don't republish negative performing chains, as they won't be included in + // a block anyway + if chain.gasPerf < 0 { + break + } + + // we don't exceed the block gasLimit in our republish endeavor + if chain.gasLimit > gasLimit { + break + } + + gasLimit -= chain.gasLimit + msgs = append(msgs, chain.msgs...) + } + + log.Infof("republishing %d messages", len(msgs)) + for _, m := range msgs { + mb, err := m.Serialize() + if err != nil { + return xerrors.Errorf("cannot serialize message: %w", err) + } + + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb) + if err != nil { + return xerrors.Errorf("cannot publish: %w", err) + } + } + + return nil +}