From ca803d99fe0947df10b64aac23f7b06962743b4a Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 12:33:17 +0300 Subject: [PATCH 01/17] nearly optimal message selection for a given ticket quality Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 319 ++++++++++++++++++++++++++-- chain/messagepool/selection_test.go | 8 +- 2 files changed, 300 insertions(+), 27 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 47923fd6f..2e44c896e 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -3,6 +3,7 @@ package messagepool import ( "context" "math/big" + "math/rand" "sort" "time" @@ -19,26 +20,283 @@ import ( var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) +const MaxBlocks = 15 + type msgChain struct { msgs []*types.SignedMessage gasReward *big.Int gasLimit int64 gasPerf float64 + effPerf float64 valid bool + merged bool next *msgChain + prev *msgChain } -func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.selectMessages(mp.curTs, ts) + // if the ticket quality is high enough that the first block has higher probability + // than any other block, then we don't bother with optimal selection because the + // first block will always have higher effective performance + if tq > 0.84 { + return mp.selectMessagesGreedy(mp.curTs, ts) + } + + return mp.selectMessagesOptimal(mp.curTs, ts, tq) } -func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { + start := time.Now() + + baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + if err != nil { + return nil, xerrors.Errorf("computing basefee: %w", err) + } + + // 0. Load messages from the targeti tipset; if it is the same as the current tipset in + // the mpoll, then this is just the pending messages + pending, err := mp.getPendingMessages(curTs, ts) + if err != nil { + return nil, err + } + + if len(pending) == 0 { + return nil, nil + } + + // defer only here so if we have no pending messages we don't spam + defer func() { + log.Infow("message selection done", "took", time.Since(start)) + }() + + // 0b. Select all priority messages that fit in the block + minGas := int64(gasguess.MinGas) + result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + + // have we filled the block? + if gasLimit < minGas { + return result, nil + } + + // 1. Create a list of dependent message chains with maximal gas reward per limit consumed + startChains := time.Now() + var chains []*msgChain + for actor, mset := range pending { + next := mp.createMessageChains(actor, mset, baseFee, ts) + chains = append(chains, next...) + } + log.Infow("create message chains done", "took", time.Since(startChains)) + + // 2. Sort the chains + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) + }) + + if len(chains) != 0 && chains[0].gasPerf < 0 { + log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf) + return nil, nil + } + + // 3. Parition chains into blocks (without trimming) + // we use the residual gas limit from the priority message selection as those message + // will be unconditionally included + residualGasLimit := gasLimit + nextChain := 0 + partitions := make([][]*msgChain, MaxBlocks) + for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { + gasLimit := residualGasLimit + for nextChain < len(chains) { + chain := chains[nextChain] + partitions[i] = append(partitions[i], chain) + nextChain++ + gasLimit -= chain.gasLimit + if gasLimit < minGas { + break + } + } + } + + // 4. Compute effective performance for each chain, based on the partition they fall into + // The effective performance is the gasPerf of the chain * block probability + // Note that we don't have to do anything special about residues that didn't fit in any + // partition because these will already have an effective perf of 0 and will be pushed + // to the end by virtue of smaller gasPerf. + blockProb := mp.blockProbabilities(tq) + for i := 0; i < MaxBlocks; i++ { + for _, chain := range partitions[i] { + chain.SetEffectivePerf(blockProb[i]) + } + } + + // 5. Resort the chains based on effective performance + sort.Slice(chains, func(i, j int) bool { + return chains[i].BeforeEffective(chains[j]) + }) + + // 6. Merge the head chains to produce the list of messages selected for inclusion + // subject to the residual gas limit + // When a chain is merged in, all its previous dependent chains *must* also be + // merged in or we'll have a broken block + startMerge := time.Now() + last := len(chains) + for i, chain := range chains { + // did we run out of performing chains? + if chain.gasPerf < 0 { + break + } + + // has it already been merged? + if chain.merged { + continue + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + } + + // does it all fit in the block? + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // we can't fit this chain and its dependencies because of block gasLimit -- we are + // at the edge + last = i + break + } + log.Infow("merge message chains done", "took", time.Since(startMerge)) + + // 7. We have reached the edge of what can fit wholesale; if we still hae available + // gasLimit to pack some more chains, then trim the last chain and push it down. + // Trimming invalidaates subsequent dependent chains so that they can't be selected + // as their dependency cannot be (fully) included. + // We do this in a loop because the blocker might have been inordinately large and + // we might have to do it multiple times to satisfy tail packing + startTail := time.Now() +tailLoop: + for gasLimit >= minGas && last < len(chains) { + // trim if necessary + if chains[last].gasLimit > gasLimit { + chains[last].Trim(gasLimit, mp, baseFee, ts, false) + } + + // push down if it hasn't been invalidated + if chains[last].valid { + for i := last; i < len(chains)-1; i++ { + if chains[i].BeforeEffective(chains[i+1]) { + break + } + chains[i], chains[i+1] = chains[i+1], chains[i] + } + } + + // select the next (valid and fitting) chain and its dependencies for inclusion + for i, chain := range chains[last:] { + // has the chain been invalidated? + if !chain.valid { + continue + } + + // has it already been merged? + if chain.merged { + continue + } + + // if gasPerf < 0 we have no more profitable chains + if chain.gasPerf < 0 { + break tailLoop + } + + // compute the dependencies that must be merged and the gas limit including deps + chainGasLimit := chain.gasLimit + depGasLimit := int64(0) + var chainDeps []*msgChain + for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { + chainDeps = append(chainDeps, curChain) + chainGasLimit += curChain.gasLimit + depGasLimit += curChain.gasLimit + } + + // does it all fit in the bock + if chainGasLimit <= gasLimit { + // include it together with all dependencies + for i := len(chainDeps) - 1; i >= 0; i-- { + curChain := chainDeps[i] + curChain.merged = true + result = append(result, curChain.msgs...) + } + + chain.merged = true + result = append(result, chain.msgs...) + gasLimit -= chainGasLimit + continue + } + + // it doesn't all fit; now we have to take into account the dependent chains before + // making a decision about trimming or invalidating. + // if the dependencies exceed the gas limit, then we must invalidate the chain + // as it can never be included. + // Otherwise we can just trim and continue + if depGasLimit > gasLimit { + chain.Invalidate() + last += i + 1 + continue tailLoop + } + + // dependencies fit, just trim it + chain.Trim(gasLimit-depGasLimit, mp, baseFee, ts, false) + last += i + continue tailLoop + } + + // the merge loop ended after processing all the chains and we we probably have still + // gas to spare; end the loop. + break + } + log.Infow("pack tail chains done", "took", time.Since(startTail)) + + return result, nil +} + +func (mp *MessagePool) blockProbabilities(tq float64) []float64 { + // TODO FIXME fit in the actual probability distribution + // this just makes a dummy random distribution for testing purposes + bps := make([]float64, MaxBlocks) + norm := 0.0 + for i := 0; i < MaxBlocks; i++ { + p := rand.Float64() + bps[i] = p + norm += p + } + // normalize to make it a distribution + for i := 0; i < MaxBlocks; i++ { + bps[i] /= norm + } + + return bps +} + +func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -95,18 +353,18 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM startMerge := time.Now() last := len(chains) for i, chain := range chains { - // does it fit in the block? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } - // did we run out of performing chains? if chain.gasPerf < 0 { break } + // does it fit in the block? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // we can't fit this chain because of block gasLimit -- we are at the edge last = i break @@ -137,30 +395,31 @@ tailLoop: // select the next (valid and fitting) chain for inclusion for i, chain := range chains[last:] { - // has the chain been invalidated + // has the chain been invalidated? if !chain.valid { continue } - // does it fit in the bock? - if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 { - gasLimit -= chain.gasLimit - result = append(result, chain.msgs...) - continue - } // if gasPerf < 0 we have no more profitable chains if chain.gasPerf < 0 { break tailLoop } + // does it fit in the bock? + if chain.gasLimit <= gasLimit { + gasLimit -= chain.gasLimit + result = append(result, chain.msgs...) + continue + } + // this chain needs to be trimmed last += i continue tailLoop } - // the merge loop ended after processing all the chains and we probably still have gas to spare - // -- mark the end. - last = len(chains) + // the merge loop ended after processing all the chains and we probably still have + // gas to spare; end the loop + break } log.Infow("pack tail chains done", "took", time.Since(startTail)) @@ -533,6 +792,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 chains[i].next = chains[i+1] } + for i := len(chains) - 1; i > 0; i-- { + chains[i].prev = chains[i-1] + } + return chains } @@ -563,16 +826,26 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, } if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } -func (mc *msgChain) invalidate() { +func (mc *msgChain) Invalidate() { mc.valid = false mc.msgs = nil if mc.next != nil { - mc.next.invalidate() + mc.next.Invalidate() mc.next = nil } } + +func (mc *msgChain) SetEffectivePerf(bp float64) { + mc.effPerf = mc.gasPerf * bp +} + +func (mc *msgChain) BeforeEffective(other *msgChain) bool { + return mc.effPerf > other.effPerf || + (mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) || + (mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) +} diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 177c62dc2..02205d6e9 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -403,7 +403,7 @@ func TestBasicMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -471,7 +471,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.setStateNonce(a1, 10) tma.setStateNonce(a2, 10) - msgs, err = mp.SelectMessages(ts3) + msgs, err = mp.SelectMessages(ts3, 1.0) if err != nil { t.Fatal(err) } @@ -545,7 +545,7 @@ func TestMessageSelectionTrimming(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } @@ -609,7 +609,7 @@ func TestPriorityMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts) + msgs, err := mp.SelectMessages(ts, 1.0) if err != nil { t.Fatal(err) } From 28fe602a9b8e0abb9f8e071c871652a59861408f Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 12:35:14 +0300 Subject: [PATCH 02/17] pass a ticket quality to mpool.SelectMessages -- temporary Signed-off-by: Jakub Sztandera --- node/impl/full/mpool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index caf4255f3..346f3e2c6 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -46,7 +46,8 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*typ return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.Mpool.SelectMessages(ts) + // TODO FIXME compute (or pass in) the actual ticket quality! + return a.Mpool.SelectMessages(ts, 1.0) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { From 8eff3a25f9f04a769e2fea2fa8d14b459b755d5e Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 12:43:44 +0300 Subject: [PATCH 03/17] use BlockGasLimit instead of residual for partitioning Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 2e44c896e..aba54cd96 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -104,13 +104,12 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 } // 3. Parition chains into blocks (without trimming) - // we use the residual gas limit from the priority message selection as those message - // will be unconditionally included - residualGasLimit := gasLimit + // we use the full blockGasLimit (as opposed to the residual gas limit from the + // priority message selection) as we have to account for what other miners are doing nextChain := 0 partitions := make([][]*msgChain, MaxBlocks) for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { - gasLimit := residualGasLimit + gasLimit := int64(build.BlockGasLimit) for nextChain < len(chains) { chain := chains[nextChain] partitions[i] = append(partitions[i], chain) From 080614098d0f4cde26684a07c53eba682253e6fd Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 12:50:10 +0300 Subject: [PATCH 04/17] fix edge case of effective performance for chains that dont fit in any partition Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index aba54cd96..6cc19b7f3 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -123,14 +123,18 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 // 4. Compute effective performance for each chain, based on the partition they fall into // The effective performance is the gasPerf of the chain * block probability - // Note that we don't have to do anything special about residues that didn't fit in any - // partition because these will already have an effective perf of 0 and will be pushed - // to the end by virtue of smaller gasPerf. blockProb := mp.blockProbabilities(tq) + effChains := 0 for i := 0; i < MaxBlocks; i++ { for _, chain := range partitions[i] { chain.SetEffectivePerf(blockProb[i]) } + effChains += len(partitions[i]) + } + + // nullify the effective performance of chains that don't fit in any partition + for _, chain := range chains[effChains:] { + chain.SetNullEffectivePerf() } // 5. Resort the chains based on effective performance @@ -843,6 +847,14 @@ func (mc *msgChain) SetEffectivePerf(bp float64) { mc.effPerf = mc.gasPerf * bp } +func (mc *msgChain) SetNullEffectivePerf() { + if mc.gasPerf < 0 { + mc.effPerf = mc.gasPerf + } else { + mc.effPerf = 0 + } +} + func (mc *msgChain) BeforeEffective(other *msgChain) bool { return mc.effPerf > other.effPerf || (mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) || From d3c8f295c2b4ecae300c5bcf236652ebea471933 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 12:24:00 +0200 Subject: [PATCH 05/17] Use real blockProbabilities function Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 6cc19b7f3..0627a4638 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -3,7 +3,6 @@ package messagepool import ( "context" "math/big" - "math/rand" "sort" "time" @@ -281,24 +280,6 @@ tailLoop: return result, nil } -func (mp *MessagePool) blockProbabilities(tq float64) []float64 { - // TODO FIXME fit in the actual probability distribution - // this just makes a dummy random distribution for testing purposes - bps := make([]float64, MaxBlocks) - norm := 0.0 - for i := 0; i < MaxBlocks; i++ { - p := rand.Float64() - bps[i] = p - norm += p - } - // normalize to make it a distribution - for i := 0; i < MaxBlocks; i++ { - bps[i] /= norm - } - - return bps -} - func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() From 2057433f48a949175ae5a27e8b29715b017dc54f Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 12:26:54 +0200 Subject: [PATCH 06/17] Add needed files Signed-off-by: Jakub Sztandera --- chain/messagepool/block_proba.go | 71 +++++++++++++++++++++++++++ chain/messagepool/block_proba_test.go | 8 +++ 2 files changed, 79 insertions(+) create mode 100644 chain/messagepool/block_proba.go create mode 100644 chain/messagepool/block_proba_test.go diff --git a/chain/messagepool/block_proba.go b/chain/messagepool/block_proba.go new file mode 100644 index 000000000..403507352 --- /dev/null +++ b/chain/messagepool/block_proba.go @@ -0,0 +1,71 @@ +package messagepool + +import "math" + +func noWinnersProb() []float64 { + poissPdf := func(x float64) float64 { + const Mu = 5 + lg, _ := math.Lgamma(x + 1) + result := math.Exp((math.Log(Mu) * x) - lg - Mu) + return result + } + + out := make([]float64, 0, MaxBlocks) + for i := 0; i < MaxBlocks; i++ { + out = append(out, poissPdf(float64(i))) + } + return out +} + +func binomialCoefficient(n, k float64) float64 { + if k > n { + return math.NaN() + } + r := 1.0 + for d := 1.0; d <= k; d++ { + r *= n + r /= d + n -= 1 + } + return r +} + +func (mp *MessagePool) blockProbabilities(tq float64) []float64 { + noWinners := noWinnersProb() // cache this + + p := 1 - tq + binoPdf := func(x, trials float64) float64 { + // based on https://github.com/atgjack/prob + if x > trials { + return 0 + } + if p == 0 { + if x == 0 { + return 1.0 + } + return 0.0 + } + if p == 1 { + if x == trials { + return 1.0 + } + return 0.0 + } + coef := binomialCoefficient(trials, x) + pow := math.Pow(p, x) * math.Pow(1-p, trials-x) + if math.IsInf(coef, 0) { + return 0 + } + return coef * pow + } + + out := make([]float64, 0, MaxBlocks) + for place := 0; place < MaxBlocks; place++ { + var pPlace float64 + for otherWinners, pCase := range noWinners { + pPlace += pCase * binoPdf(float64(place), float64(otherWinners+1)) + } + out = append(out, pPlace) + } + return out +} diff --git a/chain/messagepool/block_proba_test.go b/chain/messagepool/block_proba_test.go new file mode 100644 index 000000000..2cc2ecc2a --- /dev/null +++ b/chain/messagepool/block_proba_test.go @@ -0,0 +1,8 @@ +package messagepool + +import "testing" + +func TestBlockProbability(t *testing.T) { + mp := &MessagePool{} + t.Logf("%+v\n", mp.blockProbabilities(1-0.15)) +} From b309e80e4175685b0e6e04d16e2065e7f8b5f720 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 13:05:04 +0200 Subject: [PATCH 07/17] Wire in Ticket Quality to MpoolSelect Signed-off-by: Jakub Sztandera --- api/api_full.go | 2 +- api/apistruct/struct.go | 12 +++++++----- cli/state.go | 2 +- cmd/lotus-shed/mpool.go | 8 +++++++- cmd/lotus/debug_advance.go | 2 +- miner/miner.go | 10 +++++++++- node/impl/full/mpool.go | 4 ++-- 7 files changed, 28 insertions(+), 12 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 184805698..42c5017de 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -159,7 +159,7 @@ type FullNode interface { MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) // MpoolSelect returns a list of pending messages for inclusion in the next block - MpoolSelect(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) + MpoolSelect(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) // MpoolPush pushes a signed message to mempool. MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index a571e4564..c7f1314fd 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -97,9 +97,11 @@ type FullNodeStruct struct { SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"` SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"` - MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"` - MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"` - MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` + MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"` + MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"` + + MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"` + MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"` @@ -456,8 +458,8 @@ func (c *FullNodeStruct) MpoolSetConfig(ctx context.Context, cfg *types.MpoolCon return c.Internal.MpoolSetConfig(ctx, cfg) } -func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { - return c.Internal.MpoolSelect(ctx, tsk) +func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey, tq float64) ([]*types.SignedMessage, error) { + return c.Internal.MpoolSelect(ctx, tsk, tq) } func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { diff --git a/cli/state.go b/cli/state.go index 3072d4c01..5b8a10a7a 100644 --- a/cli/state.go +++ b/cli/state.go @@ -858,7 +858,7 @@ var stateComputeStateCmd = &cli.Command{ var msgs []*types.Message if cctx.Bool("apply-mpool-messages") { - pmsgs, err := api.MpoolSelect(ctx, ts.Key()) + pmsgs, err := api.MpoolSelect(ctx, ts.Key(), 1) if err != nil { return err } diff --git a/cmd/lotus-shed/mpool.go b/cmd/lotus-shed/mpool.go index c7a2e9b39..d3660db69 100644 --- a/cmd/lotus-shed/mpool.go +++ b/cmd/lotus-shed/mpool.go @@ -20,6 +20,12 @@ var mpoolCmd = &cli.Command{ var minerSelectMsgsCmd = &cli.Command{ Name: "miner-select-msgs", + Flags: []cli.Flag{ + &cli.Float64Flag{ + Name: "ticket-quality", + Value: 1, + }, + }, Action: func(cctx *cli.Context) error { api, closer, err := lcli.GetFullNodeAPI(cctx) if err != nil { @@ -34,7 +40,7 @@ var minerSelectMsgsCmd = &cli.Command{ return err } - msgs, err := api.MpoolSelect(ctx, head.Key()) + msgs, err := api.MpoolSelect(ctx, head.Key(), cctx.Float64("ticket-quality")) if err != nil { return err } diff --git a/cmd/lotus/debug_advance.go b/cmd/lotus/debug_advance.go index 17d63b6de..72c833bb6 100644 --- a/cmd/lotus/debug_advance.go +++ b/cmd/lotus/debug_advance.go @@ -33,7 +33,7 @@ func init() { if err != nil { return err } - msgs, err := api.MpoolSelect(ctx, head.Key()) + msgs, err := api.MpoolSelect(ctx, head.Key(), 1) if err != nil { return err } diff --git a/miner/miner.go b/miner/miner.go index b8bb9e562..077e18638 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -6,10 +6,12 @@ import ( "crypto/rand" "encoding/binary" "fmt" + "math/big" "sync" "time" "github.com/filecoin-project/lotus/chain/gen/slashfilter" + "github.com/minio/blake2b-simd" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -390,8 +392,14 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, xerrors.Errorf("failed to compute winning post proof: %w", err) } + ticketHash := blake2b.Sum256(ticket.VRFProof) + ticketNum := types.BigFromBytes(ticketHash[:]).Int + ticketDenu := big.NewInt(1) + ticketDenu.Lsh(ticketDenu, 256) + tq, _ := new(big.Rat).SetFrac(ticketNum, ticketDenu).Float64() + tq = 1 - tq // get pending messages early, - msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key()) + msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), tq) if err != nil { return nil, xerrors.Errorf("failed to select messages for block: %w", err) } diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 346f3e2c6..20ca271c0 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -40,14 +40,14 @@ func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) e return nil } -func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { +func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQuality float64) ([]*types.SignedMessage, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } // TODO FIXME compute (or pass in) the actual ticket quality! - return a.Mpool.SelectMessages(ts, 1.0) + return a.Mpool.SelectMessages(ts, ticketQuality) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { From 268d435ce0a675dd776d2d0e1b9951d71d523b5e Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 14:22:01 +0300 Subject: [PATCH 08/17] add some optimal selection tests Signed-off-by: Jakub Sztandera --- chain/messagepool/selection_test.go | 233 ++++++++++++++++++++++++++-- 1 file changed, 223 insertions(+), 10 deletions(-) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 02205d6e9..8940d3f88 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -59,7 +59,7 @@ func TestMessageChains(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ func TestMessageChains(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -297,7 +297,7 @@ func TestMessageChainSkipping(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -307,7 +307,7 @@ func TestMessageChainSkipping(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -367,7 +367,7 @@ func TestBasicMessageSelection(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -377,7 +377,7 @@ func TestBasicMessageSelection(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -511,7 +511,7 @@ func TestMessageSelectionTrimming(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -521,7 +521,7 @@ func TestMessageSelectionTrimming(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -574,7 +574,7 @@ func TestPriorityMessageSelection(t *testing.T) { t.Fatal(err) } - a1, err := w1.GenerateKey(crypto.SigTypeBLS) + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -584,7 +584,7 @@ func TestPriorityMessageSelection(t *testing.T) { t.Fatal(err) } - a2, err := w2.GenerateKey(crypto.SigTypeBLS) + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) if err != nil { t.Fatal(err) } @@ -643,3 +643,216 @@ func TestPriorityMessageSelection(t *testing.T) { nextNonce++ } } + +func TestOptimalMessageSelection1(t *testing.T) { + // this test uses just a single actor sending messages with a low tq + // the chain depenent merging algorithm should pick messages from the actor + // from the start + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + nMessages := int(10 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 0.25) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a1 { + t.Fatal("expected message from a1") + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + +func TestOptimalMessageSelection2(t *testing.T) { + // this test uses two actors sending messages to each other, with the first + // actor paying (much) higher gas premium than the second. + // We select with a low ticket quality; the chain depenent merging algorithm should pick + // messages from the second actor from the start + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + nMessages := int(5 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(10000+i%3+bias)) + mustAdd(t, mp, m) + m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 0.1) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a2 { + t.Fatal("expected message from a2") + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + +func TestOptimalMessageSelection3(t *testing.T) { + // this test uses 10 actors sending a block of messages to each other, with the the first + // actors paying higher gas premium than the subsequent actors. + // We select with a low ticket quality; the chain depenent merging algorithm should pick + // messages from the median actor from the start + mp, tma := makeTestMpool() + + nActors := 10 + // the actors + var actors []address.Address + var wallets []*wallet.Wallet + + for i := 0; i < nActors; i++ { + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a, err := w.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + actors = append(actors, a) + wallets = append(wallets, w) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + + for _, a := range actors { + tma.setBalance(a, 1) // in FIL + tma.setBalance(a, 1) // in FIL + } + + nMessages := int(build.BlockGasLimit/gasLimit) + 1 + for i := 0; i < nMessages; i++ { + for j := 0; j < nActors; j++ { + bias := (nActors-j)*nMessages + (nMessages+2-i)/(3*nActors) + i%3 + m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(1+bias)) + mustAdd(t, mp, m) + } + } + + msgs, err := mp.SelectMessages(ts, 0.1) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages, but got %d", expectedMsgs, len(msgs)) + } + + nextNonce := uint64(0) + a := actors[len(actors)/2-1] + for _, m := range msgs { + if m.Message.From != a { + who := 0 + for i, a := range actors { + if a == m.Message.From { + who = i + break + } + } + t.Fatalf("expected message from last actor, but got from %d instead", who) + } + + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} From 0091f3a9cea610314bd6936915c9b5551b013b47 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 15:05:55 +0300 Subject: [PATCH 09/17] add competitive selection test Signed-off-by: Jakub Sztandera --- chain/messagepool/selection_test.go | 107 +++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 8940d3f88..7f58ab074 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -2,6 +2,8 @@ package messagepool import ( "context" + "math/big" + "math/rand" "testing" "github.com/filecoin-project/go-address" @@ -12,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" _ "github.com/filecoin-project/lotus/lib/sigs/bls" @@ -814,7 +817,6 @@ func TestOptimalMessageSelection3(t *testing.T) { for _, a := range actors { tma.setBalance(a, 1) // in FIL - tma.setBalance(a, 1) // in FIL } nMessages := int(build.BlockGasLimit/gasLimit) + 1 @@ -856,3 +858,106 @@ func TestOptimalMessageSelection3(t *testing.T) { nextNonce++ } } + +func testCompetitiveMessageSelection(t *testing.T) { + // in this test we use 100 actors and send 10 blocks of messages. + // actors send with an exponentially decreasing premium. + // a number of miners select with varying ticket quality and we compare the + // capacity and rewards of greedy selection -vs- optimal selection + + mp, tma := makeTestMpool() + + nActors := 100 + // the actors + var actors []address.Address + var wallets []*wallet.Wallet + + for i := 0; i < nActors; i++ { + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a, err := w.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + actors = append(actors, a) + wallets = append(wallets, w) + } + + block := mock.MkBlock(nil, 1, 1) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}] + baseFee := types.NewInt(0) + + for _, a := range actors { + tma.setBalance(a, 1) // in FIL + } + + nMessages := 10 * int(build.BlockGasLimit/gasLimit) + nonces := make([]uint64, nActors) + for i := 0; i < nMessages; i++ { + from := rand.Intn(nActors) + to := rand.Intn(nActors) + premium := 1 + rand.Intn(1000) + nonce := nonces[from] + nonces[from]++ + m := makeTestMessage(wallets[from], actors[from], actors[to], uint64(nonce), gasLimit, uint64(premium)) + mustAdd(t, mp, m) + } + + // 1. greedy selection + greedyMsgs, err := mp.selectMessagesGreedy(ts, ts) + if err != nil { + t.Fatal(err) + } + + // 2. optimal selection + nMiners := 10 + optMsgs := make(map[cid.Cid]*types.SignedMessage) + for i := 0; i < nMiners; i++ { + tq := rand.Float64() + msgs, err := mp.SelectMessages(ts, tq) + if err != nil { + t.Fatal(err) + } + for _, m := range msgs { + optMsgs[m.Cid()] = m + } + } + + t.Log("greedy capacity", len(greedyMsgs)) + t.Log("optimal capacity", len(optMsgs)) + if len(greedyMsgs) > len(optMsgs) { + t.Fatal("greedy capacity higher than optimal capacity; wtf") + } + + greedyReward := big.NewInt(0) + for _, m := range greedyMsgs { + greedyReward.Add(greedyReward, mp.getGasReward(m, baseFee, ts)) + } + + optReward := big.NewInt(0) + for _, m := range optMsgs { + optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) + } + + t.Log("greedy reward", greedyReward) + t.Log("optimal reward", optReward) + if greedyReward.Cmp(optReward) > 0 { + t.Fatal("greedy reward higher than optimal reward; booh") + } +} + +func TestCompetitiveMessageSelection(t *testing.T) { + seeds := []int64{1947, 1976, 2020, 2100, 10000} + for _, seed := range seeds { + t.Log("running competitve message selection with seed", seed) + rand.Seed(seed) + testCompetitiveMessageSelection(t) + } +} From 4da3aedacbb90da690189db498cacf122c085caf Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 14:59:27 +0200 Subject: [PATCH 10/17] Work on competitve test a bit Signed-off-by: Jakub Sztandera --- chain/messagepool/selection_test.go | 44 ++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 7f58ab074..90ac0e734 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -2,6 +2,7 @@ package messagepool import ( "context" + "math" "math/big" "math/rand" "testing" @@ -859,7 +860,7 @@ func TestOptimalMessageSelection3(t *testing.T) { } } -func testCompetitiveMessageSelection(t *testing.T) { +func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) { // in this test we use 100 actors and send 10 blocks of messages. // actors send with an exponentially decreasing premium. // a number of miners select with varying ticket quality and we compare the @@ -867,7 +868,7 @@ func testCompetitiveMessageSelection(t *testing.T) { mp, tma := makeTestMpool() - nActors := 100 + nActors := 300 // the actors var actors []address.Address var wallets []*wallet.Wallet @@ -899,11 +900,12 @@ func testCompetitiveMessageSelection(t *testing.T) { } nMessages := 10 * int(build.BlockGasLimit/gasLimit) + t.Log("nMessages", nMessages) nonces := make([]uint64, nActors) for i := 0; i < nMessages; i++ { - from := rand.Intn(nActors) - to := rand.Intn(nActors) - premium := 1 + rand.Intn(1000) + from := rng.Intn(nActors) + to := rng.Intn(nActors) + premium := 20000*math.Exp(-3.*rand.Float64()) + 5000 nonce := nonces[from] nonces[from]++ m := makeTestMessage(wallets[from], actors[from], actors[to], uint64(nonce), gasLimit, uint64(premium)) @@ -917,10 +919,20 @@ func testCompetitiveMessageSelection(t *testing.T) { } // 2. optimal selection - nMiners := 10 + minersRand := rng.Float64() + winerProba := noWinnersProb() + i := 0 + for ; i < MaxBlocks && minersRand > 0; i++ { + minersRand -= winerProba[i] + } + nMiners := i + if nMiners == 0 { + nMiners = 1 + } + optMsgs := make(map[cid.Cid]*types.SignedMessage) for i := 0; i < nMiners; i++ { - tq := rand.Float64() + tq := rng.Float64() msgs, err := mp.SelectMessages(ts, tq) if err != nil { t.Fatal(err) @@ -930,8 +942,9 @@ func testCompetitiveMessageSelection(t *testing.T) { } } - t.Log("greedy capacity", len(greedyMsgs)) - t.Log("optimal capacity", len(optMsgs)) + t.Logf("nMiners: %d", nMiners) + t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs), + len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs))) if len(greedyMsgs) > len(optMsgs) { t.Fatal("greedy capacity higher than optimal capacity; wtf") } @@ -946,10 +959,14 @@ func testCompetitiveMessageSelection(t *testing.T) { optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) } - t.Log("greedy reward", greedyReward) - t.Log("optimal reward", optReward) + nMinersBig := big.NewInt(int64(nMiners)) + greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() + optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() + t.Logf("greedy reward: %f, optimal reward: %f (x%.1f)", greedyAvgReward, + optimalAvgReward, optimalAvgReward/greedyAvgReward) + if greedyReward.Cmp(optReward) > 0 { - t.Fatal("greedy reward higher than optimal reward; booh") + t.Fatal("greedy reward raw higher than optimal reward; booh") } } @@ -957,7 +974,6 @@ func TestCompetitiveMessageSelection(t *testing.T) { seeds := []int64{1947, 1976, 2020, 2100, 10000} for _, seed := range seeds { t.Log("running competitve message selection with seed", seed) - rand.Seed(seed) - testCompetitiveMessageSelection(t) + testCompetitiveMessageSelection(t, rand.New(rand.NewSource(seed))) } } From 56e3f7da7d7ea64bdcbeb8d66393bdd0fb8b6814 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 17:18:46 +0200 Subject: [PATCH 11/17] Fix getGasReward and Trim Signed-off-by: Jakub Sztandera --- chain/messagepool/selection.go | 17 +++++-- chain/messagepool/selection_test.go | 75 +++++++++++++++-------------- 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 0627a4638..fdcb71c54 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -111,13 +111,14 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 gasLimit := int64(build.BlockGasLimit) for nextChain < len(chains) { chain := chains[nextChain] - partitions[i] = append(partitions[i], chain) nextChain++ + partitions[i] = append(partitions[i], chain) gasLimit -= chain.gasLimit if gasLimit < minGas { break } } + } // 4. Compute effective performance for each chain, based on the partition they fall into @@ -606,11 +607,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. } func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int { - gasReward := abig.Mul(msg.Message.GasPremium, types.NewInt(uint64(msg.Message.GasLimit))) - maxReward := types.BigSub(msg.Message.GasFeeCap, baseFee) - if types.BigCmp(maxReward, gasReward) < 0 { - gasReward = maxReward + maxPremium := types.BigSub(msg.Message.GasFeeCap, baseFee) + if types.BigCmp(maxPremium, msg.Message.GasPremium) < 0 { + maxPremium = msg.Message.GasPremium } + gasReward := abig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit))) return gasReward.Int } @@ -795,7 +796,13 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasLimit -= mc.msgs[i].Message.GasLimit if mc.gasLimit > 0 { + bp := 1.0 + if mc.effPerf != 0 { + bp = mc.effPerf / mc.gasPerf + } + mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit) + mc.effPerf = bp * mc.gasPerf } else { mc.gasPerf = 0 } diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 90ac0e734..5e93476ab 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -20,6 +20,7 @@ import ( _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" + logging "github.com/ipfs/go-log" ) func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage { @@ -930,44 +931,48 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand) { nMiners = 1 } - optMsgs := make(map[cid.Cid]*types.SignedMessage) - for i := 0; i < nMiners; i++ { - tq := rng.Float64() - msgs, err := mp.SelectMessages(ts, tq) - if err != nil { - t.Fatal(err) + logging.SetLogLevel("messagepool", "error") + for i := 0; i < 1; i++ { + optMsgs := make(map[cid.Cid]*types.SignedMessage) + for j := 0; j < nMiners; j++ { + tq := rng.Float64() + msgs, err := mp.SelectMessages(ts, tq) + if err != nil { + t.Fatal(err) + } + for _, m := range msgs { + optMsgs[m.Cid()] = m + } } - for _, m := range msgs { - optMsgs[m.Cid()] = m + + t.Logf("nMiners: %d", nMiners) + t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs), + len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs))) + if len(greedyMsgs) > len(optMsgs) { + t.Fatal("greedy capacity higher than optimal capacity; wtf") + } + + greedyReward := big.NewInt(0) + for _, m := range greedyMsgs { + greedyReward.Add(greedyReward, mp.getGasReward(m, baseFee, ts)) + } + + optReward := big.NewInt(0) + for _, m := range optMsgs { + optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) + } + + nMinersBig := big.NewInt(int64(nMiners)) + greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() + optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() + t.Logf("greedy reward: %.0f, optimal reward: %.0f (x%.1f)", greedyAvgReward, + optimalAvgReward, optimalAvgReward/greedyAvgReward) + + if greedyReward.Cmp(optReward) > 0 { + t.Fatal("greedy reward raw higher than optimal reward; booh") } } - - t.Logf("nMiners: %d", nMiners) - t.Logf("greedy capacity %d, optimal capacity %d (x%.1f)", len(greedyMsgs), - len(optMsgs), float64(len(optMsgs))/float64(len(greedyMsgs))) - if len(greedyMsgs) > len(optMsgs) { - t.Fatal("greedy capacity higher than optimal capacity; wtf") - } - - greedyReward := big.NewInt(0) - for _, m := range greedyMsgs { - greedyReward.Add(greedyReward, mp.getGasReward(m, baseFee, ts)) - } - - optReward := big.NewInt(0) - for _, m := range optMsgs { - optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) - } - - nMinersBig := big.NewInt(int64(nMiners)) - greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() - optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() - t.Logf("greedy reward: %f, optimal reward: %f (x%.1f)", greedyAvgReward, - optimalAvgReward, optimalAvgReward/greedyAvgReward) - - if greedyReward.Cmp(optReward) > 0 { - t.Fatal("greedy reward raw higher than optimal reward; booh") - } + logging.SetLogLevel("messagepool", "info") } func TestCompetitiveMessageSelection(t *testing.T) { From d3baf2b9ebbff68f7e6ff6317ec5c30ee863e9c9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 19:32:28 +0300 Subject: [PATCH 12/17] fix edge case in chain creation --- chain/messagepool/selection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index fdcb71c54..64d845e02 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -693,7 +693,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 } // check we have a sane set of messages to construct the chains - if i > 0 { + if i > skip { msgs = msgs[skip:i] } else { return nil From 2aaf14b5588aa9fc1917c32f1edde6cbe1dd6fbb Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 20:00:51 +0300 Subject: [PATCH 13/17] mute timing logs if they are under 1ms --- chain/messagepool/selection.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 64d845e02..4d84df2fe 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -90,7 +90,9 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 next := mp.createMessageChains(actor, mset, baseFee, ts) chains = append(chains, next...) } - log.Infow("create message chains done", "took", time.Since(startChains)) + if dt := time.Since(startChains); dt > time.Millisecond { + log.Infow("create message chains done", "took", dt) + } // 2. Sort the chains sort.Slice(chains, func(i, j int) bool { @@ -187,7 +189,9 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 last = i break } - log.Infow("merge message chains done", "took", time.Since(startMerge)) + if dt := time.Since(startMerge); dt > time.Millisecond { + log.Infow("merge message chains done", "took", dt) + } // 7. We have reached the edge of what can fit wholesale; if we still hae available // gasLimit to pack some more chains, then trim the last chain and push it down. @@ -276,7 +280,9 @@ tailLoop: // gas to spare; end the loop. break } - log.Infow("pack tail chains done", "took", time.Since(startTail)) + if dt := time.Since(startTail); dt > time.Millisecond { + log.Infow("pack tail chains done", "took", dt) + } return result, nil } @@ -321,7 +327,9 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S next := mp.createMessageChains(actor, mset, baseFee, ts) chains = append(chains, next...) } - log.Infow("create message chains done", "took", time.Since(startChains)) + if dt := time.Since(startChains); dt > time.Millisecond { + log.Infow("create message chains done", "took", dt) + } // 2. Sort the chains sort.Slice(chains, func(i, j int) bool { @@ -354,7 +362,9 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S last = i break } - log.Infow("merge message chains done", "took", time.Since(startMerge)) + if dt := time.Since(startMerge); dt > time.Millisecond { + log.Infow("merge message chains done", "took", dt) + } // 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit // to pack some more chains, then trim the last chain and push it down. @@ -406,7 +416,9 @@ tailLoop: // gas to spare; end the loop break } - log.Infow("pack tail chains done", "took", time.Since(startTail)) + if dt := time.Since(startTail); dt > time.Millisecond { + log.Infow("pack tail chains done", "took", dt) + } return result, nil } @@ -414,7 +426,9 @@ tailLoop: func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { start := time.Now() defer func() { - log.Infow("select priority messages done", "took", time.Since(start)) + if dt := time.Since(start); dt > time.Millisecond { + log.Infow("select priority messages done", "took", dt) + } }() result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow) @@ -502,8 +516,8 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. result := make(map[address.Address]map[uint64]*types.SignedMessage) haveCids := make(map[cid.Cid]struct{}) defer func() { - if time.Since(start) > time.Millisecond { - log.Infow("get pending messages done", "took", time.Since(start)) + if dt := time.Since(start); dt > time.Millisecond { + log.Infow("get pending messages done", "took", dt) } }() From a467deede6e18411b2189fffb2cb83983a953e74 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 20:20:59 +0300 Subject: [PATCH 14/17] fix typos in comment --- chain/messagepool/selection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 4d84df2fe..bed084aad 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -58,8 +58,8 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 return nil, xerrors.Errorf("computing basefee: %w", err) } - // 0. Load messages from the targeti tipset; if it is the same as the current tipset in - // the mpoll, then this is just the pending messages + // 0. Load messages from the target tipset; if it is the same as the current tipset in + // the mpool, then this is just the pending messages pending, err := mp.getPendingMessages(curTs, ts) if err != nil { return nil, err From a45febc06566ea460ad48c620ef2d32849500a12 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 19:32:31 +0200 Subject: [PATCH 15/17] Fix MpoolLocker Signed-off-by: Jakub Sztandera --- node/builder.go | 1 + node/impl/full/mpool.go | 31 ++++++------------------------- node/modules/dtypes/mpool.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 25 deletions(-) create mode 100644 node/modules/dtypes/mpool.go diff --git a/node/builder.go b/node/builder.go index 671170e53..aa0a18626 100644 --- a/node/builder.go +++ b/node/builder.go @@ -255,6 +255,7 @@ func Online() Option { Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(dtypes.Graphsync), modules.Graphsync), + Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 20ca271c0..b1d9a58cb 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -2,7 +2,6 @@ package full import ( "context" - "sync" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -13,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) type MpoolAPI struct { @@ -25,10 +25,7 @@ type MpoolAPI struct { Mpool *messagepool.MessagePool - PushLocks struct { - m map[address.Address]chan struct{} - sync.Mutex - } `name:"verymuchunique" optional:"true"` + PushLocks *dtypes.MpoolLocker } func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) { @@ -118,27 +115,11 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t if err != nil { return nil, xerrors.Errorf("getting key address: %w", err) } - - a.PushLocks.Lock() - if a.PushLocks.m == nil { - a.PushLocks.m = make(map[address.Address]chan struct{}) + done, err := a.PushLocks.TakeLock(ctx, fromA) + if err != nil { + return nil, xerrors.Errorf("taking lock: %w", err) } - lk, ok := a.PushLocks.m[fromA] - if !ok { - lk = make(chan struct{}, 1) - a.PushLocks.m[msg.From] = lk - } - a.PushLocks.Unlock() - - select { - case lk <- struct{}{}: - case <-ctx.Done(): - return nil, ctx.Err() - } - - defer func() { - <-lk - }() + defer done() } if msg.Nonce != 0 { diff --git a/node/modules/dtypes/mpool.go b/node/modules/dtypes/mpool.go new file mode 100644 index 000000000..1c64449f8 --- /dev/null +++ b/node/modules/dtypes/mpool.go @@ -0,0 +1,35 @@ +package dtypes + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-address" +) + +type MpoolLocker struct { + m map[address.Address]chan struct{} + lk sync.Mutex +} + +func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(), error) { + ml.lk.Lock() + if ml.m == nil { + ml.m = make(map[address.Address]chan struct{}) + } + lk, ok := ml.m[a] + if !ok { + lk = make(chan struct{}, 1) + ml.m[a] = lk + } + ml.lk.Unlock() + + select { + case lk <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + return func() { + <-lk + }, nil +} From 9032163c5a080a15167cbe3a0d5ae632173ba8d1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 11 Aug 2020 20:47:50 +0300 Subject: [PATCH 16/17] turn probability eyeballing into an actual test --- chain/messagepool/block_proba_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/block_proba_test.go b/chain/messagepool/block_proba_test.go index 2cc2ecc2a..b39ea587e 100644 --- a/chain/messagepool/block_proba_test.go +++ b/chain/messagepool/block_proba_test.go @@ -4,5 +4,12 @@ import "testing" func TestBlockProbability(t *testing.T) { mp := &MessagePool{} - t.Logf("%+v\n", mp.blockProbabilities(1-0.15)) + bp := mp.blockProbabilities(1 - 0.15) + t.Logf("%+v\n", bp) + for i := 0; i < len(bp)-1; i++ { + if bp[i] < bp[i+1] { + t.Fatalf("expected decreasing block probabilities for this quality: %d %f %f", + i, bp[i], bp[i+1]) + } + } } From f018e870dc4348ba7940e11faa5916930adbb38e Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 11 Aug 2020 23:35:06 +0200 Subject: [PATCH 17/17] Address review Signed-off-by: Jakub Sztandera --- chain/types/blockheader.go | 10 ++++++++++ miner/miner.go | 10 +--------- node/impl/full/mpool.go | 1 - 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/chain/types/blockheader.go b/chain/types/blockheader.go index 8950fd91a..36b43c012 100644 --- a/chain/types/blockheader.go +++ b/chain/types/blockheader.go @@ -22,6 +22,16 @@ type Ticket struct { VRFProof []byte } +func (t *Ticket) Quality() float64 { + ticketHash := blake2b.Sum256(t.VRFProof) + ticketNum := BigFromBytes(ticketHash[:]).Int + ticketDenu := big.NewInt(1) + ticketDenu.Lsh(ticketDenu, 256) + tv, _ := new(big.Rat).SetFrac(ticketNum, ticketDenu).Float64() + tq := 1 - tv + return tq +} + type BeaconEntry struct { Round uint64 Data []byte diff --git a/miner/miner.go b/miner/miner.go index 077e18638..27d3c040d 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -6,12 +6,10 @@ import ( "crypto/rand" "encoding/binary" "fmt" - "math/big" "sync" "time" "github.com/filecoin-project/lotus/chain/gen/slashfilter" - "github.com/minio/blake2b-simd" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -392,14 +390,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, xerrors.Errorf("failed to compute winning post proof: %w", err) } - ticketHash := blake2b.Sum256(ticket.VRFProof) - ticketNum := types.BigFromBytes(ticketHash[:]).Int - ticketDenu := big.NewInt(1) - ticketDenu.Lsh(ticketDenu, 256) - tq, _ := new(big.Rat).SetFrac(ticketNum, ticketDenu).Float64() - tq = 1 - tq // get pending messages early, - msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), tq) + msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), ticket.Quality()) if err != nil { return nil, xerrors.Errorf("failed to select messages for block: %w", err) } diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index b1d9a58cb..cd6adef6d 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -43,7 +43,6 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - // TODO FIXME compute (or pass in) the actual ticket quality! return a.Mpool.SelectMessages(ts, ticketQuality) }