Add replay rejection
This commit is contained in:
parent
a62e027002
commit
3ff209d95d
@ -1,6 +1,7 @@
|
|||||||
package sub
|
package sub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -451,6 +452,7 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType
|
|||||||
type peerMsgInfo struct {
|
type peerMsgInfo struct {
|
||||||
peerID peer.ID
|
peerID peer.ID
|
||||||
lastCid cid.Cid
|
lastCid cid.Cid
|
||||||
|
lastSeqno []byte
|
||||||
rateLimit *ratelimit.Window
|
rateLimit *ratelimit.Window
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
}
|
}
|
||||||
@ -513,6 +515,17 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
|
|||||||
msgInfo.mutex.Lock()
|
msgInfo.mutex.Lock()
|
||||||
defer msgInfo.mutex.Unlock()
|
defer msgInfo.mutex.Unlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
// Reject replayed messages.
|
||||||
|
seqno := msg.Message.GetSeqno()
|
||||||
|
if bytes.Equal(msgInfo.lastSeqno, seqno) {
|
||||||
|
log.Warnf("rejecting replayed indexer message")
|
||||||
|
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationReject
|
||||||
|
}
|
||||||
|
msgInfo.lastSeqno = seqno
|
||||||
|
}
|
||||||
|
|
||||||
if !ok || originPeer != msgInfo.peerID {
|
if !ok || originPeer != msgInfo.peerID {
|
||||||
// Check that the miner ID maps to the peer that sent the message.
|
// Check that the miner ID maps to the peer that sent the message.
|
||||||
err = v.authenticateMessage(ctx, minerID, originPeer)
|
err = v.authenticateMessage(ctx, minerID, originPeer)
|
||||||
|
Loading…
Reference in New Issue
Block a user