953d56e216
We have observed that EthGetTransactionCount is one of the hotspots on Glif production notes, and we are seeing regular 10-20 second latencies when calling this rpc method. I tracked the high latency spikes and they were correlated when we were running ExecuteTipSet while following the chain. To address this, we should not rely on tipset computation to get nounce and instead look at the parent tipset and then count the messages sent from the 'addr'.
150 lines
4.9 KiB
Go
150 lines
4.9 KiB
Go
package messagepool
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
var (
|
|
HeadChangeCoalesceMinDelay = 2 * time.Second
|
|
HeadChangeCoalesceMaxDelay = 6 * time.Second
|
|
HeadChangeCoalesceMergeInterval = time.Second
|
|
)
|
|
|
|
type Provider interface {
|
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
|
PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error)
|
|
PubSubPublish(string, []byte) error
|
|
GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error)
|
|
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
|
StateDeterministicAddressAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
|
StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version
|
|
MessagesForBlock(context.Context, *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
|
MessagesForTipset(context.Context, *types.TipSet) ([]types.ChainMsg, error)
|
|
LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
|
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
|
IsLite() bool
|
|
}
|
|
|
|
type mpoolProvider struct {
|
|
sm *stmgr.StateManager
|
|
ps *pubsub.PubSub
|
|
|
|
lite MpoolNonceAPI
|
|
}
|
|
|
|
var _ Provider = (*mpoolProvider)(nil)
|
|
|
|
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
|
return &mpoolProvider{sm: sm, ps: ps}
|
|
}
|
|
|
|
func NewProviderLite(sm *stmgr.StateManager, ps *pubsub.PubSub, noncer MpoolNonceAPI) Provider {
|
|
return &mpoolProvider{sm: sm, ps: ps, lite: noncer}
|
|
}
|
|
|
|
func (mpp *mpoolProvider) IsLite() bool {
|
|
return mpp.lite != nil
|
|
}
|
|
|
|
func (mpp *mpoolProvider) getActorLite(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
|
if !mpp.IsLite() {
|
|
return nil, errors.New("should not use getActorLite on non lite Provider")
|
|
}
|
|
|
|
n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key())
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("getting nonce over lite: %w", err)
|
|
}
|
|
a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key())
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("getting actor over lite: %w", err)
|
|
}
|
|
a.Nonce = n
|
|
return a, nil
|
|
}
|
|
|
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
|
mpp.sm.ChainStore().SubscribeHeadChanges(
|
|
store.WrapHeadChangeCoalescer(
|
|
cb,
|
|
HeadChangeCoalesceMinDelay,
|
|
HeadChangeCoalesceMaxDelay,
|
|
HeadChangeCoalesceMergeInterval,
|
|
))
|
|
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
|
}
|
|
|
|
func (mpp *mpoolProvider) PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) {
|
|
return mpp.sm.ChainStore().PutMessage(ctx, m)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
|
return mpp.ps.Publish(k, v) // nolint
|
|
}
|
|
|
|
func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
|
if mpp.IsLite() {
|
|
return mpp.getActorLite(addr, ts)
|
|
}
|
|
|
|
return mpp.sm.LoadActor(context.TODO(), addr, ts)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
|
if mpp.IsLite() {
|
|
return mpp.getActorLite(addr, ts)
|
|
}
|
|
|
|
stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("computing tipset state for GetActor: %w", err)
|
|
}
|
|
st, err := mpp.sm.StateTree(stcid)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to load state tree: %w", err)
|
|
}
|
|
return st.GetActor(addr)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) StateDeterministicAddressAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
|
return mpp.sm.ResolveToDeterministicAddressAtFinality(ctx, addr, ts)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) StateNetworkVersion(ctx context.Context, height abi.ChainEpoch) network.Version {
|
|
return mpp.sm.GetNetworkVersion(ctx, height)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) MessagesForBlock(ctx context.Context, h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
|
return mpp.sm.ChainStore().MessagesForBlock(ctx, h)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
|
|
return mpp.sm.ChainStore().MessagesForTipset(ctx, ts)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
|
return mpp.sm.ChainStore().LoadTipSet(ctx, tsk)
|
|
}
|
|
|
|
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
|
|
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
|
|
if err != nil {
|
|
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
|
|
}
|
|
return baseFee, nil
|
|
}
|