Merge pull request #10700 from filecoin-project/10538-opt-ethgettransactioncount
perf: Address performance of EthGetTransactionCount
This commit is contained in:
commit
9d5d6a5f3d
@ -169,13 +169,13 @@ type MessagePool struct {
|
|||||||
|
|
||||||
sigValCache *lru.TwoQueueCache[string, struct{}]
|
sigValCache *lru.TwoQueueCache[string, struct{}]
|
||||||
|
|
||||||
nonceCache *lru.Cache[nonceCacheKey, uint64]
|
stateNonceCache *lru.Cache[stateNonceCacheKey, uint64]
|
||||||
|
|
||||||
evtTypes [3]journal.EventType
|
evtTypes [3]journal.EventType
|
||||||
journal journal.Journal
|
journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
type nonceCacheKey struct {
|
type stateNonceCacheKey struct {
|
||||||
tsk types.TipSetKey
|
tsk types.TipSetKey
|
||||||
addr address.Address
|
addr address.Address
|
||||||
}
|
}
|
||||||
@ -371,7 +371,7 @@ func (ms *msgSet) toSlice() []*types.SignedMessage {
|
|||||||
func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.UpgradeSchedule, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.UpgradeSchedule, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize)
|
||||||
noncecache, _ := lru.New[nonceCacheKey, uint64](256)
|
stateNonceCache, _ := lru.New[stateNonceCacheKey, uint64](32768) // 32k * ~200 bytes = 6MB
|
||||||
keycache, _ := lru.New[address.Address, address.Address](1_000_000)
|
keycache, _ := lru.New[address.Address, address.Address](1_000_000)
|
||||||
|
|
||||||
cfg, err := loadConfig(ctx, ds)
|
cfg, err := loadConfig(ctx, ds)
|
||||||
@ -384,26 +384,26 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra
|
|||||||
}
|
}
|
||||||
|
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
ds: ds,
|
ds: ds,
|
||||||
addSema: make(chan struct{}, 1),
|
addSema: make(chan struct{}, 1),
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: build.Clock.Ticker(RepublishInterval),
|
repubTk: build.Clock.Ticker(RepublishInterval),
|
||||||
repubTrigger: make(chan struct{}, 1),
|
repubTrigger: make(chan struct{}, 1),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
keyCache: keycache,
|
keyCache: keycache,
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
getNtwkVersion: us.GetNtwkVersion,
|
getNtwkVersion: us.GetNtwkVersion,
|
||||||
pruneTrigger: make(chan struct{}, 1),
|
pruneTrigger: make(chan struct{}, 1),
|
||||||
pruneCooldown: make(chan struct{}, 1),
|
pruneCooldown: make(chan struct{}, 1),
|
||||||
blsSigCache: cache,
|
blsSigCache: cache,
|
||||||
sigValCache: verifcache,
|
sigValCache: verifcache,
|
||||||
nonceCache: noncecache,
|
stateNonceCache: stateNonceCache,
|
||||||
changes: lps.New(50),
|
changes: lps.New(50),
|
||||||
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
||||||
api: api,
|
api: api,
|
||||||
netName: netName,
|
netName: netName,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
|
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
|
||||||
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
|
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
|
||||||
@ -1068,24 +1068,52 @@ func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address,
|
|||||||
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
|
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
nk := nonceCacheKey{
|
nk := stateNonceCacheKey{
|
||||||
tsk: ts.Key(),
|
tsk: ts.Key(),
|
||||||
addr: addr,
|
addr: addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
n, ok := mp.nonceCache.Get(nk)
|
n, ok := mp.stateNonceCache.Get(nk)
|
||||||
if ok {
|
if ok {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
act, err := mp.api.GetActorAfter(addr, ts)
|
// get the nonce from the actor before ts
|
||||||
|
actor, err := mp.api.GetActorBefore(addr, ts)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
nextNonce := actor.Nonce
|
||||||
|
|
||||||
|
raddr, err := mp.resolveToKey(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.nonceCache.Add(nk, act.Nonce)
|
// loop over all messages sent by 'addr' and find the highest nonce
|
||||||
|
messages, err := mp.api.MessagesForTipset(ctx, ts)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
for _, message := range messages {
|
||||||
|
msg := message.VMMessage()
|
||||||
|
|
||||||
return act.Nonce, nil
|
maddr, err := mp.resolveToKey(ctx, msg.From)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to resolve message from address: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if maddr == raddr {
|
||||||
|
if n := msg.Nonce + 1; n > nextNonce {
|
||||||
|
nextNonce = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.stateNonceCache.Add(nk, nextNonce)
|
||||||
|
|
||||||
|
return nextNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
||||||
|
@ -120,6 +120,22 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tma *testMpoolAPI) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
balance, ok := tma.balance[addr]
|
||||||
|
if !ok {
|
||||||
|
balance = types.NewInt(1000e6)
|
||||||
|
tma.balance[addr] = balance
|
||||||
|
}
|
||||||
|
|
||||||
|
nonce := tma.statenonce[addr]
|
||||||
|
|
||||||
|
return &types.Actor{
|
||||||
|
Code: builtin2.AccountActorCodeID,
|
||||||
|
Nonce: nonce,
|
||||||
|
Balance: balance,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
// regression check for load bug
|
// regression check for load bug
|
||||||
if ts == nil {
|
if ts == nil {
|
||||||
|
@ -2,6 +2,7 @@ package messagepool
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -27,6 +28,7 @@ type Provider interface {
|
|||||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||||
PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error)
|
PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error)
|
||||||
PubSubPublish(string, []byte) error
|
PubSubPublish(string, []byte) error
|
||||||
|
GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
StateDeterministicAddressAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
StateDeterministicAddressAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version
|
StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version
|
||||||
@ -58,6 +60,23 @@ func (mpp *mpoolProvider) IsLite() bool {
|
|||||||
return mpp.lite != nil
|
return mpp.lite != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) getActorLite(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
if !mpp.IsLite() {
|
||||||
|
return nil, errors.New("should not use getActorLite on non lite Provider")
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting nonce over lite: %w", err)
|
||||||
|
}
|
||||||
|
a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting actor over lite: %w", err)
|
||||||
|
}
|
||||||
|
a.Nonce = n
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
||||||
mpp.sm.ChainStore().SubscribeHeadChanges(
|
mpp.sm.ChainStore().SubscribeHeadChanges(
|
||||||
store.WrapHeadChangeCoalescer(
|
store.WrapHeadChangeCoalescer(
|
||||||
@ -77,18 +96,17 @@ func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
|||||||
return mpp.ps.Publish(k, v) // nolint
|
return mpp.ps.Publish(k, v) // nolint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
if mpp.IsLite() {
|
||||||
|
return mpp.getActorLite(addr, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mpp.sm.LoadActor(context.TODO(), addr, ts)
|
||||||
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
if mpp.IsLite() {
|
if mpp.IsLite() {
|
||||||
n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key())
|
return mpp.getActorLite(addr, ts)
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("getting nonce over lite: %w", err)
|
|
||||||
}
|
|
||||||
a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key())
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("getting actor over lite: %w", err)
|
|
||||||
}
|
|
||||||
a.Nonce = n
|
|
||||||
return a, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts)
|
stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts)
|
||||||
|
Loading…
Reference in New Issue
Block a user