From 2e04dc908cf579898fe71e180d8924047483d1fa Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 1 Aug 2020 16:25:13 -0700 Subject: [PATCH] clean up after review --- chain/messagepool/messagepool_test.go | 5 +- chain/messagepool/pruning.go | 86 ++++----------------------- 2 files changed, 17 insertions(+), 74 deletions(-) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 678b3122a..010157066 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -249,6 +249,9 @@ func TestPruningSimple(t *testing.T) { t.Fatal(err) } + a := mock.MkBlock(nil, 1, 1) + tma.applyBlock(t, a) + sender, err := w.GenerateKey(crypto.SigTypeBLS) if err != nil { t.Fatal(err) @@ -276,6 +279,6 @@ func TestPruningSimple(t *testing.T) { msgs, _ := mp.Pending() if len(msgs) != 5 { - t.Fatal("expected only 5 messages in pool") + t.Fatal("expected only 5 messages in pool, got: ", len(msgs)) } } diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index fc18915e8..8a91b8b6a 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -3,6 +3,7 @@ package messagepool import ( "bytes" "context" + "fmt" big2 "math/big" "sort" "time" @@ -15,15 +16,19 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/ipfs/go-cid" - "golang.org/x/xerrors" ) func (mp *MessagePool) pruneExcessMessages() error { + start := time.Now() defer func() { log.Infow("message pruning complete", "took", time.Since(start)) }() + mp.curTsLk.Lock() + ts := mp.curTs + mp.curTsLk.Unlock() + mp.lk.Lock() defer mp.lk.Unlock() @@ -31,29 +36,13 @@ func (mp *MessagePool) pruneExcessMessages() error { return nil } - pruneCount := mp.currentSize - mp.maxTxPoolSizeLo - - // Step 1. Remove all 'future' messages (those with a nonce gap) - npruned, err := mp.pruneFutureMessages() - if err != nil { - return xerrors.Errorf("failed to prune future messages: %w", err) - } - - pruneCount -= npruned - if pruneCount <= 0 { - return nil - } - - // Step 2. prune messages with the lowest gas prices - - log.Warnf("still need to prune %d messages", pruneCount) - return nil + return mp.pruneMessages(context.TODO(), ts) } // just copied from miner/ SelectMessages -func (mp *MessagePool) pruneMessages(ctx context.Context) error { +func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error { al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) { - return mp.api.StateGetActor(addr, mp.curTs) + return mp.api.StateGetActor(addr, ts) } msgs := make([]*types.SignedMessage, 0, mp.currentSize) @@ -83,7 +72,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { getbal := time.Duration(0) guessGasDur := time.Duration(0) - mp.Pending() sort.Slice(msgs, func(i, j int) bool { return msgs[i].Message.Nonce < msgs[j].Message.Nonce }) @@ -91,7 +79,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { for _, msg := range msgs { vmstart := build.Clock.Now() - minGas := vm.PricelistByEpoch(mp.curTs.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that + minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { log.Warnf("invalid message in message pool: %s", err) continue @@ -169,8 +157,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { outBySender[from] = sm } - gasLimitLeft := int64(build.BlockGasLimit) - orderedSenders := make([]address.Address, 0, len(outBySender)) for k := range outBySender { orderedSenders = append(orderedSenders, k) @@ -193,11 +179,8 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { continue } for n := range meta.msgs { - if meta.gasLimit[n] > gasLimitLeft { - break - } - if n+len(out) > build.BlockMessageLimit { + if n+len(out) > mp.maxTxPoolSizeLo { break } @@ -218,7 +201,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { { out = append(out, outBySender[bestSender].msgs[:nBest]...) - gasLimitLeft -= outBySender[bestSender].gasLimit[nBest-1] outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:] outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:] @@ -229,7 +211,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { } } - if len(out) >= build.BlockMessageLimit { + if len(out) >= mp.maxTxPoolSizeLo { break } } @@ -253,10 +235,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { "msgs", len(msgs)) } - if len(out) > mp.maxTxPoolSizeLo { - out = out[:mp.maxTxPoolSizeLo] - } - + fmt.Println("GOOD: ", len(out)) good := make(map[cid.Cid]bool) for _, m := range out { good[m.Cid()] = true @@ -270,42 +249,3 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error { return nil } - -func (mp *MessagePool) pruneFutureMessages() (int, error) { - var pruned int - for addr, ms := range mp.pending { - if _, ok := mp.localAddrs[addr]; ok { - continue - } - - act, err := mp.api.StateGetActor(addr, nil) - if err != nil { - return 0, err - } - - allmsgs := make([]*types.SignedMessage, 0, len(ms.msgs)) - for _, m := range ms.msgs { - allmsgs = append(allmsgs, m) - } - - sort.Slice(allmsgs, func(i, j int) bool { - return allmsgs[i].Message.Nonce < allmsgs[j].Message.Nonce - }) - - start := act.Nonce - for i := 0; i < len(allmsgs); i++ { - if allmsgs[i].Message.Nonce == start { - start++ - } else { - ms.nextNonce = start - for ; i < len(allmsgs); i++ { - pruned++ - delete(ms.msgs, allmsgs[i].Message.Nonce) - } - break - } - } - - } - return pruned, nil -}