Merge pull request #832 from filecoin-project/feat/smrt-rbc

Smarter message rebroadcast
This commit is contained in:
Łukasz Magiera 2019-12-10 16:18:04 +01:00 committed by GitHub
commit fe95c59158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 4 deletions

View File

@ -90,7 +90,8 @@ func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDea
dealIdx := -1
for i, storageDeal := range params.Deals {
// TODO: make it less hacky
eq, err := cborutil.Equals(&deal.Proposal, &storageDeal)
sd := storageDeal
eq, err := cborutil.Equals(&deal.Proposal, &sd)
if err != nil {
return nil, err
}

View File

@ -192,14 +192,43 @@ func (mp *MessagePool) repubLocal() {
select {
case <-mp.repubTk.C:
mp.lk.Lock()
msgs := make([]*types.SignedMessage, 0)
msgsForAddr := make(map[address.Address][]*types.SignedMessage)
for a := range mp.localAddrs {
msgs = append(msgs, mp.pendingFor(a)...)
msgsForAddr[a] = mp.pendingFor(a)
}
mp.lk.Unlock()
var errout error
for _, msg := range msgs {
outputMsgs := []*types.SignedMessage{}
for a, msgs := range msgsForAddr {
a, err := mp.api.StateGetActor(a, nil)
if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not get actor state: %w", err))
continue
}
curNonce := a.Nonce
for _, m := range msgs {
if m.Message.Nonce < curNonce {
continue
}
if m.Message.Nonce != curNonce {
break
}
outputMsgs = append(outputMsgs, m)
curNonce++
}
}
if len(outputMsgs) != 0 {
log.Infow("republishing local messages", "n", len(outputMsgs))
}
for _, msg := range outputMsgs {
msgb, err := msg.Serialize()
if err != nil {
errout = multierr.Append(errout, xerrors.Errorf("could not serialize: %w", err))