eth: fetch announced hashes from origin, periodically
This commit is contained in:
parent
8c012e103f
commit
fdccce781e
@ -21,7 +21,8 @@ import (
|
||||
const (
|
||||
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
|
||||
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
|
||||
blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
|
||||
notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
|
||||
notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
|
||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||
blockProcAmount = 256
|
||||
)
|
||||
@ -57,6 +58,7 @@ type ProtocolManager struct {
|
||||
minedBlockSub event.Subscription
|
||||
|
||||
newPeerCh chan *peer
|
||||
newHashCh chan []*blockAnnounce
|
||||
quitSync chan struct{}
|
||||
// wait group is used for graceful shutdowns during downloading
|
||||
// and processing
|
||||
@ -74,6 +76,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
||||
downloader: downloader,
|
||||
peers: newPeerSet(),
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
newHashCh: make(chan []*blockAnnounce, 1),
|
||||
quitSync: make(chan struct{}),
|
||||
}
|
||||
|
||||
@ -121,7 +124,8 @@ func (pm *ProtocolManager) Start() {
|
||||
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
||||
go pm.minedBroadcastLoop()
|
||||
|
||||
go pm.update()
|
||||
go pm.syncer()
|
||||
go pm.fetcher()
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) Stop() {
|
||||
@ -302,32 +306,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
||||
p.blockHashes.Add(hash)
|
||||
p.recentHash = hash
|
||||
}
|
||||
// Wait a bit for potentially receiving the blocks, fetch if not
|
||||
go func() {
|
||||
time.Sleep(blockArrivalTimeout)
|
||||
|
||||
// Drop all the hashes that are already known
|
||||
unknown := make([]common.Hash, 0, len(hashes))
|
||||
for _, hash := range hashes {
|
||||
if !self.chainman.HasBlock(hash) {
|
||||
unknown = append(unknown, hash)
|
||||
}
|
||||
// Schedule all the unknown hashes for retrieval
|
||||
unknown := make([]common.Hash, 0, len(hashes))
|
||||
for _, hash := range hashes {
|
||||
if !self.chainman.HasBlock(hash) {
|
||||
unknown = append(unknown, hash)
|
||||
}
|
||||
if len(unknown) == 0 {
|
||||
return
|
||||
}
|
||||
announces := make([]*blockAnnounce, len(unknown))
|
||||
for i, hash := range unknown {
|
||||
announces[i] = &blockAnnounce{
|
||||
hash: hash,
|
||||
peer: p,
|
||||
time: time.Now(),
|
||||
}
|
||||
// Retrieve all the unknown hashes
|
||||
if err := p.requestBlocks(unknown); err != nil {
|
||||
glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err)
|
||||
}
|
||||
if glog.V(logger.Detail) {
|
||||
hashes := make([]string, len(unknown))
|
||||
for i, hash := range unknown {
|
||||
hashes[i] = fmt.Sprintf("%x", hash[:4])
|
||||
}
|
||||
glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes)
|
||||
}
|
||||
}()
|
||||
}
|
||||
if len(announces) > 0 {
|
||||
self.newHashCh <- announces
|
||||
}
|
||||
|
||||
case NewBlockMsg:
|
||||
var request newBlockMsgData
|
||||
@ -407,13 +403,13 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
|
||||
split := int(math.Sqrt(float64(len(peers))))
|
||||
|
||||
transfer := peers[:split]
|
||||
nofity := peers[split:]
|
||||
notify := peers[split:]
|
||||
|
||||
// Send out the data transfers and the notifications
|
||||
for _, peer := range nofity {
|
||||
for _, peer := range notify {
|
||||
peer.sendNewBlockHashes([]common.Hash{hash})
|
||||
}
|
||||
glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.")
|
||||
glog.V(logger.Detail).Infoln("broadcast hash to", len(notify), "peers.")
|
||||
|
||||
for _, peer := range transfer {
|
||||
peer.sendNewBlock(block)
|
||||
|
58
eth/sync.go
58
eth/sync.go
@ -5,15 +5,67 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
)
|
||||
|
||||
// update periodically tries to synchronise with the network, both downloading
|
||||
// hashes and blocks as well as retrieving cached ones.
|
||||
func (pm *ProtocolManager) update() {
|
||||
// blockAnnounce is the hash notification of the availability of a new block in
|
||||
// the network.
|
||||
type blockAnnounce struct {
|
||||
hash common.Hash
|
||||
peer *peer
|
||||
time time.Time
|
||||
}
|
||||
|
||||
// fetcher is responsible for collecting hash notifications, and periodically
|
||||
// checking all unknown ones and individually fetching them.
|
||||
func (pm *ProtocolManager) fetcher() {
|
||||
announces := make(map[common.Hash]*blockAnnounce)
|
||||
request := make(map[*peer][]common.Hash)
|
||||
cycle := time.Tick(notifyCheckCycle)
|
||||
|
||||
// Iterate the block fetching until a quit is requested
|
||||
for {
|
||||
select {
|
||||
case notifications := <-pm.newHashCh:
|
||||
// A batch of hashes the notified, schedule them for retrieval
|
||||
glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id)
|
||||
for _, announce := range notifications {
|
||||
announces[announce.hash] = announce
|
||||
}
|
||||
|
||||
case <-cycle:
|
||||
// Check if any notified blocks failed to arrive
|
||||
for hash, announce := range announces {
|
||||
if time.Since(announce.time) > notifyArriveTimeout {
|
||||
if !pm.chainman.HasBlock(hash) {
|
||||
request[announce.peer] = append(request[announce.peer], hash)
|
||||
}
|
||||
delete(announces, hash)
|
||||
}
|
||||
}
|
||||
if len(request) == 0 {
|
||||
break
|
||||
}
|
||||
// Send out all block requests
|
||||
for peer, hashes := range request {
|
||||
glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id)
|
||||
peer.requestBlocks(hashes)
|
||||
}
|
||||
request = make(map[*peer][]common.Hash)
|
||||
|
||||
case <-pm.quitSync:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncer is responsible for periodically synchronising with the network, both
|
||||
// downloading hashes and blocks as well as retrieving cached ones.
|
||||
func (pm *ProtocolManager) syncer() {
|
||||
forceSync := time.Tick(forceSyncCycle)
|
||||
blockProc := time.Tick(blockProcCycle)
|
||||
blockProcPend := int32(0)
|
||||
|
Loading…
Reference in New Issue
Block a user