Merge pull request #4849 from filecoin-project/feat/conngater

Connection Gater support
This commit is contained in:
Łukasz Magiera 2020-11-20 16:25:02 +01:00 committed by GitHub
commit 94763c2aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 497 additions and 0 deletions

View File

@ -46,6 +46,11 @@ type Common interface {
// usage and current rate per protocol
NetBandwidthStatsByProtocol(ctx context.Context) (map[protocol.ID]metrics.Stats, error)
// ConnectionGater API
NetBlockAdd(ctx context.Context, acl NetBlockList) error
NetBlockRemove(ctx context.Context, acl NetBlockList) error
NetBlockList(ctx context.Context) (NetBlockList, error)
// MethodGroup: Common
// ID returns peerID of libp2p node backing this API

View File

@ -60,6 +60,9 @@ type CommonStruct struct {
NetBandwidthStatsByPeer func(ctx context.Context) (map[string]metrics.Stats, error) `perm:"read"`
NetBandwidthStatsByProtocol func(ctx context.Context) (map[protocol.ID]metrics.Stats, error) `perm:"read"`
NetAgentVersion func(ctx context.Context, p peer.ID) (string, error) `perm:"read"`
NetBlockAdd func(ctx context.Context, acl api.NetBlockList) error `perm:"admin"`
NetBlockRemove func(ctx context.Context, acl api.NetBlockList) error `perm:"admin"`
NetBlockList func(ctx context.Context) (api.NetBlockList, error) `perm:"read"`
ID func(context.Context) (peer.ID, error) `perm:"read"`
Version func(context.Context) (api.Version, error) `perm:"read"`
@ -495,6 +498,18 @@ func (c *CommonStruct) NetBandwidthStatsByProtocol(ctx context.Context) (map[pro
return c.Internal.NetBandwidthStatsByProtocol(ctx)
}
func (c *CommonStruct) NetBlockAdd(ctx context.Context, acl api.NetBlockList) error {
return c.Internal.NetBlockAdd(ctx, acl)
}
func (c *CommonStruct) NetBlockRemove(ctx context.Context, acl api.NetBlockList) error {
return c.Internal.NetBlockRemove(ctx, acl)
}
func (c *CommonStruct) NetBlockList(ctx context.Context) (api.NetBlockList, error) {
return c.Internal.NetBlockList(ctx)
}
func (c *CommonStruct) NetAgentVersion(ctx context.Context, p peer.ID) (string, error) {
return c.Internal.NetAgentVersion(ctx, p)
}

View File

@ -107,3 +107,9 @@ func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelSta
}
return channel
}
type NetBlockList struct {
Peers []peer.ID
IPAddrs []string
IPSubnets []string
}

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/go-address"
atypes "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/addrutil"
)
@ -34,6 +35,7 @@ var netCmd = &cli.Command{
netScores,
NetReachability,
NetBandwidthCmd,
NetBlockCmd,
},
}
@ -375,3 +377,202 @@ var NetBandwidthCmd = &cli.Command{
},
}
var NetBlockCmd = &cli.Command{
Name: "block",
Usage: "Manage network connection gating rules",
Subcommands: []*cli.Command{
NetBlockAddCmd,
NetBlockRemoveCmd,
NetBlockListCmd,
},
}
var NetBlockAddCmd = &cli.Command{
Name: "add",
Usage: "Add connection gating rules",
Subcommands: []*cli.Command{
NetBlockAddPeer,
NetBlockAddIP,
NetBlockAddSubnet,
},
}
var NetBlockAddPeer = &cli.Command{
Name: "peer",
Usage: "Block a peer",
ArgsUsage: "<Peer> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var peers []peer.ID
for _, s := range cctx.Args().Slice() {
p, err := peer.Decode(s)
if err != nil {
return err
}
peers = append(peers, p)
}
return api.NetBlockAdd(ctx, atypes.NetBlockList{Peers: peers})
},
}
var NetBlockAddIP = &cli.Command{
Name: "ip",
Usage: "Block an IP address",
ArgsUsage: "<IP> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
return api.NetBlockAdd(ctx, atypes.NetBlockList{IPAddrs: cctx.Args().Slice()})
},
}
var NetBlockAddSubnet = &cli.Command{
Name: "subnet",
Usage: "Block an IP subnet",
ArgsUsage: "<CIDR> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
return api.NetBlockAdd(ctx, atypes.NetBlockList{IPSubnets: cctx.Args().Slice()})
},
}
var NetBlockRemoveCmd = &cli.Command{
Name: "remove",
Usage: "Remove connection gating rules",
Subcommands: []*cli.Command{
NetBlockRemovePeer,
NetBlockRemoveIP,
NetBlockRemoveSubnet,
},
}
var NetBlockRemovePeer = &cli.Command{
Name: "peer",
Usage: "Unblock a peer",
ArgsUsage: "<Peer> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var peers []peer.ID
for _, s := range cctx.Args().Slice() {
p, err := peer.Decode(s)
if err != nil {
return err
}
peers = append(peers, p)
}
return api.NetBlockRemove(ctx, atypes.NetBlockList{Peers: peers})
},
}
var NetBlockRemoveIP = &cli.Command{
Name: "ip",
Usage: "Unblock an IP address",
ArgsUsage: "<IP> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
return api.NetBlockRemove(ctx, atypes.NetBlockList{IPAddrs: cctx.Args().Slice()})
},
}
var NetBlockRemoveSubnet = &cli.Command{
Name: "subnet",
Usage: "Unblock an IP subnet",
ArgsUsage: "<CIDR> ...",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
return api.NetBlockRemove(ctx, atypes.NetBlockList{IPSubnets: cctx.Args().Slice()})
},
}
var NetBlockListCmd = &cli.Command{
Name: "list",
Usage: "list connection gating rules",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
acl, err := api.NetBlockList(ctx)
if err != nil {
return err
}
if len(acl.Peers) != 0 {
sort.Slice(acl.Peers, func(i, j int) bool {
return strings.Compare(string(acl.Peers[i]), string(acl.Peers[j])) > 0
})
fmt.Println("Blocked Peers:")
for _, p := range acl.Peers {
fmt.Printf("\t%s\n", p)
}
}
if len(acl.IPAddrs) != 0 {
sort.Slice(acl.IPAddrs, func(i, j int) bool {
return strings.Compare(acl.IPAddrs[i], acl.IPAddrs[j]) < 0
})
fmt.Println("Blocked IPs:")
for _, a := range acl.IPAddrs {
fmt.Printf("\t%s\n", a)
}
}
if len(acl.IPSubnets) != 0 {
sort.Slice(acl.IPSubnets, func(i, j int) bool {
return strings.Compare(acl.IPSubnets[i], acl.IPSubnets[j]) < 0
})
fmt.Println("Blocked Subnets:")
for _, n := range acl.IPSubnets {
fmt.Printf("\t%s\n", n)
}
}
return nil
},
}

