combine add and remove evts and use a single emitter
to reduce likelihood of races
This commit is contained in:
parent
7aec500384
commit
ef444676c2
@ -11,7 +11,6 @@ import (
|
|||||||
host "github.com/libp2p/go-libp2p-core/host"
|
host "github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"go.uber.org/multierr"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
@ -39,34 +38,26 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer
|
|||||||
pmgr: pmgr,
|
pmgr: pmgr,
|
||||||
}
|
}
|
||||||
|
|
||||||
addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
|
evtSub, err := h.EventBus().Subscribe(new(peermgr.FilPeerEvt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for newPeer := range addSub.Out() {
|
for evt := range evtSub.Out() {
|
||||||
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
|
pEvt := evt.(peermgr.FilPeerEvt)
|
||||||
|
switch pEvt.Type {
|
||||||
|
case peermgr.AddFilPeerEvt:
|
||||||
|
bsPt.addPeer(pEvt.ID)
|
||||||
|
case peermgr.RemoveFilPeerEvt:
|
||||||
|
bsPt.removePeer(pEvt.ID)
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for rmPeer := range rmSub.Out() {
|
|
||||||
bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStop: func(ctx context.Context) error {
|
OnStop: func(ctx context.Context) error {
|
||||||
return multierr.Combine(
|
return evtSub.Close()
|
||||||
addSub.Close(),
|
|
||||||
rmSub.Close(),
|
|
||||||
)
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -54,19 +54,22 @@ type PeerMgr struct {
|
|||||||
dht *dht.IpfsDHT
|
dht *dht.IpfsDHT
|
||||||
|
|
||||||
notifee *net.NotifyBundle
|
notifee *net.NotifyBundle
|
||||||
addPeerEmitter event.Emitter
|
emitter event.Emitter
|
||||||
rmPeerEmitter event.Emitter
|
|
||||||
|
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewFilPeer struct {
|
type FilPeerEvt struct {
|
||||||
Id peer.ID
|
Type FilPeerEvtType
|
||||||
|
ID peer.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoveFilPeer struct {
|
type FilPeerEvtType int
|
||||||
Id peer.ID
|
|
||||||
}
|
const (
|
||||||
|
AddFilPeerEvt FilPeerEvtType = iota
|
||||||
|
RemoveFilPeerEvt
|
||||||
|
)
|
||||||
|
|
||||||
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
|
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
|
||||||
pm := &PeerMgr{
|
pm := &PeerMgr{
|
||||||
@ -82,23 +85,16 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
|
|||||||
|
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
|
emitter, err := h.EventBus().Emitter(new(FilPeerEvt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
|
return nil, xerrors.Errorf("creating FilPeerEvt emitter: %w", err)
|
||||||
}
|
}
|
||||||
pm.addPeerEmitter = emitter
|
pm.emitter = emitter
|
||||||
|
|
||||||
emitter, err = h.EventBus().Emitter(new(RemoveFilPeer))
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err)
|
|
||||||
}
|
|
||||||
pm.rmPeerEmitter = emitter
|
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStop: func(ctx context.Context) error {
|
OnStop: func(ctx context.Context) error {
|
||||||
return multierr.Combine(
|
return multierr.Combine(
|
||||||
pm.addPeerEmitter.Close(),
|
pm.emitter.Close(),
|
||||||
pm.rmPeerEmitter.Close(),
|
|
||||||
pm.Stop(ctx),
|
pm.Stop(ctx),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
@ -116,7 +112,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
||||||
_ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
|
_ = pmgr.emitter.Emit(FilPeerEvt{Type: AddFilPeerEvt, ID: p}) //nolint:errcheck
|
||||||
pmgr.peersLk.Lock()
|
pmgr.peersLk.Lock()
|
||||||
defer pmgr.peersLk.Unlock()
|
defer pmgr.peersLk.Unlock()
|
||||||
pmgr.peers[p] = time.Duration(0)
|
pmgr.peers[p] = time.Duration(0)
|
||||||
@ -151,7 +147,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if disconnected {
|
if disconnected {
|
||||||
_ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck
|
_ = pmgr.emitter.Emit(FilPeerEvt{Type: RemoveFilPeerEvt, ID: p}) //nolint:errcheck
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user