swarm/network/stream: fix a goroutine leak in Registry (#19139)

* swarm/network/stream: fix a goroutine leak in Registry

* swarm/network, swamr/network/stream: Kademlia close addr count and depth change chans

* swarm/network/stream: rename close channel to quit

* swarm/network/stream: fix sync between NewRegistry goroutine and Close method
This commit is contained in:
Janoš Guljaš 2019-02-20 14:45:25 +01:00 committed by Viktor Trón
parent f49f95e2b0
commit ba2dfa5ce4
2 changed files with 60 additions and 9 deletions

View File

@ -333,6 +333,18 @@ func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
return k.nDepthC return k.nDepthC
} }
// CloseNeighbourhoodDepthC closes the channel returned by
// NeighbourhoodDepthC and stops sending neighbourhood change.
func (k *Kademlia) CloseNeighbourhoodDepthC() {
k.lock.Lock()
defer k.lock.Unlock()
if k.nDepthC != nil {
close(k.nDepthC)
k.nDepthC = nil
}
}
// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel // sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel
// if it is initialized. // if it is initialized.
func (k *Kademlia) sendNeighbourhoodDepthChange() { func (k *Kademlia) sendNeighbourhoodDepthChange() {
@ -362,6 +374,18 @@ func (k *Kademlia) AddrCountC() <-chan int {
return k.addrCountC return k.addrCountC
} }
// CloseAddrCountC closes the channel returned by
// AddrCountC and stops sending address count change.
func (k *Kademlia) CloseAddrCountC() {
k.lock.Lock()
defer k.lock.Unlock()
if k.addrCountC != nil {
close(k.addrCountC)
k.addrCountC = nil
}
}
// Off removes a peer from among live peers // Off removes a peer from among live peers
func (k *Kademlia) Off(p *Peer) { func (k *Kademlia) Off(p *Peer) {
k.lock.Lock() k.lock.Lock()

View File

@ -95,6 +95,7 @@ type Registry struct {
spec *protocols.Spec //this protocol's spec spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting prices protocols.Prices //implements protocols.Prices, provides prices to accounting
quit chan struct{} // terminates registry goroutines
} }
// RegistryOptions holds optional values for NewRegistry constructor. // RegistryOptions holds optional values for NewRegistry constructor.
@ -117,6 +118,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
// check if retrieval has been disabled // check if retrieval has been disabled
retrieval := options.Retrieval != RetrievalDisabled retrieval := options.Retrieval != RetrievalDisabled
quit := make(chan struct{})
streamer := &Registry{ streamer := &Registry{
addr: localID, addr: localID,
skipCheck: options.SkipCheck, skipCheck: options.SkipCheck,
@ -128,6 +131,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
autoRetrieval: retrieval, autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers, maxPeerServers: options.MaxPeerServers,
balance: balance, balance: balance,
quit: quit,
} }
streamer.setupSpec() streamer.setupSpec()
@ -172,25 +176,41 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
go func() { go func() {
defer close(out) defer close(out)
for i := range in { for {
select { select {
case <-out: case i, ok := <-in:
default: if !ok {
return
}
select {
case <-out:
default:
}
out <- i
case <-quit:
return
} }
out <- i
} }
}() }()
return out return out
} }
kad := streamer.delivery.kad
// get notification channels from Kademlia before returning
// from this function to avoid race with Close method and
// the goroutine created below
depthC := latestIntC(kad.NeighbourhoodDepthC())
addressBookSizeC := latestIntC(kad.AddrCountC())
go func() { go func() {
// wait for kademlia table to be healthy // wait for kademlia table to be healthy
time.Sleep(options.SyncUpdateDelay) // but return if Registry is closed before
select {
kad := streamer.delivery.kad case <-time.After(options.SyncUpdateDelay):
depthC := latestIntC(kad.NeighbourhoodDepthC()) case <-quit:
addressBookSizeC := latestIntC(kad.AddrCountC()) return
}
// initial requests for syncing subscription to peers // initial requests for syncing subscription to peers
streamer.updateSyncing() streamer.updateSyncing()
@ -229,6 +249,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
<-timer.C <-timer.C
} }
timer.Reset(options.SyncUpdateDelay) timer.Reset(options.SyncUpdateDelay)
case <-quit:
break loop
} }
} }
timer.Stop() timer.Stop()
@ -398,6 +420,11 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error {
} }
func (r *Registry) Close() error { func (r *Registry) Close() error {
// Stop sending neighborhood depth change and address count
// change from Kademlia that were initiated in NewRegistry constructor.
r.delivery.kad.CloseNeighbourhoodDepthC()
r.delivery.kad.CloseAddrCountC()
close(r.quit)
return r.intervalsStore.Close() return r.intervalsStore.Close()
} }