diff --git a/api/api_full.go b/api/api_full.go index 5a3e4c6fa..21fadbaa6 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -159,7 +159,7 @@ type FullNode interface { MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) // MpoolSelect returns a list of pending messages for inclusion in the next block - MpoolSelect(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) + MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) // MpoolPush pushes a signed message to mempool. MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index d5b7f7a97..8fa5c69af 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -97,9 +97,11 @@ type FullNodeStruct struct { SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"` SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"` - MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"` - MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"` - MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` + MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"` + MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"` + + MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"` + MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"` @@ -456,8 +458,8 @@ func (c *FullNodeStruct) MpoolSetConfig(ctx context.Context, cfg *types.MpoolCon return c.Internal.MpoolSetConfig(ctx, cfg) } -func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { - return c.Internal.MpoolSelect(ctx, tsk) +func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey, tq float64) ([]*types.SignedMessage, error) { + return c.Internal.MpoolSelect(ctx, tsk, tq) } func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { diff --git a/chain/messagepool/block_proba.go b/chain/messagepool/block_proba.go new file mode 100644 index 000000000..403507352 --- /dev/null +++ b/chain/messagepool/block_proba.go @@ -0,0 +1,71 @@ +package messagepool + +import "math" + +func noWinnersProb() []float64 { + poissPdf := func(x float64) float64 { + const Mu = 5 + lg, _ := math.Lgamma(x + 1) + result := math.Exp((math.Log(Mu) * x) - lg - Mu) + return result + } + + out := make([]float64, 0, MaxBlocks) + for i := 0; i < MaxBlocks; i++ { + out = append(out, poissPdf(float64(i))) + } + return out +} + +func binomialCoefficient(n, k float64) float64 { + if k > n { + return math.NaN() + } + r := 1.0 + for d := 1.0; d <= k; d++ { + r *= n + r /= d + n -= 1 + } + return r +} + +func (mp *MessagePool) blockProbabilities(tq float64) []float64 { + noWinners := noWinnersProb() // cache this + + p := 1 - tq + binoPdf := func(x, trials float64) float64 { + // based on https://github.com/atgjack/prob + if x > trials { + return 0 + } + if p == 0 { + if x == 0 { + return 1.0 + } + return 0.0 + } + if p == 1 { + if x == trials { + return 1.0 + } + return 0.0 + } + coef := binomialCoefficient(trials, x) + pow := math.Pow(p, x) * math.Pow(1-p, trials-x) + if math.IsInf(coef, 0) { + return 0 + } + return coef * pow + } + + out := make([]float64, 0, MaxBlocks) + for place := 0; place < MaxBlocks; place++ { + var pPlace float64 + for otherWinners, pCase := range noWinners { + pPlace += pCase * binoPdf(float64(place), float64(otherWinners+1)) + } + out = append(out, pPlace) + } + return out +} diff --git a/chain/messagepool/block_proba_test.go b/chain/messagepool/block_proba_test.go new file mode 100644 index 000000000..b39ea587e --- /dev/null +++ b/chain/messagepool/block_proba_test.go @@ -0,0 +1,15 @@ +package messagepool + +import "testing" + +func TestBlockProbability(t *testing.T) { + mp := &MessagePool{} + bp := mp.blockProbabilities(1 - 0.15) + t.Logf("%+v\n", bp) + for i := 0; i < len(bp)-1; i++ { + if bp[i] < bp[i+1] { + t.Fatalf("expected decreasing block probabilities for this quality: %d %f %f", + i, bp[i], bp[i+1]) + } + } +} diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 47923fd6f..bed084aad 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -19,26 +19,275 @@ import ( var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) +const MaxBlocks = 15 + type msgChain struct { msgs []*types.SignedMessage gasReward *big.Int gasLimit int64 gasPerf float64 + effPerf float64 valid bool + merged bool next *msgChain + prev *msgChain } -func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.selectMessages(mp.curTs, ts) + // if the ticket quality is high enough that the first block has higher probability + // than any other block, then we don't bother with optimal selection because the + // first block will always have higher effective performance + if tq > 0.84 { + return mp.selectMessagesGreedy(mp.curTs, ts) + } + + return mp.selectMessagesOptimal(mp.curTs, ts, tq) } -func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { + start := time.Now() + + baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + if err != nil { + return nil, xerrors.Errorf("computing basefee: %w", err) + } + + // 0. Load messages from the target tipset; if it is the same as the current tipset in + // the mpool, then this is just the pending messages + pending, err := mp.getPendingMessages(curTs, ts) + if err != nil { + return nil, err + } + + if len(pending) == 0 { + return nil, nil + } + + // defer only here so if we have no pending messages we don't spam + defer func() { + log.Infow("message selection done", "took", time.Since(start)) + }() + + // 0b. Select all priority messages that fit in the block + minGas := int64(gasguess.MinGas) + result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + + // have we filled the block? + if gasLimit < minGas { + return result, nil + } + + // 1. Create a list of dependent message chains with maximal gas reward per limit consumed + startChains := time.Now() + var chains []*msgChain + for actor, mset := range pending { + next := mp.createMessageChains(actor, mset, baseFee, ts) + chains = append(chains, next...) + } + if dt := time.Since(startChains); dt > time.Millisecond { + log.Infow("create message chains done", "took", dt) + } + + // 2. Sort the chains + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) + }) + + if len(chains) != 0 && chains[0].gasPerf < 0 { + log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf) + return nil, nil + } + + // 3. Parition chains into blocks (without trimming) + // we use the full blockGasLimit (as opposed to the residual gas limit from the + // priority message selection) as we have to account for what other miners are doing + nextChain := 0 + partitions := make([][]*msgChain, MaxBlocks) + for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { + gasLimit := int64(build.BlockGasLimit) + for nextChain < len(chains) { + chain := chains[nextChain] + nextChain++ + partitions[i] = append(partitions[i], chain) + gasLimit -= chain.gasLimit + if gasLimit < minGas { + break + } + } + + } + + // 4. Compute effective performance for each chain, based on the partition they fall into + // The effective performance is the gasPerf of the chain * block probability + blockProb := mp.blockProbabilities(tq) + effChains := 0 + for i := 0; i < MaxBlocks; i++ { + for _, chain := range partitions[i] { + chain.SetEffectivePerf(blockProb[i]) + } + effChains += len(partitions[i]) + } + + // nullify the effective performance of chains that don't fit in any partition + for _, chain := range chains[effChains:] { + chain.SetNullEffectivePerf() + } + + // 5. Resort the chains based on effective performance + sort.Slice(chains, func(i, j int) bool { + return chains[i].BeforeEffective(chains[j]) + }) + + // 6. Merge the head chains to produce the list of messages selected for inclusion + // subject to the residual gas limit + // When a chain is merged in, all its previous dependent chains *must* also be + // merged in or we'll have a broken block + startMerge := time.Now() + last := len(chains) + for i, chain := range chains { + // did we run out of performing chains? + if chain.gasPerf < 0 { + break + } + + // has it already been merged? + if chain.merged { + continue + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + } + + // does it all fit in the block? + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // we can't fit this chain and its dependencies because of block gasLimit -- we are + // at the edge + last = i + break + } + if dt := time.Since(startMerge); dt > time.Millisecond { + log.Infow("merge message chains done", "took", dt) + } + + // 7. We have reached the edge of what can fit wholesale; if we still hae available + // gasLimit to pack some more chains, then trim the last chain and push it down. + // Trimming invalidaates subsequent dependent chains so that they can't be selected + // as their dependency cannot be (fully) included. + // We do this in a loop because the blocker might have been inordinately large and + // we might have to do it multiple times to satisfy tail packing + startTail := time.Now() +tailLoop: + for gasLimit >= minGas && last < len(chains) { + // trim if necessary + if chains[last].gasLimit > gasLimit { + chains[last].Trim(gasLimit, mp, baseFee, ts, false) + } + + // push down if it hasn't been invalidated + if chains[last].valid { + for i := last; i < len(chains)-1; i++ { + if chains[i].BeforeEffective(chains[i+1]) { + break + } + chains[i], chains[i+1] = chains[i+1], chains[i] + } + } + + // select the next (valid and fitting) chain and its dependencies for inclusion + for i, chain := range chains[last:] { + // has the chain been invalidated? + if !chain.valid { + continue + } + + // has it already been merged? + if chain.merged { + continue + } + + // if gasPerf < 0 we have no more profitable chains + if chain.gasPerf < 0 { + break tailLoop + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + depGasLimit := int64(0) + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + depGasLimit += curChain.gasLimit + } + + // does it all fit in the bock + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // it doesn't all fit; now we have to take into account the dependent chains before + // making a decision about trimming or invalidating. + // if the dependencies exceed the gas limit, then we must invalidate the chain + // as it can never be included. + // Otherwise we can just trim and continue + if depGasLimit > gasLimit { + chain.Invalidate() + last += i + 1 + continue tailLoop + } + + // dependencies fit, just trim it + chain.Trim(gasLimit-depGasLimit, mp, baseFee, ts, false) + last += i + continue tailLoop + } + + // the merge loop ended after processing all the chains and we we probably have still + // gas to spare; end the loop. + break + } + if dt := time.Since(startTail); dt > time.Millisecond { + log.Infow("pack tail chains done", "took", dt) + } + + return result, nil +} + +func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -78,7 +327,9 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM next := mp.createMessageChains(actor, mset, baseFee, ts) chains = append(chains, next...) } - log.Infow("create message chains done", "took", time.Since(startChains)) + if dt := time.Since(startChains); dt > time.Millisecond { + log.Infow("create message chains done", "took", dt) + } // 2. Sort the chains sort.Slice(chains, func(i, j int) bool { @@ -95,23 +346,25 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM startMerge := time.Now() last := len(chains) for i, chain := range chains { - // does it fit in the block? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } - // did we run out of performing chains? if chain.gasPerf < 0 { break } + // does it fit in the block? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // we can't fit this chain because of block gasLimit -- we are at the edge last = i break } - log.Infow("merge message chains done", "took", time.Since(startMerge)) + if dt := time.Since(startMerge); dt > time.Millisecond { + log.Infow("merge message chains done", "took", dt) + } // 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit // to pack some more chains, then trim the last chain and push it down. @@ -137,32 +390,35 @@ tailLoop: // select the next (valid and fitting) chain for inclusion for i, chain := range chains[last:] { - // has the chain been invalidated + // has the chain been invalidated? if !chain.valid { continue } - // does it fit in the bock? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } // if gasPerf < 0 we have no more profitable chains if chain.gasPerf < 0 { break tailLoop } + // does it fit in the bock? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // this chain needs to be trimmed last += i continue tailLoop } - // the merge loop ended after processing all the chains and we probably still have gas to spare - // -- mark the end. - last = len(chains) + // the merge loop ended after processing all the chains and we probably still have + // gas to spare; end the loop + break + } + if dt := time.Since(startTail); dt > time.Millisecond { + log.Infow("pack tail chains done", "took", dt) } - log.Infow("pack tail chains done", "took", time.Since(startTail)) return result, nil } @@ -170,7 +426,9 @@ tailLoop: func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { start := time.Now() defer func() { - log.Infow("select priority messages done", "took", time.Since(start)) + if dt := time.Since(start); dt > time.Millisecond { + log.Infow("select priority messages done", "took", dt) + } }() result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow) @@ -258,8 +516,8 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. result := make(map[address.Address]map[uint64]*types.SignedMessage) haveCids := make(map[cid.Cid]struct{}) defer func() { - if time.Since(start) > time.Millisecond { - log.Infow("get pending messages done", "took", time.Since(start)) + if dt := time.Since(start); dt > time.Millisecond { + log.Infow("get pending messages done", "took", dt) } }() @@ -363,11 +621,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. } func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int { - gasReward := abig.Mul(msg.Message.GasPremium, types.NewInt(uint64(msg.Message.GasLimit))) - maxReward := types.BigSub(msg.Message.GasFeeCap, baseFee) - if types.BigCmp(maxReward, gasReward) < 0 { - gasReward = maxReward + maxPremium := types.BigSub(msg.Message.GasFeeCap, baseFee) + if types.BigCmp(maxPremium, msg.Message.GasPremium) < 0 { + maxPremium = msg.Message.GasPremium } + gasReward := abig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit))) return gasReward.Int } @@ -449,7 +707,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 } // check we have a sane set of messages to construct the chains - if i > 0 { + if i > skip { msgs = msgs[skip:i] } else { return nil @@ -533,6 +791,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 chains[i].next = chains[i+1] } + for i := len(chains) - 1; i > 0; i-- { + chains[i].prev = chains[i-1] + } + return chains } @@ -548,7 +810,13 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasLimit -= mc.msgs[i].Message.GasLimit if mc.gasLimit > 0 { + bp := 1.0 + if mc.effPerf != 0 { + bp = mc.effPerf / mc.gasPerf + } + mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit) + mc.effPerf = bp * mc.gasPerf } else { mc.gasPerf = 0 } @@ -563,16 +831,34 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, } if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } -func (mc *msgChain) invalidate() { +func (mc *msgChain) Invalidate() { mc.valid = false mc.msgs = nil if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } + +func (mc *msgChain) SetEffectivePerf(bp float64) { + mc.effPerf = mc.gasPerf * bp +} + +func (mc *msgChain) SetNullEffectivePerf() { + if mc.gasPerf < 0 { + mc.effPerf = mc.gasPerf + } else { + mc.effPerf = 0 + } +} + +func (mc *msgChain) BeforeEffective(other *msgChain) bool { + return mc.effPerf > other.effPerf || + (mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) || + (mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) +} diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 177c62dc2..5e93476ab 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -2,6 +2,9 @@ package messagepool import ( "context" + "math" + "math/big" + "math/rand" "testing" "github.com/filecoin-project/go-address" @@ -12,10 +15,12 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" + logging "github.com/ipfs/go-log" ) func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage { @@ -59,7 +64,7 @@ func TestMessageChains(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -69,7 +74,7 @@ func TestMessageChains(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -297,7 +302,7 @@ func TestMessageChainSkipping(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -307,7 +312,7 @@ func TestMessageChainSkipping(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -367,7 +372,7 @@ func TestBasicMessageSelection(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -377,7 +382,7 @@ func TestBasicMessageSelection(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -403,7 +408,7 @@ func TestBasicMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -471,7 +476,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.setStateNonce(a1, 10) tma.setStateNonce(a2, 10) - msgs, err = mp.SelectMessages(ts3) + msgs, err = mp.SelectMessages(ts3, 1.0) if err != nil { t.Fatal(err) } @@ -511,7 +516,7 @@ func TestMessageSelectionTrimming(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -521,7 +526,7 @@ func TestMessageSelectionTrimming(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -545,7 +550,7 @@ func TestMessageSelectionTrimming(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -574,7 +579,7 @@ func TestPriorityMessageSelection(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -584,7 +589,7 @@ func TestPriorityMessageSelection(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -609,7 +614,7 @@ func TestPriorityMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -643,3 +648,337 @@ func TestPriorityMessageSelection(t *testing.T) { nextNonce++ } } + +func TestOptimalMessageSelection1(t *testing.T) { + // this test uses just a single actor sending messages with a low tq + // the chain depenent merging algorithm should pick messages from the actor + // from the start + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + nMessages := int(10 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 0.25) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a1 { + t.Fatal("expected message from a1") + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + +func TestOptimalMessageSelection2(t *testing.T) { + // this test uses two actors sending messages to each other, with the first + // actor paying (much) higher gas premium than the second. + // We select with a low ticket quality; the chain depenent merging algorithm should pick + // messages from the second actor from the start + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + nMessages := int(5 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(10000+i%3+bias)) + mustAdd(t, mp, m) + m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 0.1) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a2 { + t.Fatal("expected message from a2") + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + +func TestOptimalMessageSelection3(t *testing.T) { + // this test uses 10 actors sending a block of messages to each other, with the the first + // actors paying higher gas premium than the subsequent actors. + // We select with a low ticket quality; the chain depenent merging algorithm should pick + // messages from the median actor from the start + mp, tma := makeTestMpool() + + nActors := 10 + // the actors + var actors []address.Address + var wallets []*wallet.Wallet + + for i := 0; i < nActors; i++ { + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a, err := w.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + actors = append(actors, a) + wallets = append(wallets, w) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + for _, a := range actors { + tma.setBalance(a, 1) // in FIL + } + + nMessages := int(build.BlockGasLimit/gasLimit) + 1 + for i := 0; i < nMessages; i++ { + for j := 0; j < nActors; j++ { + bias := (nActors-j)*nMessages + (nMessages+2-i)/(3*nActors) + i%3 + m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(1+bias)) + mustAdd(t, mp, m) + } + } + + msgs, err := mp.SelectMessages(ts, 0.1) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + a := actors[len(actors)/2-1] + for _, m := range msgs { + if m.Message.From != a { + who := 0 + for i, a := range actors { + if a == m.Message.From { + who = i + break + } + } + t.Fatalf("expected message from last actor, but got from %d instead", who) + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + +func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) { + // in this test we use 100 actors and send 10 blocks of messages. + // actors send with an exponentially decreasing premium. + // a number of miners select with varying ticket quality and we compare the + // capacity and rewards of greedy selection -vs- optimal selection + + mp, tma := makeTestMpool() + + nActors := 300 + // the actors + var actors []address.Address + var wallets []*wallet.Wallet + + for i := 0; i < nActors; i++ { + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a, err := w.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + actors = append(actors, a) + wallets = append(wallets, w) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + baseFee := types.NewInt(0) + + for _, a := range actors { + tma.setBalance(a, 1) // in FIL + } + + nMessages := 10 * int(build.BlockGasLimit/gasLimit) + t.Log("nMessages", nMessages) + nonces := make([]uint64, nActors) + for i := 0; i < nMessages; i++ { + from := rng.Intn(nActors) + to := rng.Intn(nActors) + premium := 20000*math.Exp(-3.*rand.Float64()) + 5000 + nonce := nonces[from] + nonces[from]++ + m := makeTestMessage(wallets[from], actors[from], actors[to], uint64(nonce), gasLimit, uint64(premium)) + mustAdd(t, mp, m) + } + + // 1. greedy selection + greedyMsgs, err := mp.selectMessagesGreedy(ts, ts) + if err != nil { + t.Fatal(err) + } + + // 2. optimal selection + minersRand := rng.Float64() + winerProba := noWinnersProb() + i := 0 + for ; i < MaxBlocks && minersRand > 0; i++ { + minersRand -= winerProba[i] + } + nMiners := i + if nMiners == 0 { + nMiners = 1 + } + + logging.SetLogLevel("messagepool", "error") + for i := 0; i < 1; i++ { + optMsgs := make(map[cid.Cid]*types.SignedMessage) + for j := 0; j < nMiners; j++ { + tq := rng.Float64() + msgs, err := mp.SelectMessages(ts, tq) + if err != nil { + t.Fatal(err) + } + for _, m := range msgs { + optMsgs[m.Cid()] = m + } + } + + t.Logf("nMiners: %d", nMiners) + t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs), + len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs))) + if len(greedyMsgs) > len(optMsgs) { + t.Fatal("greedy capacity higher than optimal capacity; wtf") + } + + greedyReward := big.NewInt(0) + for _, m := range greedyMsgs { + greedyReward.Add(greedyReward, mp.getGasReward(m, baseFee, ts)) + } + + optReward := big.NewInt(0) + for _, m := range optMsgs { + optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) + } + + nMinersBig := big.NewInt(int64(nMiners)) + greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() + optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() + t.Logf("greedy reward: %.0f, optimal reward: %.0f (x%.1f)", greedyAvgReward, + optimalAvgReward, optimalAvgReward/greedyAvgReward) + + if greedyReward.Cmp(optReward) > 0 { + t.Fatal("greedy reward raw higher than optimal reward; booh") + } + } + logging.SetLogLevel("messagepool", "info") +} + +func TestCompetitiveMessageSelection(t *testing.T) { + seeds := []int64{1947, 1976, 2020, 2100, 10000} + for _, seed := range seeds { + t.Log("running competitve message selection with seed", seed) + testCompetitiveMessageSelection(t, rand.New(rand.NewSource(seed))) + } +} diff --git a/chain/types/blockheader.go b/chain/types/blockheader.go index 8950fd91a..36b43c012 100644 --- a/chain/types/blockheader.go +++ b/chain/types/blockheader.go @@ -22,6 +22,16 @@ type Ticket struct { VRFProof []byte } +func (t *Ticket) Quality() float64 { + ticketHash := blake2b.Sum256(t.VRFProof) + ticketNum := BigFromBytes(ticketHash[:]).Int + ticketDenu := big.NewInt(1) + ticketDenu.Lsh(ticketDenu, 256) + tv, _ := new(big.Rat).SetFrac(ticketNum, ticketDenu).Float64() + tq := 1 - tv + return tq +} + type BeaconEntry struct { Round uint64 Data []byte diff --git a/cli/state.go b/cli/state.go index 3072d4c01..5b8a10a7a 100644 --- a/cli/state.go +++ b/cli/state.go @@ -858,7 +858,7 @@ var stateComputeStateCmd = &cli.Command{ var msgs []*types.Message if cctx.Bool("apply-mpool-messages") { - pmsgs, err := api.MpoolSelect(ctx, ts.Key()) + pmsgs, err := api.MpoolSelect(ctx, ts.Key(), 1) if err != nil { return err } diff --git a/cmd/lotus-shed/mpool.go b/cmd/lotus-shed/mpool.go index c7a2e9b39..d3660db69 100644 --- a/cmd/lotus-shed/mpool.go +++ b/cmd/lotus-shed/mpool.go @@ -20,6 +20,12 @@ var mpoolCmd = &cli.Command{ var minerSelectMsgsCmd = &cli.Command{ Name: "miner-select-msgs", + Flags: []cli.Flag{ + &cli.Float64Flag{ + Name: "ticket-quality", + Value: 1, + }, + }, Action: func(cctx *cli.Context) error { api, closer, err := lcli.GetFullNodeAPI(cctx) if err != nil { @@ -34,7 +40,7 @@ var minerSelectMsgsCmd = &cli.Command{ return err } - msgs, err := api.MpoolSelect(ctx, head.Key()) + msgs, err := api.MpoolSelect(ctx, head.Key(), cctx.Float64("ticket-quality")) if err != nil { return err } diff --git a/cmd/lotus/debug_advance.go b/cmd/lotus/debug_advance.go index 17d63b6de..72c833bb6 100644 --- a/cmd/lotus/debug_advance.go +++ b/cmd/lotus/debug_advance.go @@ -33,7 +33,7 @@ func init() { if err != nil { return err } - msgs, err := api.MpoolSelect(ctx, head.Key()) + msgs, err := api.MpoolSelect(ctx, head.Key(), 1) if err != nil { return err } diff --git a/miner/miner.go b/miner/miner.go index b8bb9e562..27d3c040d 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -391,7 +391,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, } // get pending messages early, - msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key()) + msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), ticket.Quality()) if err != nil { return nil, xerrors.Errorf("failed to select messages for block: %w", err) } diff --git a/node/builder.go b/node/builder.go index 671170e53..aa0a18626 100644 --- a/node/builder.go +++ b/node/builder.go @@ -255,6 +255,7 @@ func Online() Option { Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(dtypes.Graphsync), modules.Graphsync), + Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index caf4255f3..cd6adef6d 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -2,7 +2,6 @@ package full import ( "context" - "sync" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -13,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) type MpoolAPI struct { @@ -25,10 +25,7 @@ type MpoolAPI struct { Mpool *messagepool.MessagePool - PushLocks struct { - m map[address.Address]chan struct{} - sync.Mutex - } `name:"verymuchunique" optional:"true"` + PushLocks *dtypes.MpoolLocker } func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) { @@ -40,13 +37,13 @@ func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) e return nil } -func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { +func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQuality float64) ([]*types.SignedMessage, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.Mpool.SelectMessages(ts) + return a.Mpool.SelectMessages(ts, ticketQuality) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { @@ -117,27 +114,11 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t if err != nil { return nil, xerrors.Errorf("getting key address: %w", err) } - - a.PushLocks.Lock() - if a.PushLocks.m == nil { - a.PushLocks.m = make(map[address.Address]chan struct{}) + done, err := a.PushLocks.TakeLock(ctx, fromA) + if err != nil { + return nil, xerrors.Errorf("taking lock: %w", err) } - lk, ok := a.PushLocks.m[fromA] - if !ok { - lk = make(chan struct{}, 1) - a.PushLocks.m[msg.From] = lk - } - a.PushLocks.Unlock() - - select { - case lk <- struct{}{}: - case <-ctx.Done(): - return nil, ctx.Err() - } - - defer func() { - <-lk - }() + defer done() } if msg.Nonce != 0 { diff --git a/node/modules/dtypes/mpool.go b/node/modules/dtypes/mpool.go new file mode 100644 index 000000000..1c64449f8 --- /dev/null +++ b/node/modules/dtypes/mpool.go @@ -0,0 +1,35 @@ +package dtypes + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-address" +) + +type MpoolLocker struct { + m map[address.Address]chan struct{} + lk sync.Mutex +} + +func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(), error) { + ml.lk.Lock() + if ml.m == nil { + ml.m = make(map[address.Address]chan struct{}) + } + lk, ok := ml.m[a] + if !ok { + lk = make(chan struct{}, 1) + ml.m[a] = lk + } + ml.lk.Unlock() + + select { + case lk <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + return func() { + <-lk + }, nil +}