package peermgr import ( "context" "sync" "time" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/event" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" dht "github.com/libp2p/go-libp2p-kad-dht" "go.opencensus.io/stats" "go.uber.org/fx" "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/modules/dtypes" ) var log = logging.Logger("peermgr") const ( MaxFilPeers = 32 MinFilPeers = 12 ) type MaybePeerMgr struct { fx.In Mgr *PeerMgr `optional:"true"` } type PeerMgr struct { bootstrappers []peer.AddrInfo // peerLeads is a set of peers we hear about through the network // and who may be good peers to connect to for expanding our peer set //peerLeads map[peer.ID]time.Time // TODO: unused peersLk sync.Mutex peers map[peer.ID]time.Duration maxFilPeers int minFilPeers int expanding chan struct{} h host.Host dht *dht.IpfsDHT notifee *net.NotifyBundle emitter event.Emitter done chan struct{} } type FilPeerEvt struct { Type FilPeerEvtType ID peer.ID } type FilPeerEvtType int const ( AddFilPeerEvt FilPeerEvtType = iota RemoveFilPeerEvt ) func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, dht: dht, bootstrappers: bootstrap, peers: make(map[peer.ID]time.Duration), expanding: make(chan struct{}, 1), maxFilPeers: MaxFilPeers, minFilPeers: MinFilPeers, done: make(chan struct{}), } emitter, err := h.EventBus().Emitter(new(FilPeerEvt)) if err != nil { return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err) } pm.emitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return multierr.Combine( pm.emitter.Close(), pm.Stop(ctx), ) }, }) pm.notifee = &net.NotifyBundle{ DisconnectedF: func(_ net.Network, c net.Conn) { pm.Disconnect(c.RemotePeer()) }, } h.Network().Notify(pm.notifee) return pm, nil } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { _ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) } func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) { pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() dur, ok := pmgr.peers[p] return dur, ok } func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() if _, ok := pmgr.peers[p]; ok { pmgr.peers[p] = latency } } func (pmgr *PeerMgr) Disconnect(p peer.ID) { disconnected := false if pmgr.h.Network().Connectedness(p) == net.NotConnected { pmgr.peersLk.Lock() _, disconnected = pmgr.peers[p] if disconnected { delete(pmgr.peers, p) } pmgr.peersLk.Unlock() } if disconnected { _ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck } } func (pmgr *PeerMgr) Stop(ctx context.Context) error { log.Warn("closing peermgr done") close(pmgr.done) return nil } func (pmgr *PeerMgr) Run(ctx context.Context) { tick := build.Clock.Ticker(time.Second * 5) for { select { case <-tick.C: pcount := pmgr.getPeerCount() if pcount < pmgr.minFilPeers { pmgr.expandPeers() } else if pcount > pmgr.maxFilPeers { log.Debugf("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers) } stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount()))) case <-pmgr.done: log.Warn("exiting peermgr run") return } } } func (pmgr *PeerMgr) getPeerCount() int { pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() return len(pmgr.peers) } func (pmgr *PeerMgr) expandPeers() { select { case pmgr.expanding <- struct{}{}: default: return } go func() { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) defer cancel() pmgr.doExpand(ctx) <-pmgr.expanding }() } func (pmgr *PeerMgr) doExpand(ctx context.Context) { pcount := pmgr.getPeerCount() if pcount == 0 { if len(pmgr.bootstrappers) == 0 { log.Warn("no peers connected, and no bootstrappers configured") return } log.Info("connecting to bootstrap peers") wg := sync.WaitGroup{} for _, bsp := range pmgr.bootstrappers { wg.Add(1) go func(bsp peer.AddrInfo) { defer wg.Done() if err := pmgr.h.Connect(ctx, bsp); err != nil { log.Warnf("failed to connect to bootstrap peer: %s", err) } }(bsp) } wg.Wait() return } // if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along. if err := pmgr.dht.Bootstrap(ctx); err != nil { log.Warnf("dht bootstrapping failed: %s", err) } }