View File

@ -53,6 +53,9 @@
* [NetBandwidthStats](#NetBandwidthStats)
* [NetBandwidthStatsByPeer](#NetBandwidthStatsByPeer)
* [NetBandwidthStatsByProtocol](#NetBandwidthStatsByProtocol)
* [NetBlockAdd](#NetBlockAdd)
* [NetBlockList](#NetBlockList)
* [NetBlockRemove](#NetBlockRemove)
* [NetConnect](#NetConnect)
* [NetConnectedness](#NetConnectedness)
* [NetDisconnect](#NetDisconnect)
@ -798,6 +801,58 @@ Response:
}
```
### NetBlockAdd
Perms: admin
Inputs:
```json
[
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
]
```
Response: `{}`
### NetBlockList
Perms: read
Inputs: `null`
Response:
```json
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
```
### NetBlockRemove
Perms: admin
Inputs:
```json
[
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
]
```
Response: `{}`
### NetConnect

View File

@ -110,6 +110,9 @@
* [NetBandwidthStats](#NetBandwidthStats)
* [NetBandwidthStatsByPeer](#NetBandwidthStatsByPeer)
* [NetBandwidthStatsByProtocol](#NetBandwidthStatsByProtocol)
* [NetBlockAdd](#NetBlockAdd)
* [NetBlockList](#NetBlockList)
* [NetBlockRemove](#NetBlockRemove)
* [NetConnect](#NetConnect)
* [NetConnectedness](#NetConnectedness)
* [NetDisconnect](#NetDisconnect)
@ -2639,6 +2642,58 @@ Response:
}
```
### NetBlockAdd
Perms: admin
Inputs:
```json
[
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
]
```
Response: `{}`
### NetBlockList
Perms: read
Inputs: `null`
Response:
```json
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
```
### NetBlockRemove
Perms: admin
Inputs:
```json
[
{
"Peers": null,
"IPAddrs": null,
"IPSubnets": null
}
]
```
Response: `{}`
### NetConnect

View File

@ -26,6 +26,7 @@ import (
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -98,6 +99,7 @@ var (
ConnectionManagerKey = special{9} // Libp2p option
AutoNATSvcKey = special{10} // Libp2p option
BandwidthReporterKey = special{11} // Libp2p option
ConnGaterKey = special{12} // libp2p option
)
type invoke int
@ -220,6 +222,9 @@ func libp2p() Option {
Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys),
Override(StartListeningKey, lp2p.StartListening(config.DefaultFullNode().Libp2p.ListenAddresses)),
Override(new(*conngater.BasicConnectionGater), lp2p.ConnGater),
Override(ConnGaterKey, lp2p.ConnGaterOption),
)
}

View File

@ -15,6 +15,7 @@ import (
protocol "github.com/libp2p/go-libp2p-core/protocol"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -36,6 +37,7 @@ type CommonAPI struct {
RawHost lp2p.RawHost
Host host.Host
Router lp2p.BaseIpfsRouting
ConnGater *conngater.BasicConnectionGater
Reporter metrics.Reporter
Sk *dtypes.ScoreKeeper
ShutdownChan dtypes.ShutdownChan

View File

@ -0,0 +1,136 @@
package common
import (
"context"
"net"
"golang.org/x/xerrors"
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/filecoin-project/lotus/api"
)
var cLog = logging.Logger("conngater")
func (a *CommonAPI) NetBlockAdd(ctx context.Context, acl api.NetBlockList) error {
for _, p := range acl.Peers {
err := a.ConnGater.BlockPeer(p)
if err != nil {
return xerrors.Errorf("error blocking peer %s: %w", p, err)
}
for _, c := range a.Host.Network().ConnsToPeer(p) {
err = c.Close()
if err != nil {
// just log this, don't fail
cLog.Warnf("error closing connection to %s: %s", p, err)
}
}
}
for _, addr := range acl.IPAddrs {
ip := net.ParseIP(addr)
if ip == nil {
return xerrors.Errorf("error parsing IP address %s", addr)
}
err := a.ConnGater.BlockAddr(ip)
if err != nil {
return xerrors.Errorf("error blocking IP address %s: %w", addr, err)
}
for _, c := range a.Host.Network().Conns() {
remote := c.RemoteMultiaddr()
remoteIP, err := manet.ToIP(remote)
if err != nil {
continue
}
if ip.Equal(remoteIP) {
err = c.Close()
if err != nil {
// just log this, don't fail
cLog.Warnf("error closing connection to %s: %s", remoteIP, err)
}
}
}
}
for _, subnet := range acl.IPSubnets {
_, cidr, err := net.ParseCIDR(subnet)
if err != nil {
return xerrors.Errorf("error parsing subnet %s: %w", subnet, err)
}
err = a.ConnGater.BlockSubnet(cidr)
if err != nil {
return xerrors.Errorf("error blocking subunet %s: %w", subnet, err)
}
for _, c := range a.Host.Network().Conns() {
remote := c.RemoteMultiaddr()
remoteIP, err := manet.ToIP(remote)
if err != nil {
continue
}
if cidr.Contains(remoteIP) {
err = c.Close()
if err != nil {
// just log this, don't fail
cLog.Warnf("error closing connection to %s: %s", remoteIP, err)
}
}
}
}
return nil
}
func (a *CommonAPI) NetBlockRemove(ctx context.Context, acl api.NetBlockList) error {
for _, p := range acl.Peers {
err := a.ConnGater.UnblockPeer(p)
if err != nil {
return xerrors.Errorf("error unblocking peer %s: %w", p, err)
}
}
for _, addr := range acl.IPAddrs {
ip := net.ParseIP(addr)
if ip == nil {
return xerrors.Errorf("error parsing IP address %s", addr)
}
err := a.ConnGater.UnblockAddr(ip)
if err != nil {
return xerrors.Errorf("error unblocking IP address %s: %w", addr, err)
}
}
for _, subnet := range acl.IPSubnets {
_, cidr, err := net.ParseCIDR(subnet)
if err != nil {
return xerrors.Errorf("error parsing subnet %s: %w", subnet, err)
}
err = a.ConnGater.UnblockSubnet(cidr)
if err != nil {
return xerrors.Errorf("error unblocking subunet %s: %w", subnet, err)
}
}
return nil
}
func (a *CommonAPI) NetBlockList(ctx context.Context) (result api.NetBlockList, err error) {
result.Peers = a.ConnGater.ListBlockedPeers()
for _, ip := range a.ConnGater.ListBlockedAddrs() {
result.IPAddrs = append(result.IPAddrs, ip.String())
}
for _, subnet := range a.ConnGater.ListBlockedSubnets() {
result.IPSubnets = append(result.IPSubnets, subnet.String())
}
return
}

View File

@ -0,0 +1,17 @@
package lp2p
import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func ConnGater(ds dtypes.MetadataDS) (*conngater.BasicConnectionGater, error) {
return conngater.NewBasicConnectionGater(ds)
}
func ConnGaterOption(cg *conngater.BasicConnectionGater) (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, libp2p.ConnectionGater(cg))
return
}