From a8b0db91e82430e1402e990afcb97b0b50a45ccb Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 5 Aug 2020 13:28:14 +0300 Subject: [PATCH] implement pruning using the chain selection logic --- chain/messagepool/pruning.go | 235 +++++---------------------------- chain/messagepool/selection.go | 10 +- 2 files changed, 36 insertions(+), 209 deletions(-) diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index 2fabf353a..343616111 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -1,19 +1,11 @@ package messagepool import ( - "bytes" "context" - big2 "math/big" "sort" "time" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/messagepool/gasguess" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/vm" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/ipfs/go-cid" ) @@ -38,211 +30,46 @@ func (mp *MessagePool) pruneExcessMessages() error { return mp.pruneMessages(context.TODO(), ts) } -// just copied from miner/ SelectMessages func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error { - al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) { - return mp.api.StateGetActor(addr, ts) + start := time.Now() + defer func() { + log.Infof("message pruning took %s", time.Since(start)) + }() + + // Collect all messages to track which ones to remove and create chains for block inclusion + pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize) + var chains []*msgChain + for actor, mset := range mp.pending { + for _, m := range mset.msgs { + pruneMsgs[m.Message.Cid()] = m + } + actorChains := mp.createMessageChains(actor, mset, ts) + chains = append(chains, actorChains...) } - msgs := make([]*types.SignedMessage, 0, mp.currentSize) - for a := range mp.pending { - msgs = append(msgs, mp.pendingFor(a)...) - } - - type senderMeta struct { - lastReward abi.TokenAmount - lastGasLimit int64 - - gasReward []abi.TokenAmount - gasLimit []int64 - - msgs []*types.SignedMessage - } - - inclNonces := make(map[address.Address]uint64) - inclBalances := make(map[address.Address]big.Int) - outBySender := make(map[address.Address]*senderMeta) - - tooLowFundMsgs := 0 - tooHighNonceMsgs := 0 - - start := build.Clock.Now() - vmValid := time.Duration(0) - getbal := time.Duration(0) - guessGasDur := time.Duration(0) - - sort.Slice(msgs, func(i, j int) bool { - return msgs[i].Message.Nonce < msgs[j].Message.Nonce + // Sort the chains + sort.Slice(chains, func(i, j int) bool { + return chains[i].Before(chains[j]) }) - for _, msg := range msgs { - vmstart := build.Clock.Now() - - minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that - if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { - log.Warnf("invalid message in message pool: %s", err) - continue - } - - vmValid += build.Clock.Since(vmstart) - - // TODO: this should be in some more general 'validate message' call - if msg.Message.GasLimit > build.BlockGasLimit { - log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit) - continue - } - - if msg.Message.To == address.Undef { - log.Warnf("message in mempool had bad 'To' address") - continue - } - - from := msg.Message.From - - getBalStart := build.Clock.Now() - if _, ok := inclNonces[from]; !ok { - act, err := mp.api.StateGetActor(from, nil) - if err != nil { - log.Warnf("failed to check message sender balance, skipping message: %+v", err) - continue - } - - inclNonces[from] = act.Nonce - inclBalances[from] = act.Balance - } - getbal += build.Clock.Since(getBalStart) - - if inclBalances[from].LessThan(msg.Message.RequiredFunds()) { - tooLowFundMsgs++ - // todo: drop from mpool - continue - } - - if msg.Message.Nonce > inclNonces[from] { - tooHighNonceMsgs++ - continue - } - - if msg.Message.Nonce < inclNonces[from] { - continue - } - - inclNonces[from] = msg.Message.Nonce + 1 - inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds()) - sm := outBySender[from] - if sm == nil { - sm = &senderMeta{ - lastReward: big.Zero(), - } - } - - sm.gasLimit = append(sm.gasLimit, sm.lastGasLimit+msg.Message.GasLimit) - sm.lastGasLimit = sm.gasLimit[len(sm.gasLimit)-1] - - guessGasStart := build.Clock.Now() - guessedGas, err := gasguess.GuessGasUsed(ctx, types.EmptyTSK, msg, al) - guessGasDur += build.Clock.Since(guessGasStart) - if err != nil { - log.Infow("failed to guess gas", "to", msg.Message.To, "method", msg.Message.Method, "err", err) - } - - estimatedReward := big.Mul(types.NewInt(uint64(guessedGas)), msg.Message.GasPrice) - - sm.gasReward = append(sm.gasReward, big.Add(sm.lastReward, estimatedReward)) - sm.lastReward = sm.gasReward[len(sm.gasReward)-1] - - sm.msgs = append(sm.msgs, msg) - - outBySender[from] = sm - } - - orderedSenders := make([]address.Address, 0, len(outBySender)) - for k := range outBySender { - orderedSenders = append(orderedSenders, k) - } - sort.Slice(orderedSenders, func(i, j int) bool { - return bytes.Compare(orderedSenders[i].Bytes(), orderedSenders[j].Bytes()) == -1 - }) - - out := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo) - { - for { - var bestSender address.Address - var nBest int - var bestGasToReward float64 - - // TODO: This is O(n^2)-ish, could use something like container/heap to cache this math - for _, sender := range orderedSenders { - meta, ok := outBySender[sender] - if !ok { - continue - } - for n := range meta.msgs { - - if n+len(out) >= mp.maxTxPoolSizeLo { - break - } - - gasToReward, _ := new(big2.Float).SetInt(meta.gasReward[n].Int).Float64() - gasToReward /= float64(meta.gasLimit[n]) - - if gasToReward >= bestGasToReward { - bestSender = sender - nBest = n + 1 - bestGasToReward = gasToReward - } - } - } - - if nBest == 0 { - break // block gas limit reached - } - - { - out = append(out, outBySender[bestSender].msgs[:nBest]...) - - outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:] - outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:] - outBySender[bestSender].gasReward = outBySender[bestSender].gasReward[nBest:] - - if len(outBySender[bestSender].msgs) == 0 { - delete(outBySender, bestSender) - } - } - - if len(out) >= mp.maxTxPoolSizeLo { - break + // Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark + keepCount := 0 +keepLoop: + for _, chain := range chains { + for _, m := range chain.msgs { + if keepCount < MemPoolSizeLimitLoDefault { + delete(pruneMsgs, m.Message.Cid()) + keepCount++ + } else { + break keepLoop } } } - if tooLowFundMsgs > 0 { - log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs) - } - - if tooHighNonceMsgs > 0 { - log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs) - } - - sm := build.Clock.Now() - if sm.Sub(start) > time.Second { - log.Warnw("SelectMessages took a long time", - "duration", sm.Sub(start), - "vmvalidate", vmValid, - "getbalance", getbal, - "guessgas", guessGasDur, - "msgs", len(msgs)) - } - - good := make(map[cid.Cid]bool) - for _, m := range out { - good[m.Cid()] = true - } - - for _, m := range msgs { - if !good[m.Cid()] { - mp.remove(m.Message.From, m.Message.Nonce) - } + // and remove all messages that are still in pruneMsgs after processing the chains + log.Infof("Pruning %d messages", len(pruneMsgs)) + for _, m := range pruneMsgs { + mp.remove(m.Message.From, m.Message.Nonce) } return nil diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 8b9d68f13..e363c6ccd 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -27,11 +27,6 @@ type msgChain struct { } func (mp *MessagePool) SelectMessages() []*types.SignedMessage { - start := time.Now() - defer func() { - log.Infof("message selection took %s", time.Since(start)) - }() - mp.curTsLk.Lock() ts := mp.curTs mp.curTsLk.Unlock() @@ -43,6 +38,11 @@ func (mp *MessagePool) SelectMessages() []*types.SignedMessage { } func (mp *MessagePool) selectMessages(ts *types.TipSet) []*types.SignedMessage { + start := time.Now() + defer func() { + log.Infof("message selection took %s", time.Since(start)) + }() + // 1. Create a list of dependent message chains with maximal gas reward per limit consumed var chains []*msgChain for actor, mset := range mp.pending {