lotus/node/modules/lp2p/pubsub.go

261 lines
8.9 KiB
Go

package lp2p
import (
"context"
"time"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
blake2b "github.com/minio/blake2b-simd"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
func init() {
// configure larger overlay parameters
pubsub.GossipSubD = 8
pubsub.GossipSubDscore = 6
pubsub.GossipSubDlo = 6
pubsub.GossipSubDhi = 12
pubsub.GossipSubDlazy = 12
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
}
func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp {
bootstrappers[pi.ID] = struct{}{}
}
isBootstrapNode := cfg.Bootstrapper
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(
&pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
// return a heavy positive score for bootstrappers so that we don't unilaterally prune
// them and accept PX from them.
// we don't do that in the bootstrappers themselves to avoid creating a closed mesh
// between them (however we might want to consider doing just that)
_, ok := bootstrappers[p]
if ok && !isBootstrapNode {
return 2500
}
// TODO: we want to plug the application specific score to the node itself in order
// to provide feedback to the pubsub system based on observed behaviour
return 0
},
AppSpecificWeight: 1,
// This sets the IP colocation threshold to 1 peer per
IPColocationFactorThreshold: 1,
IPColocationFactorWeight: -100,
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
// IPColocationFactorWhitelist: map[string]struct{}{},
// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,
// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,
// topic parameters
Topics: map[string]*pubsub.TopicScoreParams{
build.BlocksTopic(nn): {
// expected 10 blocks/min
TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100
// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
// deliveries decay after 1 hour, cap at 100 blocks
FirstMessageDeliveriesWeight: 5, // max value is 500
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
FirstMessageDeliveriesCap: 100, // 100 blocks in an hour
// Mesh Delivery Failure is currently turned off for blocks
// This is on purpose as
// - the traffic is very low for meaningful distribution of incoming edges.
// - the reaction time needs to be very slow -- in the order of 10 min at least
// so we might as well let opportunistic grafting repair the mesh on its own
// pace.
// - the network is too small, so large asymmetries can be expected between mesh
// edges.
// We should revisit this once the network grows.
//
// // tracks deliveries in the last minute
// // penalty activates at 1 minute and expects ~0.4 blocks
// MeshMessageDeliveriesWeight: -576, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 10, // 10 blocks in a minute
// MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,
//
// // decays after 15 min
// MeshFailurePenaltyWeight: -576,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute),
// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
build.MessagesTopic(nn): {
// expected > 1 tx/second
TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100
// 1 tick per second, maxes at 1 hour
TimeInMeshWeight: 0.0002778, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
// deliveries decay after 10min, cap at 1000 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 500
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
FirstMessageDeliveriesCap: 1000,
// Mesh Delivery Failure is currently turned off for messages
// This is on purpose as the network is still too small, which results in
// asymmetries and potential unmeshing from negative scores.
// // tracks deliveries in the last minute
// // penalty activates at 1 min and expects 2.5 txs
// MeshMessageDeliveriesWeight: -16, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 100, // 100 txs in a minute
// MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,
// // decays after 5min
// MeshFailurePenaltyWeight: -16,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),
// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
},
},
&pubsub.PeerScoreThresholds{
GossipThreshold: -500,
PublishThreshold: -1000,
GraylistThreshold: -2500,
AcceptPXThreshold: 1000,
OpportunisticGraftThreshold: 5,
},
),
}
// enable Peer eXchange on bootstrappers
if isBootstrapNode {
// turn off the mesh in bootstrappers -- only do gossip and PX
pubsub.GossipSubD = 0
pubsub.GossipSubDscore = 0
pubsub.GossipSubDlo = 0
pubsub.GossipSubDhi = 0
pubsub.GossipSubDout = 0
pubsub.GossipSubDlazy = 1024
pubsub.GossipSubGossipFactor = 0.5
// turn on PX
options = append(options, pubsub.WithPeerExchange(true))
}
// direct peers
if cfg.DirectPeers != nil {
var directPeerInfo []peer.AddrInfo
for _, addr := range cfg.DirectPeers {
a, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
directPeerInfo = append(directPeerInfo, *pi)
}
options = append(options, pubsub.WithDirectPeers(directPeerInfo))
}
// tracer
if cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(cfg.RemoteTracer)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), host, *pi)
if err != nil {
return nil, err
}
trw := newTracerWrapper(tr)
options = append(options, pubsub.WithEventTracer(trw))
}
// TODO: we want to hook the peer score inspector so that we can gain visibility
// in peer scores for debugging purposes -- this might be trigged by metrics collection
// options = append(options, pubsub.WithPeerScoreInspect(XXX, time.Second))
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, options...)
}
func HashMsgId(m *pubsub_pb.Message) string {
hash := blake2b.Sum256(m.Data)
return string(hash[:])
}
func newTracerWrapper(tr pubsub.EventTracer) pubsub.EventTracer {
return &tracerWrapper{tr: tr}
}
type tracerWrapper struct {
tr pubsub.EventTracer
}
func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
// this filters the trace events reported to the remote tracer to include only
// JOIN/LEAVE/GRAFT/PRUNE/PUBLISH/DELIVER. This significantly reduces bandwidth usage and still
// collects enough data to recover the state of the mesh and compute message delivery latency
// distributions.
// TODO: hook all events into local metrics for inspection through the dashboard
switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_JOIN:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_LEAVE:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_GRAFT:
trw.tr.Trace(evt)
case pubsub_pb.TraceEvent_PRUNE:
trw.tr.Trace(evt)
}
}