lotus/chain/sub/incoming.go

440 lines
12 KiB
Go
Raw Normal View History

2019-07-08 14:07:09 +00:00
package sub
import (
"bytes"
2019-07-08 14:07:09 +00:00
"context"
2020-05-12 19:26:25 +00:00
"fmt"
2020-05-12 18:13:30 +00:00
"sync"
2019-12-07 10:49:05 +00:00
"time"
"golang.org/x/xerrors"
address "github.com/filecoin-project/go-address"
miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
2020-02-17 05:51:18 +00:00
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
2020-05-12 19:26:25 +00:00
dstore "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
2019-07-08 14:07:09 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2020-05-12 19:26:25 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/bufbstore"
2020-05-12 19:26:25 +00:00
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
2019-07-08 14:07:09 +00:00
)
var log = logging.Logger("sub")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager) {
2019-07-08 14:07:09 +00:00
for {
msg, err := bsub.Next(ctx)
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err)
2019-07-08 14:07:09 +00:00
continue
}
2020-02-17 05:51:18 +00:00
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
2020-05-05 13:35:03 +00:00
return
}
src := msg.GetFrom()
2019-07-08 14:07:09 +00:00
go func() {
2020-07-10 14:43:14 +00:00
start := build.Clock.Now()
2019-08-27 18:45:21 +00:00
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
2019-07-08 14:07:09 +00:00
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
2019-07-08 14:07:09 +00:00
return
}
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
return
}
2020-07-10 14:43:14 +00:00
took := build.Clock.Since(start)
2019-12-11 20:41:24 +00:00
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
2020-07-10 14:43:14 +00:00
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
2019-12-07 10:49:05 +00:00
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
}
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) {
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
}
2019-07-08 14:07:09 +00:00
}()
}
}
2020-02-17 05:51:18 +00:00
type BlockValidator struct {
peers *lru.TwoQueueCache
killThresh int
recvBlocks *blockReceiptCache
blacklist func(peer.ID)
// necessary for block validation
chain *store.ChainStore
stmgr *stmgr.StateManager
2020-05-12 18:13:30 +00:00
mx sync.Mutex
keycache map[string]address.Address
2020-02-17 05:51:18 +00:00
}
func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
2020-02-17 05:51:18 +00:00
p, _ := lru.New2Q(4096)
return &BlockValidator{
peers: p,
killThresh: 10,
2020-02-17 05:51:18 +00:00
blacklist: blacklist,
recvBlocks: newBlockReceiptCache(),
chain: chain,
stmgr: stmgr,
2020-05-12 19:37:01 +00:00
keycache: make(map[string]address.Address),
2020-02-17 05:51:18 +00:00
}
}
func (bv *BlockValidator) flagPeer(p peer.ID) {
v, ok := bv.peers.Get(p)
if !ok {
bv.peers.Add(p, int(1))
return
}
val := v.(int)
if val >= bv.killThresh {
log.Warnf("blacklisting peer %s", p)
2020-02-17 05:51:18 +00:00
bv.blacklist(p)
return
2020-02-17 05:51:18 +00:00
}
bv.peers.Add(p, v.(int)+1)
}
2020-05-05 13:35:03 +00:00
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
2020-05-22 14:21:37 +00:00
// track validation time
2020-07-10 14:43:14 +00:00
begin := build.Clock.Now()
2020-05-22 14:21:37 +00:00
defer func() {
2020-07-10 14:43:14 +00:00
log.Debugf("block validation time: %s", build.Clock.Since(begin))
2020-05-22 14:21:37 +00:00
}()
stats.Record(ctx, metrics.BlockReceived.M(1))
2020-05-12 19:26:25 +00:00
recordFailure := func(what string) {
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid)
}
// make sure the block can be decoded
2020-02-17 05:51:18 +00:00
blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil {
log.Error("got invalid block over pubsub: ", err)
2020-05-12 19:26:25 +00:00
recordFailure("invalid")
2020-05-05 13:35:03 +00:00
return pubsub.ValidationReject
2020-02-17 05:51:18 +00:00
}
// check the message limit constraints
2020-02-17 05:51:18 +00:00
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
log.Warnf("received block with too many messages over pubsub")
2020-05-12 19:26:25 +00:00
recordFailure("too_many_messages")
return pubsub.ValidationReject
}
// make sure we have a signature
if blk.Header.BlockSig == nil {
log.Warnf("received block without a signature over pubsub")
recordFailure("missing_signature")
return pubsub.ValidationReject
}
// validate the block meta: the Message CID in the header must match the included messages
err = bv.validateMsgMeta(ctx, blk)
if err != nil {
log.Warnf("error validating message metadata: %s", err)
recordFailure("invalid_block_meta")
2020-05-05 13:35:03 +00:00
return pubsub.ValidationReject
2020-02-17 05:51:18 +00:00
}
// we want to ensure that it is a block from a known miner; we reject blocks from unknown miners
// to prevent spam attacks.
// the logic works as follows: we lookup the miner in the chain for its key.
// if we can find it then it's a known miner and we can validate the signature.
// if we can't find it, we check whether we are (near) synced in the chain.
// if we are not synced we cannot validate the block and we must ignore it.
2020-05-12 19:26:25 +00:00
// if we are synced and the miner is unknown, then the block is rejcected.
key, err := bv.checkPowerAndGetWorkerKey(ctx, blk.Header)
2020-05-12 19:26:25 +00:00
if err != nil {
if bv.isChainNearSynced() {
log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message")
2020-05-12 19:26:25 +00:00
recordFailure("unknown_miner")
return pubsub.ValidationReject
} else {
log.Warnf("cannot validate block message; unknown miner or miner that doesn't meet min power in unsynced chain")
2020-05-12 19:26:25 +00:00
return pubsub.ValidationIgnore
}
}
err = sigs.CheckBlockSignature(ctx, blk.Header, key)
2020-05-12 19:26:25 +00:00
if err != nil {
log.Errorf("block signature verification failed: %s", err)
recordFailure("signature_verification_failed")
return pubsub.ValidationReject
}
if blk.Header.ElectionProof.WinCount < 1 {
log.Errorf("block is not claiming to be winning")
recordFailure("not_winning")
return pubsub.ValidationReject
}
// it's a good block! make sure we've only seen it once
2020-02-17 05:51:18 +00:00
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
2020-05-05 13:35:03 +00:00
return pubsub.ValidationIgnore
2020-02-17 05:51:18 +00:00
}
// all good, accept the block
2020-02-17 05:51:18 +00:00
msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
2020-05-05 13:35:03 +00:00
return pubsub.ValidationAccept
2020-02-17 05:51:18 +00:00
}
2020-05-12 19:26:25 +00:00
func (bv *BlockValidator) isChainNearSynced() bool {
ts := bv.chain.GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
2020-07-10 14:43:14 +00:00
now := build.Clock.Now().UnixNano()
2020-05-12 19:26:25 +00:00
cutoff := uint64(now) - uint64(6*time.Hour)
return timestamp > cutoff
}
func (bv *BlockValidator) validateMsgMeta(ctx context.Context, msg *types.BlockMsg) error {
// TODO there has to be a simpler way to do this without the blockstore dance
store := adt.WrapStore(ctx, cbor.NewCborStore(bstore.NewBlockstore(dstore.NewMapDatastore())))
bmArr := adt.MakeEmptyArray(store)
smArr := adt.MakeEmptyArray(store)
for i, m := range msg.BlsMessages {
2020-05-12 19:26:25 +00:00
c := cbg.CborCid(m)
if err := bmArr.Set(uint64(i), &c); err != nil {
return err
}
2020-05-12 19:26:25 +00:00
}
for i, m := range msg.SecpkMessages {
2020-05-12 19:26:25 +00:00
c := cbg.CborCid(m)
if err := smArr.Set(uint64(i), &c); err != nil {
return err
}
2020-05-12 19:26:25 +00:00
}
bmroot, err := bmArr.Root()
2020-05-12 19:26:25 +00:00
if err != nil {
return err
}
smroot, err := smArr.Root()
2020-05-12 19:26:25 +00:00
if err != nil {
return err
}
mrcid, err := store.Put(store.Context(), &types.MsgMeta{
2020-05-12 19:26:25 +00:00
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return err
}
if msg.Header.Messages != mrcid {
return fmt.Errorf("messages didn't match root cid in header")
}
return nil
}
func (bv *BlockValidator) checkPowerAndGetWorkerKey(ctx context.Context, bh *types.BlockHeader) (address.Address, error) {
addr := bh.Miner
2020-05-12 18:13:30 +00:00
bv.mx.Lock()
key, ok := bv.keycache[addr.String()]
bv.mx.Unlock()
if !ok {
// TODO I have a feeling all this can be simplified by cleverer DI to use the API
ts := bv.chain.GetHeaviestTipSet()
st, _, err := bv.stmgr.TipSetState(ctx, ts)
if err != nil {
return address.Undef, err
}
buf := bufbstore.NewBufferedBstore(bv.chain.Blockstore())
cst := cbor.NewCborStore(buf)
state, err := state.LoadStateTree(cst, st)
if err != nil {
return address.Undef, err
}
act, err := state.GetActor(addr)
if err != nil {
return address.Undef, err
}
blk, err := bv.chain.Blockstore().Get(act.Head)
if err != nil {
return address.Undef, err
}
aso := blk.RawData()
var mst miner.State
err = mst.UnmarshalCBOR(bytes.NewReader(aso))
if err != nil {
return address.Undef, err
}
info, err := mst.GetInfo(adt.WrapStore(ctx, cst))
if err != nil {
return address.Undef, err
}
worker := info.Worker
key, err = bv.stmgr.ResolveToKeyAddress(ctx, worker, ts)
if err != nil {
return address.Undef, err
}
bv.mx.Lock()
bv.keycache[addr.String()] = key
bv.mx.Unlock()
}
// we check that the miner met the minimum power at the lookback tipset
baseTs, err := bv.stmgr.ChainStore().LoadTipSet(types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Warnf("failed to load parent tipset of incoming block")
return address.Undef, err
}
lbts, err := stmgr.GetLookbackTipSetForRound(ctx, bv.stmgr, baseTs, bh.Height)
2020-07-01 11:47:40 +00:00
if err != nil {
log.Warnf("failed to load lookback tipset for incoming block")
2020-07-01 11:47:40 +00:00
return address.Undef, err
}
hmp, err := stmgr.MinerHasMinPower(ctx, bv.stmgr, bh.Miner, lbts)
if err != nil {
log.Warnf("failed to determine if incoming block's miner has minimum power")
return address.Undef, err
}
if !hmp {
log.Warnf("incoming block's miner does not have minimum power")
return address.Undef, xerrors.New("incoming block's miner does not have minimum power")
}
2020-05-12 18:13:30 +00:00
return key, nil
}
2020-02-17 05:51:18 +00:00
type blockReceiptCache struct {
blocks *lru.TwoQueueCache
}
func newBlockReceiptCache() *blockReceiptCache {
c, _ := lru.New2Q(8192)
return &blockReceiptCache{
blocks: c,
}
}
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
val, ok := brc.blocks.Get(bcid)
if !ok {
brc.blocks.Add(bcid, int(1))
return 0
}
brc.blocks.Add(bcid, val.(int)+1)
return val.(int)
}
2020-02-28 01:39:07 +00:00
type MessageValidator struct {
mpool *messagepool.MessagePool
}
func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{mp}
}
2020-05-05 13:35:03 +00:00
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.MessageReceived.M(1))
2020-02-28 01:39:07 +00:00
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
2020-05-05 13:35:03 +00:00
return pubsub.ValidationReject
2020-02-28 01:39:07 +00:00
}
if err := mv.mpool.Add(m); err != nil {
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
ctx,
tag.Insert(metrics.FailureType, "add"),
)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
switch {
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
return pubsub.ValidationIgnore
default:
return pubsub.ValidationReject
2020-05-05 13:35:03 +00:00
}
2020-02-28 01:39:07 +00:00
}
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
2020-05-05 13:35:03 +00:00
return pubsub.ValidationAccept
2020-02-28 01:39:07 +00:00
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
2019-07-08 14:07:09 +00:00
for {
2020-02-28 01:39:07 +00:00
_, err := msub.Next(ctx)
2019-07-08 14:07:09 +00:00
if err != nil {
2019-09-17 14:23:08 +00:00
log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
2019-07-08 14:07:09 +00:00
continue
}
2020-02-28 01:39:07 +00:00
// Do nothing... everything happens in validate
2019-07-08 14:07:09 +00:00
}
}