lotus/chain/sub/incoming.go
Steven Allen 5733c71c50 Lint everything
We were ignoring quite a few error cases, and had one case where we weren't
actually updating state where we wanted to. Unfortunately, if the linter doesn't
pass, nobody has any reason to actually check lint failures in CI.

There are three remaining XXXs marked in the code for lint.
2020-08-20 20:46:36 -07:00

616 lines
17 KiB
Go

package sub
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"golang.org/x/xerrors"
address "github.com/filecoin-project/go-address"
miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
)
var log = logging.Logger("sub")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) {
for {
msg, err := bsub.Next(ctx)
if err != nil {
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err)
continue
}
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
return
}
src := msg.GetFrom()
go func() {
start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
return
}
smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
return
}
took := build.Clock.Since(start)
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
}
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) {
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
}
}()
}
}
func FetchMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
cids []cid.Cid,
) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
}
// FIXME: We already sort in `fetchCids`, we are duplicating too much work,
// we don't need to pass the index.
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = msg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
cids []cid.Cid,
) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = smsg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
// Fetch `cids` from the block service, apply `cb` on each of them. Used
// by the fetch message functions above.
// We check that each block is received only once and we do not received
// blocks we did not request.
func fetchCids(
ctx context.Context,
bserv bserv.BlockService,
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
// FIXME: Why don't we use the context here?
fetchedBlocks := bserv.GetBlocks(context.TODO(), cids)
cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
cidIndex[c] = i
}
for i := 0; i < len(cids); i++ {
select {
case block, ok := <-fetchedBlocks:
if !ok {
// Closed channel, no more blocks fetched, check if we have all
// of the CIDs requested.
// FIXME: Review this check. We don't call the callback on the
// last index?
if i == len(cids)-1 {
break
}
return fmt.Errorf("failed to fetch all messages")
}
ix, ok := cidIndex[block.Cid()]
if !ok {
return fmt.Errorf("received message we didnt ask for")
}
if err := cb(ix, block); err != nil {
return err
}
}
}
return nil
}
type BlockValidator struct {
self peer.ID
peers *lru.TwoQueueCache
killThresh int
recvBlocks *blockReceiptCache
blacklist func(peer.ID)
// necessary for block validation
chain *store.ChainStore
stmgr *stmgr.StateManager
mx sync.Mutex
keycache map[string]address.Address
}
func NewBlockValidator(self peer.ID, chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
p, _ := lru.New2Q(4096)
return &BlockValidator{
self: self,
peers: p,
killThresh: 10,
blacklist: blacklist,
recvBlocks: newBlockReceiptCache(),
chain: chain,
stmgr: stmgr,
keycache: make(map[string]address.Address),
}
}
func (bv *BlockValidator) flagPeer(p peer.ID) {
v, ok := bv.peers.Get(p)
if !ok {
bv.peers.Add(p, int(1))
return
}
val := v.(int)
if val >= bv.killThresh {
log.Warnf("blacklisting peer %s", p)
bv.blacklist(p)
return
}
bv.peers.Add(p, v.(int)+1)
}
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
if pid == bv.self {
return bv.validateLocalBlock(ctx, msg)
}
// track validation time
begin := build.Clock.Now()
defer func() {
log.Debugf("block validation time: %s", build.Clock.Since(begin))
}()
stats.Record(ctx, metrics.BlockReceived.M(1))
recordFailure := func(what string) {
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid)
}
blk, what, err := bv.decodeAndCheckBlock(msg)
if err != nil {
log.Error("got invalid block over pubsub: ", err)
recordFailure(what)
return pubsub.ValidationReject
}
// validate the block meta: the Message CID in the header must match the included messages
err = bv.validateMsgMeta(ctx, blk)
if err != nil {
log.Warnf("error validating message metadata: %s", err)
recordFailure("invalid_block_meta")
return pubsub.ValidationReject
}
// we want to ensure that it is a block from a known miner; we reject blocks from unknown miners
// to prevent spam attacks.
// the logic works as follows: we lookup the miner in the chain for its key.
// if we can find it then it's a known miner and we can validate the signature.
// if we can't find it, we check whether we are (near) synced in the chain.
// if we are not synced we cannot validate the block and we must ignore it.
// if we are synced and the miner is unknown, then the block is rejcected.
key, err := bv.checkPowerAndGetWorkerKey(ctx, blk.Header)
if err != nil {
if bv.isChainNearSynced() {
log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message")
recordFailure("unknown_miner")
return pubsub.ValidationReject
}
log.Warnf("cannot validate block message; unknown miner or miner that doesn't meet min power in unsynced chain")
return pubsub.ValidationIgnore
}
err = sigs.CheckBlockSignature(ctx, blk.Header, key)
if err != nil {
log.Errorf("block signature verification failed: %s", err)
recordFailure("signature_verification_failed")
return pubsub.ValidationReject
}
if blk.Header.ElectionProof.WinCount < 1 {
log.Errorf("block is not claiming to be winning")
recordFailure("not_winning")
return pubsub.ValidationReject
}
// it's a good block! make sure we've only seen it once
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
return pubsub.ValidationIgnore
}
// all good, accept the block
msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
return pubsub.ValidationAccept
}
func (bv *BlockValidator) validateLocalBlock(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.BlockPublished.M(1))
blk, what, err := bv.decodeAndCheckBlock(msg)
if err != nil {
log.Errorf("got invalid local block: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
return pubsub.ValidationIgnore
}
if count := bv.recvBlocks.add(blk.Header.Cid()); count > 0 {
log.Warnf("local block has been seen %d times; ignoring", count)
return pubsub.ValidationIgnore
}
msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
return pubsub.ValidationAccept
}
func (bv *BlockValidator) decodeAndCheckBlock(msg *pubsub.Message) (*types.BlockMsg, string, error) {
blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil {
return nil, "invalid", xerrors.Errorf("error decoding block: %w", err)
}
if count := len(blk.BlsMessages) + len(blk.SecpkMessages); count > build.BlockMessageLimit {
return nil, "too_many_messages", fmt.Errorf("block contains too many messages (%d)", count)
}
// make sure we have a signature
if blk.Header.BlockSig == nil {
return nil, "missing_signature", fmt.Errorf("block without a signature")
}
return blk, "", nil
}
func (bv *BlockValidator) isChainNearSynced() bool {
ts := bv.chain.GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
now := build.Clock.Now().UnixNano()
cutoff := uint64(now) - uint64(6*time.Hour)
return timestamp > cutoff
}
func (bv *BlockValidator) validateMsgMeta(ctx context.Context, msg *types.BlockMsg) error {
// TODO there has to be a simpler way to do this without the blockstore dance
store := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewTemporary()))
bmArr := adt.MakeEmptyArray(store)
smArr := adt.MakeEmptyArray(store)
for i, m := range msg.BlsMessages {
c := cbg.CborCid(m)
if err := bmArr.Set(uint64(i), &c); err != nil {
return err
}
}
for i, m := range msg.SecpkMessages {
c := cbg.CborCid(m)
if err := smArr.Set(uint64(i), &c); err != nil {
return err
}
}
bmroot, err := bmArr.Root()
if err != nil {
return err
}
smroot, err := smArr.Root()
if err != nil {
return err
}
mrcid, err := store.Put(store.Context(), &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return err
}
if msg.Header.Messages != mrcid {
return fmt.Errorf("messages didn't match root cid in header")
}
return nil
}
func (bv *BlockValidator) checkPowerAndGetWorkerKey(ctx context.Context, bh *types.BlockHeader) (address.Address, error) {
addr := bh.Miner
bv.mx.Lock()
key, ok := bv.keycache[addr.String()]
bv.mx.Unlock()
if !ok {
// TODO I have a feeling all this can be simplified by cleverer DI to use the API
ts := bv.chain.GetHeaviestTipSet()
st, _, err := bv.stmgr.TipSetState(ctx, ts)
if err != nil {
return address.Undef, err
}
buf := bufbstore.NewBufferedBstore(bv.chain.Blockstore())
cst := cbor.NewCborStore(buf)
state, err := state.LoadStateTree(cst, st)
if err != nil {
return address.Undef, err
}
act, err := state.GetActor(addr)
if err != nil {
return address.Undef, err
}
blk, err := bv.chain.Blockstore().Get(act.Head)
if err != nil {
return address.Undef, err
}
aso := blk.RawData()
var mst miner.State
err = mst.UnmarshalCBOR(bytes.NewReader(aso))
if err != nil {
return address.Undef, err
}
info, err := mst.GetInfo(adt.WrapStore(ctx, cst))
if err != nil {
return address.Undef, err
}
worker := info.Worker
key, err = bv.stmgr.ResolveToKeyAddress(ctx, worker, ts)
if err != nil {
return address.Undef, err
}
bv.mx.Lock()
bv.keycache[addr.String()] = key
bv.mx.Unlock()
}
// we check that the miner met the minimum power at the lookback tipset
baseTs := bv.chain.GetHeaviestTipSet()
lbts, err := stmgr.GetLookbackTipSetForRound(ctx, bv.stmgr, baseTs, bh.Height)
if err != nil {
log.Warnf("failed to load lookback tipset for incoming block")
return address.Undef, err
}
hmp, err := stmgr.MinerHasMinPower(ctx, bv.stmgr, bh.Miner, lbts)
if err != nil {
log.Warnf("failed to determine if incoming block's miner has minimum power")
return address.Undef, err
}
if !hmp {
log.Warnf("incoming block's miner does not have minimum power")
return address.Undef, xerrors.New("incoming block's miner does not have minimum power")
}
return key, nil
}
type blockReceiptCache struct {
blocks *lru.TwoQueueCache
}
func newBlockReceiptCache() *blockReceiptCache {
c, _ := lru.New2Q(8192)
return &blockReceiptCache{
blocks: c,
}
}
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
val, ok := brc.blocks.Get(bcid)
if !ok {
brc.blocks.Add(bcid, int(1))
return 0
}
brc.blocks.Add(bcid, val.(int)+1)
return val.(int)
}
type MessageValidator struct {
self peer.ID
mpool *messagepool.MessagePool
}
func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{self: self, mpool: mp}
}
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
if pid == mv.self {
return mv.validateLocalMessage(ctx, msg)
}
stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationReject
}
if err := mv.mpool.Add(m); err != nil {
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
ctx,
tag.Insert(metrics.FailureType, "add"),
)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
switch {
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
fallthrough
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore
default:
return pubsub.ValidationReject
}
}
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
// do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode local message: %s", err)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
if m.Size() > 32*1024 {
log.Warnf("local message is too large! (%dB)", m.Size())
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
if m.Message.To == address.Undef {
log.Warn("local message has invalid destination address")
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
log.Warnf("local messages has too high value: %s", m.Message.Value)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
if err := mv.mpool.VerifyMsgSig(m); err != nil {
log.Warnf("signature verification failed for local message: %s", err)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
for {
_, err := msub.Next(ctx)
if err != nil {
log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
continue
}
// Do nothing... everything happens in validate
}
}