diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index fdb4dc028..76387dd1d 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -2,6 +2,7 @@ package lp2p import ( "context" + "encoding/json" "time" host "github.com/libp2p/go-libp2p-core/host" @@ -11,6 +12,7 @@ import ( blake2b "github.com/minio/blake2b-simd" ma "github.com/multiformats/go-multiaddr" "go.uber.org/fx" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/node/config" @@ -28,17 +30,42 @@ func init() { pubsub.GossipSubDlazy = 12 pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second } - func ScoreKeeper() *dtypes.ScoreKeeper { return new(dtypes.ScoreKeeper) } -func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub, sk *dtypes.ScoreKeeper) (service *pubsub.PubSub, err error) { +type GossipIn struct { + fx.In + Mctx helpers.MetricsCtx + Lc fx.Lifecycle + Host host.Host + Nn dtypes.NetworkName + Bp dtypes.BootstrapPeers + Cfg *config.Pubsub + Sk *dtypes.ScoreKeeper +} + +func getDrandTopic() (string, error) { + var drandInfo = struct { + Hash string `json:"hash"` + }{} + err := json.Unmarshal([]byte(build.DrandChain), &drandInfo) + if err != nil { + return "", xerrors.Errorf("could not unmarshal drand chain info: %w", err) + } + return "/drand/pubsub/v0.0.0/" + drandInfo.Hash, nil +} + +func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { bootstrappers := make(map[peer.ID]struct{}) - for _, pi := range bp { + for _, pi := range in.Bp { bootstrappers[pi.ID] = struct{}{} } - isBootstrapNode := cfg.Bootstrapper + isBootstrapNode := in.Cfg.Bootstrapper + drandTopic, err := getDrandTopic() + if err != nil { + return nil, err + } options := []pubsub.Option{ // Gossipsubv1.1 configuration @@ -80,7 +107,48 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp // topic parameters Topics: map[string]*pubsub.TopicScoreParams{ - build.BlocksTopic(nn): { + drandTopic: { + // expected 2 beaconsn/min + TopicWeight: 0.5, // 5x block topic + + // 1 tick per second, maxes at 1 after 1 hour + TimeInMeshWeight: 0.00027, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + // deliveries decay after 1 hour, cap at 100 blocks + FirstMessageDeliveriesWeight: 5, // max value is 500 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + FirstMessageDeliveriesCap: 100, // 100 blocks in an hour + + // Mesh Delivery Failure is currently turned off for blocks + // This is on purpose as + // - the traffic is very low for meaningful distribution of incoming edges. + // - the reaction time needs to be very slow -- in the order of 10 min at least + // so we might as well let opportunistic grafting repair the mesh on its own + // pace. + // - the network is too small, so large asymmetries can be expected between mesh + // edges. + // We should revisit this once the network grows. + // + // // tracks deliveries in the last minute + // // penalty activates at 1 minute and expects ~0.4 blocks + // MeshMessageDeliveriesWeight: -576, // max penalty is -100 + // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), + // MeshMessageDeliveriesCap: 10, // 10 blocks in a minute + // MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min + // MeshMessageDeliveriesWindow: 10 * time.Millisecond, + // MeshMessageDeliveriesActivation: time.Minute, + // + // // decays after 15 min + // MeshFailurePenaltyWeight: -576, + // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute), + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + }, + build.BlocksTopic(in.Nn): { // expected 10 blocks/min TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100 @@ -121,7 +189,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp InvalidMessageDeliveriesWeight: -1000, InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), }, - build.MessagesTopic(nn): { + build.MessagesTopic(in.Nn): { // expected > 1 tx/second TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100 @@ -166,7 +234,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp OpportunisticGraftThreshold: 5, }, ), - pubsub.WithPeerScoreInspect(sk.Update, 10*time.Second), + pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second), } // enable Peer eXchange on bootstrappers @@ -184,10 +252,10 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp } // direct peers - if cfg.DirectPeers != nil { + if in.Cfg.DirectPeers != nil { var directPeerInfo []peer.AddrInfo - for _, addr := range cfg.DirectPeers { + for _, addr := range in.Cfg.DirectPeers { a, err := ma.NewMultiaddr(addr) if err != nil { return nil, err @@ -205,8 +273,8 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp } // tracer - if cfg.RemoteTracer != "" { - a, err := ma.NewMultiaddr(cfg.RemoteTracer) + if in.Cfg.RemoteTracer != "" { + a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer) if err != nil { return nil, err } @@ -216,7 +284,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp return nil, err } - tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi) + tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi) if err != nil { return nil, err } @@ -229,7 +297,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp // in peer scores for debugging purposes -- this might be trigged by metrics collection // options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second)) - return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...) + return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...) } func HashMsgId(m *pubsub_pb.Message) string {