Merge pull request #1661 from filecoin-project/feat/better-pubsub-integration

Improved pubsub integration
This commit is contained in:
Whyrusleeping 2020-05-05 10:41:51 -07:00 committed by GitHub
commit 1cd932fedd
7 changed files with 146 additions and 57 deletions

View File

@ -10,7 +10,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
@ -24,7 +24,7 @@ import (
var log = logging.Logger("sub")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager) {
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager, bv *BlockValidator) {
for {
msg, err := bsub.Next(ctx)
if err != nil {
@ -39,8 +39,12 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
return
}
//nolint:golint
src := peer.ID(msg.GetFrom())
go func() {
log.Infof("New block over pubsub: %s", blk.Cid())
@ -48,13 +52,15 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err)
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; flagging source %s", err, src)
bv.flagPeer(src)
return
}
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err)
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; flagging source %s", err, src)
bv.flagPeer(src)
return
}
@ -89,7 +95,7 @@ func NewBlockValidator(blacklist func(peer.ID)) *BlockValidator {
p, _ := lru.New2Q(4096)
return &BlockValidator{
peers: p,
killThresh: 5,
killThresh: 10,
blacklist: blacklist,
recvBlocks: newBlockReceiptCache(),
}
@ -105,13 +111,15 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
val := v.(int)
if val >= bv.killThresh {
log.Warnf("blacklisting peer %s", p)
bv.blacklist(p)
return
}
bv.peers.Add(p, v.(int)+1)
}
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.BlockReceived.M(1))
blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil {
@ -119,7 +127,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid)
return false
return pubsub.ValidationReject
}
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
@ -127,18 +135,18 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid)
return false
return pubsub.ValidationReject
}
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
return false
return pubsub.ValidationIgnore
}
msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
return true
return pubsub.ValidationAccept
}
type blockReceiptCache struct {
@ -172,14 +180,14 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{mp}
}
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return false
return pubsub.ValidationReject
}
if err := mv.mpool.Add(m); err != nil {
@ -189,10 +197,13 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
tag.Insert(metrics.FailureType, "add"),
)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return xerrors.Is(err, messagepool.ErrBroadcastAnyway)
if xerrors.Is(err, messagepool.ErrBroadcastAnyway) {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
}
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return true
return pubsub.ValidationAccept
}
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {

6
go.mod
View File

@ -72,14 +72,14 @@ require (
github.com/libp2p/go-libp2p v0.8.1
github.com/libp2p/go-libp2p-circuit v0.2.1
github.com/libp2p/go-libp2p-connmgr v0.1.1
github.com/libp2p/go-libp2p-core v0.5.2
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p-discovery v0.4.0
github.com/libp2p/go-libp2p-kad-dht v0.7.6
github.com/libp2p/go-libp2p-mplex v0.2.3
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-protocol v0.1.0
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200429194044-27b987071d1f
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200504160640-ed0d01f92b61
github.com/libp2p/go-libp2p-quic-transport v0.1.1
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.1
@ -93,7 +93,7 @@ require (
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.4
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/multiformats/go-multihash v0.0.13
github.com/opentracing/opentracing-go v1.1.0
github.com/stretchr/testify v1.5.1

8
go.sum
View File

@ -592,6 +592,8 @@ github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZas
github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.2 h1:hevsCcdLiazurKBoeNn64aPYTVOPdY4phaEGeLtHOAs=
github.com/libp2p/go-libp2p-core v0.5.2/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.3 h1:b9W3w7AZR2n/YJhG8d0qPFGhGhCWKIvPuJgp4hhc4MM=
github.com/libp2p/go-libp2p-core v0.5.3/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-crypto v0.0.2/go.mod h1:eETI5OUfBnvARGOHrJz2eWNyTUxEGZnBxMcbUjfIj4I=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
@ -662,6 +664,10 @@ github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uz
github.com/libp2p/go-libp2p-pubsub v0.2.6/go.mod h1:5jEp7R3ItQ0pgcEMrPZYE9DQTg/H3CTc7Mu1j2G4Y5o=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200429194044-27b987071d1f h1:eEc7A8rD/hvQJ1/RPIbq8QYDWQyK351Ukqx68z8hQ5s=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200429194044-27b987071d1f/go.mod h1:HJFl7iqiGlTpRkYwA/zAlRJqj0xa+Jue0OLsYCz6bCQ=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200504064220-8c96dc4bdbe9 h1:BwjFOl2n5nnVPnAvX5vvexyrw4kj5FGVC1viJ4S9f3Q=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200504064220-8c96dc4bdbe9/go.mod h1:tFvkRgsW96JilTvYwe1X/lYqpruTXBqEatNXq3/MqBw=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200504160640-ed0d01f92b61 h1:PC9c2pLPRAcKtIV0ClhOk0hKYUrYGL1ZuG5+D9XT9Dc=
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200504160640-ed0d01f92b61/go.mod h1:tFvkRgsW96JilTvYwe1X/lYqpruTXBqEatNXq3/MqBw=
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=
@ -849,6 +855,8 @@ github.com/multiformats/go-multiaddr-net v0.1.2/go.mod h1:QsWt3XK/3hwvNxZJp92iMQ
github.com/multiformats/go-multiaddr-net v0.1.3/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA=
github.com/multiformats/go-multiaddr-net v0.1.4 h1:g6gwydsfADqFvrHoMkS0n9Ok9CG6F7ytOH/bJDkhIOY=
github.com/multiformats/go-multiaddr-net v0.1.4/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA=
github.com/multiformats/go-multiaddr-net v0.1.5 h1:QoRKvu0xHN1FCFJcMQLbG/yQE2z441L5urvG3+qyz7g=
github.com/multiformats/go-multiaddr-net v0.1.5/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA=
github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=

View File

@ -178,7 +178,7 @@ func libp2p() Option {
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)),
Override(AutoNATSvcKey, lp2p.AutoNATService),
Override(new(*pubsub.PubSub), lp2p.GossipSub()),
Override(new(*pubsub.PubSub), lp2p.GossipSub(&config.Pubsub{})),
Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys),
Override(StartListeningKey, lp2p.StartListening(config.DefaultFullNode().Libp2p.ListenAddresses)),
@ -354,6 +354,7 @@ func ConfigCommon(cfg *config.Common) Option {
cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)),
Override(new(*pubsub.PubSub), lp2p.GossipSub(&cfg.Pubsub)),
ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 },
Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)),
@ -377,9 +378,6 @@ func ConfigFullNode(c interface{}) Option {
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),
If(cfg.Metrics.PubsubTracing,
Override(new(*pubsub.PubSub), lp2p.GossipSub(lp2p.PubsubTracer())),
),
)
}

