eth/fetcher: lower max cache size, add timeout slack

This commit is contained in:
Péter Szilágyi 2015-06-19 16:46:16 +03:00
parent d5871fc200
commit 8c4c7ea192

View File

@ -3,6 +3,7 @@ package fetcher
import ( import (
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"time" "time"
@ -15,9 +16,10 @@ import (
const ( const (
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
maxUncleDist = 7 // Maximum allowed backward distance from the chain head maxUncleDist = 7 // Maximum allowed backward distance from the chain head
maxQueueDist = 256 // Maximum allowed distance from the chain head to queue maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
) )
var ( var (
@ -239,7 +241,7 @@ func (f *Fetcher) loop() {
request := make(map[string][]common.Hash) request := make(map[string][]common.Hash)
for hash, announces := range f.announced { for hash, announces := range f.announced {
if time.Since(announces[0].time) > arriveTimeout { if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
announce := announces[rand.Intn(len(announces))] announce := announces[rand.Intn(len(announces))]
if f.getBlock(hash) == nil { if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash) request[announce.origin] = append(request[announce.origin], hash)
@ -249,7 +251,16 @@ func (f *Fetcher) loop() {
} }
} }
// Send out all block requests // Send out all block requests
for _, hashes := range request { for peer, hashes := range request {
if glog.V(logger.Detail) && len(hashes) > 0 {
list := "["
for _, hash := range hashes {
list += fmt.Sprintf("%x, ", hash[:4])
}
list = list[:len(list)-2] + "]"
glog.V(logger.Detail).Infof("Peer %s: fetching %s", peer, list)
}
go f.fetching[hashes[0]].fetch(hashes) go f.fetching[hashes[0]].fetch(hashes)
} }
// Schedule the next fetch if blocks are still pending // Schedule the next fetch if blocks are still pending
@ -319,7 +330,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
// Discard any past or too distant blocks // Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Detail).Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
return return
} }
// Schedule the block for future importing // Schedule the block for future importing