fix lame deadlock and revert handling

This commit is contained in:
whyrusleeping 2019-12-02 12:46:25 -08:00
parent 68e25b36f1
commit e366db00fe
2 changed files with 34 additions and 11 deletions

View File

@ -62,7 +62,7 @@ type MessagePool struct {
pending map[address.Address]*msgSet pending map[address.Address]*msgSet
pendingCount int pendingCount int
curTsLk sync.RWMutex curTsLk sync.Mutex
curTs *types.TipSet curTs *types.TipSet
api Provider api Provider
@ -112,6 +112,7 @@ type Provider interface {
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error) MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error)
LoadTipSet(cids []cid.Cid) (*types.TipSet, error)
} }
type mpoolProvider struct { type mpoolProvider struct {
@ -147,6 +148,10 @@ func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg,
return mpp.sm.ChainStore().MessagesForTipset(ts) return mpp.sm.ChainStore().MessagesForTipset(ts)
} }
func (mpp *mpoolProvider) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) {
return mpp.sm.ChainStore().LoadTipSet(cids)
}
func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize) cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
mp := &MessagePool{ mp := &MessagePool{
@ -354,12 +359,23 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return stateNonce, nil return stateNonce, nil
} }
func (mp *MessagePool) setCurTipset(ts *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTs = ts
}
func (mp *MessagePool) getCurTipset() *types.TipSet {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.curTs
}
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
// TODO: this method probably should be cached // TODO: this method probably should be cached
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()
act, err := mp.api.StateGetActor(addr, mp.curTs) curTs := mp.getCurTipset()
act, err := mp.api.StateGetActor(addr, curTs)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -369,11 +385,11 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
// TODO: the correct thing to do here is probably to set curTs to chain.head // TODO: the correct thing to do here is probably to set curTs to chain.head
// but since we have an accurate view of the world until a head change occurs, // but since we have an accurate view of the world until a head change occurs,
// this should be fine // this should be fine
if mp.curTs == nil { if curTs == nil {
return baseNonce, nil return baseNonce, nil
} }
msgs, err := mp.api.MessagesForTipset(mp.curTs) msgs, err := mp.api.MessagesForTipset(curTs)
if err != nil { if err != nil {
return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) return 0, xerrors.Errorf("failed to check messages for tipset: %w", err)
} }
@ -382,7 +398,7 @@ func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
msg := m.VMMessage() msg := m.VMMessage()
if msg.From == addr { if msg.From == addr {
if msg.Nonce != baseNonce { if msg.Nonce != baseNonce {
return 0, xerrors.Errorf("tipset %s has bad nonce ordering", mp.curTs) return 0, xerrors.Errorf("tipset %s has bad nonce ordering", curTs)
} }
baseNonce++ baseNonce++
} }
@ -500,10 +516,14 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
} }
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
for _, ts := range revert { for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents())
if err != nil {
return err
}
mp.setCurTipset(pts)
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b) bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil { if err != nil {
@ -526,7 +546,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
} }
} }
} }
mp.curTs = ts
} }
for _, ts := range apply { for _, ts := range apply {
@ -543,7 +562,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.Remove(msg.From, msg.Nonce) mp.Remove(msg.From, msg.Nonce)
} }
} }
mp.curTs = ts mp.setCurTipset(ts)
} }
return nil return nil

View File

@ -86,6 +86,10 @@ func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg,
return out, nil return out, nil
} }
func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) {
panic("dont call me unless you implement me")
}
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
t.Helper() t.Helper()
n, err := mp.GetNonce(addr) n, err := mp.GetNonce(addr)