diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index fc2b7baff..6afd48c39 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -530,8 +530,8 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg msgCid := idxrMsg.Cid var msgInfo *peerMsgInfo - msgInfo, ok := v.peerCache.Get(minerAddr) - if !ok { + msgInfo, cached := v.peerCache.Get(minerAddr) + if !cached { msgInfo = &peerMsgInfo{} } @@ -539,17 +539,17 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg msgInfo.mutex.Lock() defer msgInfo.mutex.Unlock() - if ok { + var seqno uint64 + if cached { // Reject replayed messages. - seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno()) + seqno = binary.BigEndian.Uint64(msg.Message.GetSeqno()) if seqno <= msgInfo.lastSeqno { log.Debugf("ignoring replayed indexer message") return pubsub.ValidationIgnore } - msgInfo.lastSeqno = seqno } - if !ok || originPeer != msgInfo.peerID { + if !cached || originPeer != msgInfo.peerID { // Check that the miner ID maps to the peer that sent the message. err = v.authenticateMessage(ctx, minerAddr, originPeer) if err != nil { @@ -558,7 +558,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg return pubsub.ValidationReject } msgInfo.peerID = originPeer - if !ok { + if !cached { // Add msgInfo to cache only after being authenticated. If two // messages from the same peer are handled concurrently, there is a // small chance that one msgInfo could replace the other here when @@ -567,6 +567,9 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg } } + // Update message info cache with the latest message's sequence number. + msgInfo.lastSeqno = seqno + // See if message needs to be ignored due to rate limiting. if v.rateLimitPeer(msgInfo, msgCid) { return pubsub.ValidationIgnore