Merge pull request #1708 from filecoin-project/feat/block-validator
Improved pubsub block validator
This commit is contained in:
commit
70657e35ba
@ -1,30 +1,45 @@
|
||||
package sub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
address "github.com/filecoin-project/go-address"
|
||||
amt "github.com/filecoin-project/go-amt-ipld/v2"
|
||||
miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/ipfs/go-cid"
|
||||
dstore "github.com/ipfs/go-datastore"
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||
"github.com/filecoin-project/lotus/lib/sigs"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
)
|
||||
|
||||
var log = logging.Logger("sub")
|
||||
|
||||
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager, bv *BlockValidator) {
|
||||
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager) {
|
||||
for {
|
||||
msg, err := bsub.Next(ctx)
|
||||
if err != nil {
|
||||
@ -52,15 +67,13 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
||||
log.Debug("about to fetch messages for block from pubsub")
|
||||
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; flagging source %s", err, src)
|
||||
bv.flagPeer(src)
|
||||
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
|
||||
return
|
||||
}
|
||||
|
||||
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; flagging source %s", err, src)
|
||||
bv.flagPeer(src)
|
||||
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
|
||||
return
|
||||
}
|
||||
|
||||
@ -89,15 +102,25 @@ type BlockValidator struct {
|
||||
recvBlocks *blockReceiptCache
|
||||
|
||||
blacklist func(peer.ID)
|
||||
|
||||
// necessary for block validation
|
||||
chain *store.ChainStore
|
||||
stmgr *stmgr.StateManager
|
||||
|
||||
mx sync.Mutex
|
||||
keycache map[string]address.Address
|
||||
}
|
||||
|
||||
func NewBlockValidator(blacklist func(peer.ID)) *BlockValidator {
|
||||
func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
||||
p, _ := lru.New2Q(4096)
|
||||
return &BlockValidator{
|
||||
peers: p,
|
||||
killThresh: 10,
|
||||
blacklist: blacklist,
|
||||
recvBlocks: newBlockReceiptCache(),
|
||||
chain: chain,
|
||||
stmgr: stmgr,
|
||||
keycache: make(map[string]address.Address),
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,34 +144,183 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
||||
|
||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||
stats.Record(ctx, metrics.BlockReceived.M(1))
|
||||
|
||||
recordFailure := func(what string) {
|
||||
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what))
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
bv.flagPeer(pid)
|
||||
}
|
||||
|
||||
// make sure the block can be decoded
|
||||
blk, err := types.DecodeBlockMsg(msg.GetData())
|
||||
if err != nil {
|
||||
log.Error("got invalid block over pubsub: ", err)
|
||||
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid"))
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
bv.flagPeer(pid)
|
||||
recordFailure("invalid")
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
// check the message limit constraints
|
||||
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
|
||||
log.Warnf("received block with too many messages over pubsub")
|
||||
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
bv.flagPeer(pid)
|
||||
recordFailure("too_many_messages")
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
// make sure we have a signature
|
||||
if blk.Header.BlockSig == nil {
|
||||
log.Warnf("received block without a signature over pubsub")
|
||||
recordFailure("missing_signature")
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
// validate the block meta: the Message CID in the header must match the included messages
|
||||
err = bv.validateMsgMeta(ctx, blk)
|
||||
if err != nil {
|
||||
log.Warnf("error validating message metadata: %s", err)
|
||||
recordFailure("invalid_block_meta")
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
// we want to ensure that it is a block from a known miner; we reject blocks from unknown miners
|
||||
// to prevent spam attacks.
|
||||
// the logic works as follows: we lookup the miner in the chain for its key.
|
||||
// if we can find it then it's a known miner and we can validate the signature.
|
||||
// if we can't find it, we check whether we are (near) synced in the chain.
|
||||
// if we are not synced we cannot validate the block and we must ignore it.
|
||||
// if we are synced and the miner is unknown, then the block is rejcected.
|
||||
key, err := bv.getMinerWorkerKey(ctx, blk)
|
||||
if err != nil {
|
||||
if bv.isChainNearSynced() {
|
||||
log.Warnf("received block message from unknown miner over pubsub; rejecting message")
|
||||
recordFailure("unknown_miner")
|
||||
return pubsub.ValidationReject
|
||||
} else {
|
||||
log.Warnf("cannot validate block message; unknown miner in unsynced chain")
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
}
|
||||
|
||||
err = sigs.CheckBlockSignature(blk.Header, ctx, key)
|
||||
if err != nil {
|
||||
log.Errorf("block signature verification failed: %s", err)
|
||||
recordFailure("signature_verification_failed")
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
// it's a good block! make sure we've only seen it once
|
||||
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
|
||||
// TODO: once these changes propagate to the network, we can consider
|
||||
// dropping peers who send us the same block multiple times
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
// all good, accept the block
|
||||
msg.ValidatorData = blk
|
||||
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
||||
func (bv *BlockValidator) isChainNearSynced() bool {
|
||||
ts := bv.chain.GetHeaviestTipSet()
|
||||
timestamp := ts.MinTimestamp()
|
||||
now := time.Now().UnixNano()
|
||||
cutoff := uint64(now) - uint64(6*time.Hour)
|
||||
return timestamp > cutoff
|
||||
}
|
||||
|
||||
func (bv *BlockValidator) validateMsgMeta(ctx context.Context, msg *types.BlockMsg) error {
|
||||
var bcids, scids []cbg.CBORMarshaler
|
||||
for _, m := range msg.BlsMessages {
|
||||
c := cbg.CborCid(m)
|
||||
bcids = append(bcids, &c)
|
||||
}
|
||||
|
||||
for _, m := range msg.SecpkMessages {
|
||||
c := cbg.CborCid(m)
|
||||
scids = append(scids, &c)
|
||||
}
|
||||
|
||||
// TODO there has to be a simpler way to do this without the blockstore dance
|
||||
bs := cbor.NewCborStore(bstore.NewBlockstore(dstore.NewMapDatastore()))
|
||||
|
||||
bmroot, err := amt.FromArray(ctx, bs, bcids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
smroot, err := amt.FromArray(ctx, bs, scids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mrcid, err := bs.Put(ctx, &types.MsgMeta{
|
||||
BlsMessages: bmroot,
|
||||
SecpkMessages: smroot,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if msg.Header.Messages != mrcid {
|
||||
return fmt.Errorf("messages didn't match root cid in header")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bv *BlockValidator) getMinerWorkerKey(ctx context.Context, msg *types.BlockMsg) (address.Address, error) {
|
||||
addr := msg.Header.Miner
|
||||
|
||||
bv.mx.Lock()
|
||||
key, ok := bv.keycache[addr.String()]
|
||||
bv.mx.Unlock()
|
||||
if ok {
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// TODO I have a feeling all this can be simplified by cleverer DI to use the API
|
||||
ts := bv.chain.GetHeaviestTipSet()
|
||||
st, _, err := bv.stmgr.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
buf := bufbstore.NewBufferedBstore(bv.chain.Blockstore())
|
||||
cst := cbor.NewCborStore(buf)
|
||||
state, err := state.LoadStateTree(cst, st)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
act, err := state.GetActor(addr)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
|
||||
blk, err := bv.chain.Blockstore().Get(act.Head)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
aso := blk.RawData()
|
||||
|
||||
var mst miner.State
|
||||
err = mst.UnmarshalCBOR(bytes.NewReader(aso))
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
|
||||
worker := mst.Info.Worker
|
||||
key, err = bv.stmgr.ResolveToKeyAddress(ctx, worker, ts)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
|
||||
bv.mx.Lock()
|
||||
bv.keycache[addr.String()] = key
|
||||
bv.mx.Unlock()
|
||||
|
||||
return key, nil
|
||||
}
|
||||
|
||||
type blockReceiptCache struct {
|
||||
blocks *lru.TwoQueueCache
|
||||
}
|
||||
@ -197,10 +369,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
tag.Insert(metrics.FailureType, "add"),
|
||||
)
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
if xerrors.Is(err, messagepool.ErrBroadcastAnyway) {
|
||||
return pubsub.ValidationAccept
|
||||
switch {
|
||||
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
|
||||
return pubsub.ValidationIgnore
|
||||
default:
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
|
@ -69,6 +69,9 @@ type BlockHeader struct {
|
||||
BlockSig *crypto.Signature // 13
|
||||
|
||||
ForkSignaling uint64 // 14
|
||||
|
||||
// internal
|
||||
validated bool // true if the signature has been validated
|
||||
}
|
||||
|
||||
func (b *BlockHeader) ToStorageBlock() (block.Block, error) {
|
||||
@ -124,6 +127,14 @@ func (blk *BlockHeader) SigningBytes() ([]byte, error) {
|
||||
return blkcopy.Serialize()
|
||||
}
|
||||
|
||||
func (blk *BlockHeader) SetValidated() {
|
||||
blk.validated = true
|
||||
}
|
||||
|
||||
func (blk *BlockHeader) IsValidated() bool {
|
||||
return blk.validated
|
||||
}
|
||||
|
||||
type MsgMeta struct {
|
||||
BlsMessages cid.Cid
|
||||
SecpkMessages cid.Cid
|
||||
|
2
go.mod
2
go.mod
@ -78,7 +78,7 @@ require (
|
||||
github.com/libp2p/go-libp2p-mplex v0.2.3
|
||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||
github.com/libp2p/go-libp2p-peerstore v0.2.3
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200505181014-5bbe37191afb
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200513065812-9de0241df138
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1
|
||||
github.com/libp2p/go-libp2p-record v0.1.2
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.2.1
|
||||
|
4
go.sum
4
go.sum
@ -691,8 +691,8 @@ github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlc
|
||||
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.6/go.mod h1:5jEp7R3ItQ0pgcEMrPZYE9DQTg/H3CTc7Mu1j2G4Y5o=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200505181014-5bbe37191afb h1:kVOUUt/ipiYgqB7t8qsUfs36o5ySZbShFL2BXICuRdU=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200505181014-5bbe37191afb/go.mod h1:tFvkRgsW96JilTvYwe1X/lYqpruTXBqEatNXq3/MqBw=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200513065812-9de0241df138 h1:6ZjQGfnsky0lcXE7uLVOX8/zMYYfEjfe/3+BKTcFjf0=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200513065812-9de0241df138/go.mod h1:tFvkRgsW96JilTvYwe1X/lYqpruTXBqEatNXq3/MqBw=
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
|
||||
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=
|
||||
|
@ -72,6 +72,10 @@ func CheckBlockSignature(blk *types.BlockHeader, ctx context.Context, worker add
|
||||
_, span := trace.StartSpan(ctx, "checkBlockSignature")
|
||||
defer span.End()
|
||||
|
||||
if blk.IsValidated() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if blk.BlockSig == nil {
|
||||
return xerrors.New("block signature not present")
|
||||
}
|
||||
@ -81,8 +85,12 @@ func CheckBlockSignature(blk *types.BlockHeader, ctx context.Context, worker add
|
||||
return xerrors.Errorf("failed to get block signing bytes: %w", err)
|
||||
}
|
||||
|
||||
_ = sigb
|
||||
return Verify(blk.BlockSig, worker, sigb)
|
||||
err = Verify(blk.BlockSig, worker, sigb)
|
||||
if err == nil {
|
||||
blk.SetValidated()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// SigShim is used for introducing signature functions
|
||||
|
@ -62,6 +62,10 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
|
||||
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
|
||||
// IPColocationFactorWhitelist: map[string]struct{}{},
|
||||
|
||||
// P7: behavioural penalties, decay after 1hr
|
||||
BehaviourPenaltyWeight: -10,
|
||||
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
|
||||
|
||||
DecayInterval: pubsub.DefaultDecayInterval,
|
||||
DecayToZero: pubsub.DefaultDecayToZero,
|
||||
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/sub"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
@ -58,7 +59,7 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
|
||||
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
||||
}
|
||||
|
||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, h host.Host, nn dtypes.NetworkName) {
|
||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn))
|
||||
@ -66,16 +67,18 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
||||
panic(err)
|
||||
}
|
||||
|
||||
v := sub.NewBlockValidator(func(p peer.ID) {
|
||||
ps.BlacklistPeer(p)
|
||||
h.ConnManager().TagPeer(p, "badblock", -1000)
|
||||
})
|
||||
v := sub.NewBlockValidator(
|
||||
chain, stmgr,
|
||||
func(p peer.ID) {
|
||||
ps.BlacklistPeer(p)
|
||||
h.ConnManager().TagPeer(p, "badblock", -1000)
|
||||
})
|
||||
|
||||
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager(), v)
|
||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager())
|
||||
}
|
||||
|
||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
|
||||
|
Loading…
Reference in New Issue
Block a user