Add drand topic scores

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-06-03 15:48:22 +02:00
parent 366359803a
commit e91cc9cd08
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA

View File

@ -2,6 +2,7 @@ package lp2p
import (
"context"
"encoding/json"
"time"
host "github.com/libp2p/go-libp2p-core/host"
@ -11,6 +12,7 @@ import (
blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config"
@ -28,17 +30,42 @@ func init() {
pubsub.GossipSubDlazy = 12
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
}
func ScoreKeeper() *dtypes.ScoreKeeper {
return new(dtypes.ScoreKeeper)
}
func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub, sk *dtypes.ScoreKeeper) (service *pubsub.PubSub, err error) {
type GossipIn struct {
fx.In
Mctx helpers.MetricsCtx
Lc fx.Lifecycle
Host host.Host
Nn dtypes.NetworkName
Bp dtypes.BootstrapPeers
Cfg *config.Pubsub
Sk *dtypes.ScoreKeeper
}
func getDrandTopic() (string, error) {
var drandInfo = struct {
Hash string `json:"hash"`
}{}
err := json.Unmarshal([]byte(build.DrandChain), &drandInfo)
if err != nil {
return "", xerrors.Errorf("could not unmarshal drand chain info: %w", err)
}
return "/drand/pubsub/v0.0.0/" + drandInfo.Hash, nil
}
func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp {
for _, pi := range in.Bp {
bootstrappers[pi.ID] = struct{}{}
}
isBootstrapNode := cfg.Bootstrapper
isBootstrapNode := in.Cfg.Bootstrapper
drandTopic, err := getDrandTopic()
if err != nil {
return nil, err
}
options := []pubsub.Option{
// Gossipsubv1.1 configuration
@ -80,7 +107,48 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
// topic parameters
Topics: map[string]*pubsub.TopicScoreParams{
build.BlocksTopic(nn): {
drandTopic: {
// expected 2 beaconsn/min
TopicWeight: 0.5, // 5x block topic
// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
// deliveries decay after 1 hour, cap at 100 blocks
FirstMessageDeliveriesWeight: 5, // max value is 500
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
FirstMessageDeliveriesCap: 100, // 100 blocks in an hour
// Mesh Delivery Failure is currently turned off for blocks
// This is on purpose as
// - the traffic is very low for meaningful distribution of incoming edges.
// - the reaction time needs to be very slow -- in the order of 10 min at least
// so we might as well let opportunistic grafting repair the mesh on its own
// pace.
// - the network is too small, so large asymmetries can be expected between mesh
// edges.
// We should revisit this once the network grows.
//
// // tracks deliveries in the last minute
// // penalty activates at 1 minute and expects ~0.4 blocks
// MeshMessageDeliveriesWeight: -576, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 10, // 10 blocks in a minute
// MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,
//
// // decays after 15 min
// MeshFailurePenaltyWeight: -576,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute),
// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
build.BlocksTopic(in.Nn): {
// expected 10 blocks/min
TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100
@ -121,7 +189,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
build.MessagesTopic(nn): {
build.MessagesTopic(in.Nn): {
// expected > 1 tx/second
TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100
@ -166,7 +234,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
OpportunisticGraftThreshold: 5,
},
),
pubsub.WithPeerScoreInspect(sk.Update, 10*time.Second),
pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second),
}
// enable Peer eXchange on bootstrappers
@ -184,10 +252,10 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
}
// direct peers
if cfg.DirectPeers != nil {
if in.Cfg.DirectPeers != nil {
var directPeerInfo []peer.AddrInfo
for _, addr := range cfg.DirectPeers {
for _, addr := range in.Cfg.DirectPeers {
a, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
@ -205,8 +273,8 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
}
// tracer
if cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(cfg.RemoteTracer)
if in.Cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer)
if err != nil {
return nil, err
}
@ -216,7 +284,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi)
if err != nil {
return nil, err
}
@ -229,7 +297,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...)
return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
}
func HashMsgId(m *pubsub_pb.Message) string {