gammazero c084130d3e Lotus chain nodes relay indexer pubsub messages
Content providers announce the availability of indexer data using gossip pubsub.  The content providers are not connected directly to indexers, so the pubsub messages are relayed to indexers via chain nodes. This PR makes chain nodes relay gossip pubsub messages, on the /indexer/ingest/<netname> topic.
2022-02-03 14:56:21 -08:00

468 lines
12 KiB

package sub
import (
address "github.com/filecoin-project/go-address"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
bserv "github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
pubsub "github.com/libp2p/go-libp2p-pubsub"
var log = logging.Logger("sub")
var msgCidPrefix = cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: client.DefaultHashFunction,
MhLength: 32,
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
for {
msg, err := bsub.Next(ctx)
if err != nil {
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
log.Error("error from block subscription: ", err)
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
src := msg.GetFrom()
go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv.NewSession(ctx, bs)
start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubsub: %s; source: %s", err, src)
smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubsub: %s; source: %s", err, src)
took := build.Clock.Since(start)
log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if took > 3*time.Second {
log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) {
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
func FetchMessagesByCids(
ctx context.Context,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
out[i] = msg
return nil
if err != nil {
return nil, err
return out, nil
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids(
ctx context.Context,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
out[i] = smsg
return nil
if err != nil {
return nil, err
return out, nil
// Fetch `cids` from the block service, apply `cb` on each of them. Used
// by the fetch message functions above.
// We check that each block is received only once and we do not received
// blocks we did not request.
func fetchCids(
ctx context.Context,
bserv bserv.BlockGetter,
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
if c.Prefix() != msgCidPrefix {
return fmt.Errorf("invalid msg CID: %s", c)
cidIndex[c] = i
if len(cids) != len(cidIndex) {
return fmt.Errorf("duplicate CIDs in fetchCids input")
for block := range bserv.GetBlocks(ctx, cids) {
ix, ok := cidIndex[block.Cid()]
if !ok {
// Ignore duplicate/unexpected blocks. This shouldn't
// happen, but we can be safe.
log.Errorw("received duplicate/unexpected block when syncing", "cid", block.Cid())
// Record that we've received the block.
delete(cidIndex, block.Cid())
if err := cb(ix, block); err != nil {
return err
if len(cidIndex) > 0 {
err := ctx.Err()
if err == nil {
err = fmt.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex))
return err
return nil
type BlockValidator struct {
self peer.ID
peers *lru.TwoQueueCache
killThresh int
recvBlocks *blockReceiptCache
blacklist func(peer.ID)
// necessary for block validation
chain *store.ChainStore
consensus consensus.Consensus
func NewBlockValidator(self peer.ID, chain *store.ChainStore, cns consensus.Consensus, blacklist func(peer.ID)) *BlockValidator {
p, _ := lru.New2Q(4096)
return &BlockValidator{
self: self,
peers: p,
killThresh: 10,
blacklist: blacklist,
recvBlocks: newBlockReceiptCache(),
chain: chain,
consensus: cns,
func (bv *BlockValidator) flagPeer(p peer.ID) {
v, ok := bv.peers.Get(p)
if !ok {
bv.peers.Add(p, int(1))
val := v.(int)
if val >= bv.killThresh {
log.Warnf("blacklisting peer %s", p)
bv.peers.Add(p, v.(int)+1)
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer func() {
if rerr := recover(); rerr != nil {
err := xerrors.Errorf("validate block: %s", rerr)
recordFailure(ctx, metrics.BlockValidationFailure, err.Error())
res = pubsub.ValidationReject
var what string
res, what = bv.consensus.ValidateBlockPubsub(ctx, pid == bv.self, msg)
if res == pubsub.ValidationAccept {
// it's a good block! make sure we've only seen it once
if count := bv.recvBlocks.add(msg.ValidatorData.(*types.BlockMsg).Cid()); count > 0 {
if pid == bv.self {
log.Warnf("local block has been seen %d times; ignoring", count)
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
return pubsub.ValidationIgnore
} else {
recordFailure(ctx, metrics.BlockValidationFailure, what)
return res
type blockReceiptCache struct {
blocks *lru.TwoQueueCache
func newBlockReceiptCache() *blockReceiptCache {
c, _ := lru.New2Q(8192)
return &blockReceiptCache{
blocks: c,
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
val, ok := brc.blocks.Get(bcid)
if !ok {
brc.blocks.Add(bcid, int(1))
return 0
brc.blocks.Add(bcid, val.(int)+1)
return val.(int)
type MessageValidator struct {
self peer.ID
mpool *messagepool.MessagePool
func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{self: self, mpool: mp}
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
if pid == mv.self {
return mv.validateLocalMessage(ctx, msg)
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationReject
if err := mv.mpool.Add(ctx, m); err != nil {
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
tag.Upsert(metrics.Local, "false"),
recordFailure(ctx, metrics.MessageValidationFailure, "add")
switch {
case xerrors.Is(err, messagepool.ErrSoftValidationFailure):
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
case xerrors.Is(err, messagepool.ErrNonceGap):
case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore
return pubsub.ValidationReject
ctx, _ = tag.New(
tag.Upsert(metrics.MsgValid, "true"),
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
ctx, _ = tag.New(
tag.Upsert(metrics.Local, "true"),
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
// do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode local message: %s", err)
recordFailure(ctx, metrics.MessageValidationFailure, "decode")
return pubsub.ValidationIgnore
if m.Size() > messagepool.MaxMessageSize {
log.Warnf("local message is too large! (%dB)", m.Size())
recordFailure(ctx, metrics.MessageValidationFailure, "oversize")
return pubsub.ValidationIgnore
if m.Message.To == address.Undef {
log.Warn("local message has invalid destination address")
recordFailure(ctx, metrics.MessageValidationFailure, "undef-addr")
return pubsub.ValidationIgnore
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
log.Warnf("local messages has too high value: %s", m.Message.Value)
recordFailure(ctx, metrics.MessageValidationFailure, "value-too-high")
return pubsub.ValidationIgnore
if err := mv.mpool.VerifyMsgSig(m); err != nil {
log.Warnf("signature verification failed for local message: %s", err)
recordFailure(ctx, metrics.MessageValidationFailure, "verify-sig")
return pubsub.ValidationIgnore
ctx, _ = tag.New(
tag.Upsert(metrics.MsgValid, "true"),
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
for {
_, err := msub.Next(ctx)
if err != nil {
log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
// Do nothing... everything happens in validate
func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType string) {
ctx, _ = tag.New(
tag.Upsert(metrics.FailureType, failureType),
stats.Record(ctx, metric.M(1))
type IndexerMessageValidator struct {
self peer.ID
func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator {
return &IndexerMessageValidator{self: self}
func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// This chain-node should not be publishing its own messages. These are
// relayed from miner-nodes or index publishers. If a node appears to be
// local, reject it.
if pid == v.self {
log.Warnf("refusing to relay indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationReject
stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1))
return pubsub.ValidationAccept