Use config for pubsub tracing

This commit is contained in:
Łukasz Magiera 2019-11-20 21:31:00 +01:00
parent 3a8100fd3c
commit c7c422abbd
4 changed files with 32 additions and 16 deletions

View File

@ -312,6 +312,9 @@ func ConfigFullNode(c interface{}) Option {
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
If(cfg.Metrics.PubsubTracing,
Override(new(*pubsub.PubSub), lp2p.GossipSub(lp2p.PubsubTracer())),
),
) )
} }

View File

@ -42,6 +42,7 @@ type Libp2p struct {
type Metrics struct { type Metrics struct {
Nickname string Nickname string
PubsubTracing bool
} }
// // Storage Miner // // Storage Miner

View File

@ -12,7 +12,10 @@ import (
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
) )
func withTracer(host host.Host, opts []pubsub.Option) []pubsub.Option { type PubsubOpt func(host.Host) pubsub.Option
func PubsubTracer() PubsubOpt {
return func(host host.Host) pubsub.Option {
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX")) pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/147.75.67.199/tcp/4001/p2p/QmTd6UvR47vUidRNZ1ZKXHrAFhqTJAD27rKL9XYghEKgKX"))
if err != nil { if err != nil {
panic(err) panic(err)
@ -23,17 +26,20 @@ func withTracer(host host.Host, opts []pubsub.Option) []pubsub.Option {
panic(err) panic(err)
} }
return append(opts, pubsub.WithEventTracer(tr)) return pubsub.WithEventTracer(tr)
}
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
} }
} }
func GossipSub(pubsubOptions ...pubsub.Option) interface{} { func GossipSub(pubsubOptions ...PubsubOpt) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, withTracer(host, pubsubOptions)...) return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, paresOpts(host, pubsubOptions)...)
} }
} }
func paresOpts(host host.Host, in []PubsubOpt) []pubsub.Option {
out := make([]pubsub.Option, len(in))
for k, v := range in {
out[k] = v(host)
}
return out
}

View File

@ -40,6 +40,12 @@ func ApplyIf(check func(s *Settings) bool, opts ...Option) Option {
} }
} }
func If(b bool, opts ...Option) Option {
return ApplyIf(func(s *Settings) bool {
return b
}, opts...)
}
// Override option changes constructor for a given type // Override option changes constructor for a given type
func Override(typ, constructor interface{}) Option { func Override(typ, constructor interface{}) Option {
return func(s *Settings) error { return func(s *Settings) error {