Merge pull request #397 from filecoin-project/feat/peermgr
Initial pass at a peer manager
This commit is contained in:
commit
3f991c0600
@ -211,6 +211,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
|
||||
node.Online(),
|
||||
node.Repo(sourceRepo),
|
||||
node.MockHost(tu.mn),
|
||||
node.Test(),
|
||||
|
||||
node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)),
|
||||
)
|
||||
@ -242,6 +243,7 @@ func (tu *syncTestUtil) addClientNode() int {
|
||||
node.Online(),
|
||||
node.Repo(repo.NewMemory(nil)),
|
||||
node.MockHost(tu.mn),
|
||||
node.Test(),
|
||||
|
||||
node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)),
|
||||
)
|
||||
|
@ -5,17 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/aerrors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
block "github.com/ipfs/go-block-format"
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
@ -24,7 +13,17 @@ import (
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
logging "github.com/ipfs/go-log"
|
||||
dag "github.com/ipfs/go-merkledag"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/aerrors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||
)
|
||||
|
||||
var log = logging.Logger("vm")
|
||||
|
@ -4,6 +4,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
@ -95,8 +96,9 @@ var DaemonCmd = &cli.Command{
|
||||
return lr.SetAPIEndpoint(apima)
|
||||
}),
|
||||
|
||||
node.ApplyIf(func(s *node.Settings) bool { return cctx.Bool("bootstrap") },
|
||||
node.Override(node.BootstrapKey, modules.Bootstrap),
|
||||
node.ApplyIf(func(s *node.Settings) bool { return !cctx.Bool("bootstrap") },
|
||||
node.Unset(node.RunPeerMgrKey),
|
||||
node.Unset(new(*peermgr.PeerMgr)),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
func main() {
|
||||
logging.SetLogLevel("*", "INFO")
|
||||
logging.SetLogLevel("dht", "ERROR")
|
||||
local := []*cli.Command{
|
||||
DaemonCmd,
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
@ -37,6 +36,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/paych"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
"github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
@ -78,6 +78,7 @@ const (
|
||||
|
||||
RunHelloKey
|
||||
RunBlockSyncKey
|
||||
RunPeerMgrKey
|
||||
|
||||
HandleIncomingBlocksKey
|
||||
HandleIncomingMessagesKey
|
||||
@ -122,26 +123,6 @@ type Settings struct {
|
||||
Config bool // Config option applied
|
||||
}
|
||||
|
||||
// Override option changes constructor for a given type
|
||||
func Override(typ, constructor interface{}) Option {
|
||||
return func(s *Settings) error {
|
||||
if i, ok := typ.(invoke); ok {
|
||||
s.invokes[i] = fx.Invoke(constructor)
|
||||
return nil
|
||||
}
|
||||
|
||||
if c, ok := typ.(special); ok {
|
||||
s.modules[c] = fx.Provide(constructor)
|
||||
return nil
|
||||
}
|
||||
ctor := as(constructor, typ)
|
||||
rt := reflect.TypeOf(typ).Elem()
|
||||
|
||||
s.modules[rt] = fx.Provide(ctor)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var defConf = config.Default()
|
||||
|
||||
func defaults() []Option {
|
||||
@ -227,8 +208,11 @@ func Online() Option {
|
||||
|
||||
Override(new(*hello.Service), hello.NewHelloService),
|
||||
Override(new(*chain.BlockSyncService), chain.NewBlockSyncService),
|
||||
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
||||
|
||||
Override(RunHelloKey, modules.RunHello),
|
||||
Override(RunBlockSyncKey, modules.RunBlockSync),
|
||||
Override(RunPeerMgrKey, modules.RunPeerMgr),
|
||||
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
|
||||
Override(HeadMetricsKey, metrics.SendHeadNotifs("")),
|
||||
|
||||
@ -394,15 +378,9 @@ func New(ctx context.Context, opts ...Option) (StopFunc, error) {
|
||||
|
||||
// In-memory / testing
|
||||
|
||||
func randomIdentity() Option {
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
|
||||
if err != nil {
|
||||
return Error(err)
|
||||
}
|
||||
|
||||
func Test() Option {
|
||||
return Options(
|
||||
Override(new(ci.PrivKey), sk),
|
||||
Override(new(ci.PubKey), pk),
|
||||
Override(new(peer.ID), peer.IDFromPublicKey),
|
||||
Unset(RunPeerMgrKey),
|
||||
Unset(new(*peermgr.PeerMgr)),
|
||||
)
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ package hello
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
@ -14,7 +14,9 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/cborrpc"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
)
|
||||
|
||||
const ProtocolID = "/fil/hello/1.0.0"
|
||||
@ -36,14 +38,26 @@ type Service struct {
|
||||
|
||||
cs *store.ChainStore
|
||||
syncer *chain.Syncer
|
||||
pmgr *peermgr.PeerMgr
|
||||
}
|
||||
|
||||
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer) *Service {
|
||||
type MaybePeerMgr struct {
|
||||
fx.In
|
||||
|
||||
Mgr *peermgr.PeerMgr `optional:"true"`
|
||||
}
|
||||
|
||||
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service {
|
||||
if pmgr.Mgr == nil {
|
||||
log.Warn("running without peer manager")
|
||||
}
|
||||
|
||||
return &Service{
|
||||
newStream: h.NewStream,
|
||||
|
||||
cs: cs,
|
||||
syncer: syncer,
|
||||
pmgr: pmgr.Mgr,
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,6 +88,9 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
||||
|
||||
log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
|
||||
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
|
||||
if hs.pmgr != nil {
|
||||
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
|
||||
}
|
||||
}
|
||||
|
||||
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
||||
@ -88,6 +105,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gen, err := hs.cs.GetGenesis()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -3,24 +3,19 @@ package modules
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/addrutil"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/gbrlsnchs/jwt/v3"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
record "github.com/libp2p/go-libp2p-record"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
var log = logging.Logger("modules")
|
||||
@ -86,40 +81,3 @@ func ConfigBootstrap(peers []string) func() (dtypes.BootstrapPeers, error) {
|
||||
func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
|
||||
return build.BuiltinBootstrap()
|
||||
}
|
||||
|
||||
func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, pinfos dtypes.BootstrapPeers) {
|
||||
ctx, cancel := context.WithCancel(mctx)
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
go func() {
|
||||
for {
|
||||
sctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
<-sctx.Done()
|
||||
cancel()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(host.Network().Conns()) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Warn("No peers connected, performing automatic bootstrap")
|
||||
|
||||
for _, pi := range pinfos {
|
||||
if err := host.Connect(ctx, pi); err != nil {
|
||||
log.Warn("bootstrap connect failed: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/sub"
|
||||
"github.com/filecoin-project/lotus/node/hello"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
)
|
||||
@ -33,6 +34,10 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.
|
||||
h.Network().Notify(&bundle)
|
||||
}
|
||||
|
||||
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
|
||||
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
||||
}
|
||||
|
||||
func RunBlockSync(h host.Host, svc *chain.BlockSyncService) {
|
||||
h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream)
|
||||
}
|
||||
|
@ -91,6 +91,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a
|
||||
node.StorageMiner(&minerapi),
|
||||
node.Online(),
|
||||
node.Repo(r),
|
||||
node.Test(),
|
||||
|
||||
node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(secbpath)),
|
||||
node.Override(new(api.FullNode), tnd),
|
||||
@ -133,6 +134,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te
|
||||
node.Online(),
|
||||
node.Repo(repo.NewMemory(nil)),
|
||||
node.MockHost(mn),
|
||||
node.Test(),
|
||||
|
||||
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock)),
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"go.uber.org/fx"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
@ -38,6 +39,44 @@ func ApplyIf(check func(s *Settings) bool, opts ...Option) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Override option changes constructor for a given type
|
||||
func Override(typ, constructor interface{}) Option {
|
||||
return func(s *Settings) error {
|
||||
if i, ok := typ.(invoke); ok {
|
||||
s.invokes[i] = fx.Invoke(constructor)
|
||||
return nil
|
||||
}
|
||||
|
||||
if c, ok := typ.(special); ok {
|
||||
s.modules[c] = fx.Provide(constructor)
|
||||
return nil
|
||||
}
|
||||
ctor := as(constructor, typ)
|
||||
rt := reflect.TypeOf(typ).Elem()
|
||||
|
||||
s.modules[rt] = fx.Provide(ctor)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func Unset(typ interface{}) Option {
|
||||
return func(s *Settings) error {
|
||||
if i, ok := typ.(invoke); ok {
|
||||
s.invokes[i] = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
if c, ok := typ.(special); ok {
|
||||
delete(s.modules, c)
|
||||
return nil
|
||||
}
|
||||
rt := reflect.TypeOf(typ).Elem()
|
||||
|
||||
delete(s.modules, rt)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// from go-ipfs
|
||||
// as casts input constructor to a given interface (if a value is given, it
|
||||
// wraps it into a constructor).
|
||||
|
140
peermgr/peermgr.go
Normal file
140
peermgr/peermgr.go
Normal file
@ -0,0 +1,140 @@
|
||||
package peermgr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
net "github.com/libp2p/go-libp2p-core/network"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
)
|
||||
|
||||
var log = logging.Logger("peermgr")
|
||||
|
||||
const (
|
||||
MaxFilPeers = 32
|
||||
MinFilPeers = 8
|
||||
)
|
||||
|
||||
type PeerMgr struct {
|
||||
bootstrappers []peer.AddrInfo
|
||||
|
||||
// peerLeads is a set of peers we hear about through the network
|
||||
// and who may be good peers to connect to for expanding our peer set
|
||||
peerLeads map[peer.ID]time.Time
|
||||
|
||||
peersLk sync.Mutex
|
||||
peers map[peer.ID]struct{}
|
||||
|
||||
maxFilPeers int
|
||||
minFilPeers int
|
||||
|
||||
expanding bool
|
||||
|
||||
h host.Host
|
||||
dht *dht.IpfsDHT
|
||||
|
||||
notifee *net.NotifyBundle
|
||||
}
|
||||
|
||||
func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr {
|
||||
pm := &PeerMgr{
|
||||
h: h,
|
||||
dht: dht,
|
||||
bootstrappers: bootstrap,
|
||||
|
||||
peers: make(map[peer.ID]struct{}),
|
||||
|
||||
maxFilPeers: MaxFilPeers,
|
||||
minFilPeers: MinFilPeers,
|
||||
}
|
||||
|
||||
pm.notifee = &net.NotifyBundle{
|
||||
DisconnectedF: func(_ net.Network, c net.Conn) {
|
||||
pm.Disconnect(c.RemotePeer())
|
||||
},
|
||||
}
|
||||
|
||||
h.Network().Notify(pm.notifee)
|
||||
|
||||
return pm
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
||||
pmgr.peersLk.Lock()
|
||||
defer pmgr.peersLk.Unlock()
|
||||
pmgr.peers[p] = struct{}{}
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
|
||||
if pmgr.h.Network().Connectedness(p) == net.NotConnected {
|
||||
pmgr.peersLk.Lock()
|
||||
defer pmgr.peersLk.Unlock()
|
||||
delete(pmgr.peers, p)
|
||||
}
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) Run(ctx context.Context) {
|
||||
tick := time.NewTicker(time.Second * 5)
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
pcount := pmgr.getPeerCount()
|
||||
if pcount < pmgr.minFilPeers {
|
||||
pmgr.expandPeers()
|
||||
} else if pcount > pmgr.maxFilPeers {
|
||||
log.Infof("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) getPeerCount() int {
|
||||
pmgr.peersLk.Lock()
|
||||
defer pmgr.peersLk.Unlock()
|
||||
return len(pmgr.peers)
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) expandPeers() {
|
||||
if pmgr.expanding {
|
||||
return
|
||||
}
|
||||
pmgr.expanding = true
|
||||
go func() {
|
||||
defer func() {
|
||||
pmgr.expanding = false
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
|
||||
defer cancel()
|
||||
|
||||
pmgr.doExpand(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) doExpand(ctx context.Context) {
|
||||
pcount := pmgr.getPeerCount()
|
||||
if pcount == 0 {
|
||||
if len(pmgr.bootstrappers) == 0 {
|
||||
log.Warn("no peers connected, and no bootstrappers configured")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("connecting to bootstrap peers")
|
||||
for _, bsp := range pmgr.bootstrappers {
|
||||
if err := pmgr.h.Connect(ctx, bsp); err != nil {
|
||||
log.Warnf("failed to connect to bootstrap peer: %s", err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along.
|
||||
if err := pmgr.dht.Bootstrap(ctx); err != nil {
|
||||
log.Warnf("dht bootstrapping failed: %s", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user