lotus/chain/sub/incoming.go

633 lines
17 KiB
Go
Raw Normal View History

2019-07-08 14:07:09 +00:00
package sub
import (
"bytes"
2019-07-08 14:07:09 +00:00
"context"
2022-02-10 00:21:05 +00:00
"encoding/binary"
"sync"
2020-09-08 07:39:16 +00:00
"time"
lru "github.com/hashicorp/golang-lru/v2"
chore: migrate to boxo This migrates everything except the `go-car` librairy: https://github.com/ipfs/boxo/issues/218#issuecomment-1529922103 I didn't migrated everything in the previous release because all the boxo code wasn't compatible with the go-ipld-prime one due to a an in flight (/ aftermath) revert of github.com/ipfs/go-block-format. go-block-format has been unmigrated since slight bellow absolutely everything depends on it that would have required everything to be moved on boxo or everything to optin into using boxo which were all deal breakers for different groups. This worked fine because lotus's codebase could live hapely on the first multirepo setup however boost is now trying to use boxo's code with lotus's (still on multirepo) setup: https://filecoinproject.slack.com/archives/C03AQ3QAUG1/p1685022344779649 The alternative would be for boost to write shim types which just forward calls and return with the different interface definitions. Btw why is that an issue in the first place is because unlike what go's duck typing model suggest interfaces are not transparent https://github.com/golang/go/issues/58112, interfaces are strongly typed but they have implicit narrowing. The issue is if you return an interface from an interface Go does not have a function definition to insert the implicit conversion thus instead the type checker complains you are not returning the right type. Stubbing types were reverted https://github.com/ipfs/boxo/issues/218#issuecomment-1478650351 Last time I only migrated `go-bitswap` to `boxo/bitswap` because of the security issues and because we never had the interface return an interface problem (we had concrete wrappers where the implicit conversion took place).
2023-05-25 14:31:53 +00:00
bserv "github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
2022-06-14 15:00:51 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipni/go-libipni/announce/message"
2022-06-14 15:00:51 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
2022-06-14 15:00:51 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
2022-06-14 15:00:51 +00:00
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
2021-09-02 16:07:23 +00:00
"github.com/filecoin-project/lotus/chain/consensus"
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/client"
"github.com/filecoin-project/lotus/node/impl/full"
2019-07-08 14:07:09 +00:00
)
var log = logging.Logger("sub")
var msgCidPrefix = cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: client.DefaultHashFunction,
MhLength: 32,
}
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
2019-07-08 14:07:09 +00:00
for {
msg, err := bsub.Next(ctx)
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err)
2019-07-08 14:07:09 +00:00
continue
}
2020-02-17 05:51:18 +00:00
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
2020-05-05 13:35:03 +00:00
return
}
src := msg.GetFrom()
2019-07-08 14:07:09 +00:00
go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv.NewSession(ctx, bs)
2020-07-10 14:43:14 +00:00
start := build.Clock.Now()
2019-08-27 18:45:21 +00:00
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
2019-07-08 14:07:09 +00:00
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubsub: %s; source: %s", err, src)
2019-07-08 14:07:09 +00:00
return
}
smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubsub: %s; source: %s", err, src)
return
}
2020-07-10 14:43:14 +00:00
took := build.Clock.Since(start)
2020-11-03 12:28:31 +00:00
log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if took > 3*time.Second {
log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
}
2020-07-10 14:43:14 +00:00
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
2020-12-10 14:48:37 +00:00
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
metrics.BlockDelay.M(delay),
)
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
2019-12-07 10:49:05 +00:00
}
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) {
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
}
2019-07-08 14:07:09 +00:00
}()
}
}
2020-07-27 15:31:36 +00:00
func FetchMessagesByCids(
ctx context.Context,
bserv bserv.BlockGetter,
2020-07-27 15:31:36 +00:00
cids []cid.Cid,
) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
}
out[i] = msg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids(
ctx context.Context,
bserv bserv.BlockGetter,
2020-07-27 15:31:36 +00:00
cids []cid.Cid,
) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
}
out[i] = smsg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
// Fetch `cids` from the block service, apply `cb` on each of them. Used
2022-08-29 14:25:30 +00:00
//
// by the fetch message functions above.
//
2020-07-27 15:31:36 +00:00
// We check that each block is received only once and we do not received
2022-08-29 14:25:30 +00:00
//
// blocks we did not request.
2020-07-27 15:31:36 +00:00
func fetchCids(
ctx context.Context,
bserv bserv.BlockGetter,
2020-07-27 15:31:36 +00:00
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
2020-07-27 15:31:36 +00:00
cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
if c.Prefix() != msgCidPrefix {
return xerrors.Errorf("invalid msg CID: %s", c)
}
2020-07-27 15:31:36 +00:00
cidIndex[c] = i
}
if len(cids) != len(cidIndex) {
return xerrors.Errorf("duplicate CIDs in fetchCids input")
}
for block := range bserv.GetBlocks(ctx, cids) {
ix, ok := cidIndex[block.Cid()]
if !ok {
// Ignore duplicate/unexpected blocks. This shouldn't
// happen, but we can be safe.
log.Errorw("received duplicate/unexpected block when syncing", "cid", block.Cid())
continue
}
2020-07-27 15:31:36 +00:00
// Record that we've received the block.
delete(cidIndex, block.Cid())
2020-07-27 15:31:36 +00:00
if err := cb(ix, block); err != nil {
return err
}
}
2020-07-27 15:31:36 +00:00
if len(cidIndex) > 0 {
err := ctx.Err()
if err == nil {
err = xerrors.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex))
2020-07-27 15:31:36 +00:00
}
return err
2020-07-27 15:31:36 +00:00
}
return nil
}
2020-02-17 05:51:18 +00:00
type BlockValidator struct {
self peer.ID
peers *lru.TwoQueueCache[peer.ID, int]
2020-02-17 05:51:18 +00:00
killThresh int
recvBlocks *blockReceiptCache
blacklist func(peer.ID)
// necessary for block validation
2021-09-02 16:07:23 +00:00
chain *store.ChainStore
consensus consensus.Consensus
2020-02-17 05:51:18 +00:00
}
2021-09-02 16:07:23 +00:00
func NewBlockValidator(self peer.ID, chain *store.ChainStore, cns consensus.Consensus, blacklist func(peer.ID)) *BlockValidator {
p, _ := lru.New2Q[peer.ID, int](4096)
2020-02-17 05:51:18 +00:00
return &BlockValidator{
self: self,
2020-02-17 05:51:18 +00:00
peers: p,
killThresh: 10,
2020-02-17 05:51:18 +00:00
blacklist: blacklist,
recvBlocks: newBlockReceiptCache(),
chain: chain,
2021-09-02 16:07:23 +00:00
consensus: cns,
2020-02-17 05:51:18 +00:00
}
}
func (bv *BlockValidator) flagPeer(p peer.ID) {
val, ok := bv.peers.Get(p)
2020-02-17 05:51:18 +00:00
if !ok {
bv.peers.Add(p, 1)
2020-02-17 05:51:18 +00:00
return
}
if val >= bv.killThresh {
log.Warnf("blacklisting peer %s", p)
2020-02-17 05:51:18 +00:00
bv.blacklist(p)
return
2020-02-17 05:51:18 +00:00
}
bv.peers.Add(p, val+1)
2020-02-17 05:51:18 +00:00
}
2021-09-02 16:07:23 +00:00
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
2020-05-22 14:21:37 +00:00
defer func() {
2021-09-02 16:07:23 +00:00
if rerr := recover(); rerr != nil {
err := xerrors.Errorf("validate block: %s", rerr)
recordFailure(ctx, metrics.BlockValidationFailure, err.Error())
bv.flagPeer(pid)
res = pubsub.ValidationReject
return
}
2020-05-22 14:21:37 +00:00
}()
2021-09-02 16:07:23 +00:00
var what string
res, what = consensus.ValidateBlockPubsub(ctx, bv.consensus, pid == bv.self, msg)
2021-09-02 16:07:23 +00:00
if res == pubsub.ValidationAccept {
// it's a good block! make sure we've only seen it once
if count := bv.recvBlocks.add(msg.ValidatorData.(*types.BlockMsg).Cid()); count > 0 {
if pid == bv.self {
log.Warnf("local block has been seen %d times; ignoring", count)
}
2020-02-17 05:51:18 +00:00
2021-09-02 16:07:23 +00:00
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
return pubsub.ValidationIgnore
2020-05-12 19:26:25 +00:00
}
2021-09-02 16:07:23 +00:00
} else {
2020-11-19 12:46:40 +00:00
recordFailure(ctx, metrics.BlockValidationFailure, what)
}
2020-05-12 18:13:30 +00:00
2021-09-02 16:07:23 +00:00
return res
}
2020-02-17 05:51:18 +00:00
type blockReceiptCache struct {
blocks *lru.TwoQueueCache[cid.Cid, int]
2020-02-17 05:51:18 +00:00
}
func newBlockReceiptCache() *blockReceiptCache {
c, _ := lru.New2Q[cid.Cid, int](8192)
2020-02-17 05:51:18 +00:00
return &blockReceiptCache{
blocks: c,
}
}
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
val, ok := brc.blocks.Get(bcid)
if !ok {
brc.blocks.Add(bcid, 1)
2020-02-17 05:51:18 +00:00
return 0
}
brc.blocks.Add(bcid, val+1)
return val
2020-02-17 05:51:18 +00:00
}
2020-02-28 01:39:07 +00:00
type MessageValidator struct {
self peer.ID
2020-02-28 01:39:07 +00:00
mpool *messagepool.MessagePool
}
func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{self: self, mpool: mp}
2020-02-28 01:39:07 +00:00
}
2020-05-05 13:35:03 +00:00
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
if pid == mv.self {
return mv.validateLocalMessage(ctx, msg)
}
2021-06-11 11:19:26 +00:00
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()
stats.Record(ctx, metrics.MessageReceived.M(1))
2020-02-28 01:39:07 +00:00
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
2020-05-05 13:35:03 +00:00
return pubsub.ValidationReject
2020-02-28 01:39:07 +00:00
}
2021-05-18 18:56:42 +00:00
if err := mv.mpool.Add(ctx, m); err != nil {
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.Local, "false"),
)
2020-08-28 07:01:45 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "add")
switch {
2023-09-21 15:37:02 +00:00
case xerrors.Is(err, messagepool.ErrSoftValidationFailure):
fallthrough
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
fallthrough
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
fallthrough
2020-09-01 14:59:44 +00:00
case xerrors.Is(err, messagepool.ErrNonceGap):
fallthrough
case xerrors.Is(err, messagepool.ErrGasFeeCapTooLow):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
fallthrough
2023-09-21 15:37:02 +00:00
case xerrors.Is(err, messagepool.ErrNotEnoughFunds):
fallthrough
case xerrors.Is(err, messagepool.ErrExistingNonce):
return pubsub.ValidationIgnore
2023-09-21 15:37:02 +00:00
case xerrors.Is(err, messagepool.ErrMessageTooBig):
fallthrough
case xerrors.Is(err, messagepool.ErrMessageValueTooHigh):
fallthrough
case xerrors.Is(err, messagepool.ErrInvalidToAddr):
fallthrough
default:
return pubsub.ValidationReject
2020-05-05 13:35:03 +00:00
}
2020-02-28 01:39:07 +00:00
}
2021-06-11 11:19:26 +00:00
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
2020-05-05 13:35:03 +00:00
return pubsub.ValidationAccept
2020-02-28 01:39:07 +00:00
}
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.Local, "true"),
)
2021-06-11 11:19:26 +00:00
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()
// do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode local message: %s", err)
2020-08-28 06:53:59 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "decode")
return pubsub.ValidationIgnore
}
if m.Size() > messagepool.MaxMessageSize {
log.Warnf("local message is too large! (%dB)", m.Size())
2020-08-28 06:53:59 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "oversize")
return pubsub.ValidationIgnore
}
if m.Message.To == address.Undef {
log.Warn("local message has invalid destination address")
2020-08-28 06:53:59 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "undef-addr")
return pubsub.ValidationIgnore
}
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
log.Warnf("local messages has too high value: %s", m.Message.Value)
2020-08-28 06:53:59 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "value-too-high")
return pubsub.ValidationIgnore
}
if err := mv.mpool.VerifyMsgSig(m); err != nil {
log.Warnf("signature verification failed for local message: %s", err)
2020-08-28 06:53:59 +00:00
recordFailure(ctx, metrics.MessageValidationFailure, "verify-sig")
return pubsub.ValidationIgnore
}
2021-06-11 11:19:26 +00:00
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
2019-07-08 14:07:09 +00:00
for {
2020-02-28 01:39:07 +00:00
_, err := msub.Next(ctx)
2019-07-08 14:07:09 +00:00
if err != nil {
2019-09-17 14:23:08 +00:00
log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
2019-07-08 14:07:09 +00:00
continue
}
2020-02-28 01:39:07 +00:00
// Do nothing... everything happens in validate
2019-07-08 14:07:09 +00:00
}
}
2020-08-28 06:53:59 +00:00
func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType string) {
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.FailureType, failureType),
)
stats.Record(ctx, metric.M(1))
2020-08-28 07:01:45 +00:00
}
type peerMsgInfo struct {
peerID peer.ID
lastCid cid.Cid
2022-02-10 00:21:05 +00:00
lastSeqno uint64
rateLimit *ratelimit.Window
mutex sync.Mutex
}
type IndexerMessageValidator struct {
self peer.ID
peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo]
chainApi full.ChainModuleAPI
stateApi full.StateModuleAPI
}
func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator {
peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192)
return &IndexerMessageValidator{
self: self,
peerCache: peerCache,
chainApi: chainApi,
stateApi: stateApi,
}
}
func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// This chain-node should not be publishing its own messages. These are
// relayed from market-nodes. If a node appears to be local, reject it.
if pid == v.self {
log.Debug("ignoring indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
originPeer := msg.GetFrom()
if originPeer == v.self {
log.Debug("ignoring indexer message originating from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
2023-03-07 19:43:23 +00:00
idxrMsg := message.Message{}
err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data))
if err != nil {
log.Errorw("Could not decode indexer pubsub message", "err", err)
return pubsub.ValidationReject
}
if len(idxrMsg.ExtraData) == 0 {
2022-02-09 18:29:49 +00:00
log.Debugw("ignoring messsage missing miner id", "peer", originPeer)
return pubsub.ValidationIgnore
}
// Get miner info from lotus
minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData)
if err != nil {
log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData)
return pubsub.ValidationReject
}
msgCid := idxrMsg.Cid
2023-09-21 15:37:02 +00:00
msgInfo, cached := v.peerCache.Get(minerAddr)
if !cached {
msgInfo = &peerMsgInfo{}
}
// Lock this peer's message info.
msgInfo.mutex.Lock()
defer msgInfo.mutex.Unlock()
2023-09-21 15:37:02 +00:00
var seqno uint64
if cached {
2022-02-09 19:06:56 +00:00
// Reject replayed messages.
2023-09-21 15:37:02 +00:00
seqno = binary.BigEndian.Uint64(msg.Message.GetSeqno())
2022-02-10 00:21:05 +00:00
if seqno <= msgInfo.lastSeqno {
log.Debugf("ignoring replayed indexer message")
return pubsub.ValidationIgnore
2022-02-09 19:06:56 +00:00
}
}
2023-09-21 15:37:02 +00:00
if !cached || originPeer != msgInfo.peerID {
// Check that the miner ID maps to the peer that sent the message.
err = v.authenticateMessage(ctx, minerAddr, originPeer)
if err != nil {
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerAddr)
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationReject
}
msgInfo.peerID = originPeer
2023-09-21 15:37:02 +00:00
if !cached {
// Add msgInfo to cache only after being authenticated. If two
// messages from the same peer are handled concurrently, there is a
// small chance that one msgInfo could replace the other here when
// the info is first cached. This is OK, so no need to prevent it.
v.peerCache.Add(minerAddr, msgInfo)
}
}
2023-09-21 15:37:02 +00:00
// Update message info cache with the latest message's sequence number.
msgInfo.lastSeqno = seqno
// See if message needs to be ignored due to rate limiting.
if v.rateLimitPeer(msgInfo, msgCid) {
return pubsub.ValidationIgnore
}
stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool {
const (
msgLimit = 5
msgTimeLimit = 10 * time.Second
repeatTimeLimit = 2 * time.Hour
)
timeWindow := msgInfo.rateLimit
// Check overall message rate.
if timeWindow == nil {
timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit)
msgInfo.rateLimit = timeWindow
} else if msgInfo.lastCid == msgCid {
// Check if this is a repeat of the previous message data.
if time.Since(timeWindow.Newest()) < repeatTimeLimit {
log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID)
return true
}
}
err := timeWindow.Add()
if err != nil {
log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err)
return true
}
msgInfo.lastCid = msgCid
return false
}
func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error {
ts, err := v.chainApi.ChainHead(ctx)
if err != nil {
return err
}
minerInfo, err := v.stateApi.StateMinerInfo(ctx, minerAddress, ts.Key())
if err != nil {
return err
}
if minerInfo.PeerId == nil {
return xerrors.New("no peer id for miner")
}
if *minerInfo.PeerId != peerID {
return xerrors.New("miner id does not map to peer that sent message")
}
return nil
}