2019-12-01 23:11:43 +00:00
|
|
|
package messagepool
|
2019-07-05 14:46:21 +00:00
|
|
|
|
|
|
|
import (
|
2019-11-23 19:01:56 +00:00
|
|
|
"bytes"
|
2019-11-17 07:44:06 +00:00
|
|
|
"context"
|
|
|
|
"errors"
|
2020-05-15 17:56:38 +00:00
|
|
|
"fmt"
|
2020-03-09 22:46:00 +00:00
|
|
|
"math"
|
2019-11-13 22:41:39 +00:00
|
|
|
"sort"
|
2019-07-05 14:46:21 +00:00
|
|
|
"sync"
|
2019-11-13 21:53:18 +00:00
|
|
|
"time"
|
2019-07-08 12:51:45 +00:00
|
|
|
|
2020-07-29 16:30:08 +00:00
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
2020-02-12 23:52:36 +00:00
|
|
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
2020-08-21 17:17:30 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2019-11-12 07:16:42 +00:00
|
|
|
lru "github.com/hashicorp/golang-lru"
|
2019-12-01 22:22:10 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
2019-11-23 19:01:56 +00:00
|
|
|
"github.com/ipfs/go-datastore"
|
|
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
|
|
"github.com/ipfs/go-datastore/query"
|
2020-01-08 19:10:57 +00:00
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2020-08-10 16:53:56 +00:00
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
2019-11-17 07:44:06 +00:00
|
|
|
lps "github.com/whyrusleeping/pubsub"
|
2019-09-20 09:01:49 +00:00
|
|
|
"golang.org/x/xerrors"
|
2019-09-18 02:50:03 +00:00
|
|
|
|
2019-12-19 20:13:17 +00:00
|
|
|
"github.com/filecoin-project/go-address"
|
2020-07-10 14:43:14 +00:00
|
|
|
|
2019-11-17 07:44:06 +00:00
|
|
|
"github.com/filecoin-project/lotus/api"
|
2019-11-12 07:16:42 +00:00
|
|
|
"github.com/filecoin-project/lotus/build"
|
2020-08-25 00:21:03 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/store"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
2020-07-29 16:30:08 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
2020-01-30 23:48:25 +00:00
|
|
|
"github.com/filecoin-project/lotus/lib/sigs"
|
2019-11-23 19:01:56 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
2020-07-10 14:43:14 +00:00
|
|
|
|
|
|
|
"github.com/raulk/clock"
|
2019-07-05 14:46:21 +00:00
|
|
|
)
|
|
|
|
|
2019-12-01 23:11:43 +00:00
|
|
|
var log = logging.Logger("messagepool")
|
|
|
|
|
2020-03-09 22:46:00 +00:00
|
|
|
const futureDebug = false
|
|
|
|
|
2020-08-12 18:24:35 +00:00
|
|
|
var rbfNumBig = types.NewInt(uint64((ReplaceByFeeRatioDefault - 1) * RbfDenom))
|
|
|
|
var rbfDenomBig = types.NewInt(RbfDenom)
|
|
|
|
|
2020-08-07 14:33:55 +00:00
|
|
|
const RbfDenom = 256
|
2020-04-01 20:26:14 +00:00
|
|
|
|
2020-08-10 16:53:56 +00:00
|
|
|
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
|
2020-08-10 08:18:12 +00:00
|
|
|
|
2019-10-13 13:03:15 +00:00
|
|
|
var (
|
2019-11-13 14:48:57 +00:00
|
|
|
ErrMessageTooBig = errors.New("message too big")
|
2019-10-13 13:03:15 +00:00
|
|
|
|
2019-11-13 14:48:57 +00:00
|
|
|
ErrMessageValueTooHigh = errors.New("cannot send more filecoin than will ever exist")
|
2019-10-13 13:03:15 +00:00
|
|
|
|
2019-11-13 14:48:57 +00:00
|
|
|
ErrNonceTooLow = errors.New("message nonce too low")
|
2019-10-13 13:03:15 +00:00
|
|
|
|
2019-11-13 14:48:57 +00:00
|
|
|
ErrNotEnoughFunds = errors.New("not enough funds to execute transaction")
|
2019-10-14 03:28:19 +00:00
|
|
|
|
2019-11-13 14:48:57 +00:00
|
|
|
ErrInvalidToAddr = errors.New("message had invalid to address")
|
2020-03-01 04:15:02 +00:00
|
|
|
|
2020-08-09 01:17:40 +00:00
|
|
|
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
|
|
|
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
2020-08-12 17:26:58 +00:00
|
|
|
|
2020-08-18 07:19:46 +00:00
|
|
|
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
|
2019-10-13 13:03:15 +00:00
|
|
|
)
|
|
|
|
|
2019-11-13 21:53:18 +00:00
|
|
|
const (
|
2019-11-23 19:01:56 +00:00
|
|
|
localMsgsDs = "/mpool/local"
|
|
|
|
|
2019-11-17 07:44:06 +00:00
|
|
|
localUpdates = "update"
|
2019-11-13 21:53:18 +00:00
|
|
|
)
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
type MessagePool struct {
|
|
|
|
lk sync.Mutex
|
|
|
|
|
2020-08-07 17:10:09 +00:00
|
|
|
ds dtypes.MetadataDS
|
|
|
|
|
2020-08-12 07:38:40 +00:00
|
|
|
addSema chan struct{}
|
|
|
|
|
2020-08-17 07:03:39 +00:00
|
|
|
closer chan struct{}
|
|
|
|
|
|
|
|
repubTk *clock.Ticker
|
|
|
|
repubTrigger chan struct{}
|
|
|
|
|
|
|
|
republished map[cid.Cid]struct{}
|
2019-11-13 21:53:18 +00:00
|
|
|
|
|
|
|
localAddrs map[address.Address]struct{}
|
|
|
|
|
2019-12-05 02:04:09 +00:00
|
|
|
pending map[address.Address]*msgSet
|
2019-07-05 14:46:21 +00:00
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
2019-12-01 22:22:10 +00:00
|
|
|
curTs *types.TipSet
|
2019-09-16 14:17:08 +00:00
|
|
|
|
2020-08-07 14:33:55 +00:00
|
|
|
cfgLk sync.Mutex
|
|
|
|
cfg *types.MpoolConfig
|
|
|
|
|
2019-12-02 19:39:50 +00:00
|
|
|
api Provider
|
2019-10-13 13:03:15 +00:00
|
|
|
|
|
|
|
minGasPrice types.BigInt
|
|
|
|
|
2020-08-07 14:33:55 +00:00
|
|
|
currentSize int
|
2020-07-16 22:28:35 +00:00
|
|
|
|
|
|
|
// pruneTrigger is a channel used to trigger a mempool pruning
|
|
|
|
pruneTrigger chan struct{}
|
2019-11-12 07:16:42 +00:00
|
|
|
|
2020-08-07 16:50:10 +00:00
|
|
|
// pruneCooldown is a channel used to allow a cooldown time between prunes
|
|
|
|
pruneCooldown chan struct{}
|
|
|
|
|
2019-11-12 07:16:42 +00:00
|
|
|
blsSigCache *lru.TwoQueueCache
|
2019-11-17 07:44:06 +00:00
|
|
|
|
|
|
|
changes *lps.PubSub
|
2019-11-23 19:01:56 +00:00
|
|
|
|
|
|
|
localMsgs datastore.Datastore
|
2020-03-31 23:13:37 +00:00
|
|
|
|
|
|
|
netName dtypes.NetworkName
|
2020-05-15 17:56:38 +00:00
|
|
|
|
|
|
|
sigValCache *lru.TwoQueueCache
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type msgSet struct {
|
2019-12-05 02:04:09 +00:00
|
|
|
msgs map[uint64]*types.SignedMessage
|
|
|
|
nextNonce uint64
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func newMsgSet() *msgSet {
|
|
|
|
return &msgSet{
|
2019-07-25 22:15:03 +00:00
|
|
|
msgs: make(map[uint64]*types.SignedMessage),
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-07 14:33:55 +00:00
|
|
|
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
2019-09-20 09:01:49 +00:00
|
|
|
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
|
|
|
|
ms.nextNonce = m.Message.Nonce + 1
|
|
|
|
}
|
2020-04-01 20:26:14 +00:00
|
|
|
exms, has := ms.msgs[m.Message.Nonce]
|
|
|
|
if has {
|
|
|
|
if m.Cid() != exms.Cid() {
|
|
|
|
// check if RBF passes
|
2020-08-06 21:08:42 +00:00
|
|
|
minPrice := exms.Message.GasPremium
|
2020-08-12 18:24:35 +00:00
|
|
|
minPrice = types.BigAdd(minPrice, types.BigDiv(types.BigMul(minPrice, rbfNumBig), rbfDenomBig))
|
2020-04-01 20:26:14 +00:00
|
|
|
minPrice = types.BigAdd(minPrice, types.NewInt(1))
|
2020-08-06 21:08:42 +00:00
|
|
|
if types.BigCmp(m.Message.GasPremium, minPrice) >= 0 {
|
|
|
|
log.Infow("add with RBF", "oldpremium", exms.Message.GasPremium,
|
|
|
|
"newpremium", m.Message.GasPremium, "addr", m.Message.From, "nonce", m.Message.Nonce)
|
2020-04-01 20:26:14 +00:00
|
|
|
} else {
|
|
|
|
log.Info("add with duplicate nonce")
|
2020-07-16 22:28:35 +00:00
|
|
|
return false, xerrors.Errorf("message from %s with nonce %d already in mpool,"+
|
2020-08-09 01:17:40 +00:00
|
|
|
" increase GasPremium to %s from %s to trigger replace by fee: %w",
|
|
|
|
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium,
|
|
|
|
ErrRBFTooLowPremium)
|
2020-04-01 20:26:14 +00:00
|
|
|
}
|
2019-09-20 09:01:49 +00:00
|
|
|
}
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
ms.msgs[m.Message.Nonce] = m
|
2019-09-20 09:01:49 +00:00
|
|
|
|
2020-07-16 22:28:35 +00:00
|
|
|
return !has, nil
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
2020-03-31 23:13:37 +00:00
|
|
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
2019-11-12 07:16:42 +00:00
|
|
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
2020-05-15 17:56:38 +00:00
|
|
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
2020-08-07 17:10:09 +00:00
|
|
|
|
|
|
|
cfg, err := loadConfig(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("error loading mpool config: %w", err)
|
|
|
|
}
|
|
|
|
}
|
2020-05-15 17:56:38 +00:00
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
mp := &MessagePool{
|
2020-08-07 17:10:09 +00:00
|
|
|
ds: ds,
|
2020-08-12 07:38:40 +00:00
|
|
|
addSema: make(chan struct{}, 1),
|
2020-08-07 16:50:10 +00:00
|
|
|
closer: make(chan struct{}),
|
2020-08-10 08:18:12 +00:00
|
|
|
repubTk: build.Clock.Ticker(RepublishInterval),
|
2020-08-17 07:03:39 +00:00
|
|
|
repubTrigger: make(chan struct{}, 1),
|
2020-08-07 16:50:10 +00:00
|
|
|
localAddrs: make(map[address.Address]struct{}),
|
|
|
|
pending: make(map[address.Address]*msgSet),
|
|
|
|
minGasPrice: types.NewInt(0),
|
|
|
|
pruneTrigger: make(chan struct{}, 1),
|
|
|
|
pruneCooldown: make(chan struct{}, 1),
|
|
|
|
blsSigCache: cache,
|
|
|
|
sigValCache: verifcache,
|
|
|
|
changes: lps.New(50),
|
|
|
|
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
|
|
|
api: api,
|
|
|
|
netName: netName,
|
|
|
|
cfg: cfg,
|
2019-11-23 19:01:56 +00:00
|
|
|
}
|
|
|
|
|
2020-08-07 16:50:10 +00:00
|
|
|
// enable initial prunes
|
|
|
|
mp.pruneCooldown <- struct{}{}
|
|
|
|
|
2020-08-25 05:49:15 +00:00
|
|
|
// load the current tipset and subscribe to head changes _before_ loading local messages
|
2019-12-03 19:33:29 +00:00
|
|
|
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
2019-11-13 14:48:57 +00:00
|
|
|
err := mp.HeadChange(rev, app)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("mpool head notif handler error: %+v", err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
})
|
2019-07-05 14:46:21 +00:00
|
|
|
|
2020-08-25 05:49:15 +00:00
|
|
|
if err := mp.loadLocal(); err != nil {
|
|
|
|
log.Errorf("loading local messages: %+v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go mp.runLoop()
|
|
|
|
|
2019-11-23 19:01:56 +00:00
|
|
|
return mp, nil
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
2019-11-13 21:53:18 +00:00
|
|
|
func (mp *MessagePool) Close() error {
|
|
|
|
close(mp.closer)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-01 22:54:21 +00:00
|
|
|
func (mp *MessagePool) Prune() {
|
2020-08-07 16:50:10 +00:00
|
|
|
// this magic incantation of triggering prune thrice is here to make the Prune method
|
|
|
|
// synchronous:
|
|
|
|
// so, its a single slot buffered channel. The first send fills the channel,
|
|
|
|
// the second send goes through when the pruning starts,
|
|
|
|
// and the third send goes through (and noops) after the pruning finishes
|
|
|
|
// and goes through the loop again
|
2020-08-01 22:54:21 +00:00
|
|
|
mp.pruneTrigger <- struct{}{}
|
|
|
|
mp.pruneTrigger <- struct{}{}
|
|
|
|
mp.pruneTrigger <- struct{}{}
|
|
|
|
}
|
|
|
|
|
2020-07-16 22:28:35 +00:00
|
|
|
func (mp *MessagePool) runLoop() {
|
2019-11-13 21:53:18 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-mp.repubTk.C:
|
2020-08-10 08:07:36 +00:00
|
|
|
if err := mp.republishPendingMessages(); err != nil {
|
|
|
|
log.Errorf("error while republishing messages: %s", err)
|
2019-11-13 21:53:18 +00:00
|
|
|
}
|
2020-08-17 07:03:39 +00:00
|
|
|
case <-mp.repubTrigger:
|
|
|
|
if err := mp.republishPendingMessages(); err != nil {
|
|
|
|
log.Errorf("error while republishing messages: %s", err)
|
|
|
|
}
|
2020-07-16 22:28:35 +00:00
|
|
|
case <-mp.pruneTrigger:
|
|
|
|
if err := mp.pruneExcessMessages(); err != nil {
|
|
|
|
log.Errorf("failed to prune excess messages from mempool: %s", err)
|
|
|
|
}
|
2019-11-13 21:53:18 +00:00
|
|
|
case <-mp.closer:
|
|
|
|
mp.repubTk.Stop()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-23 19:01:56 +00:00
|
|
|
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
|
|
|
mp.localAddrs[m.Message.From] = struct{}{}
|
|
|
|
|
|
|
|
if err := mp.localMsgs.Put(datastore.NewKey(string(m.Cid().Bytes())), msgb); err != nil {
|
|
|
|
return xerrors.Errorf("persisting local message: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2019-11-13 21:53:18 +00:00
|
|
|
}
|
|
|
|
|
2020-07-29 16:30:08 +00:00
|
|
|
func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.ChainEpoch) error {
|
|
|
|
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
|
|
|
|
|
|
|
|
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
|
|
|
|
return xerrors.Errorf("message will not be included in a block: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-07 16:44:55 +00:00
|
|
|
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
2020-08-18 07:19:46 +00:00
|
|
|
err := mp.checkMessage(m)
|
|
|
|
if err != nil {
|
|
|
|
return cid.Undef, err
|
|
|
|
}
|
|
|
|
|
2020-08-12 07:38:40 +00:00
|
|
|
// serialize push access to reduce lock contention
|
|
|
|
mp.addSema <- struct{}{}
|
|
|
|
defer func() {
|
|
|
|
<-mp.addSema
|
|
|
|
}()
|
|
|
|
|
2020-07-29 16:30:08 +00:00
|
|
|
mp.curTsLk.Lock()
|
2020-08-18 07:19:46 +00:00
|
|
|
curTs := mp.curTs
|
|
|
|
epoch := curTs.Height()
|
2020-07-29 16:30:08 +00:00
|
|
|
mp.curTsLk.Unlock()
|
|
|
|
if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
|
|
|
|
return cid.Undef, err
|
|
|
|
}
|
|
|
|
|
2019-09-16 14:17:08 +00:00
|
|
|
msgb, err := m.Serialize()
|
|
|
|
if err != nil {
|
2020-01-07 16:44:55 +00:00
|
|
|
return cid.Undef, err
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
2020-08-18 07:19:46 +00:00
|
|
|
mp.curTsLk.Lock()
|
|
|
|
if mp.curTs != curTs {
|
|
|
|
mp.curTsLk.Unlock()
|
|
|
|
return cid.Undef, ErrTryAgain
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := mp.addTs(m, mp.curTs); err != nil {
|
|
|
|
mp.curTsLk.Unlock()
|
2020-01-07 16:44:55 +00:00
|
|
|
return cid.Undef, err
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
2020-08-18 07:19:46 +00:00
|
|
|
mp.curTsLk.Unlock()
|
2019-09-16 14:17:08 +00:00
|
|
|
|
2019-11-13 21:53:18 +00:00
|
|
|
mp.lk.Lock()
|
2019-11-23 19:01:56 +00:00
|
|
|
if err := mp.addLocal(m, msgb); err != nil {
|
|
|
|
mp.lk.Unlock()
|
2020-01-07 16:44:55 +00:00
|
|
|
return cid.Undef, err
|
2019-11-23 19:01:56 +00:00
|
|
|
}
|
2019-11-13 21:53:18 +00:00
|
|
|
mp.lk.Unlock()
|
|
|
|
|
2020-03-31 23:13:37 +00:00
|
|
|
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
2020-08-18 07:19:46 +00:00
|
|
|
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
2019-10-13 13:03:15 +00:00
|
|
|
// big messages are bad, anti DOS
|
|
|
|
if m.Size() > 32*1024 {
|
2019-11-13 14:48:57 +00:00
|
|
|
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
2020-08-25 01:13:43 +00:00
|
|
|
// Perform syntaxtic validation, minGas=0 as we check if correctly in select messages
|
2020-08-24 23:01:16 +00:00
|
|
|
if err := m.Message.ValidForBlockInclusion(0); err != nil {
|
2020-08-25 01:14:52 +00:00
|
|
|
return xerrors.Errorf("message not valid for block inclusion: %w", err)
|
2020-08-24 23:01:16 +00:00
|
|
|
}
|
|
|
|
|
2019-10-14 03:28:19 +00:00
|
|
|
if m.Message.To == address.Undef {
|
|
|
|
return ErrInvalidToAddr
|
|
|
|
}
|
|
|
|
|
2019-10-13 13:03:15 +00:00
|
|
|
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
|
|
|
|
return ErrMessageValueTooHigh
|
|
|
|
}
|
|
|
|
|
2020-05-15 17:56:38 +00:00
|
|
|
if err := mp.VerifyMsgSig(m); err != nil {
|
2019-10-13 13:03:15 +00:00
|
|
|
log.Warnf("mpooladd signature verification failed: %s", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-08-18 07:19:46 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|
|
|
err := mp.checkMessage(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-08-12 07:38:40 +00:00
|
|
|
// serialize push access to reduce lock contention
|
|
|
|
mp.addSema <- struct{}{}
|
|
|
|
defer func() {
|
|
|
|
<-mp.addSema
|
|
|
|
}()
|
|
|
|
|
2020-05-15 17:56:38 +00:00
|
|
|
mp.curTsLk.Lock()
|
|
|
|
defer mp.curTsLk.Unlock()
|
|
|
|
return mp.addTs(m, mp.curTs)
|
|
|
|
}
|
|
|
|
|
|
|
|
func sigCacheKey(m *types.SignedMessage) (string, error) {
|
|
|
|
switch m.Signature.Type {
|
|
|
|
case crypto.SigTypeBLS:
|
|
|
|
if len(m.Signature.Data) < 90 {
|
|
|
|
return "", fmt.Errorf("bls signature too short")
|
|
|
|
}
|
|
|
|
|
|
|
|
return string(m.Cid().Bytes()) + string(m.Signature.Data[64:]), nil
|
|
|
|
case crypto.SigTypeSecp256k1:
|
|
|
|
return string(m.Cid().Bytes()), nil
|
|
|
|
default:
|
|
|
|
return "", xerrors.Errorf("unrecognized signature type: %d", m.Signature.Type)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
|
|
|
sck, err := sigCacheKey(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, ok := mp.sigValCache.Get(sck)
|
|
|
|
if ok {
|
|
|
|
// already validated, great
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := sigs.Verify(&m.Signature, m.Message.From, m.Message.Cid().Bytes()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
mp.sigValCache.Add(sck, struct{}{})
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
2019-12-03 19:33:29 +00:00
|
|
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
2019-10-13 13:03:15 +00:00
|
|
|
if err != nil {
|
2020-03-01 04:15:02 +00:00
|
|
|
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway)
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if snonce > m.Message.Nonce {
|
2019-11-13 14:48:57 +00:00
|
|
|
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
2019-12-07 11:41:30 +00:00
|
|
|
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
2019-10-13 13:03:15 +00:00
|
|
|
if err != nil {
|
2020-03-01 04:15:02 +00:00
|
|
|
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrBroadcastAnyway)
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if balance.LessThan(m.Message.RequiredFunds()) {
|
2019-11-13 14:48:57 +00:00
|
|
|
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(m.Message.RequiredFunds()), types.FIL(balance), ErrNotEnoughFunds)
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
2019-09-16 14:17:08 +00:00
|
|
|
return mp.addLocked(m)
|
|
|
|
}
|
|
|
|
|
2019-12-07 11:41:30 +00:00
|
|
|
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
|
|
|
return mp.addLocked(m)
|
|
|
|
}
|
|
|
|
|
2019-09-16 14:17:08 +00:00
|
|
|
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
2020-03-09 22:46:00 +00:00
|
|
|
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
2020-02-12 23:52:36 +00:00
|
|
|
if m.Signature.Type == crypto.SigTypeBLS {
|
2019-11-12 07:16:42 +00:00
|
|
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
|
|
|
}
|
2019-08-09 15:59:12 +00:00
|
|
|
|
2020-05-13 05:36:43 +00:00
|
|
|
if m.Message.GasLimit > build.BlockGasLimit {
|
|
|
|
return xerrors.Errorf("given message has too high of a gas limit")
|
|
|
|
}
|
|
|
|
|
2019-12-01 22:22:10 +00:00
|
|
|
if _, err := mp.api.PutMessage(m); err != nil {
|
2019-09-20 09:01:49 +00:00
|
|
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
2019-07-05 14:46:21 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-12-01 22:22:10 +00:00
|
|
|
if _, err := mp.api.PutMessage(&m.Message); err != nil {
|
2019-11-24 16:35:50 +00:00
|
|
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
mset, ok := mp.pending[m.Message.From]
|
|
|
|
if !ok {
|
|
|
|
mset = newMsgSet()
|
|
|
|
mp.pending[m.Message.From] = mset
|
|
|
|
}
|
|
|
|
|
2020-08-07 14:33:55 +00:00
|
|
|
incr, err := mset.add(m, mp)
|
2020-07-16 22:28:35 +00:00
|
|
|
if err != nil {
|
2020-03-10 20:06:58 +00:00
|
|
|
log.Info(err)
|
2020-08-09 01:17:40 +00:00
|
|
|
return err
|
2020-07-16 22:28:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if incr {
|
|
|
|
mp.currentSize++
|
2020-08-07 14:33:55 +00:00
|
|
|
if mp.currentSize > mp.cfg.SizeLimitHigh {
|
2020-07-16 22:28:35 +00:00
|
|
|
// send signal to prune messages if it hasnt already been sent
|
|
|
|
select {
|
|
|
|
case mp.pruneTrigger <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
2019-11-23 19:01:56 +00:00
|
|
|
}
|
2019-11-17 07:44:06 +00:00
|
|
|
|
|
|
|
mp.changes.Pub(api.MpoolUpdate{
|
|
|
|
Type: api.MpoolAdd,
|
|
|
|
Message: m,
|
|
|
|
}, localUpdates)
|
2019-07-05 14:46:21 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-07-17 06:05:11 +00:00
|
|
|
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
|
2019-12-03 19:33:29 +00:00
|
|
|
mp.curTsLk.Lock()
|
2019-12-03 21:09:39 +00:00
|
|
|
defer mp.curTsLk.Unlock()
|
2019-12-03 19:33:29 +00:00
|
|
|
|
2019-07-17 06:05:11 +00:00
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
return mp.getNonceLocked(addr, mp.curTs)
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
|
|
|
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
2019-11-23 01:26:32 +00:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
2019-07-17 06:05:11 +00:00
|
|
|
mset, ok := mp.pending[addr]
|
|
|
|
if ok {
|
2019-11-23 01:26:32 +00:00
|
|
|
if stateNonce > mset.nextNonce {
|
2019-11-24 16:35:50 +00:00
|
|
|
log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
|
2019-11-23 01:26:32 +00:00
|
|
|
|
|
|
|
return stateNonce, nil
|
|
|
|
}
|
|
|
|
|
2019-09-20 09:01:49 +00:00
|
|
|
return mset.nextNonce, nil
|
2019-07-17 06:05:11 +00:00
|
|
|
}
|
|
|
|
|
2019-11-23 01:26:32 +00:00
|
|
|
return stateNonce, nil
|
2019-10-13 13:03:15 +00:00
|
|
|
}
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
2020-08-25 02:02:06 +00:00
|
|
|
act, err := mp.api.GetActorAfter(addr, curTs)
|
2019-07-17 06:05:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
2020-08-25 02:02:06 +00:00
|
|
|
return act.Nonce, nil
|
2019-07-17 06:05:11 +00:00
|
|
|
}
|
|
|
|
|
2019-12-07 11:41:30 +00:00
|
|
|
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
2020-08-25 02:02:06 +00:00
|
|
|
act, err := mp.api.GetActorAfter(addr, ts)
|
2019-10-13 13:03:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return types.EmptyInt, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return act.Balance, nil
|
|
|
|
}
|
|
|
|
|
2020-04-16 21:06:31 +00:00
|
|
|
func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
2020-08-12 07:38:40 +00:00
|
|
|
// serialize push access to reduce lock contention
|
|
|
|
mp.addSema <- struct{}{}
|
|
|
|
defer func() {
|
|
|
|
<-mp.addSema
|
|
|
|
}()
|
2019-12-03 19:33:29 +00:00
|
|
|
|
|
|
|
mp.curTsLk.Lock()
|
2019-09-16 14:17:08 +00:00
|
|
|
mp.lk.Lock()
|
2020-08-12 17:26:58 +00:00
|
|
|
|
|
|
|
curTs := mp.curTs
|
2020-04-16 21:06:31 +00:00
|
|
|
|
|
|
|
fromKey := addr
|
|
|
|
if fromKey.Protocol() == address.ID {
|
2020-04-18 00:25:43 +00:00
|
|
|
var err error
|
2020-04-16 21:06:31 +00:00
|
|
|
fromKey, err = mp.api.StateAccountKey(ctx, fromKey, mp.curTs)
|
|
|
|
if err != nil {
|
2020-08-12 17:26:58 +00:00
|
|
|
mp.lk.Unlock()
|
|
|
|
mp.curTsLk.Unlock()
|
2020-04-16 21:06:31 +00:00
|
|
|
return nil, xerrors.Errorf("resolving sender key: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-18 00:25:43 +00:00
|
|
|
nonce, err := mp.getNonceLocked(fromKey, mp.curTs)
|
|
|
|
if err != nil {
|
2020-08-12 17:26:58 +00:00
|
|
|
mp.lk.Unlock()
|
|
|
|
mp.curTsLk.Unlock()
|
2020-04-18 00:25:43 +00:00
|
|
|
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-08-12 17:26:58 +00:00
|
|
|
// release the locks for signing
|
|
|
|
mp.lk.Unlock()
|
|
|
|
mp.curTsLk.Unlock()
|
|
|
|
|
2020-04-16 21:06:31 +00:00
|
|
|
msg, err := cb(fromKey, nonce)
|
2019-09-16 14:17:08 +00:00
|
|
|
if err != nil {
|
2019-09-17 08:15:26 +00:00
|
|
|
return nil, err
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
2020-08-12 17:26:58 +00:00
|
|
|
// reacquire the locks and check state for consistency
|
|
|
|
mp.curTsLk.Lock()
|
|
|
|
defer mp.curTsLk.Unlock()
|
|
|
|
|
|
|
|
if mp.curTs != curTs {
|
|
|
|
return nil, ErrTryAgain
|
|
|
|
}
|
|
|
|
|
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
|
|
|
nonce2, err := mp.getNonceLocked(fromKey, mp.curTs)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if nonce2 != nonce {
|
|
|
|
return nil, ErrTryAgain
|
|
|
|
}
|
|
|
|
|
2020-07-29 16:30:08 +00:00
|
|
|
if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-09-16 14:17:08 +00:00
|
|
|
msgb, err := msg.Serialize()
|
|
|
|
if err != nil {
|
2019-09-17 08:15:26 +00:00
|
|
|
return nil, err
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := mp.addLocked(msg); err != nil {
|
2020-04-01 01:34:23 +00:00
|
|
|
return nil, xerrors.Errorf("add locked failed: %w", err)
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
2019-11-24 16:35:50 +00:00
|
|
|
if err := mp.addLocal(msg, msgb); err != nil {
|
|
|
|
log.Errorf("addLocal failed: %+v", err)
|
|
|
|
}
|
2019-09-16 14:17:08 +00:00
|
|
|
|
2020-03-31 23:13:37 +00:00
|
|
|
return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
2019-09-16 14:17:08 +00:00
|
|
|
}
|
|
|
|
|
2019-08-14 04:43:29 +00:00
|
|
|
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
2019-07-05 14:46:21 +00:00
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
2020-08-01 22:54:21 +00:00
|
|
|
mp.remove(from, nonce)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mp *MessagePool) remove(from address.Address, nonce uint64) {
|
2019-08-14 04:43:29 +00:00
|
|
|
mset, ok := mp.pending[from]
|
2019-07-05 14:46:21 +00:00
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-11-19 21:26:25 +00:00
|
|
|
if m, ok := mset.msgs[nonce]; ok {
|
|
|
|
mp.changes.Pub(api.MpoolUpdate{
|
|
|
|
Type: api.MpoolRemove,
|
|
|
|
Message: m,
|
|
|
|
}, localUpdates)
|
2020-07-16 22:28:35 +00:00
|
|
|
|
|
|
|
mp.currentSize--
|
2019-11-19 21:26:25 +00:00
|
|
|
}
|
2019-11-17 07:44:06 +00:00
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
// NB: This deletes any message with the given nonce. This makes sense
|
|
|
|
// as two messages with the same sender cannot have the same nonce
|
2019-08-14 04:43:29 +00:00
|
|
|
delete(mset.msgs, nonce)
|
2019-07-05 14:46:21 +00:00
|
|
|
|
|
|
|
if len(mset.msgs) == 0 {
|
2019-11-23 01:26:32 +00:00
|
|
|
delete(mp.pending, from)
|
2019-09-20 09:01:49 +00:00
|
|
|
} else {
|
|
|
|
var max uint64
|
|
|
|
for nonce := range mset.msgs {
|
|
|
|
if max < nonce {
|
|
|
|
max = nonce
|
|
|
|
}
|
|
|
|
}
|
2019-11-27 12:18:22 +00:00
|
|
|
if max < nonce {
|
2019-11-24 16:35:50 +00:00
|
|
|
max = nonce // we could have not seen the removed message before
|
|
|
|
}
|
|
|
|
|
2019-09-20 09:01:49 +00:00
|
|
|
mset.nextNonce = max + 1
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
|
|
|
|
mp.curTsLk.Lock()
|
|
|
|
defer mp.curTsLk.Unlock()
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
2019-12-03 19:33:29 +00:00
|
|
|
|
2019-09-06 22:32:42 +00:00
|
|
|
out := make([]*types.SignedMessage, 0)
|
2019-11-13 21:53:18 +00:00
|
|
|
for a := range mp.pending {
|
|
|
|
out = append(out, mp.pendingFor(a)...)
|
|
|
|
}
|
2019-09-20 09:01:49 +00:00
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
return out, mp.curTs
|
2019-11-13 21:53:18 +00:00
|
|
|
}
|
2020-07-22 15:46:13 +00:00
|
|
|
func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
|
|
|
mp.curTsLk.Lock()
|
|
|
|
defer mp.curTsLk.Unlock()
|
|
|
|
|
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
return mp.pendingFor(a), mp.curTs
|
|
|
|
}
|
2019-09-20 09:01:49 +00:00
|
|
|
|
2019-11-13 21:53:18 +00:00
|
|
|
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
|
|
|
mset := mp.pending[a]
|
|
|
|
if mset == nil || len(mset.msgs) == 0 {
|
|
|
|
return nil
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
2019-11-13 22:41:39 +00:00
|
|
|
set := make([]*types.SignedMessage, 0, len(mset.msgs))
|
2019-11-13 21:53:18 +00:00
|
|
|
|
2019-11-13 22:41:39 +00:00
|
|
|
for _, m := range mset.msgs {
|
|
|
|
set = append(set, m)
|
2019-11-13 21:53:18 +00:00
|
|
|
}
|
2019-11-13 22:41:39 +00:00
|
|
|
|
|
|
|
sort.Slice(set, func(i, j int) bool {
|
|
|
|
return set[i].Message.Nonce < set[j].Message.Nonce
|
|
|
|
})
|
|
|
|
|
|
|
|
return set
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
|
2019-07-26 04:54:22 +00:00
|
|
|
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
2019-12-03 19:33:29 +00:00
|
|
|
mp.curTsLk.Lock()
|
|
|
|
defer mp.curTsLk.Unlock()
|
2019-12-01 22:22:10 +00:00
|
|
|
|
2020-08-17 07:03:39 +00:00
|
|
|
repubTrigger := false
|
2019-12-07 11:41:30 +00:00
|
|
|
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
|
|
|
|
add := func(m *types.SignedMessage) {
|
|
|
|
s, ok := rmsgs[m.Message.From]
|
|
|
|
if !ok {
|
|
|
|
s = make(map[uint64]*types.SignedMessage)
|
|
|
|
rmsgs[m.Message.From] = s
|
|
|
|
}
|
|
|
|
s[m.Message.Nonce] = m
|
|
|
|
}
|
|
|
|
rm := func(from address.Address, nonce uint64) {
|
|
|
|
s, ok := rmsgs[from]
|
|
|
|
if !ok {
|
|
|
|
mp.Remove(from, nonce)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := s[nonce]; ok {
|
|
|
|
delete(s, nonce)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
mp.Remove(from, nonce)
|
|
|
|
}
|
|
|
|
|
2020-08-17 07:03:39 +00:00
|
|
|
maybeRepub := func(cid cid.Cid) {
|
|
|
|
if !repubTrigger {
|
|
|
|
mp.lk.Lock()
|
|
|
|
_, republished := mp.republished[cid]
|
|
|
|
mp.lk.Unlock()
|
|
|
|
if republished {
|
|
|
|
repubTrigger = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-21 17:17:30 +00:00
|
|
|
var merr error
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
for _, ts := range revert {
|
2019-12-02 20:46:25 +00:00
|
|
|
pts, err := mp.api.LoadTipSet(ts.Parents())
|
|
|
|
if err != nil {
|
2020-08-21 17:17:30 +00:00
|
|
|
log.Errorf("error loading reverted tipset parent: %s", err)
|
|
|
|
merr = multierror.Append(merr, err)
|
|
|
|
continue
|
2019-12-02 20:46:25 +00:00
|
|
|
}
|
|
|
|
|
2020-08-21 17:17:30 +00:00
|
|
|
mp.curTs = pts
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
|
|
|
if err != nil {
|
2020-08-21 17:17:30 +00:00
|
|
|
log.Errorf("error retrieving messages for reverted block: %s", err)
|
|
|
|
merr = multierror.Append(merr, err)
|
|
|
|
continue
|
2019-12-03 19:33:29 +00:00
|
|
|
}
|
2019-08-01 20:40:47 +00:00
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
for _, msg := range msgs {
|
2019-12-07 11:41:30 +00:00
|
|
|
add(msg)
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, ts := range apply {
|
2020-08-21 17:17:30 +00:00
|
|
|
mp.curTs = ts
|
|
|
|
|
2019-07-05 14:46:21 +00:00
|
|
|
for _, b := range ts.Blocks() {
|
2019-12-01 22:22:10 +00:00
|
|
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
2019-07-05 14:46:21 +00:00
|
|
|
if err != nil {
|
2020-08-21 17:17:30 +00:00
|
|
|
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
|
|
|
log.Errorf("error retrieving messages for block: %s", xerr)
|
|
|
|
merr = multierror.Append(merr, xerr)
|
|
|
|
continue
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
2020-08-21 17:17:30 +00:00
|
|
|
|
2019-08-01 20:40:47 +00:00
|
|
|
for _, msg := range smsgs {
|
2019-12-07 11:41:30 +00:00
|
|
|
rm(msg.Message.From, msg.Message.Nonce)
|
2020-08-17 07:03:39 +00:00
|
|
|
maybeRepub(msg.Cid())
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
2019-08-01 20:40:47 +00:00
|
|
|
|
|
|
|
for _, msg := range bmsgs {
|
2019-12-07 11:41:30 +00:00
|
|
|
rm(msg.From, msg.Nonce)
|
2020-08-17 07:03:39 +00:00
|
|
|
maybeRepub(msg.Cid())
|
2019-08-01 20:40:47 +00:00
|
|
|
}
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-17 07:03:39 +00:00
|
|
|
if repubTrigger {
|
|
|
|
select {
|
|
|
|
case mp.repubTrigger <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-07 11:41:30 +00:00
|
|
|
for _, s := range rmsgs {
|
|
|
|
for _, msg := range s {
|
|
|
|
if err := mp.addSkipChecks(msg); err != nil {
|
|
|
|
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-09 22:46:00 +00:00
|
|
|
if len(revert) > 0 && futureDebug {
|
|
|
|
msgs, ts := mp.Pending()
|
|
|
|
|
|
|
|
buckets := map[address.Address]*statBucket{}
|
|
|
|
|
|
|
|
for _, v := range msgs {
|
|
|
|
bkt, ok := buckets[v.Message.From]
|
|
|
|
if !ok {
|
|
|
|
bkt = &statBucket{
|
|
|
|
msgs: map[uint64]*types.SignedMessage{},
|
|
|
|
}
|
|
|
|
buckets[v.Message.From] = bkt
|
|
|
|
}
|
|
|
|
|
|
|
|
bkt.msgs[v.Message.Nonce] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
for a, bkt := range buckets {
|
2020-08-25 02:02:06 +00:00
|
|
|
// TODO that might not be correct with GatActorAfter but it is only debug code
|
|
|
|
act, err := mp.api.GetActorAfter(a, ts)
|
2020-03-09 22:46:00 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Debugf("%s, err: %s\n", a, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
var cmsg *types.SignedMessage
|
|
|
|
var ok bool
|
|
|
|
|
|
|
|
cur := act.Nonce
|
|
|
|
for {
|
|
|
|
cmsg, ok = bkt.msgs[cur]
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
cur++
|
|
|
|
}
|
|
|
|
|
|
|
|
ff := uint64(math.MaxUint64)
|
|
|
|
for k := range bkt.msgs {
|
|
|
|
if k > cur && k < ff {
|
|
|
|
ff = k
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if ff != math.MaxUint64 {
|
|
|
|
m := bkt.msgs[ff]
|
|
|
|
|
|
|
|
// cmsg can be nil if no messages from the current nonce are in the mpool
|
|
|
|
ccid := "nil"
|
|
|
|
if cmsg != nil {
|
|
|
|
ccid = cmsg.Cid().String()
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debugw("Nonce gap",
|
|
|
|
"actor", a,
|
|
|
|
"future_cid", m.Cid(),
|
|
|
|
"future_nonce", ff,
|
|
|
|
"current_cid", ccid,
|
|
|
|
"current_nonce", cur,
|
|
|
|
"revert_tipset", revert[0].Key(),
|
|
|
|
"new_head", ts.Key(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-21 17:17:30 +00:00
|
|
|
return merr
|
2019-07-05 14:46:21 +00:00
|
|
|
}
|
2019-08-01 20:40:47 +00:00
|
|
|
|
2020-08-25 00:21:03 +00:00
|
|
|
func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs map[address.Address]map[uint64]*types.SignedMessage) error {
|
|
|
|
add := func(m *types.SignedMessage) {
|
|
|
|
s, ok := rmsgs[m.Message.From]
|
|
|
|
if !ok {
|
|
|
|
s = make(map[uint64]*types.SignedMessage)
|
|
|
|
rmsgs[m.Message.From] = s
|
|
|
|
}
|
|
|
|
s[m.Message.Nonce] = m
|
|
|
|
}
|
|
|
|
rm := func(from address.Address, nonce uint64) {
|
|
|
|
s, ok := rmsgs[from]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := s[nonce]; ok {
|
|
|
|
delete(s, nonce)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
revert, apply, err := store.ReorgOps(mp.api.LoadTipSet, from, to)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("failed to compute reorg ops for mpool pending messages: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var merr error
|
|
|
|
|
|
|
|
for _, ts := range revert {
|
|
|
|
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("error retrieving messages for reverted block: %s", err)
|
|
|
|
merr = multierror.Append(merr, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, msg := range msgs {
|
|
|
|
add(msg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, ts := range apply {
|
|
|
|
mp.curTs = ts
|
|
|
|
|
|
|
|
for _, b := range ts.Blocks() {
|
|
|
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
|
|
|
if err != nil {
|
|
|
|
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
|
|
|
log.Errorf("error retrieving messages for block: %s", xerr)
|
|
|
|
merr = multierror.Append(merr, xerr)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, msg := range smsgs {
|
|
|
|
rm(msg.Message.From, msg.Message.Nonce)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, msg := range bmsgs {
|
|
|
|
rm(msg.From, msg.Nonce)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return merr
|
|
|
|
}
|
|
|
|
|
2020-03-09 22:46:00 +00:00
|
|
|
type statBucket struct {
|
|
|
|
msgs map[uint64]*types.SignedMessage
|
|
|
|
}
|
|
|
|
|
2019-12-03 19:33:29 +00:00
|
|
|
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
|
|
|
|
out := make([]*types.SignedMessage, 0)
|
|
|
|
|
|
|
|
for _, b := range blks {
|
|
|
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
|
|
|
}
|
2019-12-05 02:04:09 +00:00
|
|
|
out = append(out, smsgs...)
|
2019-12-03 19:33:29 +00:00
|
|
|
|
|
|
|
for _, msg := range bmsgs {
|
|
|
|
smsg := mp.RecoverSig(msg)
|
|
|
|
if smsg != nil {
|
|
|
|
out = append(out, smsg)
|
|
|
|
} else {
|
|
|
|
log.Warnf("could not recover signature for bls message %s", msg.Cid())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return out, nil
|
|
|
|
}
|
|
|
|
|
2019-08-01 20:40:47 +00:00
|
|
|
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
2019-11-12 07:16:42 +00:00
|
|
|
val, ok := mp.blsSigCache.Get(msg.Cid())
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
2020-02-12 23:52:36 +00:00
|
|
|
sig, ok := val.(crypto.Signature)
|
2019-11-12 07:16:42 +00:00
|
|
|
if !ok {
|
2019-11-24 16:35:50 +00:00
|
|
|
log.Errorf("value in signature cache was not a signature (got %T)", val)
|
2019-11-12 07:16:42 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return &types.SignedMessage{
|
|
|
|
Message: *msg,
|
2019-11-12 11:42:19 +00:00
|
|
|
Signature: sig,
|
2019-11-12 07:16:42 +00:00
|
|
|
}
|
2019-08-01 20:40:47 +00:00
|
|
|
}
|
2019-11-17 07:44:06 +00:00
|
|
|
|
|
|
|
func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, error) {
|
|
|
|
out := make(chan api.MpoolUpdate, 20)
|
|
|
|
sub := mp.changes.Sub(localUpdates)
|
|
|
|
|
|
|
|
go func() {
|
2019-12-08 15:00:45 +00:00
|
|
|
defer mp.changes.Unsub(sub, localUpdates)
|
2019-11-19 19:49:11 +00:00
|
|
|
|
2019-11-17 07:44:06 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case u := <-sub:
|
|
|
|
select {
|
|
|
|
case out <- u.(api.MpoolUpdate):
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return out, nil
|
|
|
|
}
|
2019-11-23 19:01:56 +00:00
|
|
|
|
|
|
|
func (mp *MessagePool) loadLocal() error {
|
|
|
|
res, err := mp.localMsgs.Query(query.Query{})
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("query local messages: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for r := range res.Next() {
|
|
|
|
if r.Error != nil {
|
|
|
|
return xerrors.Errorf("r.Error: %w", r.Error)
|
|
|
|
}
|
|
|
|
|
|
|
|
var sm types.SignedMessage
|
|
|
|
if err := sm.UnmarshalCBOR(bytes.NewReader(r.Value)); err != nil {
|
|
|
|
return xerrors.Errorf("unmarshaling local message: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := mp.Add(&sm); err != nil {
|
|
|
|
if xerrors.Is(err, ErrNonceTooLow) {
|
|
|
|
continue // todo: drop the message from local cache (if above certain confidence threshold)
|
|
|
|
}
|
|
|
|
|
2019-12-10 19:42:09 +00:00
|
|
|
log.Errorf("adding local message: %+v", err)
|
2019-11-23 19:01:56 +00:00
|
|
|
}
|
2020-08-07 22:41:57 +00:00
|
|
|
|
|
|
|
mp.localAddrs[sm.Message.From] = struct{}{}
|
2019-11-23 19:01:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2020-08-21 17:28:45 +00:00
|
|
|
|
2020-08-21 20:24:53 +00:00
|
|
|
func (mp *MessagePool) Clear(local bool) {
|
2020-08-21 17:28:45 +00:00
|
|
|
mp.lk.Lock()
|
|
|
|
defer mp.lk.Unlock()
|
|
|
|
|
2020-08-21 20:24:53 +00:00
|
|
|
// remove everything if local is true, including removing local messages from
|
|
|
|
// the datastore
|
|
|
|
if local {
|
|
|
|
for a := range mp.localAddrs {
|
|
|
|
mset, ok := mp.pending[a]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
2020-08-21 17:51:03 +00:00
|
|
|
|
2020-08-21 20:24:53 +00:00
|
|
|
for _, m := range mset.msgs {
|
|
|
|
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error deleting local message: %s", err)
|
|
|
|
}
|
2020-08-21 17:51:03 +00:00
|
|
|
}
|
|
|
|
}
|
2020-08-21 17:59:40 +00:00
|
|
|
|
|
|
|
mp.pending = make(map[address.Address]*msgSet)
|
2020-08-21 20:24:53 +00:00
|
|
|
mp.republished = nil
|
|
|
|
|
|
|
|
return
|
2020-08-21 17:59:40 +00:00
|
|
|
}
|
|
|
|
|
2020-08-21 20:24:53 +00:00
|
|
|
// remove everything except the local messages
|
|
|
|
for a := range mp.pending {
|
|
|
|
_, isLocal := mp.localAddrs[a]
|
|
|
|
if isLocal {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
delete(mp.pending, a)
|
|
|
|
}
|
2020-08-21 17:28:45 +00:00
|
|
|
}
|