eth, eth/fetcher: cache future propagated blocks too
This commit is contained in:
parent
057bc237ad
commit
11c8f83a58
@ -56,6 +56,7 @@ type inject struct {
|
|||||||
type Fetcher struct {
|
type Fetcher struct {
|
||||||
// Various event channels
|
// Various event channels
|
||||||
notify chan *announce
|
notify chan *announce
|
||||||
|
insert chan *inject
|
||||||
filter chan chan []*types.Block
|
filter chan chan []*types.Block
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
|
||||||
@ -69,6 +70,7 @@ type Fetcher struct {
|
|||||||
func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher {
|
func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher {
|
||||||
return &Fetcher{
|
return &Fetcher{
|
||||||
notify: make(chan *announce),
|
notify: make(chan *announce),
|
||||||
|
insert: make(chan *inject),
|
||||||
filter: make(chan chan []*types.Block),
|
filter: make(chan chan []*types.Block),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
hasBlock: hasBlock,
|
hasBlock: hasBlock,
|
||||||
@ -106,6 +108,20 @@ func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enqueue tries to fill gaps the the fetcher's future import queue.
|
||||||
|
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
|
||||||
|
op := &inject{
|
||||||
|
origin: peer,
|
||||||
|
block: block,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case f.insert <- op:
|
||||||
|
return nil
|
||||||
|
case <-f.quit:
|
||||||
|
return errTerminated
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Filter extracts all the blocks that were explicitly requested by the fetcher,
|
// Filter extracts all the blocks that were explicitly requested by the fetcher,
|
||||||
// returning those that should be handled differently.
|
// returning those that should be handled differently.
|
||||||
func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
|
func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
|
||||||
@ -192,6 +208,11 @@ func (f *Fetcher) loop() {
|
|||||||
}
|
}
|
||||||
announced[notification.hash] = append(announced[notification.hash], notification)
|
announced[notification.hash] = append(announced[notification.hash], notification)
|
||||||
|
|
||||||
|
case op := <-f.insert:
|
||||||
|
// A direct block insertion was requested, try and fill any pending gaps
|
||||||
|
queued.Push(op, -float32(op.block.NumberU64()))
|
||||||
|
glog.V(logger.Detail).Infof("Peer %s: filled block %x, total %v", op.origin, op.block.Hash().Bytes()[:4], queued.Size())
|
||||||
|
|
||||||
case hash := <-done:
|
case hash := <-done:
|
||||||
// A pending import finished, remove all traces of the notification
|
// A pending import finished, remove all traces of the notification
|
||||||
delete(announced, hash)
|
delete(announced, hash)
|
||||||
|
@ -271,3 +271,31 @@ func TestRandomArrivalImport(t *testing.T) {
|
|||||||
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
|
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that direct block enqueues (due to block propagation vs. hash announce)
|
||||||
|
// are correctly schedule, filling and import queue gaps.
|
||||||
|
func TestQueueGapFill(t *testing.T) {
|
||||||
|
// Create a chain of blocks to import, and choose one to not announce at all
|
||||||
|
targetBlocks := 24
|
||||||
|
hashes := createHashes(targetBlocks, knownHash)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
skip := targetBlocks / 2
|
||||||
|
|
||||||
|
tester := newTester()
|
||||||
|
fetcher := tester.makeFetcher(blocks)
|
||||||
|
|
||||||
|
// Iteratively announce blocks, skipping one entry
|
||||||
|
for i := len(hashes) - 1; i >= 0; i-- {
|
||||||
|
if i != skip {
|
||||||
|
tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher)
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Fill the missing block directly as if propagated
|
||||||
|
tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
|
||||||
|
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -340,6 +340,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
}
|
}
|
||||||
request.Block.ReceivedAt = msg.ReceivedAt
|
request.Block.ReceivedAt = msg.ReceivedAt
|
||||||
|
|
||||||
|
// Try to import the propagated block, also making it fill any fetcher gaps
|
||||||
|
self.fetcher.Enqueue(p.id, request.Block)
|
||||||
if err := self.importBlock(p, request.Block, request.TD); err != nil {
|
if err := self.importBlock(p, request.Block, request.TD); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user