Merge pull request #2924 from filecoin-project/feat/mpool-cfg-overesitm
Add GasLimitOverestimation to mpool config
This commit is contained in:
commit
0d1174cb81
@ -44,16 +44,11 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
|
||||
|
||||
mine := true
|
||||
done := make(chan struct{})
|
||||
blockNotif := make(chan struct{}, 1)
|
||||
go func() {
|
||||
defer close(done)
|
||||
for mine {
|
||||
build.Clock.Sleep(blocktime)
|
||||
if err := sn[0].MineOne(ctx, bminer.MineReq{Done: func(bool, error) {
|
||||
select {
|
||||
case blockNotif <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
@ -61,7 +56,7 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
|
||||
}
|
||||
}()
|
||||
|
||||
pledgeSectors(t, ctx, miner, nSectors, 0, blockNotif)
|
||||
pledgeSectors(t, ctx, miner, nSectors, 0, nil)
|
||||
|
||||
mine = false
|
||||
<-done
|
||||
|
@ -14,6 +14,7 @@ var (
|
||||
MemPoolSizeLimitHiDefault = 30000
|
||||
MemPoolSizeLimitLoDefault = 20000
|
||||
PruneCooldownDefault = time.Minute
|
||||
GasLimitOverestimation = 1.25
|
||||
|
||||
ConfigKey = datastore.NewKey("/mpool/config")
|
||||
)
|
||||
@ -34,6 +35,10 @@ func loadConfig(ds dtypes.MetadataDS) (*types.MpoolConfig, error) {
|
||||
}
|
||||
cfg := new(types.MpoolConfig)
|
||||
err = json.Unmarshal(cfgBytes, cfg)
|
||||
if cfg.GasLimitOverestimation == 0 {
|
||||
// TODO: remove in next reset
|
||||
cfg.GasLimitOverestimation = GasLimitOverestimation
|
||||
}
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
@ -65,9 +70,10 @@ func (mp *MessagePool) SetConfig(cfg *types.MpoolConfig) {
|
||||
|
||||
func DefaultConfig() *types.MpoolConfig {
|
||||
return &types.MpoolConfig{
|
||||
SizeLimitHigh: MemPoolSizeLimitHiDefault,
|
||||
SizeLimitLow: MemPoolSizeLimitLoDefault,
|
||||
ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
|
||||
PruneCooldown: PruneCooldownDefault,
|
||||
SizeLimitHigh: MemPoolSizeLimitHiDefault,
|
||||
SizeLimitLow: MemPoolSizeLimitLoDefault,
|
||||
ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
|
||||
PruneCooldown: PruneCooldownDefault,
|
||||
GasLimitOverestimation: GasLimitOverestimation,
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,8 @@ var (
|
||||
|
||||
ErrInvalidToAddr = errors.New("message had invalid to address")
|
||||
|
||||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
||||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
||||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -135,8 +136,9 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
||||
} else {
|
||||
log.Info("add with duplicate nonce")
|
||||
return false, xerrors.Errorf("message from %s with nonce %d already in mpool,"+
|
||||
" increase GasPremium to %s from %s to trigger replace by fee",
|
||||
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium)
|
||||
" increase GasPremium to %s from %s to trigger replace by fee: %w",
|
||||
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium,
|
||||
ErrRBFTooLowPremium)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -517,7 +519,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
||||
incr, err := mset.add(m, mp)
|
||||
if err != nil {
|
||||
log.Info(err)
|
||||
return err // TODO(review): this error return was dropped at some point, was it on purpose?
|
||||
return err
|
||||
}
|
||||
|
||||
if incr {
|
||||
|
@ -254,12 +254,14 @@ tailLoop:
|
||||
|
||||
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Infow("get pending messages done", "took", time.Since(start))
|
||||
}()
|
||||
|
||||
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||
haveCids := make(map[cid.Cid]struct{})
|
||||
defer func() {
|
||||
if time.Since(start) > time.Millisecond {
|
||||
log.Infow("get pending messages done", "took", time.Since(start))
|
||||
}
|
||||
}()
|
||||
|
||||
// are we in sync?
|
||||
inSync := false
|
||||
|
@ -108,7 +108,10 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
|
||||
ts = sm.cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
state := ts.ParentState()
|
||||
state, _, err := sm.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("computing tipset state: %w", err)
|
||||
}
|
||||
|
||||
r := store.NewChainRand(sm.cs, ts.Cids(), ts.Height())
|
||||
|
||||
@ -122,7 +125,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
|
||||
|
||||
vmopt := &vm.VMOpts{
|
||||
StateBase: state,
|
||||
Epoch: ts.Height(),
|
||||
Epoch: ts.Height() + 1,
|
||||
Rand: r,
|
||||
Bstore: sm.cs.Blockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
|
@ -510,7 +510,7 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
)
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
switch {
|
||||
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
|
||||
case xerrors.Is(err, messagepool.ErrBroadcastAnyway) || xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
|
||||
return pubsub.ValidationIgnore
|
||||
default:
|
||||
return pubsub.ValidationReject
|
||||
|
@ -7,11 +7,12 @@ import (
|
||||
)
|
||||
|
||||
type MpoolConfig struct {
|
||||
PriorityAddrs []address.Address
|
||||
SizeLimitHigh int
|
||||
SizeLimitLow int
|
||||
ReplaceByFeeRatio float64
|
||||
PruneCooldown time.Duration
|
||||
PriorityAddrs []address.Address
|
||||
SizeLimitHigh int
|
||||
SizeLimitLow int
|
||||
ReplaceByFeeRatio float64
|
||||
PruneCooldown time.Duration
|
||||
GasLimitOverestimation float64
|
||||
}
|
||||
|
||||
func (mc *MpoolConfig) Clone() *MpoolConfig {
|
||||
|
@ -29,12 +29,12 @@ func ComputeGasOverestimationBurn(gasUsed, gasLimit int64) (int64, int64) {
|
||||
return 0, gasLimit
|
||||
}
|
||||
|
||||
// over = gasLimit/gasUsed - 1 - 0.3
|
||||
// over = gasLimit/gasUsed - 1 - 0.1
|
||||
// over = min(over, 1)
|
||||
// gasToBurn = (gasLimit - gasUsed) * over
|
||||
|
||||
// so to factor out division from `over`
|
||||
// over*gasUsed = min(gasLimit - (13*gasUsed)/10, gasUsed)
|
||||
// over*gasUsed = min(gasLimit - (11*gasUsed)/10, gasUsed)
|
||||
// gasToBurn = ((gasLimit - gasUsed)*over*gasUsed) / gasUsed
|
||||
over := gasLimit - (gasOveruseNum*gasUsed)/gasOveruseDenom
|
||||
if over < 0 {
|
||||
|
@ -2,6 +2,7 @@ package full
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.uber.org/fx"
|
||||
@ -23,6 +24,11 @@ type MpoolAPI struct {
|
||||
Chain *store.ChainStore
|
||||
|
||||
Mpool *messagepool.MessagePool
|
||||
|
||||
PushLocks struct {
|
||||
m map[address.Address]chan struct{}
|
||||
sync.Mutex
|
||||
} `name:"verymuchunique" optional:"true"`
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
|
||||
@ -105,10 +111,35 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci
|
||||
return a.Mpool.Push(smsg)
|
||||
}
|
||||
|
||||
// GasMargin sets by how much should gas used be increased over test execution
|
||||
var GasMargin = 1.5
|
||||
|
||||
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
|
||||
{
|
||||
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting key address: %w", err)
|
||||
}
|
||||
|
||||
a.PushLocks.Lock()
|
||||
if a.PushLocks.m == nil {
|
||||
a.PushLocks.m = make(map[address.Address]chan struct{})
|
||||
}
|
||||
lk, ok := a.PushLocks.m[fromA]
|
||||
if !ok {
|
||||
lk = make(chan struct{}, 1)
|
||||
a.PushLocks.m[msg.From] = lk
|
||||
}
|
||||
a.PushLocks.Unlock()
|
||||
|
||||
select {
|
||||
case lk <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-lk
|
||||
}()
|
||||
}
|
||||
|
||||
if msg.Nonce != 0 {
|
||||
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
|
||||
}
|
||||
@ -117,7 +148,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating gas used: %w", err)
|
||||
}
|
||||
msg.GasLimit = int64(float64(gasLimit) * GasMargin)
|
||||
msg.GasLimit = int64(float64(gasLimit) * a.Mpool.GetConfig().GasLimitOverestimation)
|
||||
}
|
||||
|
||||
if msg.GasPremium == types.EmptyInt || types.BigCmp(msg.GasPremium, types.NewInt(0)) == 0 {
|
||||
@ -129,7 +160,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
||||
}
|
||||
|
||||
if msg.GasFeeCap == types.EmptyInt || types.BigCmp(msg.GasFeeCap, types.NewInt(0)) == 0 {
|
||||
feeCap, err := a.GasEstimateFeeCap(ctx, msg, 20, types.EmptyTSK)
|
||||
feeCap, err := a.GasEstimateFeeCap(ctx, msg, 10, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("estimating fee cap: %w", err)
|
||||
}
|
||||
|
11
scripts/dev/api.bash
Normal file
11
scripts/dev/api.bash
Normal file
@ -0,0 +1,11 @@
|
||||
#!/bin/bash
|
||||
# vim: set expandtab ts=2 sw=2:
|
||||
|
||||
token=$(lotus auth create-token --perm admin)
|
||||
|
||||
runAPI() {
|
||||
curl -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
--data '{"jsonrpc":"2.0","id":2,"method":"Filecoin.'"$1"'","params":'"${2:-null}"'}' \
|
||||
'http://127.0.0.1:1234/rpc/v0?token='"$token"
|
||||
}
|
Loading…
Reference in New Issue
Block a user