From ca803d99fe0947df10b64aac23f7b06962743b4a Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 12:33:17 +0300 Subject: [PATCH] nearly optimal message selection for a given ticket quality Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 319 ++++++++++++++++++++++++++-- chain/messagepool/selection_test.go | 8 +- 2 files changed, 300 insertions(+), 27 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 47923fd6f..2e44c896e 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -3,6 +3,7 @@ package messagepool import ( "context" "math/big" + "math/rand" "sort" "time" @@ -19,26 +20,283 @@ import ( var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) +const MaxBlocks = 15 + type msgChain struct { msgs []*types.SignedMessage gasReward *big.Int gasLimit int64 gasPerf float64 + effPerf float64 valid bool + merged bool next *msgChain + prev *msgChain } -func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.selectMessages(mp.curTs, ts) + // if the ticket quality is high enough that the first block has higher probability + // than any other block, then we don't bother with optimal selection because the + // first block will always have higher effective performance + if tq > 0.84 { + return mp.selectMessagesGreedy(mp.curTs, ts) + } + + return mp.selectMessagesOptimal(mp.curTs, ts, tq) } -func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { + start := time.Now() + + baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + if err != nil { + return nil, xerrors.Errorf("computing basefee: %w", err) + } + + // 0. Load messages from the targeti tipset; if it is the same as the current tipset in + // the mpoll, then this is just the pending messages + pending, err := mp.getPendingMessages(curTs, ts) + if err != nil { + return nil, err + } + + if len(pending) == 0 { + return nil, nil + } + + // defer only here so if we have no pending messages we don't spam + defer func() { + log.Infow("message selection done", "took", time.Since(start)) + }() + + // 0b. Select all priority messages that fit in the block + minGas := int64(gasguess.MinGas) + result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + + // have we filled the block? + if gasLimit < minGas { + return result, nil + } + + // 1. Create a list of dependent message chains with maximal gas reward per limit consumed + startChains := time.Now() + var chains []*msgChain + for actor, mset := range pending { + next := mp.createMessageChains(actor, mset, baseFee, ts) + chains = append(chains, next...) + } + log.Infow("create message chains done", "took", time.Since(startChains)) + + // 2. Sort the chains + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) + }) + + if len(chains) != 0 && chains[0].gasPerf < 0 { + log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf) + return nil, nil + } + + // 3. Parition chains into blocks (without trimming) + // we use the residual gas limit from the priority message selection as those message + // will be unconditionally included + residualGasLimit := gasLimit + nextChain := 0 + partitions := make([][]*msgChain, MaxBlocks) + for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { + gasLimit := residualGasLimit + for nextChain < len(chains) { + chain := chains[nextChain] + partitions[i] = append(partitions[i], chain) + nextChain++ + gasLimit -= chain.gasLimit + if gasLimit < minGas { + break + } + } + } + + // 4. Compute effective performance for each chain, based on the partition they fall into + // The effective performance is the gasPerf of the chain * block probability + // Note that we don't have to do anything special about residues that didn't fit in any + // partition because these will already have an effective perf of 0 and will be pushed + // to the end by virtue of smaller gasPerf. + blockProb := mp.blockProbabilities(tq) + for i := 0; i < MaxBlocks; i++ { + for _, chain := range partitions[i] { + chain.SetEffectivePerf(blockProb[i]) + } + } + + // 5. Resort the chains based on effective performance + sort.Slice(chains, func(i, j int) bool { + return chains[i].BeforeEffective(chains[j]) + }) + + // 6. Merge the head chains to produce the list of messages selected for inclusion + // subject to the residual gas limit + // When a chain is merged in, all its previous dependent chains *must* also be + // merged in or we'll have a broken block + startMerge := time.Now() + last := len(chains) + for i, chain := range chains { + // did we run out of performing chains? + if chain.gasPerf < 0 { + break + } + + // has it already been merged? + if chain.merged { + continue + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + } + + // does it all fit in the block? + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // we can't fit this chain and its dependencies because of block gasLimit -- we are + // at the edge + last = i + break + } + log.Infow("merge message chains done", "took", time.Since(startMerge)) + + // 7. We have reached the edge of what can fit wholesale; if we still hae available + // gasLimit to pack some more chains, then trim the last chain and push it down. + // Trimming invalidaates subsequent dependent chains so that they can't be selected + // as their dependency cannot be (fully) included. + // We do this in a loop because the blocker might have been inordinately large and + // we might have to do it multiple times to satisfy tail packing + startTail := time.Now() +tailLoop: + for gasLimit >= minGas && last < len(chains) { + // trim if necessary + if chains[last].gasLimit > gasLimit { + chains[last].Trim(gasLimit, mp, baseFee, ts, false) + } + + // push down if it hasn't been invalidated + if chains[last].valid { + for i := last; i < len(chains)-1; i++ { + if chains[i].BeforeEffective(chains[i+1]) { + break + } + chains[i], chains[i+1] = chains[i+1], chains[i] + } + } + + // select the next (valid and fitting) chain and its dependencies for inclusion + for i, chain := range chains[last:] { + // has the chain been invalidated? + if !chain.valid { + continue + } + + // has it already been merged? + if chain.merged { + continue + } + + // if gasPerf < 0 we have no more profitable chains + if chain.gasPerf < 0 { + break tailLoop + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + depGasLimit := int64(0) + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + depGasLimit += curChain.gasLimit + } + + // does it all fit in the bock + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // it doesn't all fit; now we have to take into account the dependent chains before + // making a decision about trimming or invalidating. + // if the dependencies exceed the gas limit, then we must invalidate the chain + // as it can never be included. + // Otherwise we can just trim and continue + if depGasLimit > gasLimit { + chain.Invalidate() + last += i + 1 + continue tailLoop + } + + // dependencies fit, just trim it + chain.Trim(gasLimit-depGasLimit, mp, baseFee, ts, false) + last += i + continue tailLoop + } + + // the merge loop ended after processing all the chains and we we probably have still + // gas to spare; end the loop. + break + } + log.Infow("pack tail chains done", "took", time.Since(startTail)) + + return result, nil +} + +func (mp *MessagePool) blockProbabilities(tq float64) []float64 { + // TODO FIXME fit in the actual probability distribution + // this just makes a dummy random distribution for testing purposes + bps := make([]float64, MaxBlocks) + norm := 0.0 + for i := 0; i < MaxBlocks; i++ { + p := rand.Float64() + bps[i] = p + norm += p + } + // normalize to make it a distribution + for i := 0; i < MaxBlocks; i++ { + bps[i] /= norm + } + + return bps +} + +func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -95,18 +353,18 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM startMerge := time.Now() last := len(chains) for i, chain := range chains { - // does it fit in the block? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } - // did we run out of performing chains? if chain.gasPerf < 0 { break } + // does it fit in the block? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // we can't fit this chain because of block gasLimit -- we are at the edge last = i break @@ -137,30 +395,31 @@ tailLoop: // select the next (valid and fitting) chain for inclusion for i, chain := range chains[last:] { - // has the chain been invalidated + // has the chain been invalidated? if !chain.valid { continue } - // does it fit in the bock? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } // if gasPerf < 0 we have no more profitable chains if chain.gasPerf < 0 { break tailLoop } + // does it fit in the bock? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // this chain needs to be trimmed last += i continue tailLoop } - // the merge loop ended after processing all the chains and we probably still have gas to spare - // -- mark the end. - last = len(chains) + // the merge loop ended after processing all the chains and we probably still have + // gas to spare; end the loop + break } log.Infow("pack tail chains done", "took", time.Since(startTail)) @@ -533,6 +792,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 chains[i].next = chains[i+1] } + for i := len(chains) - 1; i > 0; i-- { + chains[i].prev = chains[i-1] + } + return chains } @@ -563,16 +826,26 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, } if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } -func (mc *msgChain) invalidate() { +func (mc *msgChain) Invalidate() { mc.valid = false mc.msgs = nil if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } + +func (mc *msgChain) SetEffectivePerf(bp float64) { + mc.effPerf = mc.gasPerf * bp +} + +func (mc *msgChain) BeforeEffective(other *msgChain) bool { + return mc.effPerf > other.effPerf || + (mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) || + (mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) +} diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 177c62dc2..02205d6e9 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -403,7 +403,7 @@ func TestBasicMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -471,7 +471,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.setStateNonce(a1, 10) tma.setStateNonce(a2, 10) - msgs, err = mp.SelectMessages(ts3) + msgs, err = mp.SelectMessages(ts3, 1.0) if err != nil { t.Fatal(err) } @@ -545,7 +545,7 @@ func TestMessageSelectionTrimming(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -609,7 +609,7 @@ func TestPriorityMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) }