Merge pull request #1911 from filecoin-project/feat/drand-score

Score drand topic
This commit is contained in:
Jakub Sztandera 2020-06-08 15:12:25 +02:00 committed by GitHub
commit f257f4febb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 13 deletions

View File

@ -34,3 +34,12 @@ func BuiltinBootstrap() ([]peer.AddrInfo, error) {
})
return out, err
}
func DrandBootstrap() ([]peer.AddrInfo, error) {
addrs := []string{
"/dnsaddr/pl-eu.testnet.drand.sh/",
"/dnsaddr/pl-us.testnet.drand.sh/",
"/dnsaddr/pl-sin.testnet.drand.sh/",
}
return addrutil.ParseAddresses(context.TODO(), addrs)
}

View File

@ -218,6 +218,7 @@ func Online() Option {
// TODO: Fix offline mode
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),

View File

@ -89,3 +89,7 @@ func ConfigBootstrap(peers []string) func() (dtypes.BootstrapPeers, error) {
func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
return build.BuiltinBootstrap()
}
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
return build.DrandBootstrap()
}

View File

@ -3,5 +3,6 @@ package dtypes
import "github.com/libp2p/go-libp2p-core/peer"
type BootstrapPeers []peer.AddrInfo
type DrandBootstrap []peer.AddrInfo
type Bootstrapper bool

View File

@ -2,6 +2,7 @@ package lp2p
import (
"context"
"encoding/json"
"time"
host "github.com/libp2p/go-libp2p-core/host"
@ -11,6 +12,7 @@ import (
blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config"
@ -28,17 +30,48 @@ func init() {
pubsub.GossipSubDlazy = 12
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
}
func ScoreKeeper() *dtypes.ScoreKeeper {
return new(dtypes.ScoreKeeper)
}
func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub, sk *dtypes.ScoreKeeper) (service *pubsub.PubSub, err error) {
type GossipIn struct {
fx.In
Mctx helpers.MetricsCtx
Lc fx.Lifecycle
Host host.Host
Nn dtypes.NetworkName
Bp dtypes.BootstrapPeers
Db dtypes.DrandBootstrap
Cfg *config.Pubsub
Sk *dtypes.ScoreKeeper
}
func getDrandTopic() (string, error) {
var drandInfo = struct {
Hash string `json:"hash"`
}{}
err := json.Unmarshal([]byte(build.DrandChain), &drandInfo)
if err != nil {
return "", xerrors.Errorf("could not unmarshal drand chain info: %w", err)
}
return "/drand/pubsub/v0.0.0/" + drandInfo.Hash, nil
}
func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp {
for _, pi := range in.Bp {
bootstrappers[pi.ID] = struct{}{}
}
isBootstrapNode := cfg.Bootstrapper
drandBootstrappers := make(map[peer.ID]struct{})
for _, pi := range in.Db {
drandBootstrappers[pi.ID] = struct{}{}
}
isBootstrapNode := in.Cfg.Bootstrapper
drandTopic, err := getDrandTopic()
if err != nil {
return nil, err
}
options := []pubsub.Option{
// Gossipsubv1.1 configuration
@ -56,6 +89,11 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
return 2500
}
_, ok = drandBootstrappers[p]
if ok && !isBootstrapNode {
return 1500
}
// TODO: we want to plug the application specific score to the node itself in order
// to provide feedback to the pubsub system based on observed behaviour
return 0
@ -80,7 +118,48 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
// topic parameters
Topics: map[string]*pubsub.TopicScoreParams{
build.BlocksTopic(nn): {
drandTopic: {
// expected 2 beaconsn/min
TopicWeight: 0.5, // 5x block topic
// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
// deliveries decay after 1 hour, cap at 100 blocks
FirstMessageDeliveriesWeight: 5, // max value is 500
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
FirstMessageDeliveriesCap: 100, // 100 blocks in an hour
// Mesh Delivery Failure is currently turned off for blocks
// This is on purpose as
// - the traffic is very low for meaningful distribution of incoming edges.
// - the reaction time needs to be very slow -- in the order of 10 min at least
// so we might as well let opportunistic grafting repair the mesh on its own
// pace.
// - the network is too small, so large asymmetries can be expected between mesh
// edges.
// We should revisit this once the network grows.
//
// // tracks deliveries in the last minute
// // penalty activates at 1 minute and expects ~0.4 blocks
// MeshMessageDeliveriesWeight: -576, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 10, // 10 blocks in a minute
// MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,
//
// // decays after 15 min
// MeshFailurePenaltyWeight: -576,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute),
// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
build.BlocksTopic(in.Nn): {
// expected 10 blocks/min
TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100
@ -121,7 +200,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
build.MessagesTopic(nn): {
build.MessagesTopic(in.Nn): {
// expected > 1 tx/second
TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100
@ -166,7 +245,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
OpportunisticGraftThreshold: 5,
},
),
pubsub.WithPeerScoreInspect(sk.Update, 10*time.Second),
pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second),
}
// enable Peer eXchange on bootstrappers
@ -184,10 +263,10 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
}
// direct peers
if cfg.DirectPeers != nil {
if in.Cfg.DirectPeers != nil {
var directPeerInfo []peer.AddrInfo
for _, addr := range cfg.DirectPeers {
for _, addr := range in.Cfg.DirectPeers {
a, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
@ -205,8 +284,8 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
}
// tracer
if cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(cfg.RemoteTracer)
if in.Cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer)
if err != nil {
return nil, err
}
@ -216,7 +295,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi)
if err != nil {
return nil, err
}
@ -229,7 +308,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...)
return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
}
func HashMsgId(m *pubsub_pb.Message) string {