Integrate multiple drand networks with pubsub

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-09-10 12:41:29 +02:00 committed by Aayush Rajasekaran
parent 564d0ae974
commit c71e1adc93
2 changed files with 132 additions and 119 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/gbrlsnchs/jwt/v3" "github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
record "github.com/libp2p/go-libp2p-record" record "github.com/libp2p/go-libp2p-record"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -93,14 +94,18 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
return build.BuiltinBootstrap() return build.BuiltinBootstrap()
} }
func DrandBootstrap(d dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) { func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
// TODO: retry resolving, don't fail if at least one resolve succeeds // TODO: retry resolving, don't fail if at least one resolve succeeds
addrs, err := addrutil.ParseAddresses(context.TODO(), d[0].Config.Relays) res := []peer.AddrInfo{}
for _, d := range ds {
addrs, err := addrutil.ParseAddresses(context.TODO(), d.Config.Relays)
if err != nil { if err != nil {
log.Errorf("reoslving drand relays addresses: %+v", err) log.Errorf("reoslving drand relays addresses: %+v", err)
return nil, nil return res, nil
} }
return addrs, nil res = append(res, addrs...)
}
return res, nil
} }
func SetupJournal(lr repo.LockedRepo) error { func SetupJournal(lr repo.LockedRepo) error {

View File

@ -74,58 +74,8 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
} }
isBootstrapNode := in.Cfg.Bootstrapper isBootstrapNode := in.Cfg.Bootstrapper
drandTopic, err := getDrandTopic(in.Dr[0].Config.ChainInfoJSON)
if err != nil {
return nil, err
}
options := []pubsub.Option{ drandTopicParams := &pubsub.TopicScoreParams{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(true),
pubsub.WithMessageIdFn(HashMsgId),
pubsub.WithPeerScore(
&pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
// return a heavy positive score for bootstrappers so that we don't unilaterally prune
// them and accept PX from them.
// we don't do that in the bootstrappers themselves to avoid creating a closed mesh
// between them (however we might want to consider doing just that)
_, ok := bootstrappers[p]
if ok && !isBootstrapNode {
return 2500
}
_, ok = drandBootstrappers[p]
if ok && !isBootstrapNode {
return 1500
}
// TODO: we want to plug the application specific score to the node itself in order
// to provide feedback to the pubsub system based on observed behaviour
return 0
},
AppSpecificWeight: 1,
// This sets the IP colocation threshold to 1 peer per
IPColocationFactorThreshold: 1,
IPColocationFactorWeight: -100,
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
// IPColocationFactorWhitelist: map[string]struct{}{},
// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,
// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,
// topic parameters
Topics: map[string]*pubsub.TopicScoreParams{
drandTopic: {
// expected 2 beaconsn/min // expected 2 beaconsn/min
TopicWeight: 0.5, // 5x block topic; max cap is 62.5 TopicWeight: 0.5, // 5x block topic; max cap is 62.5
@ -152,7 +102,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
// invalid messages decay after 1 hour // invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000, InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}, }
topicParams := map[string]*pubsub.TopicScoreParams{
build.BlocksTopic(in.Nn): { build.BlocksTopic(in.Nn): {
// expected 10 blocks/min // expected 10 blocks/min
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100 TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100
@ -228,7 +180,68 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
InvalidMessageDeliveriesWeight: -1000, InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}, },
}
pgTopicWeights := map[string]float64{
build.BlocksTopic(in.Nn): 10,
build.MessagesTopic(in.Nn): 1,
}
for _, d := range in.Dr {
topic, err := getDrandTopic(d.Config.ChainInfoJSON)
if err != nil {
return nil, err
}
topicParams[topic] = drandTopicParams
pgTopicWeights[topic] = 5
}
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(true),
pubsub.WithMessageIdFn(HashMsgId),
pubsub.WithPeerScore(
&pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
// return a heavy positive score for bootstrappers so that we don't unilaterally prune
// them and accept PX from them.
// we don't do that in the bootstrappers themselves to avoid creating a closed mesh
// between them (however we might want to consider doing just that)
_, ok := bootstrappers[p]
if ok && !isBootstrapNode {
return 2500
}
_, ok = drandBootstrappers[p]
if ok && !isBootstrapNode {
return 1500
}
// TODO: we want to plug the application specific score to the node itself in order
// to provide feedback to the pubsub system based on observed behaviour
return 0
}, },
AppSpecificWeight: 1,
// This sets the IP colocation threshold to 1 peer per
IPColocationFactorThreshold: 1,
IPColocationFactorWeight: -100,
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
// IPColocationFactorWhitelist: map[string]struct{}{},
// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,
// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,
// topic parameters
Topics: topicParams,
}, },
&pubsub.PeerScoreThresholds{ &pubsub.PeerScoreThresholds{
GossipThreshold: -500, GossipThreshold: -500,
@ -278,11 +291,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
} }
// validation queue RED // validation queue RED
pgTopicWeights := map[string]float64{
drandTopic: 5,
build.BlocksTopic(in.Nn): 10,
build.MessagesTopic(in.Nn): 1,
}
var pgParams *pubsub.PeerGaterParams var pgParams *pubsub.PeerGaterParams
if isBootstrapNode { if isBootstrapNode {