Merge pull request #3023 from filecoin-project/feat/mpool-optimimal-selection-redux

Further improvements for optimal message selection
This commit is contained in:
Jakub Sztandera 2020-08-13 15:46:51 +02:00 committed by GitHub
commit 428e315d2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 70 deletions

View File

@ -1,20 +1,29 @@
package messagepool package messagepool
import "math" import (
"math"
"sync"
)
var noWinnersProbCache []float64
var noWinnersProbOnce sync.Once
func noWinnersProb() []float64 { func noWinnersProb() []float64 {
poissPdf := func(x float64) float64 { noWinnersProbOnce.Do(func() {
const Mu = 5 poissPdf := func(x float64) float64 {
lg, _ := math.Lgamma(x + 1) const Mu = 5
result := math.Exp((math.Log(Mu) * x) - lg - Mu) lg, _ := math.Lgamma(x + 1)
return result result := math.Exp((math.Log(Mu) * x) - lg - Mu)
} return result
}
out := make([]float64, 0, MaxBlocks) out := make([]float64, 0, MaxBlocks)
for i := 0; i < MaxBlocks; i++ { for i := 0; i < MaxBlocks; i++ {
out = append(out, poissPdf(float64(i))) out = append(out, poissPdf(float64(i)))
} }
return out noWinnersProbCache = out
})
return noWinnersProbCache
} }
func binomialCoefficient(n, k float64) float64 { func binomialCoefficient(n, k float64) float64 {

View File

@ -22,15 +22,17 @@ var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
const MaxBlocks = 15 const MaxBlocks = 15
type msgChain struct { type msgChain struct {
msgs []*types.SignedMessage msgs []*types.SignedMessage
gasReward *big.Int gasReward *big.Int
gasLimit int64 gasLimit int64
gasPerf float64 gasPerf float64
effPerf float64 effPerf float64
valid bool bp float64
merged bool parentOffset float64
next *msgChain valid bool
prev *msgChain merged bool
next *msgChain
prev *msgChain
} }
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
@ -175,12 +177,24 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
for i := len(chainDeps) - 1; i >= 0; i-- { for i := len(chainDeps) - 1; i >= 0; i-- {
curChain := chainDeps[i] curChain := chainDeps[i]
curChain.merged = true curChain.merged = true
// adjust the next chain for the parent, which is being merged
if next := curChain.next; next != nil && next.effPerf > 0 {
next.effPerf += next.parentOffset
}
result = append(result, curChain.msgs...) result = append(result, curChain.msgs...)
} }
chain.merged = true chain.merged = true
if next := chain.next; next != nil && next.effPerf > 0 {
next.effPerf += next.parentOffset
}
result = append(result, chain.msgs...) result = append(result, chain.msgs...)
gasLimit -= chainGasLimit gasLimit -= chainGasLimit
// resort to account for already merged chains and effective performance adjustments
sort.Slice(chains[i+1:], func(i, j int) bool {
return chains[i].BeforeEffective(chains[j])
})
continue continue
} }
@ -810,18 +824,13 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt,
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
mc.gasLimit -= mc.msgs[i].Message.GasLimit mc.gasLimit -= mc.msgs[i].Message.GasLimit
if mc.gasLimit > 0 { if mc.gasLimit > 0 {
bp := 1.0
if mc.gasPerf != 0 { // prevent div by 0
bp = mc.effPerf / mc.gasPerf
}
mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit) mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit)
if mc.bp != 0 {
if mc.effPerf != 0 { // keep effPerf 0 if it is 0 mc.setEffPerf()
mc.effPerf = bp * mc.gasPerf
} }
} else { } else {
mc.gasPerf = 0 mc.gasPerf = 0
mc.effPerf = 0
} }
i-- i--
} }
@ -849,7 +858,19 @@ func (mc *msgChain) Invalidate() {
} }
func (mc *msgChain) SetEffectivePerf(bp float64) { func (mc *msgChain) SetEffectivePerf(bp float64) {
mc.effPerf = mc.gasPerf * bp mc.bp = bp
mc.setEffPerf()
}
func (mc *msgChain) setEffPerf() {
effPerf := mc.gasPerf * mc.bp
if effPerf > 0 && mc.prev != nil {
effPerfWithParent := (effPerf*float64(mc.gasLimit) + mc.prev.effPerf*float64(mc.prev.gasLimit)) / float64(mc.gasLimit+mc.prev.gasLimit)
mc.parentOffset = effPerf - effPerfWithParent
effPerf = effPerfWithParent
}
mc.effPerf = effPerf
} }
func (mc *msgChain) SetNullEffectivePerf() { func (mc *msgChain) SetNullEffectivePerf() {
@ -861,7 +882,8 @@ func (mc *msgChain) SetNullEffectivePerf() {
} }
func (mc *msgChain) BeforeEffective(other *msgChain) bool { func (mc *msgChain) BeforeEffective(other *msgChain) bool {
return mc.effPerf > other.effPerf || // move merged chains to the front so we can discard them earlier
return (mc.merged && !other.merged) || mc.effPerf > other.effPerf ||
(mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) || (mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) ||
(mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) (mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
} }

View File

@ -755,9 +755,9 @@ func TestOptimalMessageSelection2(t *testing.T) {
nMessages := int(5 * build.BlockGasLimit / gasLimit) nMessages := int(5 * build.BlockGasLimit / gasLimit)
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
bias := (nMessages - i) / 3 bias := (nMessages - i) / 3
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(10000+i%3+bias)) m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(200000+i%3+bias))
mustAdd(t, mp, m) mustAdd(t, mp, m)
m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias)) m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(190000+i%3+bias))
mustAdd(t, mp, m) mustAdd(t, mp, m)
} }
@ -824,8 +824,8 @@ func TestOptimalMessageSelection3(t *testing.T) {
nMessages := int(build.BlockGasLimit/gasLimit) + 1 nMessages := int(build.BlockGasLimit/gasLimit) + 1
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
for j := 0; j < nActors; j++ { for j := 0; j < nActors; j++ {
bias := (nActors-j)*nMessages + (nMessages+2-i)/(3*nActors) + i%3 premium := 500000 + 20000*(nActors-j) + (nMessages+2-i)/(3*nActors) + i%3
m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(1+bias)) m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(premium))
mustAdd(t, mp, m) mustAdd(t, mp, m)
} }
} }
@ -840,28 +840,31 @@ func TestOptimalMessageSelection3(t *testing.T) {
t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs))
} }
nextNonce := uint64(0) whoIs := func(a address.Address) int {
a := actors[len(actors)/2-1] for i, aa := range actors {
for _, m := range msgs { if a == aa {
if m.Message.From != a { return i
who := 0
for i, a := range actors {
if a == m.Message.From {
who = i
break
}
} }
t.Fatalf("expected message from last actor, but got from %d instead", who) }
return -1
}
nonces := make([]uint64, nActors)
for _, m := range msgs {
who := whoIs(m.Message.From)
if who < 3 {
t.Fatalf("got message from %dth actor", who)
} }
nextNonce := nonces[who]
if m.Message.Nonce != nextNonce { if m.Message.Nonce != nextNonce {
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
} }
nextNonce++ nonces[who]++
} }
} }
func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) { func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) (float64, float64) {
// in this test we use 100 actors and send 10 blocks of messages. // in this test we use 100 actors and send 10 blocks of messages.
// actors send with an exponentially decreasing premium. // actors send with an exponentially decreasing premium.
// a number of miners select with varying ticket quality and we compare the // a number of miners select with varying ticket quality and we compare the
@ -913,26 +916,30 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) {
mustAdd(t, mp, m) mustAdd(t, mp, m)
} }
logging.SetLogLevel("messagepool", "error")
// 1. greedy selection // 1. greedy selection
greedyMsgs, err := mp.selectMessagesGreedy(ts, ts) greedyMsgs, err := mp.selectMessagesGreedy(ts, ts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// 2. optimal selection capacityBoost := 0.0
minersRand := rng.Float64() rewardBoost := 0.0
winerProba := noWinnersProb() const runs = 1
i := 0 for i := 0; i < runs; i++ {
for ; i < MaxBlocks && minersRand > 0; i++ { // 2. optimal selection
minersRand -= winerProba[i] minersRand := rng.Float64()
} winerProba := noWinnersProb()
nMiners := i i := 0
if nMiners == 0 { for ; i < MaxBlocks && minersRand > 0; i++ {
nMiners = 1 minersRand -= winerProba[i]
} }
nMiners := i
if nMiners == 0 {
nMiners = 1
}
logging.SetLogLevel("messagepool", "error")
for i := 0; i < 1; i++ {
optMsgs := make(map[cid.Cid]*types.SignedMessage) optMsgs := make(map[cid.Cid]*types.SignedMessage)
for j := 0; j < nMiners; j++ { for j := 0; j < nMiners; j++ {
tq := rng.Float64() tq := rng.Float64()
@ -945,9 +952,12 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) {
} }
} }
boost := float64(len(optMsgs)) / float64(len(greedyMsgs))
capacityBoost += boost
t.Logf("nMiners: %d", nMiners) t.Logf("nMiners: %d", nMiners)
t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs), t.Logf("greedy capacity %d, optimal capacity %d (x %.1f )", len(greedyMsgs),
len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs))) len(optMsgs), boost)
if len(greedyMsgs) > len(optMsgs) { if len(greedyMsgs) > len(optMsgs) {
t.Fatal("greedy capacity higher than optimal capacity; wtf") t.Fatal("greedy capacity higher than optimal capacity; wtf")
} }
@ -965,20 +975,36 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) {
nMinersBig := big.NewInt(int64(nMiners)) nMinersBig := big.NewInt(int64(nMiners))
greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64()
optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64()
t.Logf("greedy reward: %.0f, optimal reward: %.0f (x%.1f)", greedyAvgReward,
optimalAvgReward, optimalAvgReward/greedyAvgReward)
if greedyReward.Cmp(optReward) > 0 { boost = optimalAvgReward / greedyAvgReward
t.Fatal("greedy reward raw higher than optimal reward; booh") rewardBoost += boost
} t.Logf("greedy reward: %.0f, optimal reward: %.0f (x %.1f )", greedyAvgReward,
optimalAvgReward, boost)
} }
capacityBoost /= runs
rewardBoost /= runs
t.Logf("Average capacity boost: %f", capacityBoost)
t.Logf("Average reward boost: %f", rewardBoost)
logging.SetLogLevel("messagepool", "info") logging.SetLogLevel("messagepool", "info")
return capacityBoost, rewardBoost
} }
func TestCompetitiveMessageSelection(t *testing.T) { func TestCompetitiveMessageSelection(t *testing.T) {
seeds := []int64{1947, 1976, 2020, 2100, 10000} var capacityBoost, rewardBoost float64
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
for _, seed := range seeds { for _, seed := range seeds {
t.Log("running competitve message selection with seed", seed) t.Log("running competitve message selection with seed", seed)
testCompetitiveMessageSelection(t, rand.New(rand.NewSource(seed))) cb, rb := testCompetitiveMessageSelection(t, rand.New(rand.NewSource(seed)))
capacityBoost += cb
rewardBoost += rb
} }
capacityBoost /= float64(len(seeds))
rewardBoost /= float64(len(seeds))
t.Logf("Average capacity boost across all seeds: %f", capacityBoost)
t.Logf("Average reward boost across all seeds: %f", rewardBoost)
} }