From e366db00fe13f9faff4b7656f18d581016e535f7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 12:46:25 -0800 Subject: [PATCH] fix lame deadlock and revert handling --- chain/messagepool/messagepool.go | 41 ++++++++++++++++++++------- chain/messagepool/messagepool_test.go | 4 +++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0ae7791d1..dfa82c3fa 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -62,7 +62,7 @@ type MessagePool struct { pending map[address.Address]*msgSet pendingCount int - curTsLk sync.RWMutex + curTsLk sync.Mutex curTs *types.TipSet api Provider @@ -112,6 +112,7 @@ type Provider interface { StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error) + LoadTipSet(cids []cid.Cid) (*types.TipSet, error) } type mpoolProvider struct { @@ -147,6 +148,10 @@ func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, return mpp.sm.ChainStore().MessagesForTipset(ts) } +func (mpp *mpoolProvider) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { + return mpp.sm.ChainStore().LoadTipSet(cids) +} + func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) mp := &MessagePool{ @@ -354,12 +359,23 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return stateNonce, nil } +func (mp *MessagePool) setCurTipset(ts *types.TipSet) { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + mp.curTs = ts +} + +func (mp *MessagePool) getCurTipset() *types.TipSet { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + return mp.curTs +} + func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { // TODO: this method probably should be cached - mp.curTsLk.RLock() - defer mp.curTsLk.RUnlock() - act, err := mp.api.StateGetActor(addr, mp.curTs) + curTs := mp.getCurTipset() + act, err := mp.api.StateGetActor(addr, curTs) if err != nil { return 0, err } @@ -369,11 +385,11 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { // TODO: the correct thing to do here is probably to set curTs to chain.head // but since we have an accurate view of the world until a head change occurs, // this should be fine - if mp.curTs == nil { + if curTs == nil { return baseNonce, nil } - msgs, err := mp.api.MessagesForTipset(mp.curTs) + msgs, err := mp.api.MessagesForTipset(curTs) if err != nil { return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) } @@ -382,7 +398,7 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { msg := m.VMMessage() if msg.From == addr { if msg.Nonce != baseNonce { - return 0, xerrors.Errorf("tipset %s has bad nonce ordering", mp.curTs) + return 0, xerrors.Errorf("tipset %s has bad nonce ordering", curTs) } baseNonce++ } @@ -500,10 +516,14 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { } func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() for _, ts := range revert { + pts, err := mp.api.LoadTipSet(ts.Parents()) + if err != nil { + return err + } + + mp.setCurTipset(pts) for _, b := range ts.Blocks() { bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { @@ -526,7 +546,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } } } - mp.curTs = ts } for _, ts := range apply { @@ -543,7 +562,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(msg.From, msg.Nonce) } } - mp.curTs = ts + mp.setCurTipset(ts) } return nil diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 1c42e5b18..d830e9fd6 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -86,6 +86,10 @@ func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg, return out, nil } +func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { + panic("dont call me unless you implement me") +} + func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { t.Helper() n, err := mp.GetNonce(addr)