Merge pull request #4754 from filecoin-project/fix/issue-92

emit events for peer disconnections and act upon them in the blocksync tracker
This commit is contained in:
Łukasz Magiera 2020-11-20 18:45:35 +01:00 committed by GitHub
commit 3113b15217
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 15 deletions

View File

@ -38,20 +38,26 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer
pmgr: pmgr,
}
sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
evtSub, err := h.EventBus().Subscribe(new(peermgr.FilPeerEvt))
if err != nil {
panic(err)
}
go func() {
for newPeer := range sub.Out() {
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
for evt := range evtSub.Out() {
pEvt := evt.(peermgr.FilPeerEvt)
switch pEvt.Type {
case peermgr.AddFilPeerEvt:
bsPt.addPeer(pEvt.ID)
case peermgr.RemoveFilPeerEvt:
bsPt.removePeer(pEvt.ID)
}
}
}()
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sub.Close()
return evtSub.Close()
},
})

View File

@ -54,15 +54,23 @@ type PeerMgr struct {
dht *dht.IpfsDHT
notifee *net.NotifyBundle
filPeerEmitter event.Emitter
emitter event.Emitter
done chan struct{}
}
type NewFilPeer struct {
Id peer.ID
type FilPeerEvt struct {
Type FilPeerEvtType
ID peer.ID
}
type FilPeerEvtType int
const (
AddFilPeerEvt FilPeerEvtType = iota
RemoveFilPeerEvt
)
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
pm := &PeerMgr{
h: h,
@ -77,16 +85,16 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
done: make(chan struct{}),
}
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
emitter, err := h.EventBus().Emitter(new(FilPeerEvt))
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err)
}
pm.filPeerEmitter = emitter
pm.emitter = emitter
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
pm.filPeerEmitter.Close(),
pm.emitter.Close(),
pm.Stop(ctx),
)
},
@ -104,7 +112,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
}
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
_ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0)
@ -127,11 +135,20 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
}
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
disconnected := false
if pmgr.h.Network().Connectedness(p) == net.NotConnected {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
_, disconnected = pmgr.peers[p]
if disconnected {
delete(pmgr.peers, p)
}
pmgr.peersLk.Unlock()
}
if disconnected {
_ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck
}
}
func (pmgr *PeerMgr) Stop(ctx context.Context) error {