eth: clean up pending announce download map, polish logs
This commit is contained in:
parent
9ed166c196
commit
8216bb901c
@ -362,7 +362,7 @@ func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int)
|
|||||||
_, chainHead, _ := pm.chainman.Status()
|
_, chainHead, _ := pm.chainman.Status()
|
||||||
jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
|
jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
|
||||||
BlockHash: hash.Hex(),
|
BlockHash: hash.Hex(),
|
||||||
BlockNumber: block.Number(), // this surely must be zero
|
BlockNumber: block.Number(),
|
||||||
ChainHeadHash: chainHead.Hex(),
|
ChainHeadHash: chainHead.Hex(),
|
||||||
BlockPrevHash: block.ParentHash().Hex(),
|
BlockPrevHash: block.ParentHash().Hex(),
|
||||||
RemoteId: p.ID().String(),
|
RemoteId: p.ID().String(),
|
||||||
|
38
eth/sync.go
38
eth/sync.go
@ -43,7 +43,7 @@ func (pm *ProtocolManager) fetcher() {
|
|||||||
select {
|
select {
|
||||||
case notifications := <-pm.newHashCh:
|
case notifications := <-pm.newHashCh:
|
||||||
// A batch of hashes the notified, schedule them for retrieval
|
// A batch of hashes the notified, schedule them for retrieval
|
||||||
glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id)
|
glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id)
|
||||||
for _, announce := range notifications {
|
for _, announce := range notifications {
|
||||||
announces[announce.hash] = announce
|
announces[announce.hash] = announce
|
||||||
}
|
}
|
||||||
@ -70,13 +70,13 @@ func (pm *ProtocolManager) fetcher() {
|
|||||||
}
|
}
|
||||||
// Send out all block requests
|
// Send out all block requests
|
||||||
for peer, hashes := range request {
|
for peer, hashes := range request {
|
||||||
glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id)
|
glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
|
||||||
peer.requestBlocks(hashes)
|
peer.requestBlocks(hashes)
|
||||||
}
|
}
|
||||||
request = make(map[*peer][]common.Hash)
|
request = make(map[*peer][]common.Hash)
|
||||||
|
|
||||||
case filter := <-pm.newBlockCh:
|
case filter := <-pm.newBlockCh:
|
||||||
// Blocks arrived, extract any explicit requests, return all else
|
// Blocks arrived, extract any explicit fetches, return all else
|
||||||
var blocks types.Blocks
|
var blocks types.Blocks
|
||||||
select {
|
select {
|
||||||
case blocks = <-filter:
|
case blocks = <-filter:
|
||||||
@ -84,26 +84,38 @@ func (pm *ProtocolManager) fetcher() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fetch, sync := []*types.Block{}, []*types.Block{}
|
explicit, download := []*types.Block{}, []*types.Block{}
|
||||||
for _, block := range blocks {
|
for _, block := range blocks {
|
||||||
hash := block.Hash()
|
hash := block.Hash()
|
||||||
|
|
||||||
|
// Filter explicitly requested blocks from hash announcements
|
||||||
if _, ok := pending[hash]; ok {
|
if _, ok := pending[hash]; ok {
|
||||||
fetch = append(fetch, block)
|
// Discard if already imported by other means
|
||||||
|
if !pm.chainman.HasBlock(hash) {
|
||||||
|
explicit = append(explicit, block)
|
||||||
|
} else {
|
||||||
|
delete(pending, hash)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sync = append(sync, block)
|
download = append(download, block)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case filter <- sync:
|
case filter <- download:
|
||||||
case <-pm.quitSync:
|
case <-pm.quitSync:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If any explicit fetches were replied to, import them
|
// If any explicit fetches were replied to, import them
|
||||||
if len(fetch) > 0 {
|
if count := len(explicit); count > 0 {
|
||||||
|
glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count)
|
||||||
go func() {
|
go func() {
|
||||||
for _, block := range fetch {
|
for _, block := range explicit {
|
||||||
if announce := pending[block.Hash()]; announce != nil {
|
hash := block.Hash()
|
||||||
|
|
||||||
|
// Make sure there's still something pending to import
|
||||||
|
if announce := pending[hash]; announce != nil {
|
||||||
|
delete(pending, hash)
|
||||||
if err := pm.importBlock(announce.peer, block, nil); err != nil {
|
if err := pm.importBlock(announce.peer, block, nil); err != nil {
|
||||||
glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
|
glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
|
||||||
return
|
return
|
||||||
@ -207,15 +219,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Get the hashes from the peer (synchronously)
|
// Get the hashes from the peer (synchronously)
|
||||||
glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
|
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
|
||||||
|
|
||||||
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
glog.V(logger.Debug).Infof("Synchronisation completed")
|
glog.V(logger.Detail).Infof("Synchronisation completed")
|
||||||
|
|
||||||
case downloader.ErrBusy:
|
case downloader.ErrBusy:
|
||||||
glog.V(logger.Debug).Infof("Synchronisation already in progress")
|
glog.V(logger.Detail).Infof("Synchronisation already in progress")
|
||||||
|
|
||||||
case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
|
case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
|
||||||
glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
|
glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user