Mempool: Selection should respect CBOR limits
This commit is contained in:
parent
dd20cb7301
commit
ec00e73c9d
@ -121,7 +121,7 @@ loop:
|
|||||||
|
|
||||||
// we can't fit the current chain but there is gas to spare
|
// we can't fit the current chain but there is gas to spare
|
||||||
// trim it and push it down
|
// trim it and push it down
|
||||||
chain.Trim(gasLimit, mp, baseFee)
|
chain.Trim(gasLimit, repubMsgLimit, mp, baseFee)
|
||||||
for j := i; j < len(chains)-1; j++ {
|
for j := i; j < len(chains)-1; j++ {
|
||||||
if chains[j].Before(chains[j+1]) {
|
if chains[j].Before(chains[j+1]) {
|
||||||
break
|
break
|
||||||
@ -131,6 +131,10 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
count := 0
|
count := 0
|
||||||
|
if len(msgs) > repubMsgLimit {
|
||||||
|
msgs = msgs[:repubMsgLimit]
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("republishing %d messages", len(msgs))
|
log.Infof("republishing %d messages", len(msgs))
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
mb, err := m.Serialize()
|
mb, err := m.Serialize()
|
||||||
|
@ -7,6 +7,10 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -34,9 +38,10 @@ type msgChain struct {
|
|||||||
merged bool
|
merged bool
|
||||||
next *msgChain
|
next *msgChain
|
||||||
prev *msgChain
|
prev *msgChain
|
||||||
|
sigType crypto.SigType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
@ -46,24 +51,151 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
|
|||||||
// if the ticket quality is high enough that the first block has higher probability
|
// if the ticket quality is high enough that the first block has higher probability
|
||||||
// than any other block, then we don't bother with optimal selection because the
|
// than any other block, then we don't bother with optimal selection because the
|
||||||
// first block will always have higher effective performance
|
// first block will always have higher effective performance
|
||||||
|
var sm *selectedMessages
|
||||||
|
var err error
|
||||||
if tq > 0.84 {
|
if tq > 0.84 {
|
||||||
msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
sm, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
||||||
} else {
|
} else {
|
||||||
msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
sm, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(msgs) > build.BlockMessageLimit {
|
// one last sanity check
|
||||||
msgs = msgs[:build.BlockMessageLimit]
|
if len(sm.msgs) > build.BlockMessageLimit {
|
||||||
|
sm.msgs = sm.msgs[:build.BlockMessageLimit]
|
||||||
}
|
}
|
||||||
|
|
||||||
return msgs, nil
|
return sm.msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
type selectedMessages struct {
|
||||||
|
msgs []*types.SignedMessage
|
||||||
|
gasLimit int64
|
||||||
|
secpLimit int
|
||||||
|
blsLimit int
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns false if chain can't be added due to block constraints
|
||||||
|
func (sm *selectedMessages) tryToAdd(mc *msgChain) bool {
|
||||||
|
l := len(mc.msgs)
|
||||||
|
|
||||||
|
if build.BlockMessageLimit < l+len(sm.msgs) || sm.gasLimit < mc.gasLimit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if mc.sigType == crypto.SigTypeBLS {
|
||||||
|
if sm.blsLimit < l {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||||
|
sm.blsLimit -= l
|
||||||
|
sm.gasLimit -= mc.gasLimit
|
||||||
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||||
|
if sm.secpLimit < l {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||||
|
sm.secpLimit -= l
|
||||||
|
sm.gasLimit -= mc.gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't add the weird sigType msg, but otherwise proceed
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns false if messages can't be added due to block constraints
|
||||||
|
// will trim / invalidate chain as appropriate
|
||||||
|
func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, baseFee types.BigInt) bool {
|
||||||
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
|
chainGasLimit := mc.gasLimit
|
||||||
|
chainMsgLimit := len(mc.msgs)
|
||||||
|
depGasLimit := int64(0)
|
||||||
|
depMsgLimit := 0
|
||||||
|
smMsgLimit := 0
|
||||||
|
|
||||||
|
if mc.sigType == crypto.SigTypeBLS {
|
||||||
|
smMsgLimit = sm.blsLimit
|
||||||
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||||
|
smMsgLimit = sm.secpLimit
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if smMsgLimit > build.BlockMessageLimit-len(sm.msgs) {
|
||||||
|
smMsgLimit = build.BlockMessageLimit - len(sm.msgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
var chainDeps []*msgChain
|
||||||
|
for curChain := mc.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
|
chainDeps = append(chainDeps, curChain)
|
||||||
|
chainGasLimit += curChain.gasLimit
|
||||||
|
chainMsgLimit += len(curChain.msgs)
|
||||||
|
depGasLimit += curChain.gasLimit
|
||||||
|
depMsgLimit += len(curChain.msgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the chain doesn't fit as-is, so trim / invalidate it and return false
|
||||||
|
if chainGasLimit > sm.gasLimit || chainMsgLimit > smMsgLimit {
|
||||||
|
|
||||||
|
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||||
|
// making a decision about trimming or invalidating.
|
||||||
|
// if the dependencies exceed the block limits, then we must invalidate the chain
|
||||||
|
// as it can never be included.
|
||||||
|
// Otherwise we can just trim and continue
|
||||||
|
if depGasLimit > sm.gasLimit || depMsgLimit >= smMsgLimit {
|
||||||
|
mc.Invalidate()
|
||||||
|
} else {
|
||||||
|
// dependencies fit, just trim it
|
||||||
|
mc.Trim(sm.gasLimit-depGasLimit, smMsgLimit-depMsgLimit, mp, baseFee)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// the chain fits! include it together with all dependencies
|
||||||
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
|
curChain := chainDeps[i]
|
||||||
|
curChain.merged = true
|
||||||
|
sm.msgs = append(sm.msgs, curChain.msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
mc.merged = true
|
||||||
|
|
||||||
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||||
|
sm.gasLimit -= chainGasLimit
|
||||||
|
|
||||||
|
if mc.sigType == crypto.SigTypeBLS {
|
||||||
|
sm.blsLimit -= chainMsgLimit
|
||||||
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||||
|
sm.secpLimit -= chainMsgLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *selectedMessages) trimChain(mc *msgChain, mp *MessagePool, baseFee types.BigInt) {
|
||||||
|
msgLimit := build.BlockMessageLimit - len(sm.msgs)
|
||||||
|
if mc.sigType == crypto.SigTypeBLS {
|
||||||
|
if msgLimit > sm.blsLimit {
|
||||||
|
msgLimit = sm.blsLimit
|
||||||
|
}
|
||||||
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||||
|
if msgLimit > sm.secpLimit {
|
||||||
|
msgLimit = sm.secpLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if mc.gasLimit > sm.gasLimit || len(mc.msgs) > msgLimit {
|
||||||
|
mc.Trim(sm.gasLimit, msgLimit, mp, baseFee)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) (*selectedMessages, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -89,10 +221,10 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
|
|
||||||
// 0b. Select all priority messages that fit in the block
|
// 0b. Select all priority messages that fit in the block
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas || len(result) >= build.BlockMessageLimit {
|
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,26 +308,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||||
chainGasLimit := chain.gasLimit
|
|
||||||
chainMsgLimit := len(chain.msgs)
|
|
||||||
var chainDeps []*msgChain
|
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
|
||||||
chainDeps = append(chainDeps, curChain)
|
|
||||||
chainGasLimit += curChain.gasLimit
|
|
||||||
chainMsgLimit += len(curChain.msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// does it all fit in the block?
|
|
||||||
if chainGasLimit <= gasLimit && chainMsgLimit+len(result) <= build.BlockMessageLimit {
|
|
||||||
// include it together with all dependencies
|
|
||||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
|
||||||
curChain := chainDeps[i]
|
|
||||||
curChain.merged = true
|
|
||||||
result = append(result, curChain.msgs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
chain.merged = true
|
|
||||||
// adjust the effective performance for all subsequent chains
|
// adjust the effective performance for all subsequent chains
|
||||||
if next := chain.next; next != nil && next.effPerf > 0 {
|
if next := chain.next; next != nil && next.effPerf > 0 {
|
||||||
next.effPerf += next.parentOffset
|
next.effPerf += next.parentOffset
|
||||||
@ -203,10 +316,8 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
next.setEffPerf()
|
next.setEffPerf()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
gasLimit -= chainGasLimit
|
|
||||||
|
|
||||||
// resort to account for already merged chains and effective performance adjustments
|
// re-sort to account for already merged chains and effective performance adjustments
|
||||||
// the sort *must* be stable or we end up getting negative gasPerfs pushed up.
|
// the sort *must* be stable or we end up getting negative gasPerfs pushed up.
|
||||||
sort.SliceStable(chains[i+1:], func(i, j int) bool {
|
sort.SliceStable(chains[i+1:], func(i, j int) bool {
|
||||||
return chains[i].BeforeEffective(chains[j])
|
return chains[i].BeforeEffective(chains[j])
|
||||||
@ -215,7 +326,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't fit this chain and its dependencies because of block gasLimit -- we are
|
// we can't fit this chain and its dependencies because of block limits -- we are
|
||||||
// at the edge
|
// at the edge
|
||||||
last = i
|
last = i
|
||||||
break
|
break
|
||||||
@ -232,12 +343,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
// we might have to do it multiple times to satisfy tail packing
|
// we might have to do it multiple times to satisfy tail packing
|
||||||
startTail := time.Now()
|
startTail := time.Now()
|
||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for result.gasLimit >= minGas && last < len(chains) {
|
||||||
// trim if necessary
|
|
||||||
if chains[last].gasLimit > gasLimit {
|
if !chains[last].valid {
|
||||||
chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee)
|
last++
|
||||||
|
continue tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// trim if necessary
|
||||||
|
result.trimChain(chains[last], mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
for i := last; i < len(chains)-1; i++ {
|
for i := last; i < len(chains)-1; i++ {
|
||||||
@ -249,7 +364,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// select the next (valid and fitting) chain and its dependencies for inclusion
|
// select the next (valid and fitting) chain and its dependencies for inclusion
|
||||||
for i, chain := range chains[last:] {
|
for _, chain := range chains[last:] {
|
||||||
// has the chain been invalidated?
|
// has the chain been invalidated?
|
||||||
if !chain.valid {
|
if !chain.valid {
|
||||||
continue
|
continue
|
||||||
@ -265,49 +380,10 @@ tailLoop:
|
|||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||||
chainGasLimit := chain.gasLimit
|
|
||||||
chainMsgLimit := len(chain.msgs)
|
|
||||||
depGasLimit := int64(0)
|
|
||||||
depMsgLimit := 0
|
|
||||||
var chainDeps []*msgChain
|
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
|
||||||
chainDeps = append(chainDeps, curChain)
|
|
||||||
chainGasLimit += curChain.gasLimit
|
|
||||||
chainMsgLimit += len(curChain.msgs)
|
|
||||||
depGasLimit += curChain.gasLimit
|
|
||||||
depMsgLimit += len(curChain.msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// does it all fit in the bock
|
|
||||||
if chainGasLimit <= gasLimit && len(result)+chainMsgLimit <= build.BlockMessageLimit {
|
|
||||||
// include it together with all dependencies
|
|
||||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
|
||||||
curChain := chainDeps[i]
|
|
||||||
curChain.merged = true
|
|
||||||
result = append(result, curChain.msgs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
chain.merged = true
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
gasLimit -= chainGasLimit
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// it doesn't all fit; now we have to take into account the dependent chains before
|
|
||||||
// making a decision about trimming or invalidating.
|
|
||||||
// if the dependencies exceed the block limits, then we must invalidate the chain
|
|
||||||
// as it can never be included.
|
|
||||||
// Otherwise we can just trim and continue
|
|
||||||
if depGasLimit > gasLimit || len(result)+depMsgLimit >= build.BlockMessageLimit {
|
|
||||||
chain.Invalidate()
|
|
||||||
last += i + 1
|
|
||||||
continue tailLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
// dependencies fit, just trim it
|
|
||||||
chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee)
|
|
||||||
last += i
|
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -321,15 +397,15 @@ tailLoop:
|
|||||||
|
|
||||||
// if we have room to spare, pick some random (non-negative) chains to fill the block
|
// if we have room to spare, pick some random (non-negative) chains to fill the block
|
||||||
// we pick randomly so that we minimize the probability of duplication among all block producers
|
// we pick randomly so that we minimize the probability of duplication among all block producers
|
||||||
if gasLimit >= minGas && len(result) <= build.BlockMessageLimit {
|
if result.gasLimit >= minGas && len(result.msgs) <= build.BlockMessageLimit {
|
||||||
randomCount := 0
|
preRandomLength := len(result.msgs)
|
||||||
|
|
||||||
startRandom := time.Now()
|
startRandom := time.Now()
|
||||||
shuffleChains(chains)
|
shuffleChains(chains)
|
||||||
|
|
||||||
for _, chain := range chains {
|
for _, chain := range chains {
|
||||||
// have we filled the block
|
// have we filled the block
|
||||||
if gasLimit < minGas || len(result) >= build.BlockMessageLimit {
|
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -343,63 +419,31 @@ tailLoop:
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||||
chainGasLimit := chain.gasLimit
|
|
||||||
chainMsgLimit := len(chain.msgs)
|
|
||||||
depGasLimit := int64(0)
|
|
||||||
depMsgLimit := 0
|
|
||||||
var chainDeps []*msgChain
|
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
|
||||||
chainDeps = append(chainDeps, curChain)
|
|
||||||
chainGasLimit += curChain.gasLimit
|
|
||||||
chainMsgLimit += len(curChain.msgs)
|
|
||||||
depGasLimit += curChain.gasLimit
|
|
||||||
depMsgLimit += len(curChain.msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// do the deps fit? if the deps won't fit, invalidate the chain
|
|
||||||
if depGasLimit > gasLimit || len(result)+depMsgLimit > build.BlockMessageLimit {
|
|
||||||
chain.Invalidate()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// do they fit as is? if it doesn't, trim to make it fit if possible
|
if chain.valid {
|
||||||
if chainGasLimit > gasLimit || len(result)+chainMsgLimit > build.BlockMessageLimit {
|
// chain got trimmed on the previous call to tryToAddWithDeps, can now be included
|
||||||
chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee)
|
result.tryToAddWithDeps(chain, mp, baseFee)
|
||||||
|
continue
|
||||||
if !chain.valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// include it together with all dependencies
|
|
||||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
|
||||||
curChain := chainDeps[i]
|
|
||||||
curChain.merged = true
|
|
||||||
result = append(result, curChain.msgs...)
|
|
||||||
randomCount += len(curChain.msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
chain.merged = true
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
randomCount += len(chain.msgs)
|
|
||||||
gasLimit -= chainGasLimit
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if dt := time.Since(startRandom); dt > time.Millisecond {
|
if dt := time.Since(startRandom); dt > time.Millisecond {
|
||||||
log.Infow("pack random tail chains done", "took", dt)
|
log.Infow("pack random tail chains done", "took", dt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if randomCount > 0 {
|
if len(result.msgs) != preRandomLength {
|
||||||
log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection",
|
log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection",
|
||||||
randomCount)
|
len(result.msgs)-preRandomLength)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) (*selectedMessages, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -425,10 +469,10 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
|
|
||||||
// 0b. Select all priority messages that fit in the block
|
// 0b. Select all priority messages that fit in the block
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas || len(result) > build.BlockMessageLimit {
|
if result.gasLimit < minGas || len(result.msgs) > build.BlockMessageLimit {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,9 +508,8 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
}
|
}
|
||||||
|
|
||||||
// does it fit in the block?
|
// does it fit in the block?
|
||||||
if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit {
|
if result.tryToAdd(chain) {
|
||||||
gasLimit -= chain.gasLimit
|
// there was room, we added the chain, keep going
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -486,9 +529,9 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
// have to do it multiple times to satisfy tail packing.
|
// have to do it multiple times to satisfy tail packing.
|
||||||
startTail := time.Now()
|
startTail := time.Now()
|
||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for result.gasLimit >= minGas && last < len(chains) {
|
||||||
// trim
|
// trim
|
||||||
chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee)
|
result.trimChain(chains[last], mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
@ -513,9 +556,8 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// does it fit in the bock?
|
// does it fit in the bock?
|
||||||
if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit {
|
if result.tryToAdd(chain) {
|
||||||
gasLimit -= chain.gasLimit
|
// there was room, we added the chain, keep going
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -535,7 +577,7 @@ tailLoop:
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *selectedMessages {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if dt := time.Since(start); dt > time.Millisecond {
|
if dt := time.Since(start); dt > time.Millisecond {
|
||||||
@ -543,8 +585,12 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
mpCfg := mp.getConfig()
|
mpCfg := mp.getConfig()
|
||||||
result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow)
|
result := &selectedMessages{
|
||||||
gasLimit := int64(build.BlockGasLimit)
|
msgs: make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow),
|
||||||
|
gasLimit: int64(build.BlockGasLimit),
|
||||||
|
blsLimit: cbg.MaxLength,
|
||||||
|
secpLimit: cbg.MaxLength,
|
||||||
|
}
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
|
|
||||||
// 1. Get priority actor chains
|
// 1. Get priority actor chains
|
||||||
@ -554,7 +600,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
pk, err := mp.resolveToKey(ctx, actor)
|
pk, err := mp.resolveToKey(ctx, actor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
||||||
return nil, gasLimit
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
mset, ok := pending[pk]
|
mset, ok := pending[pk]
|
||||||
@ -566,9 +612,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
chains = append(chains, next...)
|
chains = append(chains, next...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(chains) == 0 {
|
if len(chains) == 0 {
|
||||||
return nil, gasLimit
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Sort the chains
|
// 2. Sort the chains
|
||||||
@ -578,7 +623,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
|
|
||||||
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
return nil, gasLimit
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
||||||
@ -588,9 +633,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if chain.gasLimit <= gasLimit {
|
if result.tryToAdd(chain) {
|
||||||
gasLimit -= chain.gasLimit
|
// there was room, we added the chain, keep going
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,9 +644,10 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
|||||||
}
|
}
|
||||||
|
|
||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for result.gasLimit >= minGas && last < len(chains) {
|
||||||
// trim, discarding negative performing messages
|
// trim, discarding negative performing messages
|
||||||
chains[last].Trim(gasLimit, mp, baseFee)
|
|
||||||
|
result.trimChain(chains[last], mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
@ -627,9 +672,8 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// does it fit in the bock?
|
// does it fit in the bock?
|
||||||
if chain.gasLimit <= gasLimit {
|
if result.tryToAdd(chain) {
|
||||||
gasLimit -= chain.gasLimit
|
// there was room, we added the chain, keep going
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -643,7 +687,7 @@ tailLoop:
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, gasLimit
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
|
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
|
||||||
@ -811,6 +855,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
chain.gasLimit = m.Message.GasLimit
|
chain.gasLimit = m.Message.GasLimit
|
||||||
chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit)
|
chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit)
|
||||||
chain.valid = true
|
chain.valid = true
|
||||||
|
chain.sigType = m.Signature.Type
|
||||||
return chain
|
return chain
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -910,6 +955,7 @@ func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee
|
|||||||
mc.msgs = mc.msgs[:i+1]
|
mc.msgs = mc.msgs[:i+1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: if the trim above is a no-op, this (may) needlessly invalidates the next chain
|
||||||
if mc.next != nil {
|
if mc.next != nil {
|
||||||
mc.next.Invalidate()
|
mc.next.Invalidate()
|
||||||
mc.next = nil
|
mc.next = nil
|
||||||
|
@ -577,7 +577,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
|
|
||||||
expected := int(build.BlockGasLimit / gasLimit)
|
expected := int(build.BlockGasLimit / gasLimit)
|
||||||
if len(msgs) != expected {
|
if len(msgs) != expected {
|
||||||
t.Fatalf("expected %d messages, bug got %d", expected, len(msgs))
|
t.Fatalf("expected %d messages, but got %d", expected, len(msgs))
|
||||||
}
|
}
|
||||||
|
|
||||||
mGasLimit := int64(0)
|
mGasLimit := int64(0)
|
||||||
@ -978,7 +978,7 @@ func TestOptimalMessageSelection2(t *testing.T) {
|
|||||||
func TestOptimalMessageSelection3(t *testing.T) {
|
func TestOptimalMessageSelection3(t *testing.T) {
|
||||||
// this test uses 10 actors sending a block of messages to each other, with the the first
|
// this test uses 10 actors sending a block of messages to each other, with the the first
|
||||||
// actors paying higher gas premium than the subsequent actors.
|
// actors paying higher gas premium than the subsequent actors.
|
||||||
// We select with a low ticket quality; the chain depenent merging algorithm should pick
|
// We select with a low ticket quality; the chain dependent merging algorithm should pick
|
||||||
// messages from the median actor from the start
|
// messages from the median actor from the start
|
||||||
mp, tma := makeTestMpool()
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
@ -1109,11 +1109,13 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
|||||||
logging.SetLogLevel("messagepool", "error")
|
logging.SetLogLevel("messagepool", "error")
|
||||||
|
|
||||||
// 1. greedy selection
|
// 1. greedy selection
|
||||||
greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
gm, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
greedyMsgs := gm.msgs
|
||||||
|
|
||||||
totalGreedyCapacity := 0.0
|
totalGreedyCapacity := 0.0
|
||||||
totalGreedyReward := 0.0
|
totalGreedyReward := 0.0
|
||||||
totalOptimalCapacity := 0.0
|
totalOptimalCapacity := 0.0
|
||||||
|
Loading…
Reference in New Issue
Block a user