Merge pull request #778 from filecoin-project/feat/better-messagepool-reorg

Improve messagepool reorg checks
This commit is contained in:
Whyrusleeping 2019-12-07 16:00:08 +01:00 committed by GitHub
commit 5bd3dd00b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -289,7 +289,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}
balance, err := mp.getStateBalance(m.Message.From)
balance, err := mp.getStateBalance(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to check sender balance: %w", err)
}
@ -304,6 +304,13 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
return mp.addLocked(m)
}
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.addLocked(m)
}
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
if m.Signature.Type == types.KTBLS {
@ -402,8 +409,8 @@ func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet)
return baseNonce, nil
}
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
act, err := mp.api.StateGetActor(addr, nil)
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
act, err := mp.api.StateGetActor(addr, ts)
if err != nil {
return types.EmptyInt, err
}
@ -521,6 +528,30 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
add := func(m *types.SignedMessage) {
s, ok := rmsgs[m.Message.From]
if !ok {
s = make(map[uint64]*types.SignedMessage)
rmsgs[m.Message.From] = s
}
s[m.Message.Nonce] = m
}
rm := func(from address.Address, nonce uint64) {
s, ok := rmsgs[from]
if !ok {
mp.Remove(from, nonce)
return
}
if _, ok := s[nonce]; ok {
delete(s, nonce)
return
}
mp.Remove(from, nonce)
}
for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents())
if err != nil {
@ -535,9 +566,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.curTs = pts
for _, msg := range msgs {
if err := mp.addTs(msg, pts); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
add(msg)
}
}
@ -548,17 +577,25 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
}
for _, msg := range smsgs {
mp.Remove(msg.Message.From, msg.Message.Nonce)
rm(msg.Message.From, msg.Message.Nonce)
}
for _, msg := range bmsgs {
mp.Remove(msg.From, msg.Nonce)
rm(msg.From, msg.Nonce)
}
}
mp.curTs = ts
}
for _, s := range rmsgs {
for _, msg := range s {
if err := mp.addSkipChecks(msg); err != nil {
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
}
}
}
return nil
}