From c5323e4e032448998bc65f7a73e1269f5d1c41f4 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 10 Sep 2020 12:41:29 +0200 Subject: [PATCH] Integrate multiple drand networks with pubsub Signed-off-by: Jakub Sztandera --- node/modules/core.go | 17 ++- node/modules/lp2p/pubsub.go | 234 +++++++++++++++++++----------------- 2 files changed, 132 insertions(+), 119 deletions(-) diff --git a/node/modules/core.go b/node/modules/core.go index c0f016ab6..65b5eecb2 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -10,6 +10,7 @@ import ( "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" "golang.org/x/xerrors" @@ -93,14 +94,18 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { return build.BuiltinBootstrap() } -func DrandBootstrap(d dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) { +func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) { // TODO: retry resolving, don't fail if at least one resolve succeeds - addrs, err := addrutil.ParseAddresses(context.TODO(), d[0].Config.Relays) - if err != nil { - log.Errorf("reoslving drand relays addresses: %+v", err) - return nil, nil + res := []peer.AddrInfo{} + for _, d := range ds { + addrs, err := addrutil.ParseAddresses(context.TODO(), d.Config.Relays) + if err != nil { + log.Errorf("reoslving drand relays addresses: %+v", err) + return res, nil + } + res = append(res, addrs...) } - return addrs, nil + return res, nil } func SetupJournal(lr repo.LockedRepo) error { diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 41027672c..aefb06d89 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -74,9 +74,126 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { } isBootstrapNode := in.Cfg.Bootstrapper - drandTopic, err := getDrandTopic(in.Dr[0].Config.ChainInfoJSON) - if err != nil { - return nil, err + + drandTopicParams := &pubsub.TopicScoreParams{ + // expected 2 beaconsn/min + TopicWeight: 0.5, // 5x block topic; max cap is 62.5 + + // 1 tick per second, maxes at 1 after 1 hour + TimeInMeshWeight: 0.00027, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + // deliveries decay after 1 hour, cap at 25 beacons + FirstMessageDeliveriesWeight: 5, // max value is 125 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + FirstMessageDeliveriesCap: 25, // the maximum expected in an hour is ~26, including the decay + + // Mesh Delivery Failure is currently turned off for beacons + // This is on purpose as + // - the traffic is very low for meaningful distribution of incoming edges. + // - the reaction time needs to be very slow -- in the order of 10 min at least + // so we might as well let opportunistic grafting repair the mesh on its own + // pace. + // - the network is too small, so large asymmetries can be expected between mesh + // edges. + // We should revisit this once the network grows. + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + } + + topicParams := map[string]*pubsub.TopicScoreParams{ + build.BlocksTopic(in.Nn): { + // expected 10 blocks/min + TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100 + + // 1 tick per second, maxes at 1 after 1 hour + TimeInMeshWeight: 0.00027, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + // deliveries decay after 1 hour, cap at 100 blocks + FirstMessageDeliveriesWeight: 5, // max value is 500 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + FirstMessageDeliveriesCap: 100, // 100 blocks in an hour + + // Mesh Delivery Failure is currently turned off for blocks + // This is on purpose as + // - the traffic is very low for meaningful distribution of incoming edges. + // - the reaction time needs to be very slow -- in the order of 10 min at least + // so we might as well let opportunistic grafting repair the mesh on its own + // pace. + // - the network is too small, so large asymmetries can be expected between mesh + // edges. + // We should revisit this once the network grows. + // + // // tracks deliveries in the last minute + // // penalty activates at 1 minute and expects ~0.4 blocks + // MeshMessageDeliveriesWeight: -576, // max penalty is -100 + // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), + // MeshMessageDeliveriesCap: 10, // 10 blocks in a minute + // MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min + // MeshMessageDeliveriesWindow: 10 * time.Millisecond, + // MeshMessageDeliveriesActivation: time.Minute, + // + // // decays after 15 min + // MeshFailurePenaltyWeight: -576, + // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute), + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + }, + build.MessagesTopic(in.Nn): { + // expected > 1 tx/second + TopicWeight: 0.1, // max cap is 5, single invalid message is -100 + + // 1 tick per second, maxes at 1 hour + TimeInMeshWeight: 0.0002778, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + // deliveries decay after 10min, cap at 100 tx + FirstMessageDeliveriesWeight: 0.5, // max value is 50 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute), + FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes + + // Mesh Delivery Failure is currently turned off for messages + // This is on purpose as the network is still too small, which results in + // asymmetries and potential unmeshing from negative scores. + // // tracks deliveries in the last minute + // // penalty activates at 1 min and expects 2.5 txs + // MeshMessageDeliveriesWeight: -16, // max penalty is -100 + // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), + // MeshMessageDeliveriesCap: 100, // 100 txs in a minute + // MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute + // MeshMessageDeliveriesWindow: 10 * time.Millisecond, + // MeshMessageDeliveriesActivation: time.Minute, + + // // decays after 5min + // MeshFailurePenaltyWeight: -16, + // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute), + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + }, + } + + pgTopicWeights := map[string]float64{ + build.BlocksTopic(in.Nn): 10, + build.MessagesTopic(in.Nn): 1, + } + + for _, d := range in.Dr { + topic, err := getDrandTopic(d.Config.ChainInfoJSON) + if err != nil { + return nil, err + } + topicParams[topic] = drandTopicParams + pgTopicWeights[topic] = 5 } options := []pubsub.Option{ @@ -124,111 +241,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { RetainScore: 6 * time.Hour, // topic parameters - Topics: map[string]*pubsub.TopicScoreParams{ - drandTopic: { - // expected 2 beaconsn/min - TopicWeight: 0.5, // 5x block topic; max cap is 62.5 - - // 1 tick per second, maxes at 1 after 1 hour - TimeInMeshWeight: 0.00027, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - // deliveries decay after 1 hour, cap at 25 beacons - FirstMessageDeliveriesWeight: 5, // max value is 125 - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - FirstMessageDeliveriesCap: 25, // the maximum expected in an hour is ~26, including the decay - - // Mesh Delivery Failure is currently turned off for beacons - // This is on purpose as - // - the traffic is very low for meaningful distribution of incoming edges. - // - the reaction time needs to be very slow -- in the order of 10 min at least - // so we might as well let opportunistic grafting repair the mesh on its own - // pace. - // - the network is too small, so large asymmetries can be expected between mesh - // edges. - // We should revisit this once the network grows. - - // invalid messages decay after 1 hour - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - }, - build.BlocksTopic(in.Nn): { - // expected 10 blocks/min - TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100 - - // 1 tick per second, maxes at 1 after 1 hour - TimeInMeshWeight: 0.00027, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - // deliveries decay after 1 hour, cap at 100 blocks - FirstMessageDeliveriesWeight: 5, // max value is 500 - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - FirstMessageDeliveriesCap: 100, // 100 blocks in an hour - - // Mesh Delivery Failure is currently turned off for blocks - // This is on purpose as - // - the traffic is very low for meaningful distribution of incoming edges. - // - the reaction time needs to be very slow -- in the order of 10 min at least - // so we might as well let opportunistic grafting repair the mesh on its own - // pace. - // - the network is too small, so large asymmetries can be expected between mesh - // edges. - // We should revisit this once the network grows. - // - // // tracks deliveries in the last minute - // // penalty activates at 1 minute and expects ~0.4 blocks - // MeshMessageDeliveriesWeight: -576, // max penalty is -100 - // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), - // MeshMessageDeliveriesCap: 10, // 10 blocks in a minute - // MeshMessageDeliveriesThreshold: 0.41666, // 10/12/2 blocks/min - // MeshMessageDeliveriesWindow: 10 * time.Millisecond, - // MeshMessageDeliveriesActivation: time.Minute, - // - // // decays after 15 min - // MeshFailurePenaltyWeight: -576, - // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(15 * time.Minute), - - // invalid messages decay after 1 hour - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - }, - build.MessagesTopic(in.Nn): { - // expected > 1 tx/second - TopicWeight: 0.1, // max cap is 5, single invalid message is -100 - - // 1 tick per second, maxes at 1 hour - TimeInMeshWeight: 0.0002778, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - // deliveries decay after 10min, cap at 100 tx - FirstMessageDeliveriesWeight: 0.5, // max value is 50 - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute), - FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes - - // Mesh Delivery Failure is currently turned off for messages - // This is on purpose as the network is still too small, which results in - // asymmetries and potential unmeshing from negative scores. - // // tracks deliveries in the last minute - // // penalty activates at 1 min and expects 2.5 txs - // MeshMessageDeliveriesWeight: -16, // max penalty is -100 - // MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute), - // MeshMessageDeliveriesCap: 100, // 100 txs in a minute - // MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute - // MeshMessageDeliveriesWindow: 10 * time.Millisecond, - // MeshMessageDeliveriesActivation: time.Minute, - - // // decays after 5min - // MeshFailurePenaltyWeight: -16, - // MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute), - - // invalid messages decay after 1 hour - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - }, - }, + Topics: topicParams, }, &pubsub.PeerScoreThresholds{ GossipThreshold: -500, @@ -278,11 +291,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { } // validation queue RED - pgTopicWeights := map[string]float64{ - drandTopic: 5, - build.BlocksTopic(in.Nn): 10, - build.MessagesTopic(in.Nn): 1, - } var pgParams *pubsub.PeerGaterParams if isBootstrapNode {