Merge pull request #2979 from filecoin-project/feat/mpool-optimal-seletion
nearly optimal message pool seletion
This commit is contained in:
commit
d6f9383528
@ -159,7 +159,7 @@ type FullNode interface {
|
|||||||
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
|
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
|
||||||
|
|
||||||
// MpoolSelect returns a list of pending messages for inclusion in the next block
|
// MpoolSelect returns a list of pending messages for inclusion in the next block
|
||||||
MpoolSelect(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
|
MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error)
|
||||||
|
|
||||||
// MpoolPush pushes a signed message to mempool.
|
// MpoolPush pushes a signed message to mempool.
|
||||||
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
|
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
|
||||||
|
@ -99,7 +99,9 @@ type FullNodeStruct struct {
|
|||||||
|
|
||||||
MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"`
|
MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"`
|
||||||
MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"`
|
MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"`
|
||||||
MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
|
|
||||||
|
MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"`
|
||||||
|
|
||||||
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
|
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
|
||||||
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
|
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
|
||||||
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
|
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
|
||||||
@ -456,8 +458,8 @@ func (c *FullNodeStruct) MpoolSetConfig(ctx context.Context, cfg *types.MpoolCon
|
|||||||
return c.Internal.MpoolSetConfig(ctx, cfg)
|
return c.Internal.MpoolSetConfig(ctx, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey, tq float64) ([]*types.SignedMessage, error) {
|
||||||
return c.Internal.MpoolSelect(ctx, tsk)
|
return c.Internal.MpoolSelect(ctx, tsk, tq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
||||||
|
71
chain/messagepool/block_proba.go
Normal file
71
chain/messagepool/block_proba.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import "math"
|
||||||
|
|
||||||
|
func noWinnersProb() []float64 {
|
||||||
|
poissPdf := func(x float64) float64 {
|
||||||
|
const Mu = 5
|
||||||
|
lg, _ := math.Lgamma(x + 1)
|
||||||
|
result := math.Exp((math.Log(Mu) * x) - lg - Mu)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]float64, 0, MaxBlocks)
|
||||||
|
for i := 0; i < MaxBlocks; i++ {
|
||||||
|
out = append(out, poissPdf(float64(i)))
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func binomialCoefficient(n, k float64) float64 {
|
||||||
|
if k > n {
|
||||||
|
return math.NaN()
|
||||||
|
}
|
||||||
|
r := 1.0
|
||||||
|
for d := 1.0; d <= k; d++ {
|
||||||
|
r *= n
|
||||||
|
r /= d
|
||||||
|
n -= 1
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) blockProbabilities(tq float64) []float64 {
|
||||||
|
noWinners := noWinnersProb() // cache this
|
||||||
|
|
||||||
|
p := 1 - tq
|
||||||
|
binoPdf := func(x, trials float64) float64 {
|
||||||
|
// based on https://github.com/atgjack/prob
|
||||||
|
if x > trials {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if p == 0 {
|
||||||
|
if x == 0 {
|
||||||
|
return 1.0
|
||||||
|
}
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
if p == 1 {
|
||||||
|
if x == trials {
|
||||||
|
return 1.0
|
||||||
|
}
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
coef := binomialCoefficient(trials, x)
|
||||||
|
pow := math.Pow(p, x) * math.Pow(1-p, trials-x)
|
||||||
|
if math.IsInf(coef, 0) {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return coef * pow
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]float64, 0, MaxBlocks)
|
||||||
|
for place := 0; place < MaxBlocks; place++ {
|
||||||
|
var pPlace float64
|
||||||
|
for otherWinners, pCase := range noWinners {
|
||||||
|
pPlace += pCase * binoPdf(float64(place), float64(otherWinners+1))
|
||||||
|
}
|
||||||
|
out = append(out, pPlace)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
15
chain/messagepool/block_proba_test.go
Normal file
15
chain/messagepool/block_proba_test.go
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestBlockProbability(t *testing.T) {
|
||||||
|
mp := &MessagePool{}
|
||||||
|
bp := mp.blockProbabilities(1 - 0.15)
|
||||||
|
t.Logf("%+v\n", bp)
|
||||||
|
for i := 0; i < len(bp)-1; i++ {
|
||||||
|
if bp[i] < bp[i+1] {
|
||||||
|
t.Fatalf("expected decreasing block probabilities for this quality: %d %f %f",
|
||||||
|
i, bp[i], bp[i+1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -19,26 +19,275 @@ import (
|
|||||||
|
|
||||||
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
||||||
|
|
||||||
|
const MaxBlocks = 15
|
||||||
|
|
||||||
type msgChain struct {
|
type msgChain struct {
|
||||||
msgs []*types.SignedMessage
|
msgs []*types.SignedMessage
|
||||||
gasReward *big.Int
|
gasReward *big.Int
|
||||||
gasLimit int64
|
gasLimit int64
|
||||||
gasPerf float64
|
gasPerf float64
|
||||||
|
effPerf float64
|
||||||
valid bool
|
valid bool
|
||||||
|
merged bool
|
||||||
next *msgChain
|
next *msgChain
|
||||||
|
prev *msgChain
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.selectMessages(mp.curTs, ts)
|
// if the ticket quality is high enough that the first block has higher probability
|
||||||
|
// than any other block, then we don't bother with optimal selection because the
|
||||||
|
// first block will always have higher effective performance
|
||||||
|
if tq > 0.84 {
|
||||||
|
return mp.selectMessagesGreedy(mp.curTs, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
return mp.selectMessagesOptimal(mp.curTs, ts, tq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("computing basefee: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 0. Load messages from the target tipset; if it is the same as the current tipset in
|
||||||
|
// the mpool, then this is just the pending messages
|
||||||
|
pending, err := mp.getPendingMessages(curTs, ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pending) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// defer only here so if we have no pending messages we don't spam
|
||||||
|
defer func() {
|
||||||
|
log.Infow("message selection done", "took", time.Since(start))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 0b. Select all priority messages that fit in the block
|
||||||
|
minGas := int64(gasguess.MinGas)
|
||||||
|
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
||||||
|
|
||||||
|
// have we filled the block?
|
||||||
|
if gasLimit < minGas {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
|
||||||
|
startChains := time.Now()
|
||||||
|
var chains []*msgChain
|
||||||
|
for actor, mset := range pending {
|
||||||
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||||
|
chains = append(chains, next...)
|
||||||
|
}
|
||||||
|
if dt := time.Since(startChains); dt > time.Millisecond {
|
||||||
|
log.Infow("create message chains done", "took", dt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Sort the chains
|
||||||
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
|
return chains[i].Before(chains[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Parition chains into blocks (without trimming)
|
||||||
|
// we use the full blockGasLimit (as opposed to the residual gas limit from the
|
||||||
|
// priority message selection) as we have to account for what other miners are doing
|
||||||
|
nextChain := 0
|
||||||
|
partitions := make([][]*msgChain, MaxBlocks)
|
||||||
|
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
||||||
|
gasLimit := int64(build.BlockGasLimit)
|
||||||
|
for nextChain < len(chains) {
|
||||||
|
chain := chains[nextChain]
|
||||||
|
nextChain++
|
||||||
|
partitions[i] = append(partitions[i], chain)
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
if gasLimit < minGas {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Compute effective performance for each chain, based on the partition they fall into
|
||||||
|
// The effective performance is the gasPerf of the chain * block probability
|
||||||
|
blockProb := mp.blockProbabilities(tq)
|
||||||
|
effChains := 0
|
||||||
|
for i := 0; i < MaxBlocks; i++ {
|
||||||
|
for _, chain := range partitions[i] {
|
||||||
|
chain.SetEffectivePerf(blockProb[i])
|
||||||
|
}
|
||||||
|
effChains += len(partitions[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
// nullify the effective performance of chains that don't fit in any partition
|
||||||
|
for _, chain := range chains[effChains:] {
|
||||||
|
chain.SetNullEffectivePerf()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Resort the chains based on effective performance
|
||||||
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
|
return chains[i].BeforeEffective(chains[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
||||||
|
// subject to the residual gas limit
|
||||||
|
// When a chain is merged in, all its previous dependent chains *must* also be
|
||||||
|
// merged in or we'll have a broken block
|
||||||
|
startMerge := time.Now()
|
||||||
|
last := len(chains)
|
||||||
|
for i, chain := range chains {
|
||||||
|
// did we run out of performing chains?
|
||||||
|
if chain.gasPerf < 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// has it already been merged?
|
||||||
|
if chain.merged {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
|
chainGasLimit := chain.gasLimit
|
||||||
|
var chainDeps []*msgChain
|
||||||
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
|
chainDeps = append(chainDeps, curChain)
|
||||||
|
chainGasLimit += curChain.gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// does it all fit in the block?
|
||||||
|
if chainGasLimit <= gasLimit {
|
||||||
|
// include it together with all dependencies
|
||||||
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
|
curChain := chainDeps[i]
|
||||||
|
curChain.merged = true
|
||||||
|
result = append(result, curChain.msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
chain.merged = true
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
gasLimit -= chainGasLimit
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can't fit this chain and its dependencies because of block gasLimit -- we are
|
||||||
|
// at the edge
|
||||||
|
last = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if dt := time.Since(startMerge); dt > time.Millisecond {
|
||||||
|
log.Infow("merge message chains done", "took", dt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
||||||
|
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
||||||
|
// Trimming invalidaates subsequent dependent chains so that they can't be selected
|
||||||
|
// as their dependency cannot be (fully) included.
|
||||||
|
// We do this in a loop because the blocker might have been inordinately large and
|
||||||
|
// we might have to do it multiple times to satisfy tail packing
|
||||||
|
startTail := time.Now()
|
||||||
|
tailLoop:
|
||||||
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
|
// trim if necessary
|
||||||
|
if chains[last].gasLimit > gasLimit {
|
||||||
|
chains[last].Trim(gasLimit, mp, baseFee, ts, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// push down if it hasn't been invalidated
|
||||||
|
if chains[last].valid {
|
||||||
|
for i := last; i < len(chains)-1; i++ {
|
||||||
|
if chains[i].BeforeEffective(chains[i+1]) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
chains[i], chains[i+1] = chains[i+1], chains[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// select the next (valid and fitting) chain and its dependencies for inclusion
|
||||||
|
for i, chain := range chains[last:] {
|
||||||
|
// has the chain been invalidated?
|
||||||
|
if !chain.valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// has it already been merged?
|
||||||
|
if chain.merged {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
|
if chain.gasPerf < 0 {
|
||||||
|
break tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
|
chainGasLimit := chain.gasLimit
|
||||||
|
depGasLimit := int64(0)
|
||||||
|
var chainDeps []*msgChain
|
||||||
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
|
chainDeps = append(chainDeps, curChain)
|
||||||
|
chainGasLimit += curChain.gasLimit
|
||||||
|
depGasLimit += curChain.gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// does it all fit in the bock
|
||||||
|
if chainGasLimit <= gasLimit {
|
||||||
|
// include it together with all dependencies
|
||||||
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
|
curChain := chainDeps[i]
|
||||||
|
curChain.merged = true
|
||||||
|
result = append(result, curChain.msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
chain.merged = true
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
gasLimit -= chainGasLimit
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||||
|
// making a decision about trimming or invalidating.
|
||||||
|
// if the dependencies exceed the gas limit, then we must invalidate the chain
|
||||||
|
// as it can never be included.
|
||||||
|
// Otherwise we can just trim and continue
|
||||||
|
if depGasLimit > gasLimit {
|
||||||
|
chain.Invalidate()
|
||||||
|
last += i + 1
|
||||||
|
continue tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// dependencies fit, just trim it
|
||||||
|
chain.Trim(gasLimit-depGasLimit, mp, baseFee, ts, false)
|
||||||
|
last += i
|
||||||
|
continue tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// the merge loop ended after processing all the chains and we we probably have still
|
||||||
|
// gas to spare; end the loop.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if dt := time.Since(startTail); dt > time.Millisecond {
|
||||||
|
log.Infow("pack tail chains done", "took", dt)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -78,7 +327,9 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
|
|||||||
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||||
chains = append(chains, next...)
|
chains = append(chains, next...)
|
||||||
}
|
}
|
||||||
log.Infow("create message chains done", "took", time.Since(startChains))
|
if dt := time.Since(startChains); dt > time.Millisecond {
|
||||||
|
log.Infow("create message chains done", "took", dt)
|
||||||
|
}
|
||||||
|
|
||||||
// 2. Sort the chains
|
// 2. Sort the chains
|
||||||
sort.Slice(chains, func(i, j int) bool {
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
@ -95,23 +346,25 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
|
|||||||
startMerge := time.Now()
|
startMerge := time.Now()
|
||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
// does it fit in the block?
|
|
||||||
if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 {
|
|
||||||
gasLimit -= chain.gasLimit
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// did we run out of performing chains?
|
// did we run out of performing chains?
|
||||||
if chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// does it fit in the block?
|
||||||
|
if chain.gasLimit <= gasLimit {
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// we can't fit this chain because of block gasLimit -- we are at the edge
|
// we can't fit this chain because of block gasLimit -- we are at the edge
|
||||||
last = i
|
last = i
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Infow("merge message chains done", "took", time.Since(startMerge))
|
if dt := time.Since(startMerge); dt > time.Millisecond {
|
||||||
|
log.Infow("merge message chains done", "took", dt)
|
||||||
|
}
|
||||||
|
|
||||||
// 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit
|
// 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit
|
||||||
// to pack some more chains, then trim the last chain and push it down.
|
// to pack some more chains, then trim the last chain and push it down.
|
||||||
@ -137,32 +390,35 @@ tailLoop:
|
|||||||
|
|
||||||
// select the next (valid and fitting) chain for inclusion
|
// select the next (valid and fitting) chain for inclusion
|
||||||
for i, chain := range chains[last:] {
|
for i, chain := range chains[last:] {
|
||||||
// has the chain been invalidated
|
// has the chain been invalidated?
|
||||||
if !chain.valid {
|
if !chain.valid {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// does it fit in the bock?
|
|
||||||
if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 {
|
|
||||||
gasLimit -= chain.gasLimit
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// if gasPerf < 0 we have no more profitable chains
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
if chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// does it fit in the bock?
|
||||||
|
if chain.gasLimit <= gasLimit {
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// this chain needs to be trimmed
|
// this chain needs to be trimmed
|
||||||
last += i
|
last += i
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
// the merge loop ended after processing all the chains and we probably still have gas to spare
|
// the merge loop ended after processing all the chains and we probably still have
|
||||||
// -- mark the end.
|
// gas to spare; end the loop
|
||||||
last = len(chains)
|
break
|
||||||
|
}
|
||||||
|
if dt := time.Since(startTail); dt > time.Millisecond {
|
||||||
|
log.Infow("pack tail chains done", "took", dt)
|
||||||
}
|
}
|
||||||
log.Infow("pack tail chains done", "took", time.Since(startTail))
|
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -170,7 +426,9 @@ tailLoop:
|
|||||||
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("select priority messages done", "took", time.Since(start))
|
if dt := time.Since(start); dt > time.Millisecond {
|
||||||
|
log.Infow("select priority messages done", "took", dt)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
|
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
|
||||||
@ -258,8 +516,8 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
haveCids := make(map[cid.Cid]struct{})
|
haveCids := make(map[cid.Cid]struct{})
|
||||||
defer func() {
|
defer func() {
|
||||||
if time.Since(start) > time.Millisecond {
|
if dt := time.Since(start); dt > time.Millisecond {
|
||||||
log.Infow("get pending messages done", "took", time.Since(start))
|
log.Infow("get pending messages done", "took", dt)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -363,11 +621,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int {
|
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int {
|
||||||
gasReward := abig.Mul(msg.Message.GasPremium, types.NewInt(uint64(msg.Message.GasLimit)))
|
maxPremium := types.BigSub(msg.Message.GasFeeCap, baseFee)
|
||||||
maxReward := types.BigSub(msg.Message.GasFeeCap, baseFee)
|
if types.BigCmp(maxPremium, msg.Message.GasPremium) < 0 {
|
||||||
if types.BigCmp(maxReward, gasReward) < 0 {
|
maxPremium = msg.Message.GasPremium
|
||||||
gasReward = maxReward
|
|
||||||
}
|
}
|
||||||
|
gasReward := abig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit)))
|
||||||
return gasReward.Int
|
return gasReward.Int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -449,7 +707,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check we have a sane set of messages to construct the chains
|
// check we have a sane set of messages to construct the chains
|
||||||
if i > 0 {
|
if i > skip {
|
||||||
msgs = msgs[skip:i]
|
msgs = msgs[skip:i]
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
@ -533,6 +791,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
chains[i].next = chains[i+1]
|
chains[i].next = chains[i+1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := len(chains) - 1; i > 0; i-- {
|
||||||
|
chains[i].prev = chains[i-1]
|
||||||
|
}
|
||||||
|
|
||||||
return chains
|
return chains
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,7 +810,13 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt,
|
|||||||
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
||||||
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
||||||
if mc.gasLimit > 0 {
|
if mc.gasLimit > 0 {
|
||||||
|
bp := 1.0
|
||||||
|
if mc.effPerf != 0 {
|
||||||
|
bp = mc.effPerf / mc.gasPerf
|
||||||
|
}
|
||||||
|
|
||||||
mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit)
|
mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit)
|
||||||
|
mc.effPerf = bp * mc.gasPerf
|
||||||
} else {
|
} else {
|
||||||
mc.gasPerf = 0
|
mc.gasPerf = 0
|
||||||
}
|
}
|
||||||
@ -563,16 +831,34 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if mc.next != nil {
|
if mc.next != nil {
|
||||||
mc.next.invalidate()
|
mc.next.Invalidate()
|
||||||
mc.next = nil
|
mc.next = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *msgChain) invalidate() {
|
func (mc *msgChain) Invalidate() {
|
||||||
mc.valid = false
|
mc.valid = false
|
||||||
mc.msgs = nil
|
mc.msgs = nil
|
||||||
if mc.next != nil {
|
if mc.next != nil {
|
||||||
mc.next.invalidate()
|
mc.next.Invalidate()
|
||||||
mc.next = nil
|
mc.next = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *msgChain) SetEffectivePerf(bp float64) {
|
||||||
|
mc.effPerf = mc.gasPerf * bp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *msgChain) SetNullEffectivePerf() {
|
||||||
|
if mc.gasPerf < 0 {
|
||||||
|
mc.effPerf = mc.gasPerf
|
||||||
|
} else {
|
||||||
|
mc.effPerf = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *msgChain) BeforeEffective(other *msgChain) bool {
|
||||||
|
return mc.effPerf > other.effPerf ||
|
||||||
|
(mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) ||
|
||||||
|
(mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
||||||
|
}
|
||||||
|
@ -2,6 +2,9 @@ package messagepool
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math"
|
||||||
|
"math/big"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -12,10 +15,12 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
|
|
||||||
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
|
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
|
||||||
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
|
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
|
func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
|
||||||
@ -59,7 +64,7 @@ func TestMessageChains(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -69,7 +74,7 @@ func TestMessageChains(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -297,7 +302,7 @@ func TestMessageChainSkipping(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -307,7 +312,7 @@ func TestMessageChainSkipping(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -367,7 +372,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -377,7 +382,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -403,7 +408,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -471,7 +476,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
tma.setStateNonce(a1, 10)
|
tma.setStateNonce(a1, 10)
|
||||||
tma.setStateNonce(a2, 10)
|
tma.setStateNonce(a2, 10)
|
||||||
|
|
||||||
msgs, err = mp.SelectMessages(ts3)
|
msgs, err = mp.SelectMessages(ts3, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -511,7 +516,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -521,7 +526,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -545,7 +550,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -574,7 +579,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -584,7 +589,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -609,7 +614,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -643,3 +648,337 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
nextNonce++
|
nextNonce++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOptimalMessageSelection1(t *testing.T) {
|
||||||
|
// this test uses just a single actor sending messages with a low tq
|
||||||
|
// the chain depenent merging algorithm should pick messages from the actor
|
||||||
|
// from the start
|
||||||
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
|
// the actors
|
||||||
|
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
block := mock.MkBlock(nil, 1, 1)
|
||||||
|
ts := mock.TipSet(block)
|
||||||
|
tma.applyBlock(t, block)
|
||||||
|
|
||||||
|
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
|
|
||||||
|
nMessages := int(10 * build.BlockGasLimit / gasLimit)
|
||||||
|
for i := 0; i < nMessages; i++ {
|
||||||
|
bias := (nMessages - i) / 3
|
||||||
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias))
|
||||||
|
mustAdd(t, mp, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := mp.SelectMessages(ts, 0.25)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMsgs := int(build.BlockGasLimit / gasLimit)
|
||||||
|
if len(msgs) != expectedMsgs {
|
||||||
|
t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs))
|
||||||
|
}
|
||||||
|
|
||||||
|
nextNonce := uint64(0)
|
||||||
|
for _, m := range msgs {
|
||||||
|
if m.Message.From != a1 {
|
||||||
|
t.Fatal("expected message from a1")
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.Nonce != nextNonce {
|
||||||
|
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
|
||||||
|
}
|
||||||
|
nextNonce++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOptimalMessageSelection2(t *testing.T) {
|
||||||
|
// this test uses two actors sending messages to each other, with the first
|
||||||
|
// actor paying (much) higher gas premium than the second.
|
||||||
|
// We select with a low ticket quality; the chain depenent merging algorithm should pick
|
||||||
|
// messages from the second actor from the start
|
||||||
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
|
// the actors
|
||||||
|
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
block := mock.MkBlock(nil, 1, 1)
|
||||||
|
ts := mock.TipSet(block)
|
||||||
|
tma.applyBlock(t, block)
|
||||||
|
|
||||||
|
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
|
|
||||||
|
nMessages := int(5 * build.BlockGasLimit / gasLimit)
|
||||||
|
for i := 0; i < nMessages; i++ {
|
||||||
|
bias := (nMessages - i) / 3
|
||||||
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(10000+i%3+bias))
|
||||||
|
mustAdd(t, mp, m)
|
||||||
|
m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias))
|
||||||
|
mustAdd(t, mp, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := mp.SelectMessages(ts, 0.1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMsgs := int(build.BlockGasLimit / gasLimit)
|
||||||
|
if len(msgs) != expectedMsgs {
|
||||||
|
t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs))
|
||||||
|
}
|
||||||
|
|
||||||
|
nextNonce := uint64(0)
|
||||||
|
for _, m := range msgs {
|
||||||
|
if m.Message.From != a2 {
|
||||||
|
t.Fatal("expected message from a2")
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.Nonce != nextNonce {
|
||||||
|
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
|
||||||
|
}
|
||||||
|
nextNonce++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOptimalMessageSelection3(t *testing.T) {
|
||||||
|
// this test uses 10 actors sending a block of messages to each other, with the the first
|
||||||
|
// actors paying higher gas premium than the subsequent actors.
|
||||||
|
// We select with a low ticket quality; the chain depenent merging algorithm should pick
|
||||||
|
// messages from the median actor from the start
|
||||||
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
|
nActors := 10
|
||||||
|
// the actors
|
||||||
|
var actors []address.Address
|
||||||
|
var wallets []*wallet.Wallet
|
||||||
|
|
||||||
|
for i := 0; i < nActors; i++ {
|
||||||
|
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := w.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actors = append(actors, a)
|
||||||
|
wallets = append(wallets, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
block := mock.MkBlock(nil, 1, 1)
|
||||||
|
ts := mock.TipSet(block)
|
||||||
|
tma.applyBlock(t, block)
|
||||||
|
|
||||||
|
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
|
||||||
|
|
||||||
|
for _, a := range actors {
|
||||||
|
tma.setBalance(a, 1) // in FIL
|
||||||
|
}
|
||||||
|
|
||||||
|
nMessages := int(build.BlockGasLimit/gasLimit) + 1
|
||||||
|
for i := 0; i < nMessages; i++ {
|
||||||
|
for j := 0; j < nActors; j++ {
|
||||||
|
bias := (nActors-j)*nMessages + (nMessages+2-i)/(3*nActors) + i%3
|
||||||
|
m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(1+bias))
|
||||||
|
mustAdd(t, mp, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := mp.SelectMessages(ts, 0.1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMsgs := int(build.BlockGasLimit / gasLimit)
|
||||||
|
if len(msgs) != expectedMsgs {
|
||||||
|
t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs))
|
||||||
|
}
|
||||||
|
|
||||||
|
nextNonce := uint64(0)
|
||||||
|
a := actors[len(actors)/2-1]
|
||||||
|
for _, m := range msgs {
|
||||||
|
if m.Message.From != a {
|
||||||
|
who := 0
|
||||||
|
for i, a := range actors {
|
||||||
|
if a == m.Message.From {
|
||||||
|
who = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatalf("expected message from last actor, but got from %d instead", who)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.Nonce != nextNonce {
|
||||||
|
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
|
||||||
|
}
|
||||||
|
nextNonce++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) {
|
||||||
|
// in this test we use 100 actors and send 10 blocks of messages.
|
||||||
|
// actors send with an exponentially decreasing premium.
|
||||||
|
// a number of miners select with varying ticket quality and we compare the
|
||||||
|
// capacity and rewards of greedy selection -vs- optimal selection
|
||||||
|
|
||||||
|
mp, tma := makeTestMpool()
|
||||||
|
|
||||||
|
nActors := 300
|
||||||
|
// the actors
|
||||||
|
var actors []address.Address
|
||||||
|
var wallets []*wallet.Wallet
|
||||||
|
|
||||||
|
for i := 0; i < nActors; i++ {
|
||||||
|
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := w.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actors = append(actors, a)
|
||||||
|
wallets = append(wallets, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
block := mock.MkBlock(nil, 1, 1)
|
||||||
|
ts := mock.TipSet(block)
|
||||||
|
tma.applyBlock(t, block)
|
||||||
|
|
||||||
|
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
|
||||||
|
baseFee := types.NewInt(0)
|
||||||
|
|
||||||
|
for _, a := range actors {
|
||||||
|
tma.setBalance(a, 1) // in FIL
|
||||||
|
}
|
||||||
|
|
||||||
|
nMessages := 10 * int(build.BlockGasLimit/gasLimit)
|
||||||
|
t.Log("nMessages", nMessages)
|
||||||
|
nonces := make([]uint64, nActors)
|
||||||
|
for i := 0; i < nMessages; i++ {
|
||||||
|
from := rng.Intn(nActors)
|
||||||
|
to := rng.Intn(nActors)
|
||||||
|
premium := 20000*math.Exp(-3.*rand.Float64()) + 5000
|
||||||
|
nonce := nonces[from]
|
||||||
|
nonces[from]++
|
||||||
|
m := makeTestMessage(wallets[from], actors[from], actors[to], uint64(nonce), gasLimit, uint64(premium))
|
||||||
|
mustAdd(t, mp, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. greedy selection
|
||||||
|
greedyMsgs, err := mp.selectMessagesGreedy(ts, ts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. optimal selection
|
||||||
|
minersRand := rng.Float64()
|
||||||
|
winerProba := noWinnersProb()
|
||||||
|
i := 0
|
||||||
|
for ; i < MaxBlocks && minersRand > 0; i++ {
|
||||||
|
minersRand -= winerProba[i]
|
||||||
|
}
|
||||||
|
nMiners := i
|
||||||
|
if nMiners == 0 {
|
||||||
|
nMiners = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.SetLogLevel("messagepool", "error")
|
||||||
|
for i := 0; i < 1; i++ {
|
||||||
|
optMsgs := make(map[cid.Cid]*types.SignedMessage)
|
||||||
|
for j := 0; j < nMiners; j++ {
|
||||||
|
tq := rng.Float64()
|
||||||
|
msgs, err := mp.SelectMessages(ts, tq)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _, m := range msgs {
|
||||||
|
optMsgs[m.Cid()] = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("nMiners: %d", nMiners)
|
||||||
|
t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs),
|
||||||
|
len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs)))
|
||||||
|
if len(greedyMsgs) > len(optMsgs) {
|
||||||
|
t.Fatal("greedy capacity higher than optimal capacity; wtf")
|
||||||
|
}
|
||||||
|
|
||||||
|
greedyReward := big.NewInt(0)
|
||||||
|
for _, m := range greedyMsgs {
|
||||||
|
greedyReward.Add(greedyReward, mp.getGasReward(m, baseFee, ts))
|
||||||
|
}
|
||||||
|
|
||||||
|
optReward := big.NewInt(0)
|
||||||
|
for _, m := range optMsgs {
|
||||||
|
optReward.Add(optReward, mp.getGasReward(m, baseFee, ts))
|
||||||
|
}
|
||||||
|
|
||||||
|
nMinersBig := big.NewInt(int64(nMiners))
|
||||||
|
greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64()
|
||||||
|
optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64()
|
||||||
|
t.Logf("greedy reward: %.0f, optimal reward: %.0f (x%.1f)", greedyAvgReward,
|
||||||
|
optimalAvgReward, optimalAvgReward/greedyAvgReward)
|
||||||
|
|
||||||
|
if greedyReward.Cmp(optReward) > 0 {
|
||||||
|
t.Fatal("greedy reward raw higher than optimal reward; booh")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logging.SetLogLevel("messagepool", "info")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompetitiveMessageSelection(t *testing.T) {
|
||||||
|
seeds := []int64{1947, 1976, 2020, 2100, 10000}
|
||||||
|
for _, seed := range seeds {
|
||||||
|
t.Log("running competitve message selection with seed", seed)
|
||||||
|
testCompetitiveMessageSelection(t, rand.New(rand.NewSource(seed)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -22,6 +22,16 @@ type Ticket struct {
|
|||||||
VRFProof []byte
|
VRFProof []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Ticket) Quality() float64 {
|
||||||
|
ticketHash := blake2b.Sum256(t.VRFProof)
|
||||||
|
ticketNum := BigFromBytes(ticketHash[:]).Int
|
||||||
|
ticketDenu := big.NewInt(1)
|
||||||
|
ticketDenu.Lsh(ticketDenu, 256)
|
||||||
|
tv, _ := new(big.Rat).SetFrac(ticketNum, ticketDenu).Float64()
|
||||||
|
tq := 1 - tv
|
||||||
|
return tq
|
||||||
|
}
|
||||||
|
|
||||||
type BeaconEntry struct {
|
type BeaconEntry struct {
|
||||||
Round uint64
|
Round uint64
|
||||||
Data []byte
|
Data []byte
|
||||||
|
@ -858,7 +858,7 @@ var stateComputeStateCmd = &cli.Command{
|
|||||||
|
|
||||||
var msgs []*types.Message
|
var msgs []*types.Message
|
||||||
if cctx.Bool("apply-mpool-messages") {
|
if cctx.Bool("apply-mpool-messages") {
|
||||||
pmsgs, err := api.MpoolSelect(ctx, ts.Key())
|
pmsgs, err := api.MpoolSelect(ctx, ts.Key(), 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,12 @@ var mpoolCmd = &cli.Command{
|
|||||||
|
|
||||||
var minerSelectMsgsCmd = &cli.Command{
|
var minerSelectMsgsCmd = &cli.Command{
|
||||||
Name: "miner-select-msgs",
|
Name: "miner-select-msgs",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.Float64Flag{
|
||||||
|
Name: "ticket-quality",
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -34,7 +40,7 @@ var minerSelectMsgsCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := api.MpoolSelect(ctx, head.Key())
|
msgs, err := api.MpoolSelect(ctx, head.Key(), cctx.Float64("ticket-quality"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ func init() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
msgs, err := api.MpoolSelect(ctx, head.Key())
|
msgs, err := api.MpoolSelect(ctx, head.Key(), 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -391,7 +391,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// get pending messages early,
|
// get pending messages early,
|
||||||
msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key())
|
msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), ticket.Quality())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to select messages for block: %w", err)
|
return nil, xerrors.Errorf("failed to select messages for block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -255,6 +255,7 @@ func Online() Option {
|
|||||||
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
||||||
|
|
||||||
Override(new(dtypes.Graphsync), modules.Graphsync),
|
Override(new(dtypes.Graphsync), modules.Graphsync),
|
||||||
|
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
|
||||||
|
|
||||||
Override(RunHelloKey, modules.RunHello),
|
Override(RunHelloKey, modules.RunHello),
|
||||||
Override(RunBlockSyncKey, modules.RunBlockSync),
|
Override(RunBlockSyncKey, modules.RunBlockSync),
|
||||||
|
@ -2,7 +2,6 @@ package full
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@ -13,6 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MpoolAPI struct {
|
type MpoolAPI struct {
|
||||||
@ -25,10 +25,7 @@ type MpoolAPI struct {
|
|||||||
|
|
||||||
Mpool *messagepool.MessagePool
|
Mpool *messagepool.MessagePool
|
||||||
|
|
||||||
PushLocks struct {
|
PushLocks *dtypes.MpoolLocker
|
||||||
m map[address.Address]chan struct{}
|
|
||||||
sync.Mutex
|
|
||||||
} `name:"verymuchunique" optional:"true"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
|
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
|
||||||
@ -40,13 +37,13 @@ func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQuality float64) ([]*types.SignedMessage, error) {
|
||||||
ts, err := a.Chain.GetTipSetFromKey(tsk)
|
ts, err := a.Chain.GetTipSetFromKey(tsk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.Mpool.SelectMessages(ts)
|
return a.Mpool.SelectMessages(ts, ticketQuality)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
||||||
@ -117,27 +114,11 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting key address: %w", err)
|
return nil, xerrors.Errorf("getting key address: %w", err)
|
||||||
}
|
}
|
||||||
|
done, err := a.PushLocks.TakeLock(ctx, fromA)
|
||||||
a.PushLocks.Lock()
|
if err != nil {
|
||||||
if a.PushLocks.m == nil {
|
return nil, xerrors.Errorf("taking lock: %w", err)
|
||||||
a.PushLocks.m = make(map[address.Address]chan struct{})
|
|
||||||
}
|
}
|
||||||
lk, ok := a.PushLocks.m[fromA]
|
defer done()
|
||||||
if !ok {
|
|
||||||
lk = make(chan struct{}, 1)
|
|
||||||
a.PushLocks.m[msg.From] = lk
|
|
||||||
}
|
|
||||||
a.PushLocks.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case lk <- struct{}{}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
<-lk
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.Nonce != 0 {
|
if msg.Nonce != 0 {
|
||||||
|
35
node/modules/dtypes/mpool.go
Normal file
35
node/modules/dtypes/mpool.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package dtypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MpoolLocker struct {
|
||||||
|
m map[address.Address]chan struct{}
|
||||||
|
lk sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(), error) {
|
||||||
|
ml.lk.Lock()
|
||||||
|
if ml.m == nil {
|
||||||
|
ml.m = make(map[address.Address]chan struct{})
|
||||||
|
}
|
||||||
|
lk, ok := ml.m[a]
|
||||||
|
if !ok {
|
||||||
|
lk = make(chan struct{}, 1)
|
||||||
|
ml.m[a] = lk
|
||||||
|
}
|
||||||
|
ml.lk.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case lk <- struct{}{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
return func() {
|
||||||
|
<-lk
|
||||||
|
}, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user