Fix message nonce tracking during head changes

This commit is contained in:
whyrusleeping 2019-12-01 16:22:10 -06:00
parent 923748e551
commit 8da3cc875e
6 changed files with 269 additions and 18 deletions

View File

@ -9,6 +9,7 @@ import (
"time" "time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
@ -21,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
@ -56,9 +58,10 @@ type MessagePool struct {
pending map[address.Address]*msgSet pending map[address.Address]*msgSet
pendingCount int pendingCount int
sm *stmgr.StateManager curTsLk sync.RWMutex
curTs *types.TipSet
ps *pubsub.PubSub api MpoolProvider
minGasPrice types.BigInt minGasPrice types.BigInt
@ -98,20 +101,61 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
return nil return nil
} }
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*MessagePool, error) { type MpoolProvider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error)
PutMessage(m store.ChainMsg) (cid.Cid, error)
PubSubPublish(string, []byte) error
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error)
}
type mpoolProvider struct {
sm *stmgr.StateManager
ps *pubsub.PubSub
}
func NewMpoolProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) MpoolProvider {
return &mpoolProvider{sm, ps}
}
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
}
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
return mpp.sm.ChainStore().PutMessage(m)
}
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
return mpp.ps.Publish(k, v)
}
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
return mpp.sm.GetActor(addr, ts)
}
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
return mpp.sm.ChainStore().MessagesForBlock(h)
}
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) {
return mpp.sm.ChainStore().MessagesForTipset(ts)
}
func NewMessagePool(api MpoolProvider, ds dtypes.MetadataDS) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize) cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
mp := &MessagePool{ mp := &MessagePool{
closer: make(chan struct{}), closer: make(chan struct{}),
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second), repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}), localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet), pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),
maxTxPoolSize: 5000, maxTxPoolSize: 5000,
blsSigCache: cache, blsSigCache: cache,
changes: lps.New(50), changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
} }
if err := mp.loadLocal(); err != nil { if err := mp.loadLocal(); err != nil {
@ -120,7 +164,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.Metadat
go mp.repubLocal() go mp.repubLocal()
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app) err := mp.HeadChange(rev, app)
if err != nil { if err != nil {
log.Errorf("mpool head notif handler error: %+v", err) log.Errorf("mpool head notif handler error: %+v", err)
@ -155,7 +199,7 @@ func (mp *MessagePool) repubLocal() {
continue continue
} }
err = mp.ps.Publish(msgTopic, msgb) err = mp.api.PubSubPublish(msgTopic, msgb)
if err != nil { if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err)) errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err))
continue continue
@ -200,7 +244,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
} }
mp.lk.Unlock() mp.lk.Unlock()
return mp.ps.Publish(msgTopic, msgb) return mp.api.PubSubPublish(msgTopic, msgb)
} }
func (mp *MessagePool) Add(m *types.SignedMessage) error { func (mp *MessagePool) Add(m *types.SignedMessage) error {
@ -252,12 +296,12 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.blsSigCache.Add(m.Cid(), m.Signature) mp.blsSigCache.Add(m.Cid(), m.Signature)
} }
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil { if _, err := mp.api.PutMessage(m); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err) log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err return err
} }
if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil { if _, err := mp.api.PutMessage(&m.Message); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err) log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err return err
} }
@ -307,16 +351,44 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
} }
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
act, err := mp.sm.GetActor(addr, nil) // TODO: this method probably should be cached
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()
act, err := mp.api.StateGetActor(addr, mp.curTs)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return act.Nonce, nil baseNonce := act.Nonce
// TODO: the correct thing to do here is probably to set curTs to chain.head
// but since we have an accurate view of the world until a head change occurs,
// this should be fine
if mp.curTs == nil {
return baseNonce, nil
}
msgs, err := mp.api.MessagesForTipset(mp.curTs)
if err != nil {
return 0, xerrors.Errorf("failed to check messages for tipset: %w", err)
}
for _, m := range msgs {
msg := m.VMMessage()
if msg.From == addr {
if msg.Nonce != baseNonce {
return 0, xerrors.Errorf("tipset %s has bad nonce ordering", mp.curTs)
}
baseNonce++
}
}
return baseNonce, nil
} }
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) { func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
act, err := mp.sm.GetActor(addr, nil) act, err := mp.api.StateGetActor(addr, nil)
if err != nil { if err != nil {
return types.EmptyInt, err return types.EmptyInt, err
} }
@ -327,6 +399,9 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
if addr.Protocol() == address.ID {
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
}
nonce, err := mp.getNonceLocked(addr) nonce, err := mp.getNonceLocked(addr)
if err != nil { if err != nil {
@ -350,7 +425,7 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
log.Errorf("addLocal failed: %+v", err) log.Errorf("addLocal failed: %+v", err)
} }
return msg, mp.ps.Publish(msgTopic, msgb) return msg, mp.api.PubSubPublish(msgTopic, msgb)
} }
func (mp *MessagePool) Remove(from address.Address, nonce uint64) { func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
@ -421,9 +496,12 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
} }
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
for _, ts := range revert { for _, ts := range revert {
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b) bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err) return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
} }
@ -444,11 +522,12 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
} }
} }
} }
mp.curTs = ts
} }
for _, ts := range apply { for _, ts := range apply {
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b) bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
} }
@ -460,6 +539,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.Remove(msg.From, msg.Nonce) mp.Remove(msg.From, msg.Nonce)
} }
} }
mp.curTs = ts
} }
return nil return nil

