2020-07-16 22:28:35 +00:00
package messagepool
import (
2020-08-01 22:54:21 +00:00
"bytes"
"context"
big2 "math/big"
2020-08-01 21:56:43 +00:00
"sort"
2020-07-16 22:28:35 +00:00
"time"
2020-08-01 22:54:21 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
2020-08-01 21:56:43 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-08-01 22:54:21 +00:00
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
2020-07-16 22:28:35 +00:00
)
func ( mp * MessagePool ) pruneExcessMessages ( ) error {
2020-08-01 23:25:13 +00:00
2020-07-16 22:28:35 +00:00
start := time . Now ( )
defer func ( ) {
log . Infow ( "message pruning complete" , "took" , time . Since ( start ) )
} ( )
2020-08-01 23:25:13 +00:00
mp . curTsLk . Lock ( )
ts := mp . curTs
mp . curTsLk . Unlock ( )
2020-07-16 22:28:35 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
if mp . currentSize < mp . maxTxPoolSizeHi {
return nil
}
2020-08-01 23:25:13 +00:00
return mp . pruneMessages ( context . TODO ( ) , ts )
2020-08-01 21:56:43 +00:00
}
2020-08-01 22:54:21 +00:00
// just copied from miner/ SelectMessages
2020-08-01 23:25:13 +00:00
func ( mp * MessagePool ) pruneMessages ( ctx context . Context , ts * types . TipSet ) error {
2020-08-01 22:54:21 +00:00
al := func ( ctx context . Context , addr address . Address , tsk types . TipSetKey ) ( * types . Actor , error ) {
2020-08-01 23:25:13 +00:00
return mp . api . StateGetActor ( addr , ts )
2020-08-01 22:54:21 +00:00
}
msgs := make ( [ ] * types . SignedMessage , 0 , mp . currentSize )
for a := range mp . pending {
msgs = append ( msgs , mp . pendingFor ( a ) ... )
}
type senderMeta struct {
lastReward abi . TokenAmount
lastGasLimit int64
gasReward [ ] abi . TokenAmount
gasLimit [ ] int64
msgs [ ] * types . SignedMessage
}
inclNonces := make ( map [ address . Address ] uint64 )
inclBalances := make ( map [ address . Address ] big . Int )
outBySender := make ( map [ address . Address ] * senderMeta )
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := build . Clock . Now ( )
vmValid := time . Duration ( 0 )
getbal := time . Duration ( 0 )
guessGasDur := time . Duration ( 0 )
sort . Slice ( msgs , func ( i , j int ) bool {
return msgs [ i ] . Message . Nonce < msgs [ j ] . Message . Nonce
} )
for _ , msg := range msgs {
vmstart := build . Clock . Now ( )
2020-08-01 23:25:13 +00:00
minGas := vm . PricelistByEpoch ( ts . Height ( ) ) . OnChainMessage ( msg . ChainLength ( ) ) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
2020-08-01 22:54:21 +00:00
if err := msg . VMMessage ( ) . ValidForBlockInclusion ( minGas . Total ( ) ) ; err != nil {
log . Warnf ( "invalid message in message pool: %s" , err )
continue
}
vmValid += build . Clock . Since ( vmstart )
// TODO: this should be in some more general 'validate message' call
if msg . Message . GasLimit > build . BlockGasLimit {
log . Warnf ( "message in mempool had too high of a gas limit (%d)" , msg . Message . GasLimit )
continue
}
if msg . Message . To == address . Undef {
log . Warnf ( "message in mempool had bad 'To' address" )
continue
}
from := msg . Message . From
getBalStart := build . Clock . Now ( )
if _ , ok := inclNonces [ from ] ; ! ok {
act , err := mp . api . StateGetActor ( from , nil )
if err != nil {
log . Warnf ( "failed to check message sender balance, skipping message: %+v" , err )
continue
}
inclNonces [ from ] = act . Nonce
inclBalances [ from ] = act . Balance
}
getbal += build . Clock . Since ( getBalStart )
if inclBalances [ from ] . LessThan ( msg . Message . RequiredFunds ( ) ) {
tooLowFundMsgs ++
// todo: drop from mpool
continue
}
if msg . Message . Nonce > inclNonces [ from ] {
tooHighNonceMsgs ++
continue
}
if msg . Message . Nonce < inclNonces [ from ] {
continue
}
inclNonces [ from ] = msg . Message . Nonce + 1
inclBalances [ from ] = types . BigSub ( inclBalances [ from ] , msg . Message . RequiredFunds ( ) )
sm := outBySender [ from ]
if sm == nil {
sm = & senderMeta {
lastReward : big . Zero ( ) ,
}
}
sm . gasLimit = append ( sm . gasLimit , sm . lastGasLimit + msg . Message . GasLimit )
sm . lastGasLimit = sm . gasLimit [ len ( sm . gasLimit ) - 1 ]
guessGasStart := build . Clock . Now ( )
guessedGas , err := gasguess . GuessGasUsed ( ctx , types . EmptyTSK , msg , al )
guessGasDur += build . Clock . Since ( guessGasStart )
if err != nil {
log . Infow ( "failed to guess gas" , "to" , msg . Message . To , "method" , msg . Message . Method , "err" , err )
}
estimatedReward := big . Mul ( types . NewInt ( uint64 ( guessedGas ) ) , msg . Message . GasPrice )
sm . gasReward = append ( sm . gasReward , big . Add ( sm . lastReward , estimatedReward ) )
sm . lastReward = sm . gasReward [ len ( sm . gasReward ) - 1 ]
sm . msgs = append ( sm . msgs , msg )
outBySender [ from ] = sm
}
orderedSenders := make ( [ ] address . Address , 0 , len ( outBySender ) )
for k := range outBySender {
orderedSenders = append ( orderedSenders , k )
}
sort . Slice ( orderedSenders , func ( i , j int ) bool {
return bytes . Compare ( orderedSenders [ i ] . Bytes ( ) , orderedSenders [ j ] . Bytes ( ) ) == - 1
} )
2020-08-01 23:39:16 +00:00
out := make ( [ ] * types . SignedMessage , 0 , mp . maxTxPoolSizeLo )
2020-08-01 22:54:21 +00:00
{
for {
var bestSender address . Address
var nBest int
var bestGasToReward float64
// TODO: This is O(n^2)-ish, could use something like container/heap to cache this math
for _ , sender := range orderedSenders {
meta , ok := outBySender [ sender ]
if ! ok {
continue
}
for n := range meta . msgs {
2020-08-01 23:39:02 +00:00
if n + len ( out ) >= mp . maxTxPoolSizeLo {
2020-08-01 22:54:21 +00:00
break
}
gasToReward , _ := new ( big2 . Float ) . SetInt ( meta . gasReward [ n ] . Int ) . Float64 ( )
gasToReward /= float64 ( meta . gasLimit [ n ] )
if gasToReward >= bestGasToReward {
bestSender = sender
nBest = n + 1
bestGasToReward = gasToReward
}
}
}
if nBest == 0 {
break // block gas limit reached
}
{
out = append ( out , outBySender [ bestSender ] . msgs [ : nBest ] ... )
outBySender [ bestSender ] . msgs = outBySender [ bestSender ] . msgs [ nBest : ]
outBySender [ bestSender ] . gasLimit = outBySender [ bestSender ] . gasLimit [ nBest : ]
outBySender [ bestSender ] . gasReward = outBySender [ bestSender ] . gasReward [ nBest : ]
if len ( outBySender [ bestSender ] . msgs ) == 0 {
delete ( outBySender , bestSender )
}
}
2020-08-01 23:25:13 +00:00
if len ( out ) >= mp . maxTxPoolSizeLo {
2020-08-01 22:54:21 +00:00
break
}
}
}
if tooLowFundMsgs > 0 {
log . Warnf ( "%d messages in mempool does not have enough funds" , tooLowFundMsgs )
}
if tooHighNonceMsgs > 0 {
log . Warnf ( "%d messages in mempool had too high nonce" , tooHighNonceMsgs )
}
sm := build . Clock . Now ( )
if sm . Sub ( start ) > time . Second {
log . Warnw ( "SelectMessages took a long time" ,
"duration" , sm . Sub ( start ) ,
"vmvalidate" , vmValid ,
"getbalance" , getbal ,
"guessgas" , guessGasDur ,
"msgs" , len ( msgs ) )
}
good := make ( map [ cid . Cid ] bool )
for _ , m := range out {
good [ m . Cid ( ) ] = true
}
for _ , m := range msgs {
if ! good [ m . Cid ( ) ] {
mp . remove ( m . Message . From , m . Message . Nonce )
}
}
return nil
}