Merge pull request #2478 from filecoin-project/move-mutex
Graceful shutdown of PeerMgr
This commit is contained in:
commit
f38cbb6134
@ -51,9 +51,11 @@ type PeerMgr struct {
|
|||||||
dht *dht.IpfsDHT
|
dht *dht.IpfsDHT
|
||||||
|
|
||||||
notifee *net.NotifyBundle
|
notifee *net.NotifyBundle
|
||||||
|
|
||||||
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr {
|
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr {
|
||||||
pm := &PeerMgr{
|
pm := &PeerMgr{
|
||||||
h: h,
|
h: h,
|
||||||
dht: dht,
|
dht: dht,
|
||||||
@ -64,8 +66,16 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers)
|
|||||||
|
|
||||||
maxFilPeers: MaxFilPeers,
|
maxFilPeers: MaxFilPeers,
|
||||||
minFilPeers: MinFilPeers,
|
minFilPeers: MinFilPeers,
|
||||||
|
|
||||||
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStop: func(ctx context.Context) error {
|
||||||
|
return pm.Stop(ctx)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
pm.notifee = &net.NotifyBundle{
|
pm.notifee = &net.NotifyBundle{
|
||||||
DisconnectedF: func(_ net.Network, c net.Conn) {
|
DisconnectedF: func(_ net.Network, c net.Conn) {
|
||||||
pm.Disconnect(c.RemotePeer())
|
pm.Disconnect(c.RemotePeer())
|
||||||
@ -107,6 +117,12 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pmgr *PeerMgr) Stop(ctx context.Context) error {
|
||||||
|
log.Warn("closing peermgr done")
|
||||||
|
close(pmgr.done)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (pmgr *PeerMgr) Run(ctx context.Context) {
|
func (pmgr *PeerMgr) Run(ctx context.Context) {
|
||||||
tick := build.Clock.Ticker(time.Second * 5)
|
tick := build.Clock.Ticker(time.Second * 5)
|
||||||
for {
|
for {
|
||||||
@ -119,6 +135,9 @@ func (pmgr *PeerMgr) Run(ctx context.Context) {
|
|||||||
log.Debug("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
|
log.Debug("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
|
||||||
}
|
}
|
||||||
stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount())))
|
stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount())))
|
||||||
|
case <-pmgr.done:
|
||||||
|
log.Warn("exiting peermgr run")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,12 +91,13 @@ func (m *Miner) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
func (m *Miner) Stop(ctx context.Context) error {
|
func (m *Miner) Stop(ctx context.Context) error {
|
||||||
m.lk.Lock()
|
m.lk.Lock()
|
||||||
defer m.lk.Unlock()
|
|
||||||
|
|
||||||
m.stopping = make(chan struct{})
|
m.stopping = make(chan struct{})
|
||||||
stopping := m.stopping
|
stopping := m.stopping
|
||||||
close(m.stop)
|
close(m.stop)
|
||||||
|
|
||||||
|
m.lk.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopping:
|
case <-stopping:
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user