From fc7c7ddd97a4053f7c81f6c780521d6b7c9ba4ea Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 17 Oct 2019 17:57:56 +0900 Subject: [PATCH 1/3] Initial pass at a peer manager --- chain/vm/vm.go | 21 +++--- node/builder.go | 5 ++ node/hello/hello.go | 9 ++- node/modules/services.go | 5 ++ peermgr/peermgr.go | 135 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 peermgr/peermgr.go diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 03faebc29..be9bf6eb0 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -5,17 +5,6 @@ import ( "fmt" "math/big" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/actors/aerrors" - "github.com/filecoin-project/lotus/chain/address" - "github.com/filecoin-project/lotus/chain/state" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/bufbstore" - - cbg "github.com/whyrusleeping/cbor-gen" - "go.opencensus.io/trace" - block "github.com/ipfs/go-block-format" bserv "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" @@ -24,7 +13,17 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" dag "github.com/ipfs/go-merkledag" + cbg "github.com/whyrusleeping/cbor-gen" + "go.opencensus.io/trace" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/aerrors" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/bufbstore" ) var log = logging.Logger("vm") diff --git a/node/builder.go b/node/builder.go index 00ab37328..df49c11b1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -37,6 +37,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paych" + "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" @@ -78,6 +79,7 @@ const ( RunHelloKey RunBlockSyncKey + RunPeerMgrKey HandleIncomingBlocksKey HandleIncomingMessagesKey @@ -227,8 +229,11 @@ func Online() Option { Override(new(*hello.Service), hello.NewHelloService), Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), + Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), + Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(HeadMetricsKey, metrics.SendHeadNotifs("")), diff --git a/node/hello/hello.go b/node/hello/hello.go index 3d63dbc1d..53e32af52 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -3,7 +3,6 @@ package hello import ( "context" "fmt" - "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -14,7 +13,9 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborrpc" + "github.com/filecoin-project/lotus/peermgr" ) const ProtocolID = "/fil/hello/1.0.0" @@ -36,14 +37,16 @@ type Service struct { cs *store.ChainStore syncer *chain.Syncer + pmgr *peermgr.PeerMgr } -func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer) *Service { +func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr *peermgr.PeerMgr) *Service { return &Service{ newStream: h.NewStream, cs: cs, syncer: syncer, + pmgr: pmgr, } } @@ -74,6 +77,7 @@ func (hs *Service) HandleStream(s inet.Stream) { log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer()) hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts) + hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { @@ -88,6 +92,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { if err != nil { return err } + gen, err := hs.cs.GetGenesis() if err != nil { return err diff --git a/node/modules/services.go b/node/modules/services.go index ef6795daa..3c38f4461 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage/sector" ) @@ -33,6 +34,10 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello. h.Network().Notify(&bundle) } +func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) { + go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) +} + func RunBlockSync(h host.Host, svc *chain.BlockSyncService) { h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream) } diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go new file mode 100644 index 000000000..8ddc3a40e --- /dev/null +++ b/peermgr/peermgr.go @@ -0,0 +1,135 @@ +package peermgr + +import ( + "context" + "sync" + "time" + + host "github.com/libp2p/go-libp2p-core/host" + net "github.com/libp2p/go-libp2p-core/network" + peer "github.com/libp2p/go-libp2p-core/peer" + dht "github.com/libp2p/go-libp2p-kad-dht" + + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("peermgr") + +const ( + MaxFilPeers = 32 + MinFilPeers = 8 +) + +type PeerMgr struct { + bootstrappers []peer.AddrInfo + + // peerLeads is a set of peers we hear about through the network + // and who may be good peers to connect to for expanding our peer set + peerLeads map[peer.ID]time.Time + + peersLk sync.Mutex + peers map[peer.ID]struct{} + + maxFilPeers int + minFilPeers int + + expanding bool + + h host.Host + dht *dht.IpfsDHT + + notifee *net.NotifyBundle +} + +func NewPeerMgr(h host.Host, dht *dht.IpfsDHT) *PeerMgr { + pm := &PeerMgr{ + peers: make(map[peer.ID]struct{}), + maxFilPeers: MaxFilPeers, + minFilPeers: MinFilPeers, + h: h, + } + + pm.notifee = &net.NotifyBundle{ + DisconnectedF: func(_ net.Network, c net.Conn) { + pm.Disconnect(c.RemotePeer()) + }, + } + + h.Network().Notify(pm.notifee) + + return pm +} + +func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + pmgr.peers[p] = struct{}{} +} + +func (pmgr *PeerMgr) Disconnect(p peer.ID) { + if pmgr.h.Network().Connectedness(p) == net.NotConnected { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + delete(pmgr.peers, p) + } +} + +func (pmgr *PeerMgr) Run(ctx context.Context) { + tick := time.NewTicker(time.Second * 5) + for { + select { + case <-tick.C: + pcount := pmgr.getPeerCount() + if pcount < pmgr.minFilPeers { + pmgr.expandPeers() + } else if pcount > pmgr.maxFilPeers { + log.Infof("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers) + } + } + } +} + +func (pmgr *PeerMgr) getPeerCount() int { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + return len(pmgr.peers) +} + +func (pmgr *PeerMgr) expandPeers() { + if pmgr.expanding { + return + } + pmgr.expanding = true + go func() { + defer func() { + pmgr.expanding = false + }() + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) + defer cancel() + + pmgr.doExpand(ctx) + }() +} + +func (pmgr *PeerMgr) doExpand(ctx context.Context) { + pcount := pmgr.getPeerCount() + if pcount == 0 { + if len(pmgr.bootstrappers) == 0 { + log.Warn("no peers connected, and no bootstrappers configured") + return + } + + log.Info("connecting to bootstrap peers") + for _, bsp := range pmgr.bootstrappers { + if err := pmgr.h.Connect(ctx, bsp); err != nil { + log.Warnf("failed to connect to bootstrap peer: %s", err) + } + } + return + } + + // if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along. + if err := pmgr.dht.Bootstrap(ctx); err != nil { + log.Warnf("dht bootstrapping failed: %s", err) + } +} From 1d1f468c98bfe2e713cd973aaeedaf2fc588a33a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Oct 2019 13:02:00 +0200 Subject: [PATCH 2/3] peer manager: Disable in tests --- chain/sync_test.go | 2 ++ node/builder.go | 33 +++------------------------------ node/hello/hello.go | 19 ++++++++++++++++--- node/node_test.go | 2 ++ node/options.go | 39 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 33 deletions(-) diff --git a/chain/sync_test.go b/chain/sync_test.go index 81bcbf387..24b8fbd10 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -211,6 +211,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) { node.Online(), node.Repo(sourceRepo), node.MockHost(tu.mn), + node.Test(), node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), ) @@ -242,6 +243,7 @@ func (tu *syncTestUtil) addClientNode() int { node.Online(), node.Repo(repo.NewMemory(nil)), node.MockHost(tu.mn), + node.Test(), node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)), ) diff --git a/node/builder.go b/node/builder.go index df49c11b1..f0bc09348 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,7 +3,6 @@ package node import ( "context" "errors" - "reflect" "time" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -124,26 +123,6 @@ type Settings struct { Config bool // Config option applied } -// Override option changes constructor for a given type -func Override(typ, constructor interface{}) Option { - return func(s *Settings) error { - if i, ok := typ.(invoke); ok { - s.invokes[i] = fx.Invoke(constructor) - return nil - } - - if c, ok := typ.(special); ok { - s.modules[c] = fx.Provide(constructor) - return nil - } - ctor := as(constructor, typ) - rt := reflect.TypeOf(typ).Elem() - - s.modules[rt] = fx.Provide(ctor) - return nil - } -} - var defConf = config.Default() func defaults() []Option { @@ -399,15 +378,9 @@ func New(ctx context.Context, opts ...Option) (StopFunc, error) { // In-memory / testing -func randomIdentity() Option { - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - return Error(err) - } - +func Test() Option { return Options( - Override(new(ci.PrivKey), sk), - Override(new(ci.PubKey), pk), - Override(new(peer.ID), peer.IDFromPublicKey), + Unset(RunPeerMgrKey), + Unset(new(*peermgr.PeerMgr)), ) } diff --git a/node/hello/hello.go b/node/hello/hello.go index 53e32af52..d512d1d27 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -3,6 +3,7 @@ package hello import ( "context" "fmt" + "go.uber.org/fx" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -40,13 +41,23 @@ type Service struct { pmgr *peermgr.PeerMgr } -func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr *peermgr.PeerMgr) *Service { +type MaybePeerMgr struct { + fx.In + + Mgr *peermgr.PeerMgr `optional:"true"` +} + +func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service { + if pmgr.Mgr == nil { + log.Warn("running without peer manager") + } + return &Service{ newStream: h.NewStream, cs: cs, syncer: syncer, - pmgr: pmgr, + pmgr: pmgr.Mgr, } } @@ -77,7 +88,9 @@ func (hs *Service) HandleStream(s inet.Stream) { log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer()) hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts) - hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) + if hs.pmgr != nil { + hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) + } } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { diff --git a/node/node_test.go b/node/node_test.go index e3ad41ea9..06f1fc1c6 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -91,6 +91,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a node.StorageMiner(&minerapi), node.Online(), node.Repo(r), + node.Test(), node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(secbpath)), node.Override(new(api.FullNode), tnd), @@ -133,6 +134,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te node.Online(), node.Repo(repo.NewMemory(nil)), node.MockHost(mn), + node.Test(), node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock)), diff --git a/node/options.go b/node/options.go index 4d2ed9ced..58576a950 100644 --- a/node/options.go +++ b/node/options.go @@ -1,6 +1,7 @@ package node import ( + "go.uber.org/fx" "reflect" ) @@ -38,6 +39,44 @@ func ApplyIf(check func(s *Settings) bool, opts ...Option) Option { } } +// Override option changes constructor for a given type +func Override(typ, constructor interface{}) Option { + return func(s *Settings) error { + if i, ok := typ.(invoke); ok { + s.invokes[i] = fx.Invoke(constructor) + return nil + } + + if c, ok := typ.(special); ok { + s.modules[c] = fx.Provide(constructor) + return nil + } + ctor := as(constructor, typ) + rt := reflect.TypeOf(typ).Elem() + + s.modules[rt] = fx.Provide(ctor) + return nil + } +} + +func Unset(typ interface{}) Option { + return func(s *Settings) error { + if i, ok := typ.(invoke); ok { + s.invokes[i] = nil + return nil + } + + if c, ok := typ.(special); ok { + delete(s.modules, c) + return nil + } + rt := reflect.TypeOf(typ).Elem() + + delete(s.modules, rt) + return nil + } +} + // from go-ipfs // as casts input constructor to a given interface (if a value is given, it // wraps it into a constructor). From 5175d8541efef7b1ecd548931eed2843e33e7e83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Oct 2019 13:11:18 +0200 Subject: [PATCH 3/3] peer manager: Handle bootstrap in peermgr --- cmd/lotus/daemon.go | 6 ++++-- cmd/lotus/main.go | 1 + node/modules/core.go | 50 ++++---------------------------------------- peermgr/peermgr.go | 11 +++++++--- 4 files changed, 17 insertions(+), 51 deletions(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 08441285b..c9b736c72 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -4,6 +4,7 @@ package main import ( "context" + "github.com/filecoin-project/lotus/peermgr" "io/ioutil" "github.com/multiformats/go-multiaddr" @@ -95,8 +96,9 @@ var DaemonCmd = &cli.Command{ return lr.SetAPIEndpoint(apima) }), - node.ApplyIf(func(s *node.Settings) bool { return cctx.Bool("bootstrap") }, - node.Override(node.BootstrapKey, modules.Bootstrap), + node.ApplyIf(func(s *node.Settings) bool { return !cctx.Bool("bootstrap") }, + node.Unset(node.RunPeerMgrKey), + node.Unset(new(*peermgr.PeerMgr)), ), ) if err != nil { diff --git a/cmd/lotus/main.go b/cmd/lotus/main.go index 7834ff423..e7cffe186 100644 --- a/cmd/lotus/main.go +++ b/cmd/lotus/main.go @@ -15,6 +15,7 @@ import ( func main() { logging.SetLogLevel("*", "INFO") + logging.SetLogLevel("dht", "ERROR") local := []*cli.Command{ DaemonCmd, } diff --git a/node/modules/core.go b/node/modules/core.go index bde386d67..afc1b682a 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -3,24 +3,19 @@ package modules import ( "context" "crypto/rand" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/addrutil" - "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" - "go.uber.org/fx" "golang.org/x/xerrors" "io" "io/ioutil" - "time" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" ) var log = logging.Logger("modules") @@ -86,40 +81,3 @@ func ConfigBootstrap(peers []string) func() (dtypes.BootstrapPeers, error) { func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { return build.BuiltinBootstrap() } - -func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, pinfos dtypes.BootstrapPeers) { - ctx, cancel := context.WithCancel(mctx) - - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - go func() { - for { - sctx, cancel := context.WithTimeout(ctx, 2*time.Second) - <-sctx.Done() - cancel() - - if ctx.Err() != nil { - return - } - - if len(host.Network().Conns()) > 0 { - continue - } - - log.Warn("No peers connected, performing automatic bootstrap") - - for _, pi := range pinfos { - if err := host.Connect(ctx, pi); err != nil { - log.Warn("bootstrap connect failed: ", err) - } - } - } - }() - return nil - }, - OnStop: func(_ context.Context) error { - cancel() - return nil - }, - }) -} diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go index 8ddc3a40e..d8a0bea77 100644 --- a/peermgr/peermgr.go +++ b/peermgr/peermgr.go @@ -2,6 +2,7 @@ package peermgr import ( "context" + "github.com/filecoin-project/lotus/node/modules/dtypes" "sync" "time" @@ -41,12 +42,16 @@ type PeerMgr struct { notifee *net.NotifyBundle } -func NewPeerMgr(h host.Host, dht *dht.IpfsDHT) *PeerMgr { +func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr { pm := &PeerMgr{ - peers: make(map[peer.ID]struct{}), + h: h, + dht: dht, + bootstrappers: bootstrap, + + peers: make(map[peer.ID]struct{}), + maxFilPeers: MaxFilPeers, minFilPeers: MinFilPeers, - h: h, } pm.notifee = &net.NotifyBundle{