Smarter message rebroadcast
License: MIT Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
3b20e9e367
commit
44d8f4d6d4
@ -192,14 +192,40 @@ func (mp *MessagePool) repubLocal() {
|
||||
select {
|
||||
case <-mp.repubTk.C:
|
||||
mp.lk.Lock()
|
||||
msgs := make([]*types.SignedMessage, 0)
|
||||
|
||||
msgsForAddr := make(map[address.Address][]*types.SignedMessage)
|
||||
for a := range mp.localAddrs {
|
||||
msgs = append(msgs, mp.pendingFor(a)...)
|
||||
msgsForAddr[a] = mp.pendingFor(a)
|
||||
}
|
||||
|
||||
mp.lk.Unlock()
|
||||
|
||||
var errout error
|
||||
for _, msg := range msgs {
|
||||
outputMsgs := []*types.SignedMessage{}
|
||||
|
||||
for a, msgs := range msgsForAddr {
|
||||
a, err := mp.api.StateGetActor(a, nil)
|
||||
if err != nil {
|
||||
errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err))
|
||||
continue
|
||||
}
|
||||
|
||||
curNonce := a.Nonce
|
||||
for _, m := range msgs {
|
||||
if m.Message.Nonce != curNonce {
|
||||
break
|
||||
}
|
||||
outputMsgs = append(outputMsgs, m)
|
||||
curNonce++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(outputMsgs) != 0 {
|
||||
log.Infow("republishing local messages", "n", len(outputMsgs))
|
||||
}
|
||||
|
||||
for _, msg := range outputMsgs {
|
||||
msgb, err := msg.Serialize()
|
||||
if err != nil {
|
||||
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))
|
||||
|
Loading…
Reference in New Issue
Block a user