Merge pull request #5162 from filecoin-project/ref/mpool
Small message pool refactors
This commit is contained in:
commit
9990d440e9
14
api/types.go
14
api/types.go
@ -6,7 +6,6 @@ import (
|
|||||||
|
|
||||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -51,19 +50,6 @@ type MessageSendSpec struct {
|
|||||||
MaxFee abi.TokenAmount
|
MaxFee abi.TokenAmount
|
||||||
}
|
}
|
||||||
|
|
||||||
var DefaultMessageSendSpec = MessageSendSpec{
|
|
||||||
// MaxFee of 0.1FIL
|
|
||||||
MaxFee: abi.NewTokenAmount(int64(build.FilecoinPrecision) / 10),
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ms *MessageSendSpec) Get() MessageSendSpec {
|
|
||||||
if ms == nil {
|
|
||||||
return DefaultMessageSendSpec
|
|
||||||
}
|
|
||||||
|
|
||||||
return *ms
|
|
||||||
}
|
|
||||||
|
|
||||||
type DataTransferChannel struct {
|
type DataTransferChannel struct {
|
||||||
TransferID datatransfer.TransferID
|
TransferID datatransfer.TransferID
|
||||||
Status datatransfer.Status
|
Status datatransfer.Status
|
||||||
|
@ -181,7 +181,20 @@ func ComputeMinRBF(curPrem abi.TokenAmount) abi.TokenAmount {
|
|||||||
return types.BigAdd(minPrice, types.NewInt(1))
|
return types.BigAdd(minPrice, types.NewInt(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, maxFee abi.TokenAmount) {
|
func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, sendSepc *api.MessageSendSpec) {
|
||||||
|
var maxFee abi.TokenAmount
|
||||||
|
if sendSepc != nil {
|
||||||
|
maxFee = sendSepc.MaxFee
|
||||||
|
}
|
||||||
|
if maxFee.Int == nil || maxFee.Equals(big.Zero()) {
|
||||||
|
mf, err := mff()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get default max gas fee: %+v", err)
|
||||||
|
mf = big.Zero()
|
||||||
|
}
|
||||||
|
maxFee = mf
|
||||||
|
}
|
||||||
|
|
||||||
if maxFee.Equals(big.Zero()) {
|
if maxFee.Equals(big.Zero()) {
|
||||||
mf, err := mff()
|
mf, err := mff()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -100,7 +100,7 @@ loop:
|
|||||||
// check the baseFee lower bound -- only republish messages that can be included in the chain
|
// check the baseFee lower bound -- only republish messages that can be included in the chain
|
||||||
// within the next 20 blocks.
|
// within the next 20 blocks.
|
||||||
for _, m := range chain.msgs {
|
for _, m := range chain.msgs {
|
||||||
if !allowNegativeChains(ts.Height()) && m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
|
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
|
||||||
chain.Invalidate()
|
chain.Invalidate()
|
||||||
continue loop
|
continue loop
|
||||||
}
|
}
|
||||||
@ -115,7 +115,7 @@ loop:
|
|||||||
|
|
||||||
// we can't fit the current chain but there is gas to spare
|
// we can't fit the current chain but there is gas to spare
|
||||||
// trim it and push it down
|
// trim it and push it down
|
||||||
chain.Trim(gasLimit, mp, baseFee, true)
|
chain.Trim(gasLimit, mp, baseFee)
|
||||||
for j := i; j < len(chains)-1; j++ {
|
for j := i; j < len(chains)-1; j++ {
|
||||||
if chains[j].Before(chains[j+1]) {
|
if chains[j].Before(chains[j+1]) {
|
||||||
break
|
break
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
tbig "github.com/filecoin-project/go-state-types/big"
|
tbig "github.com/filecoin-project/go-state-types/big"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -23,12 +22,6 @@ var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
|||||||
|
|
||||||
var MaxBlockMessages = 16000
|
var MaxBlockMessages = 16000
|
||||||
|
|
||||||
// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
|
|
||||||
// away in the next fork.
|
|
||||||
func allowNegativeChains(epoch abi.ChainEpoch) bool {
|
|
||||||
return epoch < build.UpgradeBreezeHeight+5
|
|
||||||
}
|
|
||||||
|
|
||||||
const MaxBlocks = 15
|
const MaxBlocks = 15
|
||||||
|
|
||||||
type msgChain struct {
|
type msgChain struct {
|
||||||
@ -121,7 +114,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
|
|||||||
return chains[i].Before(chains[j])
|
return chains[i].Before(chains[j])
|
||||||
})
|
})
|
||||||
|
|
||||||
if !allowNegativeChains(curTs.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -174,7 +167,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
|
|||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
// did we run out of performing chains?
|
// did we run out of performing chains?
|
||||||
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,7 +233,7 @@ tailLoop:
|
|||||||
for gasLimit >= minGas && last < len(chains) {
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
// trim if necessary
|
// trim if necessary
|
||||||
if chains[last].gasLimit > gasLimit {
|
if chains[last].gasLimit > gasLimit {
|
||||||
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
|
chains[last].Trim(gasLimit, mp, baseFee)
|
||||||
}
|
}
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
@ -266,7 +259,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if gasPerf < 0 we have no more profitable chains
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,7 +300,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dependencies fit, just trim it
|
// dependencies fit, just trim it
|
||||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
|
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
||||||
last += i
|
last += i
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
@ -340,7 +333,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// is it negative?
|
// is it negative?
|
||||||
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -362,7 +355,7 @@ tailLoop:
|
|||||||
|
|
||||||
// do they fit as is? if it doesn't, trim to make it fit if possible
|
// do they fit as is? if it doesn't, trim to make it fit if possible
|
||||||
if chainGasLimit > gasLimit {
|
if chainGasLimit > gasLimit {
|
||||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
|
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
||||||
|
|
||||||
if !chain.valid {
|
if !chain.valid {
|
||||||
continue
|
continue
|
||||||
@ -445,7 +438,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
|
|||||||
return chains[i].Before(chains[j])
|
return chains[i].Before(chains[j])
|
||||||
})
|
})
|
||||||
|
|
||||||
if !allowNegativeChains(curTs.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -456,7 +449,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
|
|||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
// did we run out of performing chains?
|
// did we run out of performing chains?
|
||||||
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -485,7 +478,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
|
|||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
// trim
|
// trim
|
||||||
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
|
chains[last].Trim(gasLimit, mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
@ -505,7 +498,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if gasPerf < 0 we have no more profitable chains
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -567,7 +560,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
|||||||
return chains[i].Before(chains[j])
|
return chains[i].Before(chains[j])
|
||||||
})
|
})
|
||||||
|
|
||||||
if !allowNegativeChains(ts.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
return nil, gasLimit
|
return nil, gasLimit
|
||||||
}
|
}
|
||||||
@ -575,7 +568,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
|||||||
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
if !allowNegativeChains(ts.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -593,7 +586,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
|||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
// trim, discarding negative performing messages
|
// trim, discarding negative performing messages
|
||||||
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(ts.Height()))
|
chains[last].Trim(gasLimit, mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
@ -613,7 +606,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if gasPerf < 0 we have no more profitable chains
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
if !allowNegativeChains(ts.Height()) && chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -689,6 +682,10 @@ func (*MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt)
|
|||||||
}
|
}
|
||||||
|
|
||||||
gasReward := tbig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit)))
|
gasReward := tbig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit)))
|
||||||
|
if gasReward.Sign() == -1 {
|
||||||
|
// penalty multiplier
|
||||||
|
gasReward = tbig.Mul(gasReward, types.NewInt(3))
|
||||||
|
}
|
||||||
return gasReward.Int
|
return gasReward.Int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -764,9 +761,6 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
balance = new(big.Int).Sub(balance, required)
|
balance = new(big.Int).Sub(balance, required)
|
||||||
|
|
||||||
value := m.Message.Value.Int
|
value := m.Message.Value.Int
|
||||||
if balance.Cmp(value) < 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
balance = new(big.Int).Sub(balance, value)
|
balance = new(big.Int).Sub(balance, value)
|
||||||
|
|
||||||
gasReward := mp.getGasReward(m, baseFee)
|
gasReward := mp.getGasReward(m, baseFee)
|
||||||
@ -870,9 +864,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
|
|||||||
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, allowNegative bool) {
|
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) {
|
||||||
i := len(mc.msgs) - 1
|
i := len(mc.msgs) - 1
|
||||||
for i >= 0 && (mc.gasLimit > gasLimit || (!allowNegative && mc.gasPerf < 0)) {
|
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
|
||||||
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
||||||
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
||||||
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
||||||
|
@ -736,8 +736,6 @@ func TestPriorityMessageSelection2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityMessageSelection3(t *testing.T) {
|
func TestPriorityMessageSelection3(t *testing.T) {
|
||||||
t.Skip("reenable after removing allow negative")
|
|
||||||
|
|
||||||
mp, tma := makeTestMpool()
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
// the actors
|
// the actors
|
||||||
@ -1241,6 +1239,9 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCompetitiveMessageSelectionZipf(t *testing.T) {
|
func TestCompetitiveMessageSelectionZipf(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping in short mode")
|
||||||
|
}
|
||||||
var capacityBoost, rewardBoost, tqReward float64
|
var capacityBoost, rewardBoost, tqReward float64
|
||||||
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
|
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
|
||||||
for _, seed := range seeds {
|
for _, seed := range seeds {
|
||||||
@ -1268,9 +1269,9 @@ func TestGasReward(t *testing.T) {
|
|||||||
GasReward int64
|
GasReward int64
|
||||||
}{
|
}{
|
||||||
{Premium: 100, FeeCap: 200, BaseFee: 100, GasReward: 100},
|
{Premium: 100, FeeCap: 200, BaseFee: 100, GasReward: 100},
|
||||||
{Premium: 100, FeeCap: 200, BaseFee: 210, GasReward: -10},
|
{Premium: 100, FeeCap: 200, BaseFee: 210, GasReward: -10 * 3},
|
||||||
{Premium: 200, FeeCap: 250, BaseFee: 210, GasReward: 40},
|
{Premium: 200, FeeCap: 250, BaseFee: 210, GasReward: 40},
|
||||||
{Premium: 200, FeeCap: 250, BaseFee: 2000, GasReward: -1750},
|
{Premium: 200, FeeCap: 250, BaseFee: 2000, GasReward: -1750 * 3},
|
||||||
}
|
}
|
||||||
|
|
||||||
mp := new(MessagePool)
|
mp := new(MessagePool)
|
||||||
|
@ -446,7 +446,7 @@ var mpoolReplaceCmd = &cli.Command{
|
|||||||
return abi.TokenAmount(config.DefaultDefaultMaxFee), nil
|
return abi.TokenAmount(config.DefaultDefaultMaxFee), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
messagepool.CapGasFee(mff, &msg, mss.Get().MaxFee)
|
messagepool.CapGasFee(mff, &msg, mss)
|
||||||
} else {
|
} else {
|
||||||
if cctx.IsSet("gas-limit") {
|
if cctx.IsSet("gas-limit") {
|
||||||
msg.GasLimit = cctx.Int64("gas-limit")
|
msg.GasLimit = cctx.Int64("gas-limit")
|
||||||
|
@ -301,7 +301,7 @@ func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Messag
|
|||||||
msg.GasFeeCap = feeCap
|
msg.GasFeeCap = feeCap
|
||||||
}
|
}
|
||||||
|
|
||||||
messagepool.CapGasFee(m.GetMaxFee, msg, spec.Get().MaxFee)
|
messagepool.CapGasFee(m.GetMaxFee, msg, spec)
|
||||||
|
|
||||||
return msg, nil
|
return msg, nil
|
||||||
}
|
}
|
||||||
|
@ -812,7 +812,7 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
|
|||||||
return msg.RequiredFunds(), nil
|
return msg.RequiredFunds(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
messagepool.CapGasFee(mff, msg, big.Min(big.Sub(avail, msg.Value), msg.RequiredFunds()))
|
messagepool.CapGasFee(mff, msg, &api.MessageSendSpec{MaxFee: big.Min(big.Sub(avail, msg.Value), msg.RequiredFunds())})
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user