From 44d8f4d6d45f9a6ac77afe48fa14537a0bf22621 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 10 Dec 2019 14:13:20 +0100 Subject: [PATCH] Smarter message rebroadcast License: MIT Signed-off-by: Jakub Sztandera --- chain/messagepool/messagepool.go | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 1a827c1bd..7367f1dbc 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -192,14 +192,40 @@ func (mp *MessagePool) repubLocal() { select { case <-mp.repubTk.C: mp.lk.Lock() - msgs := make([]*types.SignedMessage, 0) + + msgsForAddr := make(map[address.Address][]*types.SignedMessage) for a := range mp.localAddrs { - msgs = append(msgs, mp.pendingFor(a)...) + msgsForAddr[a] = mp.pendingFor(a) } + mp.lk.Unlock() var errout error - for _, msg := range msgs { + outputMsgs := []*types.SignedMessage{} + + for a, msgs := range msgsForAddr { + a, err := mp.api.StateGetActor(a, nil) + if err != nil { + errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err)) + continue + } + + curNonce := a.Nonce + for _, m := range msgs { + if m.Message.Nonce != curNonce { + break + } + outputMsgs = append(outputMsgs, m) + curNonce++ + } + + } + + if len(outputMsgs) != 0 { + log.Infow("republishing local messages", "n", len(outputMsgs)) + } + + for _, msg := range outputMsgs { msgb, err := msg.Serialize() if err != nil { errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))