Integrate multiple drand networks with pubsub
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
878da78ed8
commit
c5323e4e03
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/gbrlsnchs/jwt/v3"
|
"github.com/gbrlsnchs/jwt/v3"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
record "github.com/libp2p/go-libp2p-record"
|
record "github.com/libp2p/go-libp2p-record"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -93,14 +94,18 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
|
|||||||
return build.BuiltinBootstrap()
|
return build.BuiltinBootstrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
func DrandBootstrap(d dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
|
func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
|
||||||
// TODO: retry resolving, don't fail if at least one resolve succeeds
|
// TODO: retry resolving, don't fail if at least one resolve succeeds
|
||||||
addrs, err := addrutil.ParseAddresses(context.TODO(), d[0].Config.Relays)
|
res := []peer.AddrInfo{}
|
||||||
|
for _, d := range ds {
|
||||||
|
addrs, err := addrutil.ParseAddresses(context.TODO(), d.Config.Relays)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("reoslving drand relays addresses: %+v", err)
|
log.Errorf("reoslving drand relays addresses: %+v", err)
|
||||||
return nil, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
return addrs, nil
|
res = append(res, addrs...)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetupJournal(lr repo.LockedRepo) error {
|
func SetupJournal(lr repo.LockedRepo) error {
|
||||||
|
@ -74,58 +74,8 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
isBootstrapNode := in.Cfg.Bootstrapper
|
isBootstrapNode := in.Cfg.Bootstrapper
|
||||||
drandTopic, err := getDrandTopic(in.Dr[0].Config.ChainInfoJSON)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
options := []pubsub.Option{
|
drandTopicParams := &pubsub.TopicScoreParams{
|
||||||
// Gossipsubv1.1 configuration
|
|
||||||
pubsub.WithFloodPublish(true),
|
|
||||||
pubsub.WithMessageIdFn(HashMsgId),
|
|
||||||
pubsub.WithPeerScore(
|
|
||||||
&pubsub.PeerScoreParams{
|
|
||||||
AppSpecificScore: func(p peer.ID) float64 {
|
|
||||||
// return a heavy positive score for bootstrappers so that we don't unilaterally prune
|
|
||||||
// them and accept PX from them.
|
|
||||||
// we don't do that in the bootstrappers themselves to avoid creating a closed mesh
|
|
||||||
// between them (however we might want to consider doing just that)
|
|
||||||
_, ok := bootstrappers[p]
|
|
||||||
if ok && !isBootstrapNode {
|
|
||||||
return 2500
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ok = drandBootstrappers[p]
|
|
||||||
if ok && !isBootstrapNode {
|
|
||||||
return 1500
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: we want to plug the application specific score to the node itself in order
|
|
||||||
// to provide feedback to the pubsub system based on observed behaviour
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
AppSpecificWeight: 1,
|
|
||||||
|
|
||||||
// This sets the IP colocation threshold to 1 peer per
|
|
||||||
IPColocationFactorThreshold: 1,
|
|
||||||
IPColocationFactorWeight: -100,
|
|
||||||
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
|
|
||||||
// IPColocationFactorWhitelist: map[string]struct{}{},
|
|
||||||
|
|
||||||
// P7: behavioural penalties, decay after 1hr
|
|
||||||
BehaviourPenaltyThreshold: 6,
|
|
||||||
BehaviourPenaltyWeight: -10,
|
|
||||||
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
|
|
||||||
|
|
||||||
DecayInterval: pubsub.DefaultDecayInterval,
|
|
||||||
DecayToZero: pubsub.DefaultDecayToZero,
|
|
||||||
|
|
||||||
// this retains non-positive scores for 6 hours
|
|
||||||
RetainScore: 6 * time.Hour,
|
|
||||||
|
|
||||||
// topic parameters
|
|
||||||
Topics: map[string]*pubsub.TopicScoreParams{
|
|
||||||
drandTopic: {
|
|
||||||
// expected 2 beaconsn/min
|
// expected 2 beaconsn/min
|
||||||
TopicWeight: 0.5, // 5x block topic; max cap is 62.5
|
TopicWeight: 0.5, // 5x block topic; max cap is 62.5
|
||||||
|
|
||||||
@ -152,7 +102,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
// invalid messages decay after 1 hour
|
// invalid messages decay after 1 hour
|
||||||
InvalidMessageDeliveriesWeight: -1000,
|
InvalidMessageDeliveriesWeight: -1000,
|
||||||
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
||||||
},
|
}
|
||||||
|
|
||||||
|
topicParams := map[string]*pubsub.TopicScoreParams{
|
||||||
build.BlocksTopic(in.Nn): {
|
build.BlocksTopic(in.Nn): {
|
||||||
// expected 10 blocks/min
|
// expected 10 blocks/min
|
||||||
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100
|
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100
|
||||||
@ -228,7 +180,68 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
InvalidMessageDeliveriesWeight: -1000,
|
InvalidMessageDeliveriesWeight: -1000,
|
||||||
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
||||||
},
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pgTopicWeights := map[string]float64{
|
||||||
|
build.BlocksTopic(in.Nn): 10,
|
||||||
|
build.MessagesTopic(in.Nn): 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, d := range in.Dr {
|
||||||
|
topic, err := getDrandTopic(d.Config.ChainInfoJSON)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
topicParams[topic] = drandTopicParams
|
||||||
|
pgTopicWeights[topic] = 5
|
||||||
|
}
|
||||||
|
|
||||||
|
options := []pubsub.Option{
|
||||||
|
// Gossipsubv1.1 configuration
|
||||||
|
pubsub.WithFloodPublish(true),
|
||||||
|
pubsub.WithMessageIdFn(HashMsgId),
|
||||||
|
pubsub.WithPeerScore(
|
||||||
|
&pubsub.PeerScoreParams{
|
||||||
|
AppSpecificScore: func(p peer.ID) float64 {
|
||||||
|
// return a heavy positive score for bootstrappers so that we don't unilaterally prune
|
||||||
|
// them and accept PX from them.
|
||||||
|
// we don't do that in the bootstrappers themselves to avoid creating a closed mesh
|
||||||
|
// between them (however we might want to consider doing just that)
|
||||||
|
_, ok := bootstrappers[p]
|
||||||
|
if ok && !isBootstrapNode {
|
||||||
|
return 2500
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok = drandBootstrappers[p]
|
||||||
|
if ok && !isBootstrapNode {
|
||||||
|
return 1500
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: we want to plug the application specific score to the node itself in order
|
||||||
|
// to provide feedback to the pubsub system based on observed behaviour
|
||||||
|
return 0
|
||||||
},
|
},
|
||||||
|
AppSpecificWeight: 1,
|
||||||
|
|
||||||
|
// This sets the IP colocation threshold to 1 peer per
|
||||||
|
IPColocationFactorThreshold: 1,
|
||||||
|
IPColocationFactorWeight: -100,
|
||||||
|
// TODO we want to whitelist IPv6 /64s that belong to datacenters etc
|
||||||
|
// IPColocationFactorWhitelist: map[string]struct{}{},
|
||||||
|
|
||||||
|
// P7: behavioural penalties, decay after 1hr
|
||||||
|
BehaviourPenaltyThreshold: 6,
|
||||||
|
BehaviourPenaltyWeight: -10,
|
||||||
|
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
|
||||||
|
|
||||||
|
DecayInterval: pubsub.DefaultDecayInterval,
|
||||||
|
DecayToZero: pubsub.DefaultDecayToZero,
|
||||||
|
|
||||||
|
// this retains non-positive scores for 6 hours
|
||||||
|
RetainScore: 6 * time.Hour,
|
||||||
|
|
||||||
|
// topic parameters
|
||||||
|
Topics: topicParams,
|
||||||
},
|
},
|
||||||
&pubsub.PeerScoreThresholds{
|
&pubsub.PeerScoreThresholds{
|
||||||
GossipThreshold: -500,
|
GossipThreshold: -500,
|
||||||
@ -278,11 +291,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// validation queue RED
|
// validation queue RED
|
||||||
pgTopicWeights := map[string]float64{
|
|
||||||
drandTopic: 5,
|
|
||||||
build.BlocksTopic(in.Nn): 10,
|
|
||||||
build.MessagesTopic(in.Nn): 1,
|
|
||||||
}
|
|
||||||
var pgParams *pubsub.PeerGaterParams
|
var pgParams *pubsub.PeerGaterParams
|
||||||
|
|
||||||
if isBootstrapNode {
|
if isBootstrapNode {
|
||||||
|
Loading…
Reference in New Issue
Block a user