add priority actor message selection

This commit is contained in:
vyzo 2020-08-07 18:24:36 +03:00
parent c4251912dd
commit 56c2894c80

View File

@ -62,6 +62,15 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
log.Infof("message selection took %s", time.Since(start))
}()
// 0b. Select all priority messages that fit in the block
minGas := int64(gasguess.MinGas)
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
// have we filled the block?
if gasLimit < minGas {
return result, nil
}
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
var chains []*msgChain
for actor, mset := range pending {
@ -81,9 +90,6 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
// the block gas limit.
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
last := len(chains)
for i, chain := range chains {
// does it fit in the block?
@ -112,7 +118,7 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim
chains[last].Trim(gasLimit, mp, baseFee, ts)
chains[last].Trim(gasLimit, mp, baseFee, ts, false)
// push down if it hasn't been invalidated
if chains[last].valid {
@ -155,6 +161,86 @@ tailLoop:
return result, nil
}
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
// 1. Get priority actor chains
var chains []*msgChain
priority := mp.cfg.PriorityAddrs
for _, actor := range priority {
mset, ok := pending[actor]
if ok {
// remove actor from pending set as we are already processed these messages
delete(pending, actor)
// create chains for the priority actor
next := mp.createMessageChains(actor, mset, baseFee, ts)
chains = append(chains, next...)
}
}
// 2. Sort the chains
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
// 3. Merge chains until the block limit; we are willing to include negative performing chains
// as these are messages from our own miners
last := len(chains)
for i, chain := range chains {
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// we can't fit this chain because of block gasLimit -- we are at the edge
last = i
break
}
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim, without discarding negative performing messages
chains[last].Trim(gasLimit, mp, baseFee, ts, true)
// push down if it hasn't been invalidated
if chains[last].valid {
for i := last; i < len(chains)-1; i++ {
if chains[i].Before(chains[i+1]) {
break
}
chains[i], chains[i+1] = chains[i+1], chains[i]
}
}
// select the next (valid and fitting) chain for inclusion
for i, chain := range chains[last:] {
// has the chain been invalidated
if !chain.valid {
continue
}
// does it fit in the bock?
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// this chain needs to be trimmed
last += i
continue tailLoop
}
// the merge loop ended after processing all the chains and we probably still have gas to spare
// -- mark the end.
last = len(chains)
}
return result, gasLimit
}
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
result := make(map[address.Address]map[uint64]*types.SignedMessage)
haveCids := make(map[cid.Cid]struct{})
@ -435,9 +521,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
}
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, ts *types.TipSet) {
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, ts *types.TipSet, priority bool) {
i := len(mc.msgs) - 1
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
for i >= 0 && (mc.gasLimit > gasLimit || (!priority && mc.gasPerf < 0)) {
gasLimit -= mc.msgs[i].Message.GasLimit
gasReward := mp.getGasReward(mc.msgs[i], baseFee, ts)
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)