Merge pull request #2939 from filecoin-project/feat/mpool-repub
Update messagepool republish logic
This commit is contained in:
commit
c68b023d3f
@ -20,14 +20,12 @@ import (
|
|||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
lps "github.com/whyrusleeping/pubsub"
|
lps "github.com/whyrusleeping/pubsub"
|
||||||
"go.uber.org/multierr"
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
@ -40,10 +38,10 @@ var log = logging.Logger("messagepool")
|
|||||||
|
|
||||||
const futureDebug = false
|
const futureDebug = false
|
||||||
|
|
||||||
const repubMsgLimit = 5
|
|
||||||
|
|
||||||
const RbfDenom = 256
|
const RbfDenom = 256
|
||||||
|
|
||||||
|
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrMessageTooBig = errors.New("message too big")
|
ErrMessageTooBig = errors.New("message too big")
|
||||||
|
|
||||||
@ -147,69 +145,6 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
|||||||
return !has, nil
|
return !has, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Provider interface {
|
|
||||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
|
||||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
|
||||||
PubSubPublish(string, []byte) error
|
|
||||||
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
|
||||||
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
|
||||||
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
|
||||||
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
|
||||||
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
|
||||||
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type mpoolProvider struct {
|
|
||||||
sm *stmgr.StateManager
|
|
||||||
ps *pubsub.PubSub
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
|
||||||
return &mpoolProvider{sm: sm, ps: ps}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
|
||||||
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
|
||||||
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
|
|
||||||
return mpp.sm.ChainStore().PutMessage(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
|
||||||
return mpp.ps.Publish(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
|
||||||
var act types.Actor
|
|
||||||
return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act)))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
|
||||||
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
|
||||||
return mpp.sm.ChainStore().MessagesForBlock(h)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
|
||||||
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
|
|
||||||
return mpp.sm.ChainStore().LoadTipSet(tsk)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
|
|
||||||
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
|
|
||||||
if err != nil {
|
|
||||||
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
|
|
||||||
}
|
|
||||||
return baseFee, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
@ -224,7 +159,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
ds: ds,
|
ds: ds,
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * time.Second),
|
repubTk: build.Clock.Ticker(RepublishInterval),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
@ -282,63 +217,8 @@ func (mp *MessagePool) runLoop() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-mp.repubTk.C:
|
case <-mp.repubTk.C:
|
||||||
mp.lk.Lock()
|
if err := mp.republishPendingMessages(); err != nil {
|
||||||
|
log.Errorf("error while republishing messages: %s", err)
|
||||||
msgsForAddr := make(map[address.Address][]*types.SignedMessage)
|
|
||||||
for a := range mp.localAddrs {
|
|
||||||
msgsForAddr[a] = mp.pendingFor(a)
|
|
||||||
}
|
|
||||||
|
|
||||||
mp.lk.Unlock()
|
|
||||||
|
|
||||||
var errout error
|
|
||||||
outputMsgs := []*types.SignedMessage{}
|
|
||||||
|
|
||||||
for a, msgs := range msgsForAddr {
|
|
||||||
a, err := mp.api.StateGetActor(a, nil)
|
|
||||||
if err != nil {
|
|
||||||
errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
curNonce := a.Nonce
|
|
||||||
for _, m := range msgs {
|
|
||||||
if m.Message.Nonce < curNonce {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if m.Message.Nonce != curNonce {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
outputMsgs = append(outputMsgs, m)
|
|
||||||
curNonce++
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputMsgs) != 0 {
|
|
||||||
log.Infow("republishing local messages", "n", len(outputMsgs))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputMsgs) > repubMsgLimit {
|
|
||||||
outputMsgs = outputMsgs[:repubMsgLimit]
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, msg := range outputMsgs {
|
|
||||||
msgb, err := msg.Serialize()
|
|
||||||
if err != nil {
|
|
||||||
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
|
||||||
if err != nil {
|
|
||||||
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if errout != nil {
|
|
||||||
log.Errorf("errors while republishing: %+v", errout)
|
|
||||||
}
|
}
|
||||||
case <-mp.pruneTrigger:
|
case <-mp.pruneTrigger:
|
||||||
if err := mp.pruneExcessMessages(); err != nil {
|
if err := mp.pruneExcessMessages(); err != nil {
|
||||||
@ -349,7 +229,6 @@ func (mp *MessagePool) runLoop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
||||||
|
76
chain/messagepool/provider.go
Normal file
76
chain/messagepool/provider.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Provider interface {
|
||||||
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||||
|
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
||||||
|
PubSubPublish(string, []byte) error
|
||||||
|
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
|
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
|
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
|
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
||||||
|
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
|
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mpoolProvider struct {
|
||||||
|
sm *stmgr.StateManager
|
||||||
|
ps *pubsub.PubSub
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||||
|
return &mpoolProvider{sm: sm, ps: ps}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
||||||
|
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
||||||
|
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
|
||||||
|
return mpp.sm.ChainStore().PutMessage(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
||||||
|
return mpp.ps.Publish(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
var act types.Actor
|
||||||
|
return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||||
|
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForBlock(h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
return mpp.sm.ChainStore().LoadTipSet(tsk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
|
||||||
|
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
|
||||||
|
if err != nil {
|
||||||
|
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
|
||||||
|
}
|
||||||
|
return baseFee, nil
|
||||||
|
}
|
132
chain/messagepool/repub.go
Normal file
132
chain/messagepool/repub.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const repubMsgLimit = 30
|
||||||
|
|
||||||
|
func (mp *MessagePool) republishPendingMessages() error {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
ts := mp.curTs
|
||||||
|
|
||||||
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
|
if err != nil {
|
||||||
|
mp.curTsLk.Unlock()
|
||||||
|
return xerrors.Errorf("computing basefee: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
|
mp.lk.Lock()
|
||||||
|
for actor := range mp.localAddrs {
|
||||||
|
mset, ok := mp.pending[actor]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(mset.msgs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// we need to copy this while holding the lock to avoid races with concurrent modification
|
||||||
|
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
||||||
|
for nonce, m := range mset.msgs {
|
||||||
|
pend[nonce] = m
|
||||||
|
}
|
||||||
|
pending[actor] = pend
|
||||||
|
}
|
||||||
|
mp.lk.Unlock()
|
||||||
|
mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
if len(pending) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var chains []*msgChain
|
||||||
|
for actor, mset := range pending {
|
||||||
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||||
|
chains = append(chains, next...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(chains) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
|
return chains[i].Before(chains[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
// we don't republish negative performing chains; this is an error that will be screamed
|
||||||
|
// at the user
|
||||||
|
if chains[0].gasPerf < 0 {
|
||||||
|
return xerrors.Errorf("skipping republish: all message chains have negative gas performance; best gas performance: %f", chains[0].gasPerf)
|
||||||
|
}
|
||||||
|
|
||||||
|
gasLimit := int64(build.BlockGasLimit)
|
||||||
|
minGas := int64(gasguess.MinGas)
|
||||||
|
var msgs []*types.SignedMessage
|
||||||
|
for i := 0; i < len(chains); {
|
||||||
|
chain := chains[i]
|
||||||
|
|
||||||
|
// we can exceed this if we have picked (some) longer chain already
|
||||||
|
if len(msgs) > repubMsgLimit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// there is not enough gas for any message
|
||||||
|
if gasLimit <= minGas {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// we don't republish negative performing chains, as they won't be included in
|
||||||
|
// a block anyway
|
||||||
|
if chain.gasPerf < 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// has the chain been invalidated?
|
||||||
|
if !chain.valid {
|
||||||
|
i++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// does it fit in a block?
|
||||||
|
if chain.gasLimit <= gasLimit {
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
msgs = append(msgs, chain.msgs...)
|
||||||
|
i++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can't fit the current chain but there is gas to spare
|
||||||
|
// trim it and push it down
|
||||||
|
chain.Trim(gasLimit, mp, baseFee, ts, false)
|
||||||
|
for j := i; j < len(chains)-1; j++ {
|
||||||
|
if chains[j].Before(chains[j+1]) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
chains[j], chains[j+1] = chains[j+1], chains[j]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("republishing %d messages", len(msgs))
|
||||||
|
for _, m := range msgs {
|
||||||
|
mb, err := m.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("cannot serialize message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("cannot publish: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user