147
chain/messagepool_test.go Normal file
View File

@ -0,0 +1,147 @@
package chain
import (
"testing"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
)
type testMpoolApi struct {
cb func(rev, app []*types.TipSet) error
bmsgs map[cid.Cid][]*types.SignedMessage
statenonce map[address.Address]uint64
}
func newTestMpoolApi() *testMpoolApi {
return &testMpoolApi{
bmsgs: make(map[cid.Cid][]*types.SignedMessage),
statenonce: make(map[address.Address]uint64),
}
}
func (tma *testMpoolApi) applyBlock(t *testing.T, b *types.BlockHeader) {
t.Helper()
if err := tma.cb(nil, []*types.TipSet{mock.TipSet(b)}); err != nil {
t.Fatal(err)
}
}
func (tma *testMpoolApi) setStateNonce(addr address.Address, v uint64) {
tma.statenonce[addr] = v
}
func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) {
tma.bmsgs[h.Cid()] = msgs
}
func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
tma.cb = cb
}
func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil
}
func (tma *testMpoolApi) PubSubPublish(string, []byte) error {
return nil
}
func (tma *testMpoolApi) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
return &types.Actor{
Nonce: tma.statenonce[addr],
Balance: types.NewInt(90000000),
}, nil
}
func (tma *testMpoolApi) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
return nil, tma.bmsgs[h.Cid()], nil
}
func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) {
if len(ts.Blocks()) != 1 {
panic("cant deal with multiblock tipsets in this test")
}
bm, sm, err := tma.MessagesForBlock(ts.Blocks()[0])
if err != nil {
return nil, err
}
var out []store.ChainMsg
for _, m := range bm {
out = append(out, m)
}
for _, m := range sm {
out = append(out, m)
}
return out, nil
}
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
t.Helper()
n, err := mp.GetNonce(addr)
if err != nil {
t.Fatal(err)
}
if n != val {
t.Fatalf("expected nonce of %d, got %d", val, n)
}
}
func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
t.Helper()
if err := mp.Add(msg); err != nil {
t.Fatal(err)
}
}
func TestMessagePool(t *testing.T) {
tma := newTestMpoolApi()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := NewMessagePool(tma, ds)
if err != nil {
t.Fatal(err)
}
a := mock.MkBlock(nil, 1, 1)
sender, err := w.GenerateKey(types.KTBLS)
if err != nil {
t.Fatal(err)
}
target := mock.Address(1001)
var msgs []*types.SignedMessage
for i := 0; i < 5; i++ {
msgs = append(msgs, mock.MkMessage(sender, target, uint64(i), w))
}
tma.setStateNonce(sender, 0)
assertNonce(t, mp, sender, 0)
mustAdd(t, mp, msgs[0])
assertNonce(t, mp, sender, 1)
mustAdd(t, mp, msgs[1])
assertNonce(t, mp, sender, 2)
tma.setBlockMessages(a, msgs[0], msgs[1])
tma.applyBlock(t, a)
assertNonce(t, mp, sender, 2)
}

View File

@ -601,6 +601,7 @@ func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
type ChainMsg interface { type ChainMsg interface {
Cid() cid.Cid Cid() cid.Cid
VMMessage() *types.Message VMMessage() *types.Message
ToStorageBlock() (block.Block, error)
} }
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) { func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) {

View File

@ -1,10 +1,12 @@
package mock package mock
import ( import (
"context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
@ -16,6 +18,26 @@ func Address(i uint64) address.Address {
return a return a
} }
func MkMessage(from, to address.Address, nonce uint64, w *wallet.Wallet) *types.SignedMessage {
msg := &types.Message{
To: to,
From: from,
Value: types.NewInt(1),
Nonce: nonce,
GasLimit: types.NewInt(1),
GasPrice: types.NewInt(0),
}
sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes())
if err != nil {
panic(err)
}
return &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
}
func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader { func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader {
addr := Address(123561) addr := Address(123561)

View File

@ -279,7 +279,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err) return nil, xerrors.Errorf("failed to create block: %w", err)
} }
log.Infow("mined new block", "cid", b.Cid()) log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height)
dur := time.Now().Sub(start) dur := time.Now().Sub(start)
log.Infof("Creating block took %s", dur) log.Infof("Creating block took %s", dur)

View File

@ -42,7 +42,8 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
} }
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) { func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) {
mp, err := chain.NewMessagePool(sm, ps, ds) mpp := chain.NewMpoolProvider(sm, ps)
mp, err := chain.NewMessagePool(mpp, ds)
if err != nil { if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err) return nil, xerrors.Errorf("constructing mpool: %w", err)
} }