View File

@ -11,6 +11,7 @@ import (
type Common struct {
API API
Libp2p Libp2p
Pubsub Pubsub
}
// FullNode is a full node config
@ -47,12 +48,17 @@ type Libp2p struct {
ConnMgrGrace Duration
}
type Pubsub struct {
Bootstrapper bool
DirectPeers []string
RemoteTracer string
}
// // Full Node
type Metrics struct {
Nickname string
HeadNotifs bool
PubsubTracing bool
Nickname string
HeadNotifs bool
}
type Client struct {
@ -75,6 +81,11 @@ func defCommon() Common {
ConnMgrHigh: 180,
ConnMgrGrace: Duration(20 * time.Second),
},
Pubsub: Pubsub{
Bootstrapper: false,
DirectPeers: nil,
RemoteTracer: "/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX",
},
}
}

View File

@ -7,10 +7,13 @@ import (
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
@ -22,35 +25,18 @@ func init() {
pubsub.GossipSubDlo = 6
pubsub.GossipSubDhi = 12
pubsub.GossipSubDlazy = 12
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
}
type PubsubOpt func(host.Host) pubsub.Option
func PubsubTracer() PubsubOpt {
return func(host host.Host) pubsub.Option {
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX"))
if err != nil {
panic(err)
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
if err != nil {
panic(err)
}
return pubsub.WithEventTracer(tr)
}
}
func GossipSub(pubsubOptions ...PubsubOpt) interface{} {
func GossipSub(cfg *config.Pubsub) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp {
bootstrappers[pi.ID] = struct{}{}
}
_, isBootstrapNode := bootstrappers[host.ID()]
isBootstrapNode := cfg.Bootstrapper
v11Options := []pubsub.Option{
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(
@ -161,23 +147,98 @@ func GossipSub(pubsubOptions ...PubsubOpt) interface{} {
// enable Peer eXchange on bootstrappers
if isBootstrapNode {
v11Options = append(v11Options, pubsub.WithPeerExchange(true))
// turn off the mesh in bootstrappers -- only do gossip and PX
pubsub.GossipSubD = 0
pubsub.GossipSubDscore = 0
pubsub.GossipSubDlo = 0
pubsub.GossipSubDhi = 0
pubsub.GossipSubDlazy = 1024
pubsub.GossipSubGossipFactor = 0.5
// turn on PX
options = append(options, pubsub.WithPeerExchange(true))
}
// direct peers
if cfg.DirectPeers != nil {
var directPeerInfo []peer.AddrInfo
for _, addr := range cfg.DirectPeers {
a, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
directPeerInfo = append(directPeerInfo, *pi)
}
options = append(options, pubsub.WithDirectPeers(directPeerInfo))
}
// tracer
if cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(cfg.RemoteTracer)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
if err != nil {
return nil, err
}
trw := newTracerWrapper(tr)
options = append(options, pubsub.WithEventTracer(trw))
}
// TODO: we want to hook the peer score inspector so that we can gain visibility
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// v11Options = append(v11Options, pubsub.WithPeerScoreInspect(XXX, time.Second))
options := append(v11Options, paresOpts(host, pubsubOptions)...)
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...)
}
}
func paresOpts(host host.Host, in []PubsubOpt) []pubsub.Option {
out := make([]pubsub.Option, len(in))
for k, v := range in {
out[k] = v(host)
}
return out
func HashMsgId(m *pubsub_pb.Message) string {
hash := blake2b.Sum256(m.Data)
return string(hash[:])
}
func newTracerWrapper(tr pubsub.EventTracer) pubsub.EventTracer {
return &tracerWrapper{tr: tr}
}
type tracerWrapper struct {
tr pubsub.EventTracer
}
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
// this filters the trace events reported to the remote tracer to include only
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
// collects enough data to recover the state of the mesh and compute message delivery latency
// distributions.
// TODO: hook all events into local metrics for inspection through the dashboard
switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_JOIN:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_LEAVE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_GRAFT:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_PRUNE:
trw.tr.Trace(evt)
}
}

View File

@ -6,7 +6,7 @@ import (
eventbus "github.com/libp2p/go-eventbus"
event "github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -75,7 +75,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager())
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager(), v)
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {