pubsub-specific configuration

This commit is contained in:
vyzo 2020-05-04 18:30:54 +03:00
parent cfcf946556
commit 112aca7896
3 changed files with 94 additions and 38 deletions

View File

@ -178,7 +178,7 @@ func libp2p() Option {
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)), Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)),
Override(AutoNATSvcKey, lp2p.AutoNATService), Override(AutoNATSvcKey, lp2p.AutoNATService),
Override(new(*pubsub.PubSub), lp2p.GossipSub()), Override(new(*pubsub.PubSub), lp2p.GossipSub(&config.Pubsub{})),
Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys),
Override(StartListeningKey, lp2p.StartListening(config.DefaultFullNode().Libp2p.ListenAddresses)), Override(StartListeningKey, lp2p.StartListening(config.DefaultFullNode().Libp2p.ListenAddresses)),
@ -354,6 +354,7 @@ func ConfigCommon(cfg *config.Common) Option {
cfg.Libp2p.ConnMgrHigh, cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace), time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)), cfg.Libp2p.ProtectedPeers)),
Override(new(*pubsub.PubSub), lp2p.GossipSub(&cfg.Pubsub)),
ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 }, ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 },
Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)), Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)),
@ -377,9 +378,6 @@ func ConfigFullNode(c interface{}) Option {
If(cfg.Metrics.HeadNotifs, If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
), ),
If(cfg.Metrics.PubsubTracing,
Override(new(*pubsub.PubSub), lp2p.GossipSub(lp2p.PubsubTracer())),
),
) )
} }

View File

@ -11,6 +11,7 @@ import (
type Common struct { type Common struct {
API API API API
Libp2p Libp2p Libp2p Libp2p
Pubsub Pubsub
} }
// FullNode is a full node config // FullNode is a full node config
@ -47,12 +48,17 @@ type Libp2p struct {
ConnMgrGrace Duration ConnMgrGrace Duration
} }
type Pubsub struct {
Bootstrapper bool
DirectPeers []string
RemoteTracer string
}
// // Full Node // // Full Node
type Metrics struct { type Metrics struct {
Nickname string Nickname string
HeadNotifs bool HeadNotifs bool
PubsubTracing bool
} }
type Client struct { type Client struct {
@ -75,6 +81,11 @@ func defCommon() Common {
ConnMgrHigh: 180, ConnMgrHigh: 180,
ConnMgrGrace: Duration(20 * time.Second), ConnMgrGrace: Duration(20 * time.Second),
}, },
Pubsub: Pubsub{
Bootstrapper: false,
DirectPeers: nil,
RemoteTracer: "/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX",
},
} }
} }

View File

@ -7,10 +7,12 @@ import (
host "github.com/libp2p/go-libp2p-core/host" host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx" "go.uber.org/fx"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
) )
@ -22,35 +24,18 @@ func init() {
pubsub.GossipSubDlo = 6 pubsub.GossipSubDlo = 6
pubsub.GossipSubDhi = 12 pubsub.GossipSubDhi = 12
pubsub.GossipSubDlazy = 12 pubsub.GossipSubDlazy = 12
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
} }
type PubsubOpt func(host.Host) pubsub.Option func GossipSub(cfg *config.Pubsub) interface{} {
func PubsubTracer() PubsubOpt {
return func(host host.Host) pubsub.Option {
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX"))
if err != nil {
panic(err)
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
if err != nil {
panic(err)
}
return pubsub.WithEventTracer(tr)
}
}
func GossipSub(pubsubOptions ...PubsubOpt) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers) (service *pubsub.PubSub, err error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{}) bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp { for _, pi := range bp {
bootstrappers[pi.ID] = struct{}{} bootstrappers[pi.ID] = struct{}{}
} }
_, isBootstrapNode := bootstrappers[host.ID()] isBootstrapNode := cfg.Bootstrapper
v11Options := []pubsub.Option{ options := []pubsub.Option{
// Gossipsubv1.1 configuration // Gossipsubv1.1 configuration
pubsub.WithFloodPublish(true), pubsub.WithFloodPublish(true),
pubsub.WithPeerScore( pubsub.WithPeerScore(
@ -161,23 +146,85 @@ func GossipSub(pubsubOptions ...PubsubOpt) interface{} {
// enable Peer eXchange on bootstrappers // enable Peer eXchange on bootstrappers
if isBootstrapNode { if isBootstrapNode {
v11Options = append(v11Options, pubsub.WithPeerExchange(true)) options = append(options, pubsub.WithPeerExchange(true))
}
// direct peers
if cfg.DirectPeers != nil {
var directPeerInfo []peer.AddrInfo
for _, addr := range cfg.DirectPeers {
a, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
directPeerInfo = append(directPeerInfo, *pi)
}
options = append(options, pubsub.WithDirectPeers(directPeerInfo))
}
// tracer
if cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(cfg.RemoteTracer)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
if err != nil {
return nil, err
}
trw := newTracerWrapper(tr)
options = append(options, pubsub.WithEventTracer(trw))
} }
// TODO: we want to hook the peer score inspector so that we can gain visibility // TODO: we want to hook the peer score inspector so that we can gain visibility
// in peer scores for debugging purposes -- this might be trigged by metrics collection // in peer scores for debugging purposes -- this might be trigged by metrics collection
// v11Options = append(v11Options, pubsub.WithPeerScoreInspect(XXX, time.Second)) // options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
options := append(v11Options, paresOpts(host, pubsubOptions)...)
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...) return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...)
} }
} }
func paresOpts(host host.Host, in []PubsubOpt) []pubsub.Option { func newTracerWrapper(tr pubsub.EventTracer) pubsub.EventTracer {
out := make([]pubsub.Option, len(in)) return &tracerWrapper{tr: tr}
for k, v := range in { }
out[k] = v(host)
} type tracerWrapper struct {
return out tr pubsub.EventTracer
}
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
// this filters the trace events reported to the remote tracer to include only
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
// collects enough data to recover the state of the mesh and compute message delivery latency
// distributions.
// TODO: hook all events into local metrics for inspection through the dashboard
switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_JOIN:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_LEAVE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_GRAFT:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_PRUNE:
trw.tr.Trace(evt)
}
} }