diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0d47e488e..c18f5f5ae 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -20,14 +20,12 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" lps "github.com/whyrusleeping/pubsub" - "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/lib/sigs" @@ -40,10 +38,10 @@ var log = logging.Logger("messagepool") const futureDebug = false -const repubMsgLimit = 5 - const RbfDenom = 256 +var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second + var ( ErrMessageTooBig = errors.New("message too big") @@ -147,69 +145,6 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) { return !has, nil } -type Provider interface { - SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet - PutMessage(m types.ChainMsg) (cid.Cid, error) - PubSubPublish(string, []byte) error - StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) - StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) - MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) - MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) - LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) - ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) -} - -type mpoolProvider struct { - sm *stmgr.StateManager - ps *pubsub.PubSub -} - -func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { - return &mpoolProvider{sm: sm, ps: ps} -} - -func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { - mpp.sm.ChainStore().SubscribeHeadChanges(cb) - return mpp.sm.ChainStore().GetHeaviestTipSet() -} - -func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) { - return mpp.sm.ChainStore().PutMessage(m) -} - -func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { - return mpp.ps.Publish(k, v) -} - -func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { - var act types.Actor - return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) -} - -func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { - return mpp.sm.ResolveToKeyAddress(ctx, addr, ts) -} - -func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { - return mpp.sm.ChainStore().MessagesForBlock(h) -} - -func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) { - return mpp.sm.ChainStore().MessagesForTipset(ts) -} - -func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) { - return mpp.sm.ChainStore().LoadTipSet(tsk) -} - -func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { - baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts) - if err != nil { - return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err) - } - return baseFee, nil -} - func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize) @@ -224,7 +159,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa mp := &MessagePool{ ds: ds, closer: make(chan struct{}), - repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * time.Second), + repubTk: build.Clock.Ticker(RepublishInterval), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), minGasPrice: types.NewInt(0), @@ -282,63 +217,8 @@ func (mp *MessagePool) runLoop() { for { select { case <-mp.repubTk.C: - mp.lk.Lock() - - msgsForAddr := make(map[address.Address][]*types.SignedMessage) - for a := range mp.localAddrs { - msgsForAddr[a] = mp.pendingFor(a) - } - - mp.lk.Unlock() - - var errout error - outputMsgs := []*types.SignedMessage{} - - for a, msgs := range msgsForAddr { - a, err := mp.api.StateGetActor(a, nil) - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err)) - continue - } - - curNonce := a.Nonce - for _, m := range msgs { - if m.Message.Nonce < curNonce { - continue - } - if m.Message.Nonce != curNonce { - break - } - outputMsgs = append(outputMsgs, m) - curNonce++ - } - - } - - if len(outputMsgs) != 0 { - log.Infow("republishing local messages", "n", len(outputMsgs)) - } - - if len(outputMsgs) > repubMsgLimit { - outputMsgs = outputMsgs[:repubMsgLimit] - } - - for _, msg := range outputMsgs { - msgb, err := msg.Serialize() - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err)) - continue - } - - err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) - if err != nil { - errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err)) - continue - } - } - - if errout != nil { - log.Errorf("errors while republishing: %+v", errout) + if err := mp.republishPendingMessages(); err != nil { + log.Errorf("error while republishing messages: %s", err) } case <-mp.pruneTrigger: if err := mp.pruneExcessMessages(); err != nil { @@ -349,7 +229,6 @@ func (mp *MessagePool) runLoop() { return } } - } func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go new file mode 100644 index 000000000..a6aa79ef6 --- /dev/null +++ b/chain/messagepool/provider.go @@ -0,0 +1,76 @@ +package messagepool + +import ( + "context" + + "github.com/ipfs/go-cid" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/types" +) + +type Provider interface { + SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet + PutMessage(m types.ChainMsg) (cid.Cid, error) + PubSubPublish(string, []byte) error + StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) + StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) + MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) + MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) + LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) + ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) +} + +type mpoolProvider struct { + sm *stmgr.StateManager + ps *pubsub.PubSub +} + +func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { + return &mpoolProvider{sm: sm, ps: ps} +} + +func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { + mpp.sm.ChainStore().SubscribeHeadChanges(cb) + return mpp.sm.ChainStore().GetHeaviestTipSet() +} + +func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) { + return mpp.sm.ChainStore().PutMessage(m) +} + +func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { + return mpp.ps.Publish(k, v) +} + +func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + var act types.Actor + return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) +} + +func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { + return mpp.sm.ResolveToKeyAddress(ctx, addr, ts) +} + +func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + return mpp.sm.ChainStore().MessagesForBlock(h) +} + +func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) { + return mpp.sm.ChainStore().MessagesForTipset(ts) +} + +func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) { + return mpp.sm.ChainStore().LoadTipSet(tsk) +} + +func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { + baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts) + if err != nil { + return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err) + } + return baseFee, nil +} diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go new file mode 100644 index 000000000..a55bc4f3f --- /dev/null +++ b/chain/messagepool/repub.go @@ -0,0 +1,132 @@ +package messagepool + +import ( + "context" + "sort" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/messagepool/gasguess" + "github.com/filecoin-project/lotus/chain/types" +) + +const repubMsgLimit = 30 + +func (mp *MessagePool) republishPendingMessages() error { + mp.curTsLk.Lock() + ts := mp.curTs + + baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + if err != nil { + mp.curTsLk.Unlock() + return xerrors.Errorf("computing basefee: %w", err) + } + + pending := make(map[address.Address]map[uint64]*types.SignedMessage) + mp.lk.Lock() + for actor := range mp.localAddrs { + mset, ok := mp.pending[actor] + if !ok { + continue + } + if len(mset.msgs) == 0 { + continue + } + // we need to copy this while holding the lock to avoid races with concurrent modification + pend := make(map[uint64]*types.SignedMessage, len(mset.msgs)) + for nonce, m := range mset.msgs { + pend[nonce] = m + } + pending[actor] = pend + } + mp.lk.Unlock() + mp.curTsLk.Unlock() + + if len(pending) == 0 { + return nil + } + + var chains []*msgChain + for actor, mset := range pending { + next := mp.createMessageChains(actor, mset, baseFee, ts) + chains = append(chains, next...) + } + + if len(chains) == 0 { + return nil + } + + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) + }) + + // we don't republish negative performing chains; this is an error that will be screamed + // at the user + if chains[0].gasPerf < 0 { + return xerrors.Errorf("skipping republish: all message chains have negative gas performance; best gas performance: %f", chains[0].gasPerf) + } + + gasLimit := int64(build.BlockGasLimit) + minGas := int64(gasguess.MinGas) + var msgs []*types.SignedMessage + for i := 0; i < len(chains); { + chain := chains[i] + + // we can exceed this if we have picked (some) longer chain already + if len(msgs) > repubMsgLimit { + break + } + + // there is not enough gas for any message + if gasLimit <= minGas { + break + } + + // we don't republish negative performing chains, as they won't be included in + // a block anyway + if chain.gasPerf < 0 { + break + } + + // has the chain been invalidated? + if !chain.valid { + i++ + continue + } + + // does it fit in a block? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + msgs = append(msgs, chain.msgs...) + i++ + continue + } + + // we can't fit the current chain but there is gas to spare + // trim it and push it down + chain.Trim(gasLimit, mp, baseFee, ts, false) + for j := i; j < len(chains)-1; j++ { + if chains[j].Before(chains[j+1]) { + break + } + chains[j], chains[j+1] = chains[j+1], chains[j] + } + } + + log.Infof("republishing %d messages", len(msgs)) + for _, m := range msgs { + mb, err := m.Serialize() + if err != nil { + return xerrors.Errorf("cannot serialize message: %w", err) + } + + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), mb) + if err != nil { + return xerrors.Errorf("cannot publish: %w", err) + } + } + + return nil +}