diff --git a/chain/exchange/peer_tracker.go b/chain/exchange/peer_tracker.go index 902baadce..cc8bd4be9 100644 --- a/chain/exchange/peer_tracker.go +++ b/chain/exchange/peer_tracker.go @@ -11,6 +11,7 @@ import ( host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" + "go.uber.org/multierr" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/peermgr" @@ -38,20 +39,34 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer pmgr: pmgr, } - sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) if err != nil { panic(err) } go func() { - for newPeer := range sub.Out() { + for newPeer := range addSub.Out() { bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) } }() + rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer)) + if err != nil { + panic(err) + } + + go func() { + for rmPeer := range rmSub.Out() { + bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id) + } + }() + lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return sub.Close() + return multierr.Combine( + addSub.Close(), + rmSub.Close(), + ) }, }) diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 2fe54caea..1847c11f8 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -54,7 +54,8 @@ type PeerMgr struct { dht *dht.IpfsDHT notifee *net.NotifyBundle - filPeerEmitter event.Emitter + addPeerEmitter event.Emitter + rmPeerEmitter event.Emitter done chan struct{} } @@ -63,6 +64,10 @@ type NewFilPeer struct { Id peer.ID } +type RemoveFilPeer struct { + Id peer.ID +} + func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, @@ -81,12 +86,19 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes if err != nil { return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) } - pm.filPeerEmitter = emitter + pm.addPeerEmitter = emitter + + emitter, err = h.EventBus().Emitter(new(RemoveFilPeer)) + if err != nil { + return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err) + } + pm.rmPeerEmitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return multierr.Combine( - pm.filPeerEmitter.Close(), + pm.addPeerEmitter.Close(), + pm.rmPeerEmitter.Close(), pm.Stop(ctx), ) }, @@ -104,7 +116,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { - _ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck + _ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) @@ -127,10 +139,19 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { } func (pmgr *PeerMgr) Disconnect(p peer.ID) { + disconnected := false + if pmgr.h.Network().Connectedness(p) == net.NotConnected { pmgr.peersLk.Lock() - defer pmgr.peersLk.Unlock() - delete(pmgr.peers, p) + _, disconnected = pmgr.peers[p] + if disconnected { + delete(pmgr.peers, p) + } + pmgr.peersLk.Unlock() + } + + if disconnected { + _ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck } }