Merge pull request #1904 from filecoin-project/feat/peer-scores

Add `lotus net scores` for pubsub score visibility
This commit is contained in:
Jakub Sztandera 2020-06-03 18:04:16 +02:00 committed by GitHub
commit 4c5d84d106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 85 additions and 2 deletions

View File

@ -25,6 +25,7 @@ type Common interface {
NetAddrsListen(context.Context) (peer.AddrInfo, error)
NetDisconnect(context.Context, peer.ID) error
NetFindPeer(context.Context, peer.ID) (peer.AddrInfo, error)
NetPubsubScores(context.Context) ([]PubsubScore, error)
// ID returns peerID of libp2p node backing this API
ID(context.Context) (peer.ID, error)

View File

@ -42,6 +42,7 @@ type CommonStruct struct {
NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"`
NetDisconnect func(context.Context, peer.ID) error `perm:"write"`
NetFindPeer func(context.Context, peer.ID) (peer.AddrInfo, error) `perm:"read"`
NetPubsubScores func(context.Context) ([]api.PubsubScore, error) `perm:"read"`
ID func(context.Context) (peer.ID, error) `perm:"read"`
Version func(context.Context) (api.Version, error) `perm:"read"`
@ -256,6 +257,9 @@ func (c *CommonStruct) AuthNew(ctx context.Context, perms []auth.Permission) ([]
return c.Internal.AuthNew(ctx, perms)
}
func (c *CommonStruct) NetPubsubScores(ctx context.Context) ([]api.PubsubScore, error) {
return c.Internal.NetPubsubScores(ctx)
}
func (c *CommonStruct) NetConnectedness(ctx context.Context, pid peer.ID) (network.Connectedness, error) {
return c.Internal.NetConnectedness(ctx, pid)
}

View File

@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -33,3 +34,8 @@ type ObjStat struct {
Size uint64
Links uint64
}
type PubsubScore struct {
ID peer.ID
Score float64
}

View File

@ -21,6 +21,7 @@ var netCmd = &cli.Command{
netListen,
netId,
netFindPeer,
netScores,
},
}
@ -44,7 +45,30 @@ var netPeers = &cli.Command{
})
for _, peer := range peers {
fmt.Println(peer)
fmt.Printf("%s, %s\n", peer.ID, peer.Addrs)
}
return nil
},
}
var netScores = &cli.Command{
Name: "scores",
Usage: "Print peers' pubsub scores",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
scores, err := api.NetPubsubScores(ctx)
if err != nil {
return err
}
for _, peer := range scores {
fmt.Printf("%s, %f\n", peer.ID, peer.Score)
}
return nil

View File

@ -180,6 +180,7 @@ func libp2p() Option {
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)),
Override(AutoNATSvcKey, lp2p.AutoNATService),
Override(new(*dtypes.ScoreKeeper), lp2p.ScoreKeeper),
Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), func(bs dtypes.Bootstrapper) *config.Pubsub {
return &config.Pubsub{

View File

@ -2,6 +2,8 @@ package common
import (
"context"
"sort"
"strings"
logging "github.com/ipfs/go-log/v2"
@ -28,6 +30,7 @@ type CommonAPI struct {
APISecret *dtypes.APIAlg
Host host.Host
Router lp2p.BaseIpfsRouting
Sk *dtypes.ScoreKeeper
}
type jwtPayload struct {
@ -54,6 +57,21 @@ func (a *CommonAPI) AuthNew(ctx context.Context, perms []auth.Permission) ([]byt
func (a *CommonAPI) NetConnectedness(ctx context.Context, pid peer.ID) (network.Connectedness, error) {
return a.Host.Network().Connectedness(pid), nil
}
func (a *CommonAPI) NetPubsubScores(context.Context) ([]api.PubsubScore, error) {
scores := a.Sk.Get()
out := make([]api.PubsubScore, len(scores))
i := 0
for k, v := range scores {
out[i] = api.PubsubScore{ID: k, Score: v}
i++
}
sort.Slice(out, func(i, j int) bool {
return strings.Compare(string(out[i].ID), string(out[j].ID)) > 0
})
return out, nil
}
func (a *CommonAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) {
conns := a.Host.Network().Conns()

View File

@ -0,0 +1,24 @@
package dtypes
import (
"sync"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type ScoreKeeper struct {
lk sync.Mutex
scores map[peer.ID]float64
}
func (sk *ScoreKeeper) Update(scores map[peer.ID]float64) {
sk.lk.Lock()
sk.scores = scores
sk.lk.Unlock()
}
func (sk *ScoreKeeper) Get() map[peer.ID]float64 {
sk.lk.Lock()
defer sk.lk.Unlock()
return sk.scores
}

View File

@ -28,7 +28,11 @@ func init() {
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
}
func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub) (service *pubsub.PubSub, err error) {
func ScoreKeeper() *dtypes.ScoreKeeper {
return new(dtypes.ScoreKeeper)
}
func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub, sk *dtypes.ScoreKeeper) (service *pubsub.PubSub, err error) {
bootstrappers := make(map[peer.ID]struct{})
for _, pi := range bp {
bootstrappers[pi.ID] = struct{}{}
@ -161,6 +165,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp
OpportunisticGraftThreshold: 5,
},
),
pubsub.WithPeerScoreInspect(sk.Update, 10*time.Second),
}
// enable Peer eXchange on bootstrappers