diff --git a/api/api_common.go b/api/api_common.go index a49397247..7f0514fdf 100644 --- a/api/api_common.go +++ b/api/api_common.go @@ -25,6 +25,7 @@ type Common interface { NetAddrsListen(context.Context) (peer.AddrInfo, error) NetDisconnect(context.Context, peer.ID) error NetFindPeer(context.Context, peer.ID) (peer.AddrInfo, error) + NetPubsubScores(context.Context) ([]PubsubScore, error) // ID returns peerID of libp2p node backing this API ID(context.Context) (peer.ID, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 1f2191ae8..8532cad11 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -42,6 +42,7 @@ type CommonStruct struct { NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"` NetDisconnect func(context.Context, peer.ID) error `perm:"write"` NetFindPeer func(context.Context, peer.ID) (peer.AddrInfo, error) `perm:"read"` + NetPubsubScores func(context.Context) ([]api.PubsubScore, error) `perm:"read"` ID func(context.Context) (peer.ID, error) `perm:"read"` Version func(context.Context) (api.Version, error) `perm:"read"` @@ -256,6 +257,9 @@ func (c *CommonStruct) AuthNew(ctx context.Context, perms []auth.Permission) ([] return c.Internal.AuthNew(ctx, perms) } +func (c *CommonStruct) NetPubsubScores(ctx context.Context) ([]api.PubsubScore, error) { + return c.Internal.NetPubsubScores(ctx) +} func (c *CommonStruct) NetConnectedness(ctx context.Context, pid peer.ID) (network.Connectedness, error) { return c.Internal.NetConnectedness(ctx, pid) } diff --git a/api/types.go b/api/types.go index 26d0695e1..fcc013441 100644 --- a/api/types.go +++ b/api/types.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" + "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" ) @@ -33,3 +34,8 @@ type ObjStat struct { Size uint64 Links uint64 } + +type PubsubScore struct { + ID peer.ID + Score float64 +} diff --git a/cli/net.go b/cli/net.go index b49d9604f..366162c3f 100644 --- a/cli/net.go +++ b/cli/net.go @@ -21,6 +21,7 @@ var netCmd = &cli.Command{ netListen, netId, netFindPeer, + netScores, }, } @@ -44,7 +45,30 @@ var netPeers = &cli.Command{ }) for _, peer := range peers { - fmt.Println(peer) + fmt.Printf("%s, %s\n", peer.ID, peer.Addrs) + } + + return nil + }, +} + +var netScores = &cli.Command{ + Name: "scores", + Usage: "Print peers' pubsub scores", + Action: func(cctx *cli.Context) error { + api, closer, err := GetAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + scores, err := api.NetPubsubScores(ctx) + if err != nil { + return err + } + + for _, peer := range scores { + fmt.Printf("%s, %f\n", peer.ID, peer.Score) } return nil diff --git a/node/builder.go b/node/builder.go index 3b8e0d62f..292cf39e6 100644 --- a/node/builder.go +++ b/node/builder.go @@ -180,6 +180,7 @@ func libp2p() Option { Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)), Override(AutoNATSvcKey, lp2p.AutoNATService), + Override(new(*dtypes.ScoreKeeper), lp2p.ScoreKeeper), Override(new(*pubsub.PubSub), lp2p.GossipSub), Override(new(*config.Pubsub), func(bs dtypes.Bootstrapper) *config.Pubsub { return &config.Pubsub{ diff --git a/node/impl/common/common.go b/node/impl/common/common.go index 505466a8e..6de137d9f 100644 --- a/node/impl/common/common.go +++ b/node/impl/common/common.go @@ -2,6 +2,8 @@ package common import ( "context" + "sort" + "strings" logging "github.com/ipfs/go-log/v2" @@ -28,6 +30,7 @@ type CommonAPI struct { APISecret *dtypes.APIAlg Host host.Host Router lp2p.BaseIpfsRouting + Sk *dtypes.ScoreKeeper } type jwtPayload struct { @@ -54,6 +57,21 @@ func (a *CommonAPI) AuthNew(ctx context.Context, perms []auth.Permission) ([]byt func (a *CommonAPI) NetConnectedness(ctx context.Context, pid peer.ID) (network.Connectedness, error) { return a.Host.Network().Connectedness(pid), nil } +func (a *CommonAPI) NetPubsubScores(context.Context) ([]api.PubsubScore, error) { + scores := a.Sk.Get() + out := make([]api.PubsubScore, len(scores)) + i := 0 + for k, v := range scores { + out[i] = api.PubsubScore{k, v} + i++ + } + + sort.Slice(out, func(i, j int) bool { + return strings.Compare(string(out[i].ID), string(out[j].ID)) > 0 + }) + + return out, nil +} func (a *CommonAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) { conns := a.Host.Network().Conns() diff --git a/node/modules/dtypes/scorekeeper.go b/node/modules/dtypes/scorekeeper.go new file mode 100644 index 000000000..221821bf8 --- /dev/null +++ b/node/modules/dtypes/scorekeeper.go @@ -0,0 +1,24 @@ +package dtypes + +import ( + "sync" + + peer "github.com/libp2p/go-libp2p-peer" +) + +type ScoreKeeper struct { + lk sync.Mutex + scores map[peer.ID]float64 +} + +func (sk *ScoreKeeper) Update(scores map[peer.ID]float64) { + sk.lk.Lock() + sk.scores = scores + sk.lk.Unlock() +} + +func (sk *ScoreKeeper) Get() map[peer.ID]float64 { + sk.lk.Lock() + defer sk.lk.Unlock() + return sk.scores +} diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 35285f5c1..739bbd115 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -28,7 +28,11 @@ func init() { pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second } -func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub) (service *pubsub.PubSub, err error) { +func ScoreKeeper() *dtypes.ScoreKeeper { + return new(dtypes.ScoreKeeper) +} + +func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtypes.NetworkName, bp dtypes.BootstrapPeers, cfg *config.Pubsub, sk *dtypes.ScoreKeeper) (service *pubsub.PubSub, err error) { bootstrappers := make(map[peer.ID]struct{}) for _, pi := range bp { bootstrappers[pi.ID] = struct{}{} @@ -161,6 +165,7 @@ func GossipSub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, nn dtyp OpportunisticGraftThreshold: 5, }, ), + pubsub.WithPeerScoreInspect(sk.Update, 10*time.Second), } // enable Peer eXchange on bootstrappers