emit event for peer disconnectionsa and act upon them in the blocksync peer tracker

This commit is contained in:
vyzo 2020-11-06 18:26:14 +02:00
parent b10b6fdd11
commit 7aec500384
2 changed files with 45 additions and 9 deletions

View File

@ -11,6 +11,7 @@ import (
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"go.uber.org/multierr"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/peermgr"
@ -38,20 +39,34 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer
pmgr: pmgr,
}
sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
if err != nil {
panic(err)
}
go func() {
for newPeer := range sub.Out() {
for newPeer := range addSub.Out() {
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
}
}()
rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer))
if err != nil {
panic(err)
}
go func() {
for rmPeer := range rmSub.Out() {
bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id)
}
}()
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sub.Close()
return multierr.Combine(
addSub.Close(),
rmSub.Close(),
)
},
})

View File

@ -54,7 +54,8 @@ type PeerMgr struct {
dht *dht.IpfsDHT
notifee *net.NotifyBundle
filPeerEmitter event.Emitter
addPeerEmitter event.Emitter
rmPeerEmitter event.Emitter
done chan struct{}
}
@ -63,6 +64,10 @@ type NewFilPeer struct {
Id peer.ID
}
type RemoveFilPeer struct {
Id peer.ID
}
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
pm := &PeerMgr{
h: h,
@ -81,12 +86,19 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
}
pm.filPeerEmitter = emitter
pm.addPeerEmitter = emitter
emitter, err = h.EventBus().Emitter(new(RemoveFilPeer))
if err != nil {
return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err)
}
pm.rmPeerEmitter = emitter
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
pm.filPeerEmitter.Close(),
pm.addPeerEmitter.Close(),
pm.rmPeerEmitter.Close(),
pm.Stop(ctx),
)
},
@ -104,7 +116,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
}
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
_ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0)
@ -127,10 +139,19 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
}
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
disconnected := false
if pmgr.h.Network().Connectedness(p) == net.NotConnected {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
delete(pmgr.peers, p)
_, disconnected = pmgr.peers[p]
if disconnected {
delete(pmgr.peers, p)
}
pmgr.peersLk.Unlock()
}
if disconnected {
_ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck
}
}