2019-12-01 23:11:43 +00:00
package messagepool
2019-07-05 14:46:21 +00:00
import (
2019-11-23 19:01:56 +00:00
"bytes"
2019-11-17 07:44:06 +00:00
"context"
"errors"
2020-05-15 17:56:38 +00:00
"fmt"
2020-03-09 22:46:00 +00:00
"math"
2020-08-26 11:38:26 +00:00
stdbig "math/big"
2019-11-13 22:41:39 +00:00
"sort"
2019-07-05 14:46:21 +00:00
"sync"
2019-11-13 21:53:18 +00:00
"time"
2019-07-08 12:51:45 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/crypto"
2020-08-21 17:17:30 +00:00
"github.com/hashicorp/go-multierror"
2019-11-12 07:16:42 +00:00
lru "github.com/hashicorp/golang-lru"
2019-12-01 22:22:10 +00:00
"github.com/ipfs/go-cid"
2019-11-23 19:01:56 +00:00
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
2020-01-08 19:10:57 +00:00
logging "github.com/ipfs/go-log/v2"
2020-08-10 16:53:56 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-11-17 07:44:06 +00:00
lps "github.com/whyrusleeping/pubsub"
2019-09-20 09:01:49 +00:00
"golang.org/x/xerrors"
2019-09-18 02:50:03 +00:00
2019-12-19 20:13:17 +00:00
"github.com/filecoin-project/go-address"
2020-07-10 14:43:14 +00:00
2019-11-17 07:44:06 +00:00
"github.com/filecoin-project/lotus/api"
2019-11-12 07:16:42 +00:00
"github.com/filecoin-project/lotus/build"
2020-08-25 00:21:03 +00:00
"github.com/filecoin-project/lotus/chain/store"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-07-29 16:30:08 +00:00
"github.com/filecoin-project/lotus/chain/vm"
2020-01-30 23:48:25 +00:00
"github.com/filecoin-project/lotus/lib/sigs"
2019-11-23 19:01:56 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-07-10 14:43:14 +00:00
"github.com/raulk/clock"
2019-07-05 14:46:21 +00:00
)
2019-12-01 23:11:43 +00:00
var log = logging . Logger ( "messagepool" )
2020-08-25 10:03:50 +00:00
var futureDebug = false
2020-03-09 22:46:00 +00:00
2020-08-12 18:24:35 +00:00
var rbfNumBig = types . NewInt ( uint64 ( ( ReplaceByFeeRatioDefault - 1 ) * RbfDenom ) )
var rbfDenomBig = types . NewInt ( RbfDenom )
2020-08-07 14:33:55 +00:00
const RbfDenom = 256
2020-04-01 20:26:14 +00:00
2020-09-06 10:36:25 +00:00
var RepublishInterval = time . Duration ( 10 * build . BlockDelaySecs + build . PropagationDelaySecs ) * time . Second
2020-08-10 08:18:12 +00:00
2020-08-26 18:37:32 +00:00
var minimumBaseFee = types . NewInt ( uint64 ( build . MinimumBaseFee ) )
2020-09-05 17:09:46 +00:00
var baseFeeLowerBoundFactor = types . NewInt ( 10 )
2020-08-26 18:23:28 +00:00
2020-08-26 19:40:10 +00:00
var MaxActorPendingMessages = 1000
2020-09-01 22:17:22 +00:00
var MaxNonceGap = uint64 ( 4 )
2020-09-01 14:57:44 +00:00
2019-10-13 13:03:15 +00:00
var (
2019-11-13 14:48:57 +00:00
ErrMessageTooBig = errors . New ( "message too big" )
2019-10-13 13:03:15 +00:00
2019-11-13 14:48:57 +00:00
ErrMessageValueTooHigh = errors . New ( "cannot send more filecoin than will ever exist" )
2019-10-13 13:03:15 +00:00
2019-11-13 14:48:57 +00:00
ErrNonceTooLow = errors . New ( "message nonce too low" )
2019-10-13 13:03:15 +00:00
2020-08-26 18:23:28 +00:00
ErrGasFeeCapTooLow = errors . New ( "gas fee cap too low" )
2019-11-13 14:48:57 +00:00
ErrNotEnoughFunds = errors . New ( "not enough funds to execute transaction" )
2019-10-14 03:28:19 +00:00
2019-11-13 14:48:57 +00:00
ErrInvalidToAddr = errors . New ( "message had invalid to address" )
2020-03-01 04:15:02 +00:00
2020-08-26 19:48:03 +00:00
ErrSoftValidationFailure = errors . New ( "validation failure" )
ErrRBFTooLowPremium = errors . New ( "replace by fee has too low GasPremium" )
ErrTooManyPendingMessages = errors . New ( "too many pending messages for actor" )
2020-09-01 14:57:44 +00:00
ErrNonceGap = errors . New ( "unfulfilled nonce gap" )
2020-08-12 17:26:58 +00:00
2020-08-18 07:19:46 +00:00
ErrTryAgain = errors . New ( "state inconsistency while pushing message; please try again" )
2019-10-13 13:03:15 +00:00
)
2019-11-13 21:53:18 +00:00
const (
2019-11-23 19:01:56 +00:00
localMsgsDs = "/mpool/local"
2019-11-17 07:44:06 +00:00
localUpdates = "update"
2019-11-13 21:53:18 +00:00
)
2020-09-07 19:15:17 +00:00
// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
// away in the next fork.
2020-09-07 19:28:23 +00:00
var strictBaseFeeValidation = false
2020-09-07 19:15:17 +00:00
2020-09-06 10:36:25 +00:00
func init ( ) {
// if the republish interval is too short compared to the pubsub timecache, adjust it
minInterval := pubsub . TimeCacheDuration + time . Duration ( build . PropagationDelaySecs )
if RepublishInterval < minInterval {
RepublishInterval = minInterval
}
}
2019-07-05 14:46:21 +00:00
type MessagePool struct {
lk sync . Mutex
2020-08-07 17:10:09 +00:00
ds dtypes . MetadataDS
2020-08-12 07:38:40 +00:00
addSema chan struct { }
2020-08-17 07:03:39 +00:00
closer chan struct { }
repubTk * clock . Ticker
repubTrigger chan struct { }
republished map [ cid . Cid ] struct { }
2019-11-13 21:53:18 +00:00
localAddrs map [ address . Address ] struct { }
2019-12-05 02:04:09 +00:00
pending map [ address . Address ] * msgSet
2019-07-05 14:46:21 +00:00
2019-12-03 19:33:29 +00:00
curTsLk sync . Mutex // DO NOT LOCK INSIDE lk
2019-12-01 22:22:10 +00:00
curTs * types . TipSet
2019-09-16 14:17:08 +00:00
2020-08-07 14:33:55 +00:00
cfgLk sync . Mutex
cfg * types . MpoolConfig
2019-12-02 19:39:50 +00:00
api Provider
2019-10-13 13:03:15 +00:00
minGasPrice types . BigInt
2020-08-07 14:33:55 +00:00
currentSize int
2020-07-16 22:28:35 +00:00
// pruneTrigger is a channel used to trigger a mempool pruning
pruneTrigger chan struct { }
2019-11-12 07:16:42 +00:00
2020-08-07 16:50:10 +00:00
// pruneCooldown is a channel used to allow a cooldown time between prunes
pruneCooldown chan struct { }
2019-11-12 07:16:42 +00:00
blsSigCache * lru . TwoQueueCache
2019-11-17 07:44:06 +00:00
changes * lps . PubSub
2019-11-23 19:01:56 +00:00
localMsgs datastore . Datastore
2020-03-31 23:13:37 +00:00
netName dtypes . NetworkName
2020-05-15 17:56:38 +00:00
sigValCache * lru . TwoQueueCache
2019-07-05 14:46:21 +00:00
}
type msgSet struct {
2020-08-26 11:38:26 +00:00
msgs map [ uint64 ] * types . SignedMessage
nextNonce uint64
requiredFunds * stdbig . Int
2019-07-05 14:46:21 +00:00
}
2020-09-01 14:57:44 +00:00
func newMsgSet ( nonce uint64 ) * msgSet {
2019-07-05 14:46:21 +00:00
return & msgSet {
2020-08-26 11:38:26 +00:00
msgs : make ( map [ uint64 ] * types . SignedMessage ) ,
2020-09-01 14:57:44 +00:00
nextNonce : nonce ,
2020-08-26 11:38:26 +00:00
requiredFunds : stdbig . NewInt ( 0 ) ,
2019-07-05 14:46:21 +00:00
}
}
2020-09-01 14:57:44 +00:00
func ( ms * msgSet ) add ( m * types . SignedMessage , mp * MessagePool , strict bool ) ( bool , error ) {
nextNonce := ms . nextNonce
nonceGap := false
switch {
case m . Message . Nonce == nextNonce :
nextNonce ++
// advance if we are filling a gap
for _ , fillGap := ms . msgs [ nextNonce ] ; fillGap ; _ , fillGap = ms . msgs [ nextNonce ] {
nextNonce ++
}
case strict && m . Message . Nonce > nextNonce + MaxNonceGap :
return false , xerrors . Errorf ( "message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w" , m . Message . Nonce , nextNonce , ErrNonceGap )
case m . Message . Nonce > nextNonce :
nonceGap = true
2019-09-20 09:01:49 +00:00
}
2020-09-01 14:57:44 +00:00
2020-04-01 20:26:14 +00:00
exms , has := ms . msgs [ m . Message . Nonce ]
if has {
2020-09-01 14:57:44 +00:00
// refuse RBF if we have a gap
if strict && nonceGap {
return false , xerrors . Errorf ( "rejecting replace by fee because of nonce gap (Nonce: %d, nextNonce: %d): %w" , m . Message . Nonce , nextNonce , ErrNonceGap )
}
2020-04-01 20:26:14 +00:00
if m . Cid ( ) != exms . Cid ( ) {
// check if RBF passes
2020-08-06 21:08:42 +00:00
minPrice := exms . Message . GasPremium
2020-08-12 18:24:35 +00:00
minPrice = types . BigAdd ( minPrice , types . BigDiv ( types . BigMul ( minPrice , rbfNumBig ) , rbfDenomBig ) )
2020-04-01 20:26:14 +00:00
minPrice = types . BigAdd ( minPrice , types . NewInt ( 1 ) )
2020-08-06 21:08:42 +00:00
if types . BigCmp ( m . Message . GasPremium , minPrice ) >= 0 {
log . Infow ( "add with RBF" , "oldpremium" , exms . Message . GasPremium ,
"newpremium" , m . Message . GasPremium , "addr" , m . Message . From , "nonce" , m . Message . Nonce )
2020-04-01 20:26:14 +00:00
} else {
log . Info ( "add with duplicate nonce" )
2020-07-16 22:28:35 +00:00
return false , xerrors . Errorf ( "message from %s with nonce %d already in mpool," +
2020-08-09 01:17:40 +00:00
" increase GasPremium to %s from %s to trigger replace by fee: %w" ,
m . Message . From , m . Message . Nonce , minPrice , m . Message . GasPremium ,
ErrRBFTooLowPremium )
2020-04-01 20:26:14 +00:00
}
2020-09-03 06:00:03 +00:00
} else {
return false , xerrors . Errorf ( "message from %s with nonce %d already in mpool: %w" ,
m . Message . From , m . Message . Nonce , ErrSoftValidationFailure )
2019-09-20 09:01:49 +00:00
}
2020-08-26 11:38:26 +00:00
ms . requiredFunds . Sub ( ms . requiredFunds , exms . Message . RequiredFunds ( ) . Int )
2020-08-26 18:10:46 +00:00
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
2019-07-05 14:46:21 +00:00
}
2020-08-26 11:38:26 +00:00
2020-09-01 14:57:44 +00:00
if ! has && strict && len ( ms . msgs ) > MaxActorPendingMessages {
2020-08-26 19:53:28 +00:00
log . Errorf ( "too many pending messages from actor %s" , m . Message . From )
2020-08-26 19:48:03 +00:00
return false , ErrTooManyPendingMessages
2020-08-26 19:40:10 +00:00
}
2020-09-01 19:46:36 +00:00
if strict && nonceGap {
log . Warnf ( "adding nonce-gapped message from %s (nonce: %d, nextNonce: %d)" ,
m . Message . From , m . Message . Nonce , nextNonce )
}
2020-09-01 14:57:44 +00:00
ms . nextNonce = nextNonce
2019-07-05 14:46:21 +00:00
ms . msgs [ m . Message . Nonce ] = m
2020-08-26 11:38:26 +00:00
ms . requiredFunds . Add ( ms . requiredFunds , m . Message . RequiredFunds ( ) . Int )
2020-08-26 18:10:46 +00:00
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int)
2019-09-20 09:01:49 +00:00
2020-07-16 22:28:35 +00:00
return ! has , nil
2019-07-05 14:46:21 +00:00
}
2020-09-01 14:57:44 +00:00
func ( ms * msgSet ) rm ( nonce uint64 , applied bool ) {
2020-08-26 11:38:26 +00:00
m , has := ms . msgs [ nonce ]
2020-09-01 14:57:44 +00:00
if ! has {
if applied && nonce >= ms . nextNonce {
// we removed a message we did not know about because it was applied
// we need to adjust the nonce and check if we filled a gap
ms . nextNonce = nonce + 1
for _ , fillGap := ms . msgs [ ms . nextNonce ] ; fillGap ; _ , fillGap = ms . msgs [ ms . nextNonce ] {
ms . nextNonce ++
}
}
return
}
ms . requiredFunds . Sub ( ms . requiredFunds , m . Message . RequiredFunds ( ) . Int )
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
delete ( ms . msgs , nonce )
// adjust next nonce
if applied {
// we removed a (known) message because it was applied in a tipset
// we can't possibly have filled a gap in this case
if nonce >= ms . nextNonce {
ms . nextNonce = nonce + 1
}
return
}
// we removed a message because it was pruned
// we have to adjust the nonce if it creates a gap or rewinds state
if nonce < ms . nextNonce {
ms . nextNonce = nonce
2020-08-26 11:38:26 +00:00
}
}
2020-08-26 12:27:09 +00:00
func ( ms * msgSet ) getRequiredFunds ( nonce uint64 ) types . BigInt {
2020-08-26 12:37:42 +00:00
requiredFunds := new ( stdbig . Int ) . Set ( ms . requiredFunds )
2020-08-26 12:27:09 +00:00
m , has := ms . msgs [ nonce ]
if has {
requiredFunds . Sub ( requiredFunds , m . Message . RequiredFunds ( ) . Int )
2020-08-26 18:10:46 +00:00
//requiredFunds.Sub(requiredFunds, m.Message.Value.Int)
2020-08-26 12:27:09 +00:00
}
2020-08-26 12:37:42 +00:00
return types . BigInt { Int : requiredFunds }
2020-08-26 12:27:09 +00:00
}
2020-03-31 23:13:37 +00:00
func New ( api Provider , ds dtypes . MetadataDS , netName dtypes . NetworkName ) ( * MessagePool , error ) {
2019-11-12 07:16:42 +00:00
cache , _ := lru . New2Q ( build . BlsSignatureCacheSize )
2020-05-15 17:56:38 +00:00
verifcache , _ := lru . New2Q ( build . VerifSigCacheSize )
2020-08-07 17:10:09 +00:00
cfg , err := loadConfig ( ds )
if err != nil {
2020-08-31 07:11:45 +00:00
return nil , xerrors . Errorf ( "error loading mpool config: %w" , err )
2020-08-07 17:10:09 +00:00
}
2020-05-15 17:56:38 +00:00
2019-07-05 14:46:21 +00:00
mp := & MessagePool {
2020-08-07 17:10:09 +00:00
ds : ds ,
2020-08-12 07:38:40 +00:00
addSema : make ( chan struct { } , 1 ) ,
2020-08-07 16:50:10 +00:00
closer : make ( chan struct { } ) ,
2020-08-10 08:18:12 +00:00
repubTk : build . Clock . Ticker ( RepublishInterval ) ,
2020-08-17 07:03:39 +00:00
repubTrigger : make ( chan struct { } , 1 ) ,
2020-08-07 16:50:10 +00:00
localAddrs : make ( map [ address . Address ] struct { } ) ,
pending : make ( map [ address . Address ] * msgSet ) ,
minGasPrice : types . NewInt ( 0 ) ,
pruneTrigger : make ( chan struct { } , 1 ) ,
pruneCooldown : make ( chan struct { } , 1 ) ,
blsSigCache : cache ,
sigValCache : verifcache ,
changes : lps . New ( 50 ) ,
localMsgs : namespace . Wrap ( ds , datastore . NewKey ( localMsgsDs ) ) ,
api : api ,
netName : netName ,
cfg : cfg ,
2019-11-23 19:01:56 +00:00
}
2020-08-07 16:50:10 +00:00
// enable initial prunes
mp . pruneCooldown <- struct { } { }
2020-08-25 05:49:15 +00:00
// load the current tipset and subscribe to head changes _before_ loading local messages
2019-12-03 19:33:29 +00:00
mp . curTs = api . SubscribeHeadChanges ( func ( rev , app [ ] * types . TipSet ) error {
2019-11-13 14:48:57 +00:00
err := mp . HeadChange ( rev , app )
if err != nil {
log . Errorf ( "mpool head notif handler error: %+v" , err )
}
return err
} )
2019-07-05 14:46:21 +00:00
2020-08-25 05:49:15 +00:00
if err := mp . loadLocal ( ) ; err != nil {
log . Errorf ( "loading local messages: %+v" , err )
}
go mp . runLoop ( )
2019-11-23 19:01:56 +00:00
return mp , nil
2019-07-05 14:46:21 +00:00
}
2019-11-13 21:53:18 +00:00
func ( mp * MessagePool ) Close ( ) error {
close ( mp . closer )
return nil
}
2020-08-01 22:54:21 +00:00
func ( mp * MessagePool ) Prune ( ) {
2020-08-07 16:50:10 +00:00
// this magic incantation of triggering prune thrice is here to make the Prune method
// synchronous:
// so, its a single slot buffered channel. The first send fills the channel,
// the second send goes through when the pruning starts,
// and the third send goes through (and noops) after the pruning finishes
// and goes through the loop again
2020-08-01 22:54:21 +00:00
mp . pruneTrigger <- struct { } { }
mp . pruneTrigger <- struct { } { }
mp . pruneTrigger <- struct { } { }
}
2020-07-16 22:28:35 +00:00
func ( mp * MessagePool ) runLoop ( ) {
2019-11-13 21:53:18 +00:00
for {
select {
case <- mp . repubTk . C :
2020-08-10 08:07:36 +00:00
if err := mp . republishPendingMessages ( ) ; err != nil {
log . Errorf ( "error while republishing messages: %s" , err )
2019-11-13 21:53:18 +00:00
}
2020-08-17 07:03:39 +00:00
case <- mp . repubTrigger :
if err := mp . republishPendingMessages ( ) ; err != nil {
log . Errorf ( "error while republishing messages: %s" , err )
}
2020-07-16 22:28:35 +00:00
case <- mp . pruneTrigger :
if err := mp . pruneExcessMessages ( ) ; err != nil {
log . Errorf ( "failed to prune excess messages from mempool: %s" , err )
}
2019-11-13 21:53:18 +00:00
case <- mp . closer :
mp . repubTk . Stop ( )
return
}
}
}
2019-11-23 19:01:56 +00:00
func ( mp * MessagePool ) addLocal ( m * types . SignedMessage , msgb [ ] byte ) error {
mp . localAddrs [ m . Message . From ] = struct { } { }
if err := mp . localMsgs . Put ( datastore . NewKey ( string ( m . Cid ( ) . Bytes ( ) ) ) , msgb ) ; err != nil {
return xerrors . Errorf ( "persisting local message: %w" , err )
}
return nil
2019-11-13 21:53:18 +00:00
}
2020-09-07 17:26:51 +00:00
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
// and whether the message has enough funds to be included in the next 20 blocks.
// If the message is not valid for block inclusion, it returns an error.
// For local messages, if the message can be included in the next 20 blocks, it returns true to
// signal that it should be immediately published. If the message cannot be included in the next 20
// blocks, it returns false so that the message doesn't immediately get published (and ignored by our
// peers); instead it will be published through the republish loop, once the base fee has fallen
// sufficiently.
// For non local messages, if the message cannot be included in the next 20 blocks it returns
// a (soft) validation error.
2020-09-06 10:48:26 +00:00
func ( mp * MessagePool ) verifyMsgBeforeAdd ( m * types . SignedMessage , curTs * types . TipSet , local bool ) ( bool , error ) {
2020-09-05 17:09:46 +00:00
epoch := curTs . Height ( )
2020-07-29 16:30:08 +00:00
minGas := vm . PricelistByEpoch ( epoch ) . OnChainMessage ( m . ChainLength ( ) )
if err := m . VMMessage ( ) . ValidForBlockInclusion ( minGas . Total ( ) ) ; err != nil {
2020-09-06 10:48:26 +00:00
return false , xerrors . Errorf ( "message will not be included in a block: %w" , err )
2020-07-29 16:30:08 +00:00
}
2020-09-05 17:09:46 +00:00
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
// if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely
// on republish to push it through later, if the baseFee has fallen.
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
// queues.
2020-09-06 10:48:26 +00:00
// Note that for local messages, we always add them so that they can be accepted and republished
// automatically.
publish := local
2020-09-07 19:28:23 +00:00
if strictBaseFeeValidation && len ( curTs . Blocks ( ) ) > 0 {
2020-09-05 18:03:22 +00:00
baseFee := curTs . Blocks ( ) [ 0 ] . ParentBaseFee
2020-09-05 17:31:16 +00:00
baseFeeLowerBound := types . BigDiv ( baseFee , baseFeeLowerBoundFactor )
if m . Message . GasFeeCap . LessThan ( baseFeeLowerBound ) {
2020-09-06 10:48:26 +00:00
if local {
log . Warnf ( "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s)" ,
m . Message . GasFeeCap , baseFeeLowerBound )
publish = false
} else {
return false , xerrors . Errorf ( "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w" ,
m . Message . GasFeeCap , baseFeeLowerBound , ErrSoftValidationFailure )
}
2020-09-05 17:31:16 +00:00
}
2020-09-05 17:09:46 +00:00
}
2020-09-06 10:48:26 +00:00
return publish , nil
2020-07-29 16:30:08 +00:00
}
2020-01-07 16:44:55 +00:00
func ( mp * MessagePool ) Push ( m * types . SignedMessage ) ( cid . Cid , error ) {
2020-08-18 07:19:46 +00:00
err := mp . checkMessage ( m )
if err != nil {
return cid . Undef , err
}
2020-08-12 07:38:40 +00:00
// serialize push access to reduce lock contention
mp . addSema <- struct { } { }
defer func ( ) {
<- mp . addSema
} ( )
2019-09-16 14:17:08 +00:00
msgb , err := m . Serialize ( )
if err != nil {
2020-01-07 16:44:55 +00:00
return cid . Undef , err
2019-09-16 14:17:08 +00:00
}
2020-08-18 07:19:46 +00:00
mp . curTsLk . Lock ( )
2020-09-06 10:48:26 +00:00
publish , err := mp . addTs ( m , mp . curTs , true )
if err != nil {
2020-08-18 07:19:46 +00:00
mp . curTsLk . Unlock ( )
2020-01-07 16:44:55 +00:00
return cid . Undef , err
2019-09-16 14:17:08 +00:00
}
2020-08-18 07:19:46 +00:00
mp . curTsLk . Unlock ( )
2019-09-16 14:17:08 +00:00
2019-11-13 21:53:18 +00:00
mp . lk . Lock ( )
2019-11-23 19:01:56 +00:00
if err := mp . addLocal ( m , msgb ) ; err != nil {
mp . lk . Unlock ( )
2020-01-07 16:44:55 +00:00
return cid . Undef , err
2019-11-23 19:01:56 +00:00
}
2019-11-13 21:53:18 +00:00
mp . lk . Unlock ( )
2020-09-06 10:48:26 +00:00
if publish {
err = mp . api . PubSubPublish ( build . MessagesTopic ( mp . netName ) , msgb )
}
return m . Cid ( ) , err
2019-09-16 14:17:08 +00:00
}
2020-08-18 07:19:46 +00:00
func ( mp * MessagePool ) checkMessage ( m * types . SignedMessage ) error {
2019-10-13 13:03:15 +00:00
// big messages are bad, anti DOS
if m . Size ( ) > 32 * 1024 {
2019-11-13 14:48:57 +00:00
return xerrors . Errorf ( "mpool message too large (%dB): %w" , m . Size ( ) , ErrMessageTooBig )
2019-10-13 13:03:15 +00:00
}
2020-08-26 16:37:27 +00:00
// Perform syntactic validation, minGas=0 as we check the actual mingas before we add it
2020-08-24 23:01:16 +00:00
if err := m . Message . ValidForBlockInclusion ( 0 ) ; err != nil {
2020-08-25 01:14:52 +00:00
return xerrors . Errorf ( "message not valid for block inclusion: %w" , err )
2020-08-24 23:01:16 +00:00
}
2019-10-14 03:28:19 +00:00
if m . Message . To == address . Undef {
return ErrInvalidToAddr
}
2019-10-13 13:03:15 +00:00
if ! m . Message . Value . LessThan ( types . TotalFilecoinInt ) {
return ErrMessageValueTooHigh
}
2020-08-26 18:23:28 +00:00
if m . Message . GasFeeCap . LessThan ( minimumBaseFee ) {
return ErrGasFeeCapTooLow
}
2020-05-15 17:56:38 +00:00
if err := mp . VerifyMsgSig ( m ) ; err != nil {
2020-08-26 11:13:46 +00:00
log . Warnf ( "signature verification failed: %s" , err )
2019-10-13 13:03:15 +00:00
return err
}
2020-08-18 07:19:46 +00:00
return nil
}
func ( mp * MessagePool ) Add ( m * types . SignedMessage ) error {
err := mp . checkMessage ( m )
if err != nil {
return err
}
2020-08-12 07:38:40 +00:00
// serialize push access to reduce lock contention
mp . addSema <- struct { } { }
defer func ( ) {
<- mp . addSema
} ( )
2020-05-15 17:56:38 +00:00
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
2020-09-06 10:48:26 +00:00
_ , err = mp . addTs ( m , mp . curTs , false )
return err
2020-05-15 17:56:38 +00:00
}
func sigCacheKey ( m * types . SignedMessage ) ( string , error ) {
switch m . Signature . Type {
case crypto . SigTypeBLS :
if len ( m . Signature . Data ) < 90 {
return "" , fmt . Errorf ( "bls signature too short" )
}
return string ( m . Cid ( ) . Bytes ( ) ) + string ( m . Signature . Data [ 64 : ] ) , nil
case crypto . SigTypeSecp256k1 :
return string ( m . Cid ( ) . Bytes ( ) ) , nil
default :
return "" , xerrors . Errorf ( "unrecognized signature type: %d" , m . Signature . Type )
}
}
func ( mp * MessagePool ) VerifyMsgSig ( m * types . SignedMessage ) error {
sck , err := sigCacheKey ( m )
if err != nil {
return err
}
_ , ok := mp . sigValCache . Get ( sck )
if ok {
// already validated, great
return nil
}
if err := sigs . Verify ( & m . Signature , m . Message . From , m . Message . Cid ( ) . Bytes ( ) ) ; err != nil {
return err
}
mp . sigValCache . Add ( sck , struct { } { } )
return nil
}
2020-08-26 11:23:34 +00:00
func ( mp * MessagePool ) checkBalance ( m * types . SignedMessage , curTs * types . TipSet ) error {
balance , err := mp . getStateBalance ( m . Message . From , curTs )
2019-10-13 13:03:15 +00:00
if err != nil {
2020-08-26 12:16:04 +00:00
return xerrors . Errorf ( "failed to check sender balance: %s: %w" , err , ErrSoftValidationFailure )
2019-10-13 13:03:15 +00:00
}
2020-08-26 12:13:50 +00:00
requiredFunds := m . Message . RequiredFunds ( )
2020-08-26 11:58:26 +00:00
if balance . LessThan ( requiredFunds ) {
return xerrors . Errorf ( "not enough funds (required: %s, balance: %s): %w" , types . FIL ( requiredFunds ) , types . FIL ( balance ) , ErrNotEnoughFunds )
}
2020-08-26 12:13:50 +00:00
// add Value for soft failure check
2020-08-26 18:10:46 +00:00
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)
2020-08-26 12:13:50 +00:00
2020-08-26 11:58:26 +00:00
mset , ok := mp . pending [ m . Message . From ]
if ok {
2020-08-26 12:27:09 +00:00
requiredFunds = types . BigAdd ( requiredFunds , mset . getRequiredFunds ( m . Message . Nonce ) )
2020-08-26 12:13:50 +00:00
}
if balance . LessThan ( requiredFunds ) {
2020-08-26 12:16:04 +00:00
// Note: we fail here for ErrSoftValidationFailure to signal a soft failure because we might
2020-08-26 12:13:50 +00:00
// be out of sync.
2020-08-26 12:16:04 +00:00
return xerrors . Errorf ( "not enough funds including pending messages (required: %s, balance: %s): %w" , types . FIL ( requiredFunds ) , types . FIL ( balance ) , ErrSoftValidationFailure )
2019-10-13 13:03:15 +00:00
}
2020-08-26 11:23:34 +00:00
return nil
}
2020-09-06 10:48:26 +00:00
func ( mp * MessagePool ) addTs ( m * types . SignedMessage , curTs * types . TipSet , local bool ) ( bool , error ) {
2020-08-26 11:23:34 +00:00
snonce , err := mp . getStateNonce ( m . Message . From , curTs )
2019-10-13 13:03:15 +00:00
if err != nil {
2020-09-06 10:48:26 +00:00
return false , xerrors . Errorf ( "failed to look up actor state nonce: %s: %w" , err , ErrSoftValidationFailure )
2019-10-13 13:03:15 +00:00
}
2020-08-26 11:23:34 +00:00
if snonce > m . Message . Nonce {
2020-09-06 10:48:26 +00:00
return false , xerrors . Errorf ( "minimum expected nonce is %d: %w" , snonce , ErrNonceTooLow )
2019-10-13 13:03:15 +00:00
}
2019-07-05 14:46:21 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2020-09-06 10:48:26 +00:00
publish , err := mp . verifyMsgBeforeAdd ( m , curTs , local )
if err != nil {
return false , err
2020-08-26 16:37:27 +00:00
}
2020-08-26 11:23:34 +00:00
if err := mp . checkBalance ( m , curTs ) ; err != nil {
2020-09-06 10:48:26 +00:00
return false , err
2020-08-26 11:23:34 +00:00
}
2020-09-06 10:48:26 +00:00
return publish , mp . addLocked ( m , true )
2019-09-16 14:17:08 +00:00
}
2020-09-01 14:57:44 +00:00
func ( mp * MessagePool ) addLoaded ( m * types . SignedMessage ) error {
err := mp . checkMessage ( m )
if err != nil {
return err
}
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
curTs := mp . curTs
snonce , err := mp . getStateNonce ( m . Message . From , curTs )
if err != nil {
return xerrors . Errorf ( "failed to look up actor state nonce: %s: %w" , err , ErrSoftValidationFailure )
}
if snonce > m . Message . Nonce {
return xerrors . Errorf ( "minimum expected nonce is %d: %w" , snonce , ErrNonceTooLow )
}
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2020-09-06 10:48:26 +00:00
_ , err = mp . verifyMsgBeforeAdd ( m , curTs , true )
if err != nil {
2020-09-01 14:57:44 +00:00
return err
}
if err := mp . checkBalance ( m , curTs ) ; err != nil {
return err
}
return mp . addLocked ( m , false )
}
2019-12-07 11:41:30 +00:00
func ( mp * MessagePool ) addSkipChecks ( m * types . SignedMessage ) error {
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2020-08-27 21:04:21 +00:00
return mp . addLocked ( m , false )
2019-12-07 11:41:30 +00:00
}
2020-09-01 14:57:44 +00:00
func ( mp * MessagePool ) addLocked ( m * types . SignedMessage , strict bool ) error {
2020-03-09 22:46:00 +00:00
log . Debugf ( "mpooladd: %s %d" , m . Message . From , m . Message . Nonce )
2020-02-12 23:52:36 +00:00
if m . Signature . Type == crypto . SigTypeBLS {
2019-11-12 07:16:42 +00:00
mp . blsSigCache . Add ( m . Cid ( ) , m . Signature )
}
2019-08-09 15:59:12 +00:00
2019-12-01 22:22:10 +00:00
if _ , err := mp . api . PutMessage ( m ) ; err != nil {
2019-09-20 09:01:49 +00:00
log . Warnf ( "mpooladd cs.PutMessage failed: %s" , err )
2019-07-05 14:46:21 +00:00
return err
}
2019-12-01 22:22:10 +00:00
if _ , err := mp . api . PutMessage ( & m . Message ) ; err != nil {
2019-11-24 16:35:50 +00:00
log . Warnf ( "mpooladd cs.PutMessage failed: %s" , err )
return err
}
2019-07-05 14:46:21 +00:00
mset , ok := mp . pending [ m . Message . From ]
if ! ok {
2020-09-01 14:57:44 +00:00
nonce , err := mp . getStateNonce ( m . Message . From , mp . curTs )
if err != nil {
return xerrors . Errorf ( "failed to get initial actor nonce: %w" , err )
}
mset = newMsgSet ( nonce )
2019-07-05 14:46:21 +00:00
mp . pending [ m . Message . From ] = mset
}
2020-09-01 14:57:44 +00:00
incr , err := mset . add ( m , mp , strict )
2020-07-16 22:28:35 +00:00
if err != nil {
2020-09-04 21:28:13 +00:00
log . Debug ( err )
2020-08-09 01:17:40 +00:00
return err
2020-07-16 22:28:35 +00:00
}
if incr {
mp . currentSize ++
2020-08-07 14:33:55 +00:00
if mp . currentSize > mp . cfg . SizeLimitHigh {
2020-07-16 22:28:35 +00:00
// send signal to prune messages if it hasnt already been sent
select {
case mp . pruneTrigger <- struct { } { } :
default :
}
}
2019-11-23 19:01:56 +00:00
}
2019-11-17 07:44:06 +00:00
mp . changes . Pub ( api . MpoolUpdate {
Type : api . MpoolAdd ,
Message : m ,
} , localUpdates )
2019-07-05 14:46:21 +00:00
return nil
}
2019-07-17 06:05:11 +00:00
func ( mp * MessagePool ) GetNonce ( addr address . Address ) ( uint64 , error ) {
2019-12-03 19:33:29 +00:00
mp . curTsLk . Lock ( )
2019-12-03 21:09:39 +00:00
defer mp . curTsLk . Unlock ( )
2019-12-03 19:33:29 +00:00
2019-07-17 06:05:11 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2019-12-03 19:33:29 +00:00
return mp . getNonceLocked ( addr , mp . curTs )
2019-09-16 14:17:08 +00:00
}
2019-12-03 19:33:29 +00:00
func ( mp * MessagePool ) getNonceLocked ( addr address . Address , curTs * types . TipSet ) ( uint64 , error ) {
stateNonce , err := mp . getStateNonce ( addr , curTs ) // sanity check
2019-11-23 01:26:32 +00:00
if err != nil {
return 0 , err
}
2019-07-17 06:05:11 +00:00
mset , ok := mp . pending [ addr ]
if ok {
2019-11-23 01:26:32 +00:00
if stateNonce > mset . nextNonce {
2019-11-24 16:35:50 +00:00
log . Errorf ( "state nonce was larger than mset.nextNonce (%d > %d)" , stateNonce , mset . nextNonce )
2019-11-23 01:26:32 +00:00
return stateNonce , nil
}
2019-09-20 09:01:49 +00:00
return mset . nextNonce , nil
2019-07-17 06:05:11 +00:00
}
2019-11-23 01:26:32 +00:00
return stateNonce , nil
2019-10-13 13:03:15 +00:00
}
2019-12-03 19:33:29 +00:00
func ( mp * MessagePool ) getStateNonce ( addr address . Address , curTs * types . TipSet ) ( uint64 , error ) {
2020-08-25 02:02:06 +00:00
act , err := mp . api . GetActorAfter ( addr , curTs )
2019-07-17 06:05:11 +00:00
if err != nil {
return 0 , err
}
2020-08-25 02:02:06 +00:00
return act . Nonce , nil
2019-07-17 06:05:11 +00:00
}
2019-12-07 11:41:30 +00:00
func ( mp * MessagePool ) getStateBalance ( addr address . Address , ts * types . TipSet ) ( types . BigInt , error ) {
2020-08-25 02:02:06 +00:00
act , err := mp . api . GetActorAfter ( addr , ts )
2019-10-13 13:03:15 +00:00
if err != nil {
return types . EmptyInt , err
}
return act . Balance , nil
}
2020-04-16 21:06:31 +00:00
func ( mp * MessagePool ) PushWithNonce ( ctx context . Context , addr address . Address , cb func ( address . Address , uint64 ) ( * types . SignedMessage , error ) ) ( * types . SignedMessage , error ) {
2020-08-12 07:38:40 +00:00
// serialize push access to reduce lock contention
mp . addSema <- struct { } { }
defer func ( ) {
<- mp . addSema
} ( )
2019-12-03 19:33:29 +00:00
mp . curTsLk . Lock ( )
2019-09-16 14:17:08 +00:00
mp . lk . Lock ( )
2020-08-12 17:26:58 +00:00
curTs := mp . curTs
2020-04-16 21:06:31 +00:00
fromKey := addr
if fromKey . Protocol ( ) == address . ID {
2020-04-18 00:25:43 +00:00
var err error
2020-04-16 21:06:31 +00:00
fromKey , err = mp . api . StateAccountKey ( ctx , fromKey , mp . curTs )
if err != nil {
2020-08-12 17:26:58 +00:00
mp . lk . Unlock ( )
mp . curTsLk . Unlock ( )
2020-04-16 21:06:31 +00:00
return nil , xerrors . Errorf ( "resolving sender key: %w" , err )
}
}
2020-04-18 00:25:43 +00:00
nonce , err := mp . getNonceLocked ( fromKey , mp . curTs )
if err != nil {
2020-08-12 17:26:58 +00:00
mp . lk . Unlock ( )
mp . curTsLk . Unlock ( )
2020-04-18 00:25:43 +00:00
return nil , xerrors . Errorf ( "get nonce locked failed: %w" , err )
}
2020-08-12 17:26:58 +00:00
// release the locks for signing
mp . lk . Unlock ( )
mp . curTsLk . Unlock ( )
2020-04-16 21:06:31 +00:00
msg , err := cb ( fromKey , nonce )
2019-09-16 14:17:08 +00:00
if err != nil {
2019-09-17 08:15:26 +00:00
return nil , err
2019-09-16 14:17:08 +00:00
}
2020-08-26 11:13:46 +00:00
err = mp . checkMessage ( msg )
if err != nil {
return nil , err
}
2020-08-26 16:42:56 +00:00
msgb , err := msg . Serialize ( )
if err != nil {
return nil , err
}
2020-08-12 17:26:58 +00:00
// reacquire the locks and check state for consistency
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
if mp . curTs != curTs {
return nil , ErrTryAgain
}
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
nonce2 , err := mp . getNonceLocked ( fromKey , mp . curTs )
if err != nil {
return nil , xerrors . Errorf ( "get nonce locked failed: %w" , err )
}
if nonce2 != nonce {
return nil , ErrTryAgain
}
2020-09-06 10:48:26 +00:00
publish , err := mp . verifyMsgBeforeAdd ( msg , curTs , true )
if err != nil {
2020-08-26 11:23:34 +00:00
return nil , err
}
2020-08-26 16:37:27 +00:00
if err := mp . checkBalance ( msg , curTs ) ; err != nil {
2020-07-29 16:30:08 +00:00
return nil , err
}
2020-08-27 21:04:21 +00:00
if err := mp . addLocked ( msg , true ) ; err != nil {
2020-04-01 01:34:23 +00:00
return nil , xerrors . Errorf ( "add locked failed: %w" , err )
2019-09-16 14:17:08 +00:00
}
2019-11-24 16:35:50 +00:00
if err := mp . addLocal ( msg , msgb ) ; err != nil {
log . Errorf ( "addLocal failed: %+v" , err )
}
2019-09-16 14:17:08 +00:00
2020-09-06 10:48:26 +00:00
if publish {
err = mp . api . PubSubPublish ( build . MessagesTopic ( mp . netName ) , msgb )
}
return msg , err
2019-09-16 14:17:08 +00:00
}
2020-09-01 14:57:44 +00:00
func ( mp * MessagePool ) Remove ( from address . Address , nonce uint64 , applied bool ) {
2019-07-05 14:46:21 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2020-09-01 14:57:44 +00:00
mp . remove ( from , nonce , applied )
2020-08-01 22:54:21 +00:00
}
2020-09-01 14:57:44 +00:00
func ( mp * MessagePool ) remove ( from address . Address , nonce uint64 , applied bool ) {
2019-08-14 04:43:29 +00:00
mset , ok := mp . pending [ from ]
2019-07-05 14:46:21 +00:00
if ! ok {
return
}
2019-11-19 21:26:25 +00:00
if m , ok := mset . msgs [ nonce ] ; ok {
mp . changes . Pub ( api . MpoolUpdate {
Type : api . MpoolRemove ,
Message : m ,
} , localUpdates )
2020-07-16 22:28:35 +00:00
mp . currentSize --
2019-11-19 21:26:25 +00:00
}
2019-11-17 07:44:06 +00:00
2019-07-05 14:46:21 +00:00
// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
2020-09-01 14:57:44 +00:00
mset . rm ( nonce , applied )
2019-07-05 14:46:21 +00:00
if len ( mset . msgs ) == 0 {
2019-11-23 01:26:32 +00:00
delete ( mp . pending , from )
2019-07-05 14:46:21 +00:00
}
}
2019-12-03 19:33:29 +00:00
func ( mp * MessagePool ) Pending ( ) ( [ ] * types . SignedMessage , * types . TipSet ) {
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
2019-07-05 14:46:21 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2019-12-03 19:33:29 +00:00
2020-08-25 10:03:50 +00:00
return mp . allPending ( )
}
func ( mp * MessagePool ) allPending ( ) ( [ ] * types . SignedMessage , * types . TipSet ) {
2019-09-06 22:32:42 +00:00
out := make ( [ ] * types . SignedMessage , 0 )
2019-11-13 21:53:18 +00:00
for a := range mp . pending {
out = append ( out , mp . pendingFor ( a ) ... )
}
2019-09-20 09:01:49 +00:00
2019-12-03 19:33:29 +00:00
return out , mp . curTs
2019-11-13 21:53:18 +00:00
}
2020-08-25 10:03:50 +00:00
2020-07-22 15:46:13 +00:00
func ( mp * MessagePool ) PendingFor ( a address . Address ) ( [ ] * types . SignedMessage , * types . TipSet ) {
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
return mp . pendingFor ( a ) , mp . curTs
}
2019-09-20 09:01:49 +00:00
2019-11-13 21:53:18 +00:00
func ( mp * MessagePool ) pendingFor ( a address . Address ) [ ] * types . SignedMessage {
mset := mp . pending [ a ]
if mset == nil || len ( mset . msgs ) == 0 {
return nil
2019-07-05 14:46:21 +00:00
}
2019-11-13 22:41:39 +00:00
set := make ( [ ] * types . SignedMessage , 0 , len ( mset . msgs ) )
2019-11-13 21:53:18 +00:00
2019-11-13 22:41:39 +00:00
for _ , m := range mset . msgs {
set = append ( set , m )
2019-11-13 21:53:18 +00:00
}
2019-11-13 22:41:39 +00:00
sort . Slice ( set , func ( i , j int ) bool {
return set [ i ] . Message . Nonce < set [ j ] . Message . Nonce
} )
return set
2019-07-05 14:46:21 +00:00
}
2019-07-26 04:54:22 +00:00
func ( mp * MessagePool ) HeadChange ( revert [ ] * types . TipSet , apply [ ] * types . TipSet ) error {
2019-12-03 19:33:29 +00:00
mp . curTsLk . Lock ( )
defer mp . curTsLk . Unlock ( )
2019-12-01 22:22:10 +00:00
2020-08-17 07:03:39 +00:00
repubTrigger := false
2019-12-07 11:41:30 +00:00
rmsgs := make ( map [ address . Address ] map [ uint64 ] * types . SignedMessage )
add := func ( m * types . SignedMessage ) {
s , ok := rmsgs [ m . Message . From ]
if ! ok {
s = make ( map [ uint64 ] * types . SignedMessage )
rmsgs [ m . Message . From ] = s
}
s [ m . Message . Nonce ] = m
}
rm := func ( from address . Address , nonce uint64 ) {
s , ok := rmsgs [ from ]
if ! ok {
2020-09-01 14:57:44 +00:00
mp . Remove ( from , nonce , true )
2019-12-07 11:41:30 +00:00
return
}
if _ , ok := s [ nonce ] ; ok {
delete ( s , nonce )
return
}
2020-09-01 14:57:44 +00:00
mp . Remove ( from , nonce , true )
2019-12-07 11:41:30 +00:00
}
2020-08-17 07:03:39 +00:00
maybeRepub := func ( cid cid . Cid ) {
if ! repubTrigger {
mp . lk . Lock ( )
_ , republished := mp . republished [ cid ]
mp . lk . Unlock ( )
if republished {
repubTrigger = true
}
}
}
2020-08-21 17:17:30 +00:00
var merr error
2019-07-05 14:46:21 +00:00
for _ , ts := range revert {
2019-12-02 20:46:25 +00:00
pts , err := mp . api . LoadTipSet ( ts . Parents ( ) )
if err != nil {
2020-08-21 17:17:30 +00:00
log . Errorf ( "error loading reverted tipset parent: %s" , err )
merr = multierror . Append ( merr , err )
continue
2019-12-02 20:46:25 +00:00
}
2020-08-21 17:17:30 +00:00
mp . curTs = pts
2019-12-03 19:33:29 +00:00
msgs , err := mp . MessagesForBlocks ( ts . Blocks ( ) )
if err != nil {
2020-08-21 17:17:30 +00:00
log . Errorf ( "error retrieving messages for reverted block: %s" , err )
merr = multierror . Append ( merr , err )
continue
2019-12-03 19:33:29 +00:00
}
2019-08-01 20:40:47 +00:00
2019-12-03 19:33:29 +00:00
for _ , msg := range msgs {
2019-12-07 11:41:30 +00:00
add ( msg )
2019-07-05 14:46:21 +00:00
}
}
for _ , ts := range apply {
2020-08-21 17:17:30 +00:00
mp . curTs = ts
2019-07-05 14:46:21 +00:00
for _ , b := range ts . Blocks ( ) {
2019-12-01 22:22:10 +00:00
bmsgs , smsgs , err := mp . api . MessagesForBlock ( b )
2019-07-05 14:46:21 +00:00
if err != nil {
2020-08-21 17:17:30 +00:00
xerr := xerrors . Errorf ( "failed to get messages for apply block %s(height %d) (msgroot = %s): %w" , b . Cid ( ) , b . Height , b . Messages , err )
log . Errorf ( "error retrieving messages for block: %s" , xerr )
merr = multierror . Append ( merr , xerr )
continue
2019-07-05 14:46:21 +00:00
}
2020-08-21 17:17:30 +00:00
2019-08-01 20:40:47 +00:00
for _ , msg := range smsgs {
2019-12-07 11:41:30 +00:00
rm ( msg . Message . From , msg . Message . Nonce )
2020-08-17 07:03:39 +00:00
maybeRepub ( msg . Cid ( ) )
2019-07-05 14:46:21 +00:00
}
2019-08-01 20:40:47 +00:00
for _ , msg := range bmsgs {
2019-12-07 11:41:30 +00:00
rm ( msg . From , msg . Nonce )
2020-08-17 07:03:39 +00:00
maybeRepub ( msg . Cid ( ) )
2019-08-01 20:40:47 +00:00
}
2019-07-05 14:46:21 +00:00
}
}
2020-08-17 07:03:39 +00:00
if repubTrigger {
select {
case mp . repubTrigger <- struct { } { } :
default :
}
}
2019-12-07 11:41:30 +00:00
for _ , s := range rmsgs {
for _ , msg := range s {
if err := mp . addSkipChecks ( msg ) ; err != nil {
log . Errorf ( "Failed to readd message from reorg to mpool: %s" , err )
}
}
}
2020-03-09 22:46:00 +00:00
if len ( revert ) > 0 && futureDebug {
2020-08-25 10:03:50 +00:00
mp . lk . Lock ( )
msgs , ts := mp . allPending ( )
mp . lk . Unlock ( )
2020-03-09 22:46:00 +00:00
buckets := map [ address . Address ] * statBucket { }
for _ , v := range msgs {
bkt , ok := buckets [ v . Message . From ]
if ! ok {
bkt = & statBucket {
msgs : map [ uint64 ] * types . SignedMessage { } ,
}
buckets [ v . Message . From ] = bkt
}
bkt . msgs [ v . Message . Nonce ] = v
}
for a , bkt := range buckets {
2020-08-25 02:02:06 +00:00
// TODO that might not be correct with GatActorAfter but it is only debug code
act , err := mp . api . GetActorAfter ( a , ts )
2020-03-09 22:46:00 +00:00
if err != nil {
log . Debugf ( "%s, err: %s\n" , a , err )
continue
}
var cmsg * types . SignedMessage
var ok bool
cur := act . Nonce
for {
cmsg , ok = bkt . msgs [ cur ]
if ! ok {
break
}
cur ++
}
ff := uint64 ( math . MaxUint64 )
for k := range bkt . msgs {
if k > cur && k < ff {
ff = k
}
}
if ff != math . MaxUint64 {
m := bkt . msgs [ ff ]
// cmsg can be nil if no messages from the current nonce are in the mpool
ccid := "nil"
if cmsg != nil {
ccid = cmsg . Cid ( ) . String ( )
}
log . Debugw ( "Nonce gap" ,
"actor" , a ,
"future_cid" , m . Cid ( ) ,
"future_nonce" , ff ,
"current_cid" , ccid ,
"current_nonce" , cur ,
"revert_tipset" , revert [ 0 ] . Key ( ) ,
"new_head" , ts . Key ( ) ,
)
}
}
}
2020-08-21 17:17:30 +00:00
return merr
2019-07-05 14:46:21 +00:00
}
2019-08-01 20:40:47 +00:00
2020-08-25 00:21:03 +00:00
func ( mp * MessagePool ) runHeadChange ( from * types . TipSet , to * types . TipSet , rmsgs map [ address . Address ] map [ uint64 ] * types . SignedMessage ) error {
add := func ( m * types . SignedMessage ) {
s , ok := rmsgs [ m . Message . From ]
if ! ok {
s = make ( map [ uint64 ] * types . SignedMessage )
rmsgs [ m . Message . From ] = s
}
s [ m . Message . Nonce ] = m
}
rm := func ( from address . Address , nonce uint64 ) {
s , ok := rmsgs [ from ]
if ! ok {
return
}
if _ , ok := s [ nonce ] ; ok {
delete ( s , nonce )
return
}
}
revert , apply , err := store . ReorgOps ( mp . api . LoadTipSet , from , to )
if err != nil {
return xerrors . Errorf ( "failed to compute reorg ops for mpool pending messages: %w" , err )
}
var merr error
for _ , ts := range revert {
msgs , err := mp . MessagesForBlocks ( ts . Blocks ( ) )
if err != nil {
log . Errorf ( "error retrieving messages for reverted block: %s" , err )
merr = multierror . Append ( merr , err )
continue
}
for _ , msg := range msgs {
add ( msg )
}
}
for _ , ts := range apply {
for _ , b := range ts . Blocks ( ) {
bmsgs , smsgs , err := mp . api . MessagesForBlock ( b )
if err != nil {
xerr := xerrors . Errorf ( "failed to get messages for apply block %s(height %d) (msgroot = %s): %w" , b . Cid ( ) , b . Height , b . Messages , err )
log . Errorf ( "error retrieving messages for block: %s" , xerr )
merr = multierror . Append ( merr , xerr )
continue
}
for _ , msg := range smsgs {
rm ( msg . Message . From , msg . Message . Nonce )
}
for _ , msg := range bmsgs {
rm ( msg . From , msg . Nonce )
}
}
}
return merr
}
2020-03-09 22:46:00 +00:00
type statBucket struct {
msgs map [ uint64 ] * types . SignedMessage
}
2019-12-03 19:33:29 +00:00
func ( mp * MessagePool ) MessagesForBlocks ( blks [ ] * types . BlockHeader ) ( [ ] * types . SignedMessage , error ) {
out := make ( [ ] * types . SignedMessage , 0 )
for _ , b := range blks {
bmsgs , smsgs , err := mp . api . MessagesForBlock ( b )
if err != nil {
return nil , xerrors . Errorf ( "failed to get messages for apply block %s(height %d) (msgroot = %s): %w" , b . Cid ( ) , b . Height , b . Messages , err )
}
2019-12-05 02:04:09 +00:00
out = append ( out , smsgs ... )
2019-12-03 19:33:29 +00:00
for _ , msg := range bmsgs {
smsg := mp . RecoverSig ( msg )
if smsg != nil {
out = append ( out , smsg )
} else {
log . Warnf ( "could not recover signature for bls message %s" , msg . Cid ( ) )
}
}
}
return out , nil
}
2019-08-01 20:40:47 +00:00
func ( mp * MessagePool ) RecoverSig ( msg * types . Message ) * types . SignedMessage {
2019-11-12 07:16:42 +00:00
val , ok := mp . blsSigCache . Get ( msg . Cid ( ) )
if ! ok {
return nil
}
2020-02-12 23:52:36 +00:00
sig , ok := val . ( crypto . Signature )
2019-11-12 07:16:42 +00:00
if ! ok {
2019-11-24 16:35:50 +00:00
log . Errorf ( "value in signature cache was not a signature (got %T)" , val )
2019-11-12 07:16:42 +00:00
return nil
}
return & types . SignedMessage {
Message : * msg ,
2019-11-12 11:42:19 +00:00
Signature : sig ,
2019-11-12 07:16:42 +00:00
}
2019-08-01 20:40:47 +00:00
}
2019-11-17 07:44:06 +00:00
func ( mp * MessagePool ) Updates ( ctx context . Context ) ( <- chan api . MpoolUpdate , error ) {
out := make ( chan api . MpoolUpdate , 20 )
sub := mp . changes . Sub ( localUpdates )
go func ( ) {
2019-12-08 15:00:45 +00:00
defer mp . changes . Unsub ( sub , localUpdates )
2020-08-25 10:29:09 +00:00
defer close ( out )
2019-11-19 19:49:11 +00:00
2019-11-17 07:44:06 +00:00
for {
select {
case u := <- sub :
select {
case out <- u . ( api . MpoolUpdate ) :
case <- ctx . Done ( ) :
return
2020-08-25 10:29:09 +00:00
case <- mp . closer :
return
2019-11-17 07:44:06 +00:00
}
case <- ctx . Done ( ) :
return
2020-08-25 10:29:09 +00:00
case <- mp . closer :
return
2019-11-17 07:44:06 +00:00
}
}
} ( )
return out , nil
}
2019-11-23 19:01:56 +00:00
func ( mp * MessagePool ) loadLocal ( ) error {
res , err := mp . localMsgs . Query ( query . Query { } )
if err != nil {
return xerrors . Errorf ( "query local messages: %w" , err )
}
for r := range res . Next ( ) {
if r . Error != nil {
return xerrors . Errorf ( "r.Error: %w" , r . Error )
}
var sm types . SignedMessage
if err := sm . UnmarshalCBOR ( bytes . NewReader ( r . Value ) ) ; err != nil {
return xerrors . Errorf ( "unmarshaling local message: %w" , err )
}
2020-09-01 14:57:44 +00:00
if err := mp . addLoaded ( & sm ) ; err != nil {
2019-11-23 19:01:56 +00:00
if xerrors . Is ( err , ErrNonceTooLow ) {
continue // todo: drop the message from local cache (if above certain confidence threshold)
}
2019-12-10 19:42:09 +00:00
log . Errorf ( "adding local message: %+v" , err )
2019-11-23 19:01:56 +00:00
}
2020-08-07 22:41:57 +00:00
mp . localAddrs [ sm . Message . From ] = struct { } { }
2019-11-23 19:01:56 +00:00
}
return nil
}
2020-08-21 17:28:45 +00:00
2020-08-21 20:24:53 +00:00
func ( mp * MessagePool ) Clear ( local bool ) {
2020-08-21 17:28:45 +00:00
mp . lk . Lock ( )
defer mp . lk . Unlock ( )
2020-08-21 20:24:53 +00:00
// remove everything if local is true, including removing local messages from
// the datastore
if local {
for a := range mp . localAddrs {
mset , ok := mp . pending [ a ]
if ! ok {
continue
}
2020-08-21 17:51:03 +00:00
2020-08-21 20:24:53 +00:00
for _ , m := range mset . msgs {
err := mp . localMsgs . Delete ( datastore . NewKey ( string ( m . Cid ( ) . Bytes ( ) ) ) )
if err != nil {
log . Warnf ( "error deleting local message: %s" , err )
}
2020-08-21 17:51:03 +00:00
}
}
2020-08-21 17:59:40 +00:00
mp . pending = make ( map [ address . Address ] * msgSet )
2020-08-21 20:24:53 +00:00
mp . republished = nil
return
2020-08-21 17:59:40 +00:00
}
2020-08-21 20:24:53 +00:00
// remove everything except the local messages
for a := range mp . pending {
_ , isLocal := mp . localAddrs [ a ]
if isLocal {
continue
}
delete ( mp . pending , a )
}
2020-08-21 17:28:45 +00:00
}