Improve messagepool reorg checks
This commit is contained in:
parent
bbc61a8f86
commit
74ed6455b0
@ -289,7 +289,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
|
|||||||
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
||||||
}
|
}
|
||||||
|
|
||||||
balance, err := mp.getStateBalance(m.Message.From)
|
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to check sender balance: %w", err)
|
return xerrors.Errorf("failed to check sender balance: %w", err)
|
||||||
}
|
}
|
||||||
@ -304,6 +304,13 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
|
|||||||
return mp.addLocked(m)
|
return mp.addLocked(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
||||||
|
mp.lk.Lock()
|
||||||
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
|
return mp.addLocked(m)
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
||||||
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
|
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
|
||||||
if m.Signature.Type == types.KTBLS {
|
if m.Signature.Type == types.KTBLS {
|
||||||
@ -402,8 +409,8 @@ func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet)
|
|||||||
return baseNonce, nil
|
return baseNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
|
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
||||||
act, err := mp.api.StateGetActor(addr, nil)
|
act, err := mp.api.StateGetActor(addr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.EmptyInt, err
|
return types.EmptyInt, err
|
||||||
}
|
}
|
||||||
@ -521,6 +528,30 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
rmsgs := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
|
add := func(m *types.SignedMessage) {
|
||||||
|
s, ok := rmsgs[m.Message.From]
|
||||||
|
if !ok {
|
||||||
|
s = make(map[uint64]*types.SignedMessage)
|
||||||
|
rmsgs[m.Message.From] = s
|
||||||
|
}
|
||||||
|
s[m.Message.Nonce] = m
|
||||||
|
}
|
||||||
|
rm := func(from address.Address, nonce uint64) {
|
||||||
|
s, ok := rmsgs[from]
|
||||||
|
if !ok {
|
||||||
|
mp.Remove(from, nonce)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := s[nonce]; ok {
|
||||||
|
delete(s, nonce)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.Remove(from, nonce)
|
||||||
|
}
|
||||||
|
|
||||||
for _, ts := range revert {
|
for _, ts := range revert {
|
||||||
pts, err := mp.api.LoadTipSet(ts.Parents())
|
pts, err := mp.api.LoadTipSet(ts.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -535,9 +566,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.curTs = pts
|
mp.curTs = pts
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if err := mp.addTs(msg, pts); err != nil {
|
add(msg)
|
||||||
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,17 +577,25 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
}
|
}
|
||||||
for _, msg := range smsgs {
|
for _, msg := range smsgs {
|
||||||
mp.Remove(msg.Message.From, msg.Message.Nonce)
|
rm(msg.Message.From, msg.Message.Nonce)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range bmsgs {
|
for _, msg := range bmsgs {
|
||||||
mp.Remove(msg.From, msg.Nonce)
|
rm(msg.From, msg.Nonce)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.curTs = ts
|
mp.curTs = ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, s := range rmsgs {
|
||||||
|
for _, msg := range s {
|
||||||
|
if err := mp.addSkipChecks(msg); err != nil {
|
||||||
|
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user