Merge pull request #7321 from filecoin-project/asr/msg-limit
Mempool msg selection should respect block message limits
This commit is contained in:
commit
690be5bf7f
@ -65,7 +65,7 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api.
|
||||
}
|
||||
|
||||
blsMsgCids = append(blsMsgCids, c)
|
||||
} else {
|
||||
} else if msg.Signature.Type == crypto.SigTypeSecp256k1 {
|
||||
c, err := filec.sm.ChainStore().PutMessage(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -74,6 +74,8 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api.
|
||||
secpkMsgCids = append(secpkMsgCids, c)
|
||||
secpkMessages = append(secpkMessages, msg)
|
||||
|
||||
} else {
|
||||
return nil, xerrors.Errorf("unknown sig type: %d", msg.Signature.Type)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,7 +121,7 @@ loop:
|
||||
|
||||
// we can't fit the current chain but there is gas to spare
|
||||
// trim it and push it down
|
||||
chain.Trim(gasLimit, mp, baseFee)
|
||||
chain.Trim(gasLimit, repubMsgLimit, mp, baseFee)
|
||||
for j := i; j < len(chains)-1; j++ {
|
||||
if chains[j].Before(chains[j+1]) {
|
||||
break
|
||||
@ -131,6 +131,10 @@ loop:
|
||||
}
|
||||
|
||||
count := 0
|
||||
if len(msgs) > repubMsgLimit {
|
||||
msgs = msgs[:repubMsgLimit]
|
||||
}
|
||||
|
||||
log.Infof("republishing %d messages", len(msgs))
|
||||
for _, m := range msgs {
|
||||
mb, err := m.Serialize()
|
||||
|
@ -7,6 +7,10 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
@ -34,9 +38,10 @@ type msgChain struct {
|
||||
merged bool
|
||||
next *msgChain
|
||||
prev *msgChain
|
||||
sigType crypto.SigType
|
||||
}
|
||||
|
||||
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
||||
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
@ -46,24 +51,156 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
|
||||
// if the ticket quality is high enough that the first block has higher probability
|
||||
// than any other block, then we don't bother with optimal selection because the
|
||||
// first block will always have higher effective performance
|
||||
var sm *selectedMessages
|
||||
var err error
|
||||
if tq > 0.84 {
|
||||
msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
||||
sm, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
||||
} else {
|
||||
msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
||||
sm, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(msgs) > build.BlockMessageLimit {
|
||||
msgs = msgs[:build.BlockMessageLimit]
|
||||
if sm == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return msgs, nil
|
||||
// one last sanity check
|
||||
if len(sm.msgs) > build.BlockMessageLimit {
|
||||
log.Errorf("message selection chose too many messages %d > %d", len(sm.msgs), build.BlockMessageLimit)
|
||||
sm.msgs = sm.msgs[:build.BlockMessageLimit]
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||
return sm.msgs, nil
|
||||
}
|
||||
|
||||
type selectedMessages struct {
|
||||
msgs []*types.SignedMessage
|
||||
gasLimit int64
|
||||
secpLimit int
|
||||
blsLimit int
|
||||
}
|
||||
|
||||
// returns false if chain can't be added due to block constraints
|
||||
func (sm *selectedMessages) tryToAdd(mc *msgChain) bool {
|
||||
l := len(mc.msgs)
|
||||
|
||||
if build.BlockMessageLimit < l+len(sm.msgs) || sm.gasLimit < mc.gasLimit {
|
||||
return false
|
||||
}
|
||||
|
||||
if mc.sigType == crypto.SigTypeBLS {
|
||||
if sm.blsLimit < l {
|
||||
return false
|
||||
}
|
||||
|
||||
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||
sm.blsLimit -= l
|
||||
sm.gasLimit -= mc.gasLimit
|
||||
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||
if sm.secpLimit < l {
|
||||
return false
|
||||
}
|
||||
|
||||
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||
sm.secpLimit -= l
|
||||
sm.gasLimit -= mc.gasLimit
|
||||
}
|
||||
|
||||
// don't add the weird sigType msg, but otherwise proceed
|
||||
return true
|
||||
}
|
||||
|
||||
// returns false if messages can't be added due to block constraints
|
||||
// will trim / invalidate chain as appropriate
|
||||
func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, baseFee types.BigInt) bool {
|
||||
// compute the dependencies that must be merged and the gas limit including deps
|
||||
chainGasLimit := mc.gasLimit
|
||||
chainMsgLimit := len(mc.msgs)
|
||||
depGasLimit := int64(0)
|
||||
depMsgLimit := 0
|
||||
smMsgLimit := 0
|
||||
|
||||
if mc.sigType == crypto.SigTypeBLS {
|
||||
smMsgLimit = sm.blsLimit
|
||||
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||
smMsgLimit = sm.secpLimit
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
|
||||
if smMsgLimit > build.BlockMessageLimit-len(sm.msgs) {
|
||||
smMsgLimit = build.BlockMessageLimit - len(sm.msgs)
|
||||
}
|
||||
|
||||
var chainDeps []*msgChain
|
||||
for curChain := mc.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||
chainDeps = append(chainDeps, curChain)
|
||||
chainGasLimit += curChain.gasLimit
|
||||
chainMsgLimit += len(curChain.msgs)
|
||||
depGasLimit += curChain.gasLimit
|
||||
depMsgLimit += len(curChain.msgs)
|
||||
}
|
||||
|
||||
// the chain doesn't fit as-is, so trim / invalidate it and return false
|
||||
if chainGasLimit > sm.gasLimit || chainMsgLimit > smMsgLimit {
|
||||
|
||||
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||
// making a decision about trimming or invalidating.
|
||||
// if the dependencies exceed the block limits, then we must invalidate the chain
|
||||
// as it can never be included.
|
||||
// Otherwise we can just trim and continue
|
||||
if depGasLimit > sm.gasLimit || depMsgLimit >= smMsgLimit {
|
||||
mc.Invalidate()
|
||||
} else {
|
||||
// dependencies fit, just trim it
|
||||
mc.Trim(sm.gasLimit-depGasLimit, smMsgLimit-depMsgLimit, mp, baseFee)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// the chain fits! include it together with all dependencies
|
||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||
curChain := chainDeps[i]
|
||||
curChain.merged = true
|
||||
sm.msgs = append(sm.msgs, curChain.msgs...)
|
||||
}
|
||||
|
||||
mc.merged = true
|
||||
|
||||
sm.msgs = append(sm.msgs, mc.msgs...)
|
||||
sm.gasLimit -= chainGasLimit
|
||||
|
||||
if mc.sigType == crypto.SigTypeBLS {
|
||||
sm.blsLimit -= chainMsgLimit
|
||||
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||
sm.secpLimit -= chainMsgLimit
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (sm *selectedMessages) trimChain(mc *msgChain, mp *MessagePool, baseFee types.BigInt) {
|
||||
msgLimit := build.BlockMessageLimit - len(sm.msgs)
|
||||
if mc.sigType == crypto.SigTypeBLS {
|
||||
if msgLimit > sm.blsLimit {
|
||||
msgLimit = sm.blsLimit
|
||||
}
|
||||
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
||||
if msgLimit > sm.secpLimit {
|
||||
msgLimit = sm.secpLimit
|
||||
}
|
||||
}
|
||||
|
||||
if mc.gasLimit > sm.gasLimit || len(mc.msgs) > msgLimit {
|
||||
mc.Trim(sm.gasLimit, msgLimit, mp, baseFee)
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) (*selectedMessages, error) {
|
||||
start := time.Now()
|
||||
|
||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||
@ -89,10 +226,10 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
|
||||
// 0b. Select all priority messages that fit in the block
|
||||
minGas := int64(gasguess.MinGas)
|
||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
|
||||
// have we filled the block?
|
||||
if gasLimit < minGas {
|
||||
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -117,19 +254,21 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// 3. Parition chains into blocks (without trimming)
|
||||
// 3. Partition chains into blocks (without trimming)
|
||||
// we use the full blockGasLimit (as opposed to the residual gas limit from the
|
||||
// priority message selection) as we have to account for what other miners are doing
|
||||
// priority message selection) as we have to account for what other block providers are doing
|
||||
nextChain := 0
|
||||
partitions := make([][]*msgChain, MaxBlocks)
|
||||
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
||||
gasLimit := int64(build.BlockGasLimit)
|
||||
msgLimit := build.BlockMessageLimit
|
||||
for nextChain < len(chains) {
|
||||
chain := chains[nextChain]
|
||||
nextChain++
|
||||
partitions[i] = append(partitions[i], chain)
|
||||
gasLimit -= chain.gasLimit
|
||||
if gasLimit < minGas {
|
||||
msgLimit -= len(chain.msgs)
|
||||
if gasLimit < minGas || msgLimit <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -158,7 +297,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
})
|
||||
|
||||
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
||||
// subject to the residual gas limit
|
||||
// subject to the residual block limits
|
||||
// When a chain is merged in, all its previous dependent chains *must* also be
|
||||
// merged in or we'll have a broken block
|
||||
startMerge := time.Now()
|
||||
@ -174,35 +313,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
continue
|
||||
}
|
||||
|
||||
// compute the dependencies that must be merged and the gas limit including deps
|
||||
chainGasLimit := chain.gasLimit
|
||||
var chainDeps []*msgChain
|
||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||
chainDeps = append(chainDeps, curChain)
|
||||
chainGasLimit += curChain.gasLimit
|
||||
}
|
||||
|
||||
// does it all fit in the block?
|
||||
if chainGasLimit <= gasLimit {
|
||||
// include it together with all dependencies
|
||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||
curChain := chainDeps[i]
|
||||
curChain.merged = true
|
||||
result = append(result, curChain.msgs...)
|
||||
}
|
||||
|
||||
chain.merged = true
|
||||
// adjust the effective pefromance for all subsequent chains
|
||||
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||
// adjust the effective performance for all subsequent chains
|
||||
if next := chain.next; next != nil && next.effPerf > 0 {
|
||||
next.effPerf += next.parentOffset
|
||||
for next = next.next; next != nil && next.effPerf > 0; next = next.next {
|
||||
next.setEffPerf()
|
||||
}
|
||||
}
|
||||
result = append(result, chain.msgs...)
|
||||
gasLimit -= chainGasLimit
|
||||
|
||||
// resort to account for already merged chains and effective performance adjustments
|
||||
// re-sort to account for already merged chains and effective performance adjustments
|
||||
// the sort *must* be stable or we end up getting negative gasPerfs pushed up.
|
||||
sort.SliceStable(chains[i+1:], func(i, j int) bool {
|
||||
return chains[i].BeforeEffective(chains[j])
|
||||
@ -211,7 +331,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
continue
|
||||
}
|
||||
|
||||
// we can't fit this chain and its dependencies because of block gasLimit -- we are
|
||||
// we can't fit this chain and its dependencies because of block limits -- we are
|
||||
// at the edge
|
||||
last = i
|
||||
break
|
||||
@ -222,18 +342,22 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
||||
|
||||
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
||||
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
||||
// Trimming invalidaates subsequent dependent chains so that they can't be selected
|
||||
// Trimming invalidates subsequent dependent chains so that they can't be selected
|
||||
// as their dependency cannot be (fully) included.
|
||||
// We do this in a loop because the blocker might have been inordinately large and
|
||||
// we might have to do it multiple times to satisfy tail packing
|
||||
startTail := time.Now()
|
||||
tailLoop:
|
||||
for gasLimit >= minGas && last < len(chains) {
|
||||
// trim if necessary
|
||||
if chains[last].gasLimit > gasLimit {
|
||||
chains[last].Trim(gasLimit, mp, baseFee)
|
||||
for result.gasLimit >= minGas && last < len(chains) {
|
||||
|
||||
if !chains[last].valid {
|
||||
last++
|
||||
continue tailLoop
|
||||
}
|
||||
|
||||
// trim if necessary
|
||||
result.trimChain(chains[last], mp, baseFee)
|
||||
|
||||
// push down if it hasn't been invalidated
|
||||
if chains[last].valid {
|
||||
for i := last; i < len(chains)-1; i++ {
|
||||
@ -245,7 +369,7 @@ tailLoop:
|
||||
}
|
||||
|
||||
// select the next (valid and fitting) chain and its dependencies for inclusion
|
||||
for i, chain := range chains[last:] {
|
||||
for _, chain := range chains[last:] {
|
||||
// has the chain been invalidated?
|
||||
if !chain.valid {
|
||||
continue
|
||||
@ -261,45 +385,10 @@ tailLoop:
|
||||
break tailLoop
|
||||
}
|
||||
|
||||
// compute the dependencies that must be merged and the gas limit including deps
|
||||
chainGasLimit := chain.gasLimit
|
||||
depGasLimit := int64(0)
|
||||
var chainDeps []*msgChain
|
||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||
chainDeps = append(chainDeps, curChain)
|
||||
chainGasLimit += curChain.gasLimit
|
||||
depGasLimit += curChain.gasLimit
|
||||
}
|
||||
|
||||
// does it all fit in the bock
|
||||
if chainGasLimit <= gasLimit {
|
||||
// include it together with all dependencies
|
||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||
curChain := chainDeps[i]
|
||||
curChain.merged = true
|
||||
result = append(result, curChain.msgs...)
|
||||
}
|
||||
|
||||
chain.merged = true
|
||||
result = append(result, chain.msgs...)
|
||||
gasLimit -= chainGasLimit
|
||||
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||
continue
|
||||
}
|
||||
|
||||
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||
// making a decision about trimming or invalidating.
|
||||
// if the dependencies exceed the gas limit, then we must invalidate the chain
|
||||
// as it can never be included.
|
||||
// Otherwise we can just trim and continue
|
||||
if depGasLimit > gasLimit {
|
||||
chain.Invalidate()
|
||||
last += i + 1
|
||||
continue tailLoop
|
||||
}
|
||||
|
||||
// dependencies fit, just trim it
|
||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
||||
last += i
|
||||
continue tailLoop
|
||||
}
|
||||
|
||||
@ -311,17 +400,17 @@ tailLoop:
|
||||
log.Infow("pack tail chains done", "took", dt)
|
||||
}
|
||||
|
||||
// if we have gasLimit to spare, pick some random (non-negative) chains to fill the block
|
||||
// we pick randomly so that we minimize the probability of duplication among all miners
|
||||
if gasLimit >= minGas {
|
||||
randomCount := 0
|
||||
// if we have room to spare, pick some random (non-negative) chains to fill the block
|
||||
// we pick randomly so that we minimize the probability of duplication among all block producers
|
||||
if result.gasLimit >= minGas && len(result.msgs) <= build.BlockMessageLimit {
|
||||
preRandomLength := len(result.msgs)
|
||||
|
||||
startRandom := time.Now()
|
||||
shuffleChains(chains)
|
||||
|
||||
for _, chain := range chains {
|
||||
// have we filled the block
|
||||
if gasLimit < minGas {
|
||||
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
||||
break
|
||||
}
|
||||
|
||||
@ -335,59 +424,31 @@ tailLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
// compute the dependencies that must be merged and the gas limit including deps
|
||||
chainGasLimit := chain.gasLimit
|
||||
depGasLimit := int64(0)
|
||||
var chainDeps []*msgChain
|
||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||
chainDeps = append(chainDeps, curChain)
|
||||
chainGasLimit += curChain.gasLimit
|
||||
depGasLimit += curChain.gasLimit
|
||||
}
|
||||
|
||||
// do the deps fit? if the deps won't fit, invalidate the chain
|
||||
if depGasLimit > gasLimit {
|
||||
chain.Invalidate()
|
||||
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
||||
continue
|
||||
}
|
||||
|
||||
// do they fit as is? if it doesn't, trim to make it fit if possible
|
||||
if chainGasLimit > gasLimit {
|
||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
||||
|
||||
if !chain.valid {
|
||||
if chain.valid {
|
||||
// chain got trimmed on the previous call to tryToAddWithDeps, can now be included
|
||||
result.tryToAddWithDeps(chain, mp, baseFee)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// include it together with all dependencies
|
||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||
curChain := chainDeps[i]
|
||||
curChain.merged = true
|
||||
result = append(result, curChain.msgs...)
|
||||
randomCount += len(curChain.msgs)
|
||||
}
|
||||
|
||||
chain.merged = true
|
||||
result = append(result, chain.msgs...)
|
||||
randomCount += len(chain.msgs)
|
||||
gasLimit -= chainGasLimit
|
||||
}
|
||||
|
||||
if dt := time.Since(startRandom); dt > time.Millisecond {
|
||||
log.Infow("pack random tail chains done", "took", dt)
|
||||
}
|
||||
|
||||
if randomCount > 0 {
|
||||
if len(result.msgs) != preRandomLength {
|
||||
log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection",
|
||||
randomCount)
|
||||
len(result.msgs)-preRandomLength)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) (*selectedMessages, error) {
|
||||
start := time.Now()
|
||||
|
||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||
@ -413,10 +474,10 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
||||
|
||||
// 0b. Select all priority messages that fit in the block
|
||||
minGas := int64(gasguess.MinGas)
|
||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
|
||||
// have we filled the block?
|
||||
if gasLimit < minGas {
|
||||
if result.gasLimit < minGas || len(result.msgs) > build.BlockMessageLimit {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -442,7 +503,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
||||
}
|
||||
|
||||
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
|
||||
// the block gas limit.
|
||||
// the block gas and message limits.
|
||||
startMerge := time.Now()
|
||||
last := len(chains)
|
||||
for i, chain := range chains {
|
||||
@ -452,13 +513,12 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
||||
}
|
||||
|
||||
// does it fit in the block?
|
||||
if chain.gasLimit <= gasLimit {
|
||||
gasLimit -= chain.gasLimit
|
||||
result = append(result, chain.msgs...)
|
||||
if result.tryToAdd(chain) {
|
||||
// there was room, we added the chain, keep going
|
||||
continue
|
||||
}
|
||||
|
||||
// we can't fit this chain because of block gasLimit -- we are at the edge
|
||||
// we can't fit this chain because of block limits -- we are at the edge
|
||||
last = i
|
||||
break
|
||||
}
|
||||
@ -474,9 +534,9 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
||||
// have to do it multiple times to satisfy tail packing.
|
||||
startTail := time.Now()
|
||||
tailLoop:
|
||||
for gasLimit >= minGas && last < len(chains) {
|
||||
for result.gasLimit >= minGas && last < len(chains) {
|
||||
// trim
|
||||
chains[last].Trim(gasLimit, mp, baseFee)
|
||||
result.trimChain(chains[last], mp, baseFee)
|
||||
|
||||
// push down if it hasn't been invalidated
|
||||
if chains[last].valid {
|
||||
@ -501,9 +561,8 @@ tailLoop:
|
||||
}
|
||||
|
||||
// does it fit in the bock?
|
||||
if chain.gasLimit <= gasLimit {
|
||||
gasLimit -= chain.gasLimit
|
||||
result = append(result, chain.msgs...)
|
||||
if result.tryToAdd(chain) {
|
||||
// there was room, we added the chain, keep going
|
||||
continue
|
||||
}
|
||||
|
||||
@ -523,7 +582,7 @@ tailLoop:
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
||||
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *selectedMessages {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
if dt := time.Since(start); dt > time.Millisecond {
|
||||
@ -531,8 +590,12 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
}
|
||||
}()
|
||||
mpCfg := mp.getConfig()
|
||||
result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow)
|
||||
gasLimit := int64(build.BlockGasLimit)
|
||||
result := &selectedMessages{
|
||||
msgs: make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow),
|
||||
gasLimit: int64(build.BlockGasLimit),
|
||||
blsLimit: cbg.MaxLength,
|
||||
secpLimit: cbg.MaxLength,
|
||||
}
|
||||
minGas := int64(gasguess.MinGas)
|
||||
|
||||
// 1. Get priority actor chains
|
||||
@ -542,7 +605,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
pk, err := mp.resolveToKey(ctx, actor)
|
||||
if err != nil {
|
||||
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
||||
return nil, gasLimit
|
||||
return result
|
||||
}
|
||||
|
||||
mset, ok := pending[pk]
|
||||
@ -554,9 +617,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
chains = append(chains, next...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(chains) == 0 {
|
||||
return nil, gasLimit
|
||||
return result
|
||||
}
|
||||
|
||||
// 2. Sort the chains
|
||||
@ -566,7 +628,7 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
|
||||
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||
return nil, gasLimit
|
||||
return result
|
||||
}
|
||||
|
||||
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
||||
@ -576,9 +638,8 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
break
|
||||
}
|
||||
|
||||
if chain.gasLimit <= gasLimit {
|
||||
gasLimit -= chain.gasLimit
|
||||
result = append(result, chain.msgs...)
|
||||
if result.tryToAdd(chain) {
|
||||
// there was room, we added the chain, keep going
|
||||
continue
|
||||
}
|
||||
|
||||
@ -588,9 +649,10 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
|
||||
}
|
||||
|
||||
tailLoop:
|
||||
for gasLimit >= minGas && last < len(chains) {
|
||||
for result.gasLimit >= minGas && last < len(chains) {
|
||||
// trim, discarding negative performing messages
|
||||
chains[last].Trim(gasLimit, mp, baseFee)
|
||||
|
||||
result.trimChain(chains[last], mp, baseFee)
|
||||
|
||||
// push down if it hasn't been invalidated
|
||||
if chains[last].valid {
|
||||
@ -615,9 +677,8 @@ tailLoop:
|
||||
}
|
||||
|
||||
// does it fit in the bock?
|
||||
if chain.gasLimit <= gasLimit {
|
||||
gasLimit -= chain.gasLimit
|
||||
result = append(result, chain.msgs...)
|
||||
if result.tryToAdd(chain) {
|
||||
// there was room, we added the chain, keep going
|
||||
continue
|
||||
}
|
||||
|
||||
@ -631,7 +692,7 @@ tailLoop:
|
||||
break
|
||||
}
|
||||
|
||||
return result, gasLimit
|
||||
return result
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
|
||||
@ -778,11 +839,16 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
||||
return nil
|
||||
}
|
||||
|
||||
// if we have more messages from this sender than can fit in a block, drop the extra ones
|
||||
if len(msgs) > build.BlockMessageLimit {
|
||||
msgs = msgs[:build.BlockMessageLimit]
|
||||
}
|
||||
|
||||
// ok, now we can construct the chains using the messages we have
|
||||
// invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged
|
||||
// and increase the gasPerf of the first chain
|
||||
// We do this in two passes:
|
||||
// - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf
|
||||
// - in the first pass we create chains that aggregate messages with non-decreasing gasPerf
|
||||
// - in the second pass we merge chains to maintain the invariant.
|
||||
var chains []*msgChain
|
||||
var curChain *msgChain
|
||||
@ -794,6 +860,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
||||
chain.gasLimit = m.Message.GasLimit
|
||||
chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit)
|
||||
chain.valid = true
|
||||
chain.sigType = m.Signature.Type
|
||||
return chain
|
||||
}
|
||||
|
||||
@ -808,7 +875,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
||||
gasLimit := curChain.gasLimit + m.Message.GasLimit
|
||||
gasPerf := mp.getGasPerf(gasReward, gasLimit)
|
||||
|
||||
// try to add the message to the current chain -- if it decreases the gasPerf, then make a
|
||||
// try to add the message to the current chain -- if it decreases the gasPerf, or then make a
|
||||
// new chain
|
||||
if gasPerf < curChain.gasPerf {
|
||||
chains = append(chains, curChain)
|
||||
@ -868,9 +935,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
|
||||
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
||||
}
|
||||
|
||||
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) {
|
||||
func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee types.BigInt) {
|
||||
i := len(mc.msgs) - 1
|
||||
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
|
||||
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0 || i >= msgLimit) {
|
||||
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
||||
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
||||
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
||||
@ -893,6 +960,7 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt)
|
||||
mc.msgs = mc.msgs[:i+1]
|
||||
}
|
||||
|
||||
// TODO: if the trim above is a no-op, this (may) needlessly invalidates the next chain
|
||||
if mc.next != nil {
|
||||
mc.next.Invalidate()
|
||||
mc.next = nil
|
||||
|
@ -13,6 +13,10 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
@ -527,7 +531,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimming(t *testing.T) {
|
||||
func TestMessageSelectionTrimmingGas(t *testing.T) {
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -577,7 +581,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
||||
|
||||
expected := int(build.BlockGasLimit / gasLimit)
|
||||
if len(msgs) != expected {
|
||||
t.Fatalf("expected %d messages, bug got %d", expected, len(msgs))
|
||||
t.Fatalf("expected %d messages, but got %d", expected, len(msgs))
|
||||
}
|
||||
|
||||
mGasLimit := int64(0)
|
||||
@ -590,6 +594,199 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
block := tma.nextBlock()
|
||||
ts := mock.TipSet(block)
|
||||
tma.applyBlock(t, block)
|
||||
|
||||
tma.setBalance(a1, 1) // in FIL
|
||||
|
||||
// create a larger than selectable chain
|
||||
for i := 0; i < build.BlockMessageLimit; i++ {
|
||||
m := makeTestMessage(w1, a1, a1, uint64(i), 300000, 100)
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := cbg.MaxLength
|
||||
if len(msgs) != expected {
|
||||
t.Fatalf("expected %d messages, but got %d", expected, len(msgs))
|
||||
}
|
||||
|
||||
mGasLimit := int64(0)
|
||||
for _, m := range msgs {
|
||||
mGasLimit += m.Message.GasLimit
|
||||
}
|
||||
if mGasLimit > build.BlockGasLimit {
|
||||
t.Fatal("selected messages gas limit exceeds block gas limit!")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a2, err := w2.WalletNew(context.Background(), types.KTBLS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
block := tma.nextBlock()
|
||||
ts := mock.TipSet(block)
|
||||
tma.applyBlock(t, block)
|
||||
|
||||
tma.setBalance(a1, 1) // in FIL
|
||||
tma.setBalance(a2, 1) // in FIL
|
||||
|
||||
// create 2 larger than selectable chains
|
||||
for i := 0; i < build.BlockMessageLimit; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 100)
|
||||
mustAdd(t, mp, m)
|
||||
// a2's messages are preferred
|
||||
m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 1000)
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mGasLimit := int64(0)
|
||||
counts := make(map[crypto.SigType]uint)
|
||||
for _, m := range msgs {
|
||||
mGasLimit += m.Message.GasLimit
|
||||
counts[m.Signature.Type]++
|
||||
}
|
||||
|
||||
if mGasLimit > build.BlockGasLimit {
|
||||
t.Fatal("selected messages gas limit exceeds block gas limit!")
|
||||
}
|
||||
|
||||
expected := build.BlockMessageLimit
|
||||
if len(msgs) != expected {
|
||||
t.Fatalf("expected %d messages, but got %d", expected, len(msgs))
|
||||
}
|
||||
|
||||
if counts[crypto.SigTypeBLS] != cbg.MaxLength {
|
||||
t.Fatalf("expected %d bls messages, but got %d", cbg.MaxLength, len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a1, err := w1.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a2, err := w2.WalletNew(context.Background(), types.KTBLS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
block := tma.nextBlock()
|
||||
ts := mock.TipSet(block)
|
||||
tma.applyBlock(t, block)
|
||||
|
||||
tma.setBalance(a1, 1) // in FIL
|
||||
tma.setBalance(a2, 1) // in FIL
|
||||
|
||||
// create 2 almost max-length chains of equal value
|
||||
i := 0
|
||||
for i = 0; i < cbg.MaxLength-1; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 100)
|
||||
mustAdd(t, mp, m)
|
||||
// a2's messages are preferred
|
||||
m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 100)
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
// a1's 8192th message is worth more than a2's
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), 300000, 1000)
|
||||
mustAdd(t, mp, m)
|
||||
|
||||
m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 100)
|
||||
mustAdd(t, mp, m)
|
||||
|
||||
i++
|
||||
|
||||
// a2's (unselectable) 8193rd message is worth SO MUCH
|
||||
m = makeTestMessage(w2, a2, a1, uint64(i), 300000, 1000000)
|
||||
mustAdd(t, mp, m)
|
||||
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mGasLimit := int64(0)
|
||||
counts := make(map[crypto.SigType]uint)
|
||||
for _, m := range msgs {
|
||||
mGasLimit += m.Message.GasLimit
|
||||
counts[m.Signature.Type]++
|
||||
}
|
||||
|
||||
if mGasLimit > build.BlockGasLimit {
|
||||
t.Fatal("selected messages gas limit exceeds block gas limit!")
|
||||
}
|
||||
|
||||
expected := build.BlockMessageLimit
|
||||
if len(msgs) != expected {
|
||||
t.Fatalf("expected %d messages, but got %d", expected, len(msgs))
|
||||
}
|
||||
|
||||
// we should have taken the secp chain
|
||||
if counts[crypto.SigTypeSecp256k1] != cbg.MaxLength {
|
||||
t.Fatalf("expected %d bls messages, but got %d", cbg.MaxLength, len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityMessageSelection(t *testing.T) {
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
@ -978,7 +1175,7 @@ func TestOptimalMessageSelection2(t *testing.T) {
|
||||
func TestOptimalMessageSelection3(t *testing.T) {
|
||||
// this test uses 10 actors sending a block of messages to each other, with the the first
|
||||
// actors paying higher gas premium than the subsequent actors.
|
||||
// We select with a low ticket quality; the chain depenent merging algorithm should pick
|
||||
// We select with a low ticket quality; the chain dependent merging algorithm should pick
|
||||
// messages from the median actor from the start
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
@ -1109,11 +1306,13 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
||||
logging.SetLogLevel("messagepool", "error")
|
||||
|
||||
// 1. greedy selection
|
||||
greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
||||
gm, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
greedyMsgs := gm.msgs
|
||||
|
||||
totalGreedyCapacity := 0.0
|
||||
totalGreedyReward := 0.0
|
||||
totalOptimalCapacity := 0.0
|
||||
|
Loading…
Reference in New Issue
Block a user