les: close all connected les-server when shutdown (#21426)

* les: close all connected les-server when shutdown

* les: linter nitpick

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
gary rong 2020-08-07 21:33:00 +08:00 committed by GitHub
parent 8f24097836
commit e401f5ff10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 0 deletions

View File

@ -1288,3 +1288,42 @@ func (ps *serverPeerSet) close() {
} }
ps.closed = true ps.closed = true
} }
// serverSet is a special set which contains all connected les servers.
// Les servers will also be discovered by discovery protocol because they
// also run the LES protocol. We can't drop them although they are useless
// for us(server) but for other protocols(e.g. ETH) upon the devp2p they
// may be useful.
type serverSet struct {
lock sync.Mutex
set map[string]*clientPeer
closed bool
}
func newServerSet() *serverSet {
return &serverSet{set: make(map[string]*clientPeer)}
}
func (s *serverSet) register(peer *clientPeer) error {
s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return errClosed
}
if _, exist := s.set[peer.id]; exist {
return errAlreadyRegistered
}
s.set[peer.id] = peer
return nil
}
func (s *serverSet) close() {
s.lock.Lock()
defer s.lock.Unlock()
for _, p := range s.set {
p.Disconnect(p2p.DiscQuitting)
}
s.closed = true
}

View File

@ -39,6 +39,7 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode. archiveMode bool // Flag whether the ethereum node runs in archive mode.
peers *clientPeerSet peers *clientPeerSet
serverset *serverSet
handler *serverHandler handler *serverHandler
lesTopics []discv5.Topic lesTopics []discv5.Topic
privateKey *ecdsa.PrivateKey privateKey *ecdsa.PrivateKey
@ -83,6 +84,7 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
}, },
archiveMode: e.ArchiveMode(), archiveMode: e.ArchiveMode(),
peers: newClientPeerSet(), peers: newClientPeerSet(),
serverset: newServerSet(),
lesTopics: lesTopics, lesTopics: lesTopics,
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}), fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100), servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
@ -196,6 +198,9 @@ func (s *LesServer) Start() error {
func (s *LesServer) Stop() error { func (s *LesServer) Stop() error {
close(s.closeCh) close(s.closeCh)
// Disconnect existing connections with other LES servers.
s.serverset.close()
// Disconnect existing sessions. // Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set. // This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet // sessions which are already established but not added to pm.peers yet

View File

@ -123,6 +123,9 @@ func (h *serverHandler) handle(p *clientPeer) error {
return err return err
} }
if p.server { if p.server {
if err := h.server.serverset.register(p); err != nil {
return err
}
// connected to another server, no messages expected, just wait for disconnection // connected to another server, no messages expected, just wait for disconnection
_, err := p.rw.ReadMsg() _, err := p.rw.ReadMsg()
return err return err