Correctly handle seqno check

This commit is contained in:
gammazero 2022-02-09 16:21:05 -08:00
parent 3ff209d95d
commit 681ce94a34

View File

@ -1,8 +1,8 @@
package sub package sub
import ( import (
"bytes"
"context" "context"
"encoding/binary"
"sync" "sync"
"time" "time"
@ -452,7 +452,7 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType
type peerMsgInfo struct { type peerMsgInfo struct {
peerID peer.ID peerID peer.ID
lastCid cid.Cid lastCid cid.Cid
lastSeqno []byte lastSeqno uint64
rateLimit *ratelimit.Window rateLimit *ratelimit.Window
mutex sync.Mutex mutex sync.Mutex
} }
@ -517,11 +517,10 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
if ok { if ok {
// Reject replayed messages. // Reject replayed messages.
seqno := msg.Message.GetSeqno() seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno())
if bytes.Equal(msgInfo.lastSeqno, seqno) { if seqno <= msgInfo.lastSeqno {
log.Warnf("rejecting replayed indexer message") log.Debugf("ignoring replayed indexer message")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) return pubsub.ValidationIgnore
return pubsub.ValidationReject
} }
msgInfo.lastSeqno = seqno msgInfo.lastSeqno = seqno
} }