From fc7c7ddd97a4053f7c81f6c780521d6b7c9ba4ea Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 17 Oct 2019 17:57:56 +0900 Subject: [PATCH] Initial pass at a peer manager --- chain/vm/vm.go | 21 +++--- node/builder.go | 5 ++ node/hello/hello.go | 9 ++- node/modules/services.go | 5 ++ peermgr/peermgr.go | 135 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 peermgr/peermgr.go diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 03faebc29..be9bf6eb0 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -5,17 +5,6 @@ import ( "fmt" "math/big" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/actors/aerrors" - "github.com/filecoin-project/lotus/chain/address" - "github.com/filecoin-project/lotus/chain/state" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/bufbstore" - - cbg "github.com/whyrusleeping/cbor-gen" - "go.opencensus.io/trace" - block "github.com/ipfs/go-block-format" bserv "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" @@ -24,7 +13,17 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" dag "github.com/ipfs/go-merkledag" + cbg "github.com/whyrusleeping/cbor-gen" + "go.opencensus.io/trace" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/aerrors" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/bufbstore" ) var log = logging.Logger("vm") diff --git a/node/builder.go b/node/builder.go index 00ab37328..df49c11b1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -37,6 +37,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paych" + "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" @@ -78,6 +79,7 @@ const ( RunHelloKey RunBlockSyncKey + RunPeerMgrKey HandleIncomingBlocksKey HandleIncomingMessagesKey @@ -227,8 +229,11 @@ func Online() Option { Override(new(*hello.Service), hello.NewHelloService), Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), + Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), + Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(HeadMetricsKey, metrics.SendHeadNotifs("")), diff --git a/node/hello/hello.go b/node/hello/hello.go index 3d63dbc1d..53e32af52 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -3,7 +3,6 @@ package hello import ( "context" "fmt" - "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -14,7 +13,9 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborrpc" + "github.com/filecoin-project/lotus/peermgr" ) const ProtocolID = "/fil/hello/1.0.0" @@ -36,14 +37,16 @@ type Service struct { cs *store.ChainStore syncer *chain.Syncer + pmgr *peermgr.PeerMgr } -func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer) *Service { +func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr *peermgr.PeerMgr) *Service { return &Service{ newStream: h.NewStream, cs: cs, syncer: syncer, + pmgr: pmgr, } } @@ -74,6 +77,7 @@ func (hs *Service) HandleStream(s inet.Stream) { log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer()) hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts) + hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { @@ -88,6 +92,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { if err != nil { return err } + gen, err := hs.cs.GetGenesis() if err != nil { return err diff --git a/node/modules/services.go b/node/modules/services.go index ef6795daa..3c38f4461 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage/sector" ) @@ -33,6 +34,10 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello. h.Network().Notify(&bundle) } +func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) { + go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) +} + func RunBlockSync(h host.Host, svc *chain.BlockSyncService) { h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream) } diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go new file mode 100644 index 000000000..8ddc3a40e --- /dev/null +++ b/peermgr/peermgr.go @@ -0,0 +1,135 @@ +package peermgr + +import ( + "context" + "sync" + "time" + + host "github.com/libp2p/go-libp2p-core/host" + net "github.com/libp2p/go-libp2p-core/network" + peer "github.com/libp2p/go-libp2p-core/peer" + dht "github.com/libp2p/go-libp2p-kad-dht" + + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("peermgr") + +const ( + MaxFilPeers = 32 + MinFilPeers = 8 +) + +type PeerMgr struct { + bootstrappers []peer.AddrInfo + + // peerLeads is a set of peers we hear about through the network + // and who may be good peers to connect to for expanding our peer set + peerLeads map[peer.ID]time.Time + + peersLk sync.Mutex + peers map[peer.ID]struct{} + + maxFilPeers int + minFilPeers int + + expanding bool + + h host.Host + dht *dht.IpfsDHT + + notifee *net.NotifyBundle +} + +func NewPeerMgr(h host.Host, dht *dht.IpfsDHT) *PeerMgr { + pm := &PeerMgr{ + peers: make(map[peer.ID]struct{}), + maxFilPeers: MaxFilPeers, + minFilPeers: MinFilPeers, + h: h, + } + + pm.notifee = &net.NotifyBundle{ + DisconnectedF: func(_ net.Network, c net.Conn) { + pm.Disconnect(c.RemotePeer()) + }, + } + + h.Network().Notify(pm.notifee) + + return pm +} + +func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + pmgr.peers[p] = struct{}{} +} + +func (pmgr *PeerMgr) Disconnect(p peer.ID) { + if pmgr.h.Network().Connectedness(p) == net.NotConnected { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + delete(pmgr.peers, p) + } +} + +func (pmgr *PeerMgr) Run(ctx context.Context) { + tick := time.NewTicker(time.Second * 5) + for { + select { + case <-tick.C: + pcount := pmgr.getPeerCount() + if pcount < pmgr.minFilPeers { + pmgr.expandPeers() + } else if pcount > pmgr.maxFilPeers { + log.Infof("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers) + } + } + } +} + +func (pmgr *PeerMgr) getPeerCount() int { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + return len(pmgr.peers) +} + +func (pmgr *PeerMgr) expandPeers() { + if pmgr.expanding { + return + } + pmgr.expanding = true + go func() { + defer func() { + pmgr.expanding = false + }() + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) + defer cancel() + + pmgr.doExpand(ctx) + }() +} + +func (pmgr *PeerMgr) doExpand(ctx context.Context) { + pcount := pmgr.getPeerCount() + if pcount == 0 { + if len(pmgr.bootstrappers) == 0 { + log.Warn("no peers connected, and no bootstrappers configured") + return + } + + log.Info("connecting to bootstrap peers") + for _, bsp := range pmgr.bootstrappers { + if err := pmgr.h.Connect(ctx, bsp); err != nil { + log.Warnf("failed to connect to bootstrap peer: %s", err) + } + } + return + } + + // if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along. + if err := pmgr.dht.Bootstrap(ctx); err != nil { + log.Warnf("dht bootstrapping failed: %s", err) + } +}