From 8da3cc875ef80d369dacd36937a9b96a8eec8e9f Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 1 Dec 2019 16:22:10 -0600 Subject: [PATCH 1/5] Fix message nonce tracking during head changes --- chain/messagepool.go | 112 ++++++++++++++++++++++++----- chain/messagepool_test.go | 147 ++++++++++++++++++++++++++++++++++++++ chain/store/store.go | 1 + chain/types/mock/chain.go | 22 ++++++ miner/miner.go | 2 +- node/modules/chain.go | 3 +- 6 files changed, 269 insertions(+), 18 deletions(-) create mode 100644 chain/messagepool_test.go diff --git a/chain/messagepool.go b/chain/messagepool.go index f0a3ed383..79b0ac315 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -9,6 +9,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" ) @@ -56,9 +58,10 @@ type MessagePool struct { pending map[address.Address]*msgSet pendingCount int - sm *stmgr.StateManager + curTsLk sync.RWMutex + curTs *types.TipSet - ps *pubsub.PubSub + api MpoolProvider minGasPrice types.BigInt @@ -98,20 +101,61 @@ func (ms *msgSet) add(m *types.SignedMessage) error { return nil } -func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*MessagePool, error) { +type MpoolProvider interface { + SubscribeHeadChanges(func(rev, app []*types.TipSet) error) + PutMessage(m store.ChainMsg) (cid.Cid, error) + PubSubPublish(string, []byte) error + StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) + MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) + MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error) +} + +type mpoolProvider struct { + sm *stmgr.StateManager + ps *pubsub.PubSub +} + +func NewMpoolProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) MpoolProvider { + return &mpoolProvider{sm, ps} +} + +func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { + mpp.sm.ChainStore().SubscribeHeadChanges(cb) +} + +func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) { + return mpp.sm.ChainStore().PutMessage(m) +} + +func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { + return mpp.ps.Publish(k, v) +} + +func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + return mpp.sm.GetActor(addr, ts) +} + +func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + return mpp.sm.ChainStore().MessagesForBlock(h) +} + +func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) { + return mpp.sm.ChainStore().MessagesForTipset(ts) +} + +func NewMessagePool(api MpoolProvider, ds dtypes.MetadataDS) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) mp := &MessagePool{ closer: make(chan struct{}), repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), - sm: sm, - ps: ps, minGasPrice: types.NewInt(0), maxTxPoolSize: 5000, blsSigCache: cache, changes: lps.New(50), localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), + api: api, } if err := mp.loadLocal(); err != nil { @@ -120,7 +164,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.Metadat go mp.repubLocal() - sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { err := mp.HeadChange(rev, app) if err != nil { log.Errorf("mpool head notif handler error: %+v", err) @@ -155,7 +199,7 @@ func (mp *MessagePool) repubLocal() { continue } - err = mp.ps.Publish(msgTopic, msgb) + err = mp.api.PubSubPublish(msgTopic, msgb) if err != nil { errout = multierr.Append(errout, xerrors.Errorf("could not publish: %w", err)) continue @@ -200,7 +244,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error { } mp.lk.Unlock() - return mp.ps.Publish(msgTopic, msgb) + return mp.api.PubSubPublish(msgTopic, msgb) } func (mp *MessagePool) Add(m *types.SignedMessage) error { @@ -252,12 +296,12 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { mp.blsSigCache.Add(m.Cid(), m.Signature) } - if _, err := mp.sm.ChainStore().PutMessage(m); err != nil { + if _, err := mp.api.PutMessage(m); err != nil { log.Warnf("mpooladd cs.PutMessage failed: %s", err) return err } - if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil { + if _, err := mp.api.PutMessage(&m.Message); err != nil { log.Warnf("mpooladd cs.PutMessage failed: %s", err) return err } @@ -307,16 +351,44 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { } func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { - act, err := mp.sm.GetActor(addr, nil) + // TODO: this method probably should be cached + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() + + act, err := mp.api.StateGetActor(addr, mp.curTs) if err != nil { return 0, err } - return act.Nonce, nil + baseNonce := act.Nonce + + // TODO: the correct thing to do here is probably to set curTs to chain.head + // but since we have an accurate view of the world until a head change occurs, + // this should be fine + if mp.curTs == nil { + return baseNonce, nil + } + + msgs, err := mp.api.MessagesForTipset(mp.curTs) + if err != nil { + return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) + } + + for _, m := range msgs { + msg := m.VMMessage() + if msg.From == addr { + if msg.Nonce != baseNonce { + return 0, xerrors.Errorf("tipset %s has bad nonce ordering", mp.curTs) + } + baseNonce++ + } + } + + return baseNonce, nil } func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) { - act, err := mp.sm.GetActor(addr, nil) + act, err := mp.api.StateGetActor(addr, nil) if err != nil { return types.EmptyInt, err } @@ -327,6 +399,9 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { mp.lk.Lock() defer mp.lk.Unlock() + if addr.Protocol() == address.ID { + log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr) + } nonce, err := mp.getNonceLocked(addr) if err != nil { @@ -350,7 +425,7 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ log.Errorf("addLocal failed: %+v", err) } - return msg, mp.ps.Publish(msgTopic, msgb) + return msg, mp.api.PubSubPublish(msgTopic, msgb) } func (mp *MessagePool) Remove(from address.Address, nonce uint64) { @@ -421,9 +496,12 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { } func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + for _, ts := range revert { for _, b := range ts.Blocks() { - bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b) + bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err) } @@ -444,11 +522,12 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } } } + mp.curTs = ts } for _, ts := range apply { for _, b := range ts.Blocks() { - bmsgs, smsgs, err := mp.sm.ChainStore().MessagesForBlock(b) + bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) } @@ -460,6 +539,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(msg.From, msg.Nonce) } } + mp.curTs = ts } return nil diff --git a/chain/messagepool_test.go b/chain/messagepool_test.go new file mode 100644 index 000000000..f76ed5422 --- /dev/null +++ b/chain/messagepool_test.go @@ -0,0 +1,147 @@ +package chain + +import ( + "testing" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" +) + +type testMpoolApi struct { + cb func(rev, app []*types.TipSet) error + + bmsgs map[cid.Cid][]*types.SignedMessage + statenonce map[address.Address]uint64 +} + +func newTestMpoolApi() *testMpoolApi { + return &testMpoolApi{ + bmsgs: make(map[cid.Cid][]*types.SignedMessage), + statenonce: make(map[address.Address]uint64), + } +} + +func (tma *testMpoolApi) applyBlock(t *testing.T, b *types.BlockHeader) { + t.Helper() + if err := tma.cb(nil, []*types.TipSet{mock.TipSet(b)}); err != nil { + t.Fatal(err) + } +} + +func (tma *testMpoolApi) setStateNonce(addr address.Address, v uint64) { + tma.statenonce[addr] = v +} + +func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) { + tma.bmsgs[h.Cid()] = msgs +} + +func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { + tma.cb = cb +} + +func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) { + return cid.Undef, nil +} + +func (tma *testMpoolApi) PubSubPublish(string, []byte) error { + return nil +} + +func (tma *testMpoolApi) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + return &types.Actor{ + Nonce: tma.statenonce[addr], + Balance: types.NewInt(90000000), + }, nil +} + +func (tma *testMpoolApi) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + return nil, tma.bmsgs[h.Cid()], nil +} + +func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, error) { + if len(ts.Blocks()) != 1 { + panic("cant deal with multiblock tipsets in this test") + } + + bm, sm, err := tma.MessagesForBlock(ts.Blocks()[0]) + if err != nil { + return nil, err + } + + var out []store.ChainMsg + for _, m := range bm { + out = append(out, m) + } + + for _, m := range sm { + out = append(out, m) + } + + return out, nil +} + +func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { + t.Helper() + n, err := mp.GetNonce(addr) + if err != nil { + t.Fatal(err) + } + + if n != val { + t.Fatalf("expected nonce of %d, got %d", val, n) + } +} + +func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { + t.Helper() + if err := mp.Add(msg); err != nil { + t.Fatal(err) + } +} + +func TestMessagePool(t *testing.T) { + tma := newTestMpoolApi() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := NewMessagePool(tma, ds) + if err != nil { + t.Fatal(err) + } + + a := mock.MkBlock(nil, 1, 1) + + sender, err := w.GenerateKey(types.KTBLS) + if err != nil { + t.Fatal(err) + } + target := mock.Address(1001) + + var msgs []*types.SignedMessage + for i := 0; i < 5; i++ { + msgs = append(msgs, mock.MkMessage(sender, target, uint64(i), w)) + } + + tma.setStateNonce(sender, 0) + assertNonce(t, mp, sender, 0) + mustAdd(t, mp, msgs[0]) + assertNonce(t, mp, sender, 1) + mustAdd(t, mp, msgs[1]) + assertNonce(t, mp, sender, 2) + + tma.setBlockMessages(a, msgs[0], msgs[1]) + tma.applyBlock(t, a) + + assertNonce(t, mp, sender, 2) +} diff --git a/chain/store/store.go b/chain/store/store.go index 879f16c32..9f977b161 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -601,6 +601,7 @@ func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) { type ChainMsg interface { Cid() cid.Cid VMMessage() *types.Message + ToStorageBlock() (block.Block, error) } func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) { diff --git a/chain/types/mock/chain.go b/chain/types/mock/chain.go index 37d2e8a53..f51573d03 100644 --- a/chain/types/mock/chain.go +++ b/chain/types/mock/chain.go @@ -1,10 +1,12 @@ package mock import ( + "context" "fmt" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" "github.com/ipfs/go-cid" ) @@ -16,6 +18,26 @@ func Address(i uint64) address.Address { return a } +func MkMessage(from, to address.Address, nonce uint64, w *wallet.Wallet) *types.SignedMessage { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: nonce, + GasLimit: types.NewInt(1), + GasPrice: types.NewInt(0), + } + + sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes()) + if err != nil { + panic(err) + } + return &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } +} + func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader { addr := Address(123561) diff --git a/miner/miner.go b/miner/miner.go index 370014777..f15b8e013 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -279,7 +279,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB if err != nil { return nil, xerrors.Errorf("failed to create block: %w", err) } - log.Infow("mined new block", "cid", b.Cid()) + log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height) dur := time.Now().Sub(start) log.Infof("Creating block took %s", dur) diff --git a/node/modules/chain.go b/node/modules/chain.go index b6fb29c8c..4e0b9d1e8 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -42,7 +42,8 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt } func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) { - mp, err := chain.NewMessagePool(sm, ps, ds) + mpp := chain.NewMpoolProvider(sm, ps) + mp, err := chain.NewMessagePool(mpp, ds) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } From b58e7344e87234816c82ae3b69723e5860b50881 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 1 Dec 2019 16:11:43 -0700 Subject: [PATCH 2/5] pull messagepool into separate package --- chain/{ => messagepool}/messagepool.go | 8 ++++++-- chain/{ => messagepool}/messagepool_test.go | 2 +- chain/sub/incoming.go | 3 ++- chain/sync.go | 8 ++++---- node/builder.go | 3 ++- node/impl/full/mpool.go | 4 ++-- node/modules/chain.go | 7 ++++--- node/modules/services.go | 3 ++- 8 files changed, 23 insertions(+), 15 deletions(-) rename chain/{ => messagepool}/messagepool.go (98%) rename chain/{ => messagepool}/messagepool_test.go (99%) diff --git a/chain/messagepool.go b/chain/messagepool/messagepool.go similarity index 98% rename from chain/messagepool.go rename to chain/messagepool/messagepool.go index 79b0ac315..586e71432 100644 --- a/chain/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -1,4 +1,4 @@ -package chain +package messagepool import ( "bytes" @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" lps "github.com/whyrusleeping/pubsub" "go.uber.org/multierr" @@ -20,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" @@ -27,6 +29,8 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" ) +var log = logging.Logger("messagepool") + var ( ErrMessageTooBig = errors.New("message too big") @@ -567,7 +571,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err sub := mp.changes.Sub(localUpdates) go func() { - defer mp.changes.Unsub(sub, localIncoming) + defer mp.changes.Unsub(sub, chain.LocalIncoming) for { select { diff --git a/chain/messagepool_test.go b/chain/messagepool/messagepool_test.go similarity index 99% rename from chain/messagepool_test.go rename to chain/messagepool/messagepool_test.go index f76ed5422..d7c462d3e 100644 --- a/chain/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -1,4 +1,4 @@ -package chain +package messagepool import ( "testing" diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index a2d46a824..b09d0c227 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -7,6 +7,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" ) @@ -54,7 +55,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha } } -func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) { +func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) { for { msg, err := msub.Next(ctx) if err != nil { diff --git a/chain/sync.go b/chain/sync.go index dbf974cfa..774f18f95 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -39,7 +39,7 @@ import ( var log = logging.Logger("chain") -var localIncoming = "incoming" +var LocalIncoming = "incoming" type Syncer struct { // The heaviest known tipset in the network. @@ -119,7 +119,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } } - syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming) + syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming) if from == syncer.self { // TODO: this is kindof a hack... @@ -152,11 +152,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { - sub := syncer.incoming.Sub(localIncoming) + sub := syncer.incoming.Sub(LocalIncoming) out := make(chan *types.BlockHeader, 10) go func() { - defer syncer.incoming.Unsub(sub, localIncoming) + defer syncer.incoming.Unsub(sub, LocalIncoming) for { select { diff --git a/node/builder.go b/node/builder.go index 412730fab..bc8dbea59 100644 --- a/node/builder.go +++ b/node/builder.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/market" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/metrics" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" @@ -202,7 +203,7 @@ func Online() Option { // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), - Override(new(*chain.MessagePool), modules.MessagePool), + Override(new(*messagepool.MessagePool), modules.MessagePool), Override(new(modules.Genesis), modules.ErrorGenesis), Override(SetGenesisKey, modules.SetGenesis), diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index db318a083..9ea34a7d1 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -7,8 +7,8 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" ) @@ -17,7 +17,7 @@ type MpoolAPI struct { WalletAPI - Mpool *chain.MessagePool + Mpool *messagepool.MessagePool } func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { diff --git a/node/modules/chain.go b/node/modules/chain.go index 4e0b9d1e8..093573d76 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -41,9 +42,9 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt return exch } -func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) { - mpp := chain.NewMpoolProvider(sm, ps) - mp, err := chain.NewMessagePool(mpp, ds) +func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*messagepool.MessagePool, error) { + mpp := messagepool.NewMpoolProvider(sm, ps) + mp, err := messagepool.NewMessagePool(mpp, ds) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } diff --git a/node/modules/services.go b/node/modules/services.go index 8cf298d8a..12b3f02c5 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/deals" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -53,7 +54,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubs go sub.HandleIncomingBlocks(ctx, blocksub, s) } -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) { +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *messagepool.MessagePool) { ctx := helpers.LifecycleCtx(mctx, lc) msgsub, err := pubsub.Subscribe("/fil/messages") From 68e25b36f1bf2a18251f91fe85703b9f4bb7f8b9 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 11:39:50 -0800 Subject: [PATCH 3/5] rename to avoid stuttering --- chain/messagepool/messagepool.go | 8 ++++---- chain/messagepool/messagepool_test.go | 2 +- node/modules/chain.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 586e71432..0ae7791d1 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -65,7 +65,7 @@ type MessagePool struct { curTsLk sync.RWMutex curTs *types.TipSet - api MpoolProvider + api Provider minGasPrice types.BigInt @@ -105,7 +105,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error { return nil } -type MpoolProvider interface { +type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) PutMessage(m store.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error @@ -119,7 +119,7 @@ type mpoolProvider struct { ps *pubsub.PubSub } -func NewMpoolProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) MpoolProvider { +func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { return &mpoolProvider{sm, ps} } @@ -147,7 +147,7 @@ func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, return mpp.sm.ChainStore().MessagesForTipset(ts) } -func NewMessagePool(api MpoolProvider, ds dtypes.MetadataDS) (*MessagePool, error) { +func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) mp := &MessagePool{ closer: make(chan struct{}), diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index d7c462d3e..1c42e5b18 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -115,7 +115,7 @@ func TestMessagePool(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := NewMessagePool(tma, ds) + mp, err := New(tma, ds) if err != nil { t.Fatal(err) } diff --git a/node/modules/chain.go b/node/modules/chain.go index 093573d76..728b4e7a1 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -43,8 +43,8 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt } func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*messagepool.MessagePool, error) { - mpp := messagepool.NewMpoolProvider(sm, ps) - mp, err := messagepool.NewMessagePool(mpp, ds) + mpp := messagepool.NewProvider(sm, ps) + mp, err := messagepool.New(mpp, ds) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } From e366db00fe13f9faff4b7656f18d581016e535f7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 12:46:25 -0800 Subject: [PATCH 4/5] fix lame deadlock and revert handling --- chain/messagepool/messagepool.go | 41 ++++++++++++++++++++------- chain/messagepool/messagepool_test.go | 4 +++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0ae7791d1..dfa82c3fa 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -62,7 +62,7 @@ type MessagePool struct { pending map[address.Address]*msgSet pendingCount int - curTsLk sync.RWMutex + curTsLk sync.Mutex curTs *types.TipSet api Provider @@ -112,6 +112,7 @@ type Provider interface { StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error) + LoadTipSet(cids []cid.Cid) (*types.TipSet, error) } type mpoolProvider struct { @@ -147,6 +148,10 @@ func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, return mpp.sm.ChainStore().MessagesForTipset(ts) } +func (mpp *mpoolProvider) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { + return mpp.sm.ChainStore().LoadTipSet(cids) +} + func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) mp := &MessagePool{ @@ -354,12 +359,23 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return stateNonce, nil } +func (mp *MessagePool) setCurTipset(ts *types.TipSet) { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + mp.curTs = ts +} + +func (mp *MessagePool) getCurTipset() *types.TipSet { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + return mp.curTs +} + func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { // TODO: this method probably should be cached - mp.curTsLk.RLock() - defer mp.curTsLk.RUnlock() - act, err := mp.api.StateGetActor(addr, mp.curTs) + curTs := mp.getCurTipset() + act, err := mp.api.StateGetActor(addr, curTs) if err != nil { return 0, err } @@ -369,11 +385,11 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { // TODO: the correct thing to do here is probably to set curTs to chain.head // but since we have an accurate view of the world until a head change occurs, // this should be fine - if mp.curTs == nil { + if curTs == nil { return baseNonce, nil } - msgs, err := mp.api.MessagesForTipset(mp.curTs) + msgs, err := mp.api.MessagesForTipset(curTs) if err != nil { return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) } @@ -382,7 +398,7 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { msg := m.VMMessage() if msg.From == addr { if msg.Nonce != baseNonce { - return 0, xerrors.Errorf("tipset %s has bad nonce ordering", mp.curTs) + return 0, xerrors.Errorf("tipset %s has bad nonce ordering", curTs) } baseNonce++ } @@ -500,10 +516,14 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { } func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() for _, ts := range revert { + pts, err := mp.api.LoadTipSet(ts.Parents()) + if err != nil { + return err + } + + mp.setCurTipset(pts) for _, b := range ts.Blocks() { bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { @@ -526,7 +546,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } } } - mp.curTs = ts } for _, ts := range apply { @@ -543,7 +562,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(msg.From, msg.Nonce) } } - mp.curTs = ts + mp.setCurTipset(ts) } return nil diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 1c42e5b18..d830e9fd6 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -86,6 +86,10 @@ func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, return out, nil } +func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { + panic("dont call me unless you implement me") +} + func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { t.Helper() n, err := mp.GetNonce(addr) From 481cc63181270af998dda7cbb62ea86bb59a540c Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 14:28:28 -0800 Subject: [PATCH 5/5] add a test for the revert messages issue --- chain/messagepool/messagepool.go | 2 +- chain/messagepool/messagepool_test.go | 74 ++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index dfa82c3fa..38b7e1fc4 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -398,7 +398,7 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { msg := m.VMMessage() if msg.From == addr { if msg.Nonce != baseNonce { - return 0, xerrors.Errorf("tipset %s has bad nonce ordering", curTs) + return 0, xerrors.Errorf("tipset %s has bad nonce ordering (%d != %d)", curTs.Cids(), msg.Nonce, baseNonce) } baseNonce++ } diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index d830e9fd6..1cd8f8a75 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -1,6 +1,7 @@ package messagepool import ( + "fmt" "testing" "github.com/filecoin-project/lotus/chain/address" @@ -17,6 +18,8 @@ type testMpoolApi struct { bmsgs map[cid.Cid][]*types.SignedMessage statenonce map[address.Address]uint64 + + tipsets []*types.TipSet } func newTestMpoolApi() *testMpoolApi { @@ -33,12 +36,20 @@ func (tma *testMpoolApi) applyBlock(t *testing.T, b *types.BlockHeader) { } } +func (tma *testMpoolApi) revertBlock(t *testing.T, b *types.BlockHeader) { + t.Helper() + if err := tma.cb([]*types.TipSet{mock.TipSet(b)}, nil); err != nil { + t.Fatal(err) + } +} + func (tma *testMpoolApi) setStateNonce(addr address.Address, v uint64) { tma.statenonce[addr] = v } func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) { tma.bmsgs[h.Cid()] = msgs + tma.tipsets = append(tma.tipsets, mock.TipSet(h)) } func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { @@ -87,7 +98,13 @@ func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, } func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { - panic("dont call me unless you implement me") + for _, ts := range tma.tipsets { + if types.CidArrsEqual(cids, ts.Cids()) { + return ts, nil + } + } + + return nil, fmt.Errorf("tipset not found") } func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { @@ -149,3 +166,58 @@ func TestMessagePool(t *testing.T) { assertNonce(t, mp, sender, 2) } + +func TestRevertMessages(t *testing.T) { + tma := newTestMpoolApi() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds) + if err != nil { + t.Fatal(err) + } + + a := mock.MkBlock(nil, 1, 1) + b := mock.MkBlock(mock.TipSet(a), 1, 1) + + sender, err := w.GenerateKey(types.KTBLS) + if err != nil { + t.Fatal(err) + } + target := mock.Address(1001) + + var msgs []*types.SignedMessage + for i := 0; i < 5; i++ { + msgs = append(msgs, mock.MkMessage(sender, target, uint64(i), w)) + } + + tma.setBlockMessages(a, msgs[0]) + tma.setBlockMessages(b, msgs[1], msgs[2], msgs[3]) + + mustAdd(t, mp, msgs[0]) + mustAdd(t, mp, msgs[1]) + mustAdd(t, mp, msgs[2]) + mustAdd(t, mp, msgs[3]) + + tma.setStateNonce(sender, 0) + tma.applyBlock(t, a) + assertNonce(t, mp, sender, 4) + + tma.setStateNonce(sender, 1) + tma.applyBlock(t, b) + assertNonce(t, mp, sender, 4) + tma.setStateNonce(sender, 0) + tma.revertBlock(t, b) + + assertNonce(t, mp, sender, 4) + + if len(mp.Pending()) != 3 { + t.Fatal("expected three messages in mempool") + } + +}