clean up after review

This commit is contained in:
whyrusleeping 2020-08-01 16:25:13 -07:00
parent 255777a4a9
commit 2e04dc908c
2 changed files with 17 additions and 74 deletions

View File

@ -249,6 +249,9 @@ func TestPruningSimple(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
a := mock.MkBlock(nil, 1, 1)
tma.applyBlock(t, a)
sender, err := w.GenerateKey(crypto.SigTypeBLS) sender, err := w.GenerateKey(crypto.SigTypeBLS)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -276,6 +279,6 @@ func TestPruningSimple(t *testing.T) {
msgs, _ := mp.Pending() msgs, _ := mp.Pending()
if len(msgs) != 5 { if len(msgs) != 5 {
t.Fatal("expected only 5 messages in pool") t.Fatal("expected only 5 messages in pool, got: ", len(msgs))
} }
} }

View File

@ -3,6 +3,7 @@ package messagepool
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
big2 "math/big" big2 "math/big"
"sort" "sort"
"time" "time"
@ -15,15 +16,19 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
) )
func (mp *MessagePool) pruneExcessMessages() error { func (mp *MessagePool) pruneExcessMessages() error {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.Infow("message pruning complete", "took", time.Since(start)) log.Infow("message pruning complete", "took", time.Since(start))
}() }()
mp.curTsLk.Lock()
ts := mp.curTs
mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
@ -31,29 +36,13 @@ func (mp *MessagePool) pruneExcessMessages() error {
return nil return nil
} }
pruneCount := mp.currentSize - mp.maxTxPoolSizeLo return mp.pruneMessages(context.TODO(), ts)
// Step 1. Remove all 'future' messages (those with a nonce gap)
npruned, err := mp.pruneFutureMessages()
if err != nil {
return xerrors.Errorf("failed to prune future messages: %w", err)
}
pruneCount -= npruned
if pruneCount <= 0 {
return nil
}
// Step 2. prune messages with the lowest gas prices
log.Warnf("still need to prune %d messages", pruneCount)
return nil
} }
// just copied from miner/ SelectMessages // just copied from miner/ SelectMessages
func (mp *MessagePool) pruneMessages(ctx context.Context) error { func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error {
al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) { al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return mp.api.StateGetActor(addr, mp.curTs) return mp.api.StateGetActor(addr, ts)
} }
msgs := make([]*types.SignedMessage, 0, mp.currentSize) msgs := make([]*types.SignedMessage, 0, mp.currentSize)
@ -83,7 +72,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
getbal := time.Duration(0) getbal := time.Duration(0)
guessGasDur := time.Duration(0) guessGasDur := time.Duration(0)
mp.Pending()
sort.Slice(msgs, func(i, j int) bool { sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce return msgs[i].Message.Nonce < msgs[j].Message.Nonce
}) })
@ -91,7 +79,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
for _, msg := range msgs { for _, msg := range msgs {
vmstart := build.Clock.Now() vmstart := build.Clock.Now()
minGas := vm.PricelistByEpoch(mp.curTs.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
log.Warnf("invalid message in message pool: %s", err) log.Warnf("invalid message in message pool: %s", err)
continue continue
@ -169,8 +157,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
outBySender[from] = sm outBySender[from] = sm
} }
gasLimitLeft := int64(build.BlockGasLimit)
orderedSenders := make([]address.Address, 0, len(outBySender)) orderedSenders := make([]address.Address, 0, len(outBySender))
for k := range outBySender { for k := range outBySender {
orderedSenders = append(orderedSenders, k) orderedSenders = append(orderedSenders, k)
@ -193,11 +179,8 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
continue continue
} }
for n := range meta.msgs { for n := range meta.msgs {
if meta.gasLimit[n] > gasLimitLeft {
break
}
if n+len(out) > build.BlockMessageLimit { if n+len(out) > mp.maxTxPoolSizeLo {
break break
} }
@ -218,7 +201,6 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
{ {
out = append(out, outBySender[bestSender].msgs[:nBest]...) out = append(out, outBySender[bestSender].msgs[:nBest]...)
gasLimitLeft -= outBySender[bestSender].gasLimit[nBest-1]
outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:] outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:]
outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:] outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:]
@ -229,7 +211,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
} }
} }
if len(out) >= build.BlockMessageLimit { if len(out) >= mp.maxTxPoolSizeLo {
break break
} }
} }
@ -253,10 +235,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
"msgs", len(msgs)) "msgs", len(msgs))
} }
if len(out) > mp.maxTxPoolSizeLo { fmt.Println("GOOD: ", len(out))
out = out[:mp.maxTxPoolSizeLo]
}
good := make(map[cid.Cid]bool) good := make(map[cid.Cid]bool)
for _, m := range out { for _, m := range out {
good[m.Cid()] = true good[m.Cid()] = true
@ -270,42 +249,3 @@ func (mp *MessagePool) pruneMessages(ctx context.Context) error {
return nil return nil
} }
func (mp *MessagePool) pruneFutureMessages() (int, error) {
var pruned int
for addr, ms := range mp.pending {
if _, ok := mp.localAddrs[addr]; ok {
continue
}
act, err := mp.api.StateGetActor(addr, nil)
if err != nil {
return 0, err
}
allmsgs := make([]*types.SignedMessage, 0, len(ms.msgs))
for _, m := range ms.msgs {
allmsgs = append(allmsgs, m)
}
sort.Slice(allmsgs, func(i, j int) bool {
return allmsgs[i].Message.Nonce < allmsgs[j].Message.Nonce
})
start := act.Nonce
for i := 0; i < len(allmsgs); i++ {
if allmsgs[i].Message.Nonce == start {
start++
} else {
ms.nextNonce = start
for ; i < len(allmsgs); i++ {
pruned++
delete(ms.msgs, allmsgs[i].Message.Nonce)
}
break
}
}
}
return pruned, nil
}