diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 3177a877e..45983c97c 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -833,15 +833,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { // internal state. func (f *BlockFetcher) forgetHash(hash common.Hash) { // Remove all pending announces and decrement DOS counters - for _, announce := range f.announced[hash] { - f.announces[announce.origin]-- - if f.announces[announce.origin] <= 0 { - delete(f.announces, announce.origin) + if announceMap, ok := f.announced[hash]; ok { + for _, announce := range announceMap { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + } + delete(f.announced, hash) + if f.announceChangeHook != nil { + f.announceChangeHook(hash, false) } - } - delete(f.announced, hash) - if f.announceChangeHook != nil { - f.announceChangeHook(hash, false) } // Remove any pending fetches and decrement the DOS counters if announce := f.fetching[hash]; announce != nil { diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index 54b5b13fb..b6d1125b5 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -698,6 +698,7 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) { badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) imported := make(chan interface{}) + announced := make(chan interface{}) tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { if light { if header == nil { @@ -712,9 +713,23 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) { } } // Announce a block with a bad number, check for immediate drop + tester.fetcher.announceChangeHook = func(hash common.Hash, b bool) { + announced <- nil + } tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) + verifyAnnounce := func() { + for i := 0; i < 2; i++ { + select { + case <-announced: + continue + case <-time.After(1 * time.Second): + t.Fatal("announce timeout") + return + } + } + } + verifyAnnounce() verifyImportEvent(t, imported, false) - tester.lock.RLock() dropped := tester.drops["bad"] tester.lock.RUnlock() @@ -722,11 +737,11 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) { if !dropped { t.Fatalf("peer with invalid numbered announcement not dropped") } - goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack) goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0) // Make sure a good announcement passes without a drop tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher) + verifyAnnounce() verifyImportEvent(t, imported, true) tester.lock.RLock() diff --git a/trie/sync_bloom.go b/trie/sync_bloom.go index 1afcce21d..49986fcf0 100644 --- a/trie/sync_bloom.go +++ b/trie/sync_bloom.go @@ -45,11 +45,12 @@ var ( // provided disk database on creation in a background thread and will only start // returning live results once that's finished. type SyncBloom struct { - bloom *bloomfilter.Filter - inited uint32 - closer sync.Once - closed uint32 - pend sync.WaitGroup + bloom *bloomfilter.Filter + inited uint32 + closer sync.Once + closed uint32 + pend sync.WaitGroup + closeCh chan struct{} } // NewSyncBloom creates a new bloom filter of the given size (in megabytes) and @@ -64,7 +65,8 @@ func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom { // Assemble the fast sync bloom and init it from previous sessions b := &SyncBloom{ - bloom: bloom, + bloom: bloom, + closeCh: make(chan struct{}), } b.pend.Add(2) go func() { @@ -125,16 +127,15 @@ func (b *SyncBloom) init(database ethdb.Iteratee) { // meter periodically recalculates the false positive error rate of the bloom // filter and reports it in a metric. func (b *SyncBloom) meter() { + // check every second + tick := time.NewTicker(1 * time.Second) for { - // Report the current error ration. No floats, lame, scale it up. - bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) - - // Wait one second, but check termination more frequently - for i := 0; i < 10; i++ { - if atomic.LoadUint32(&b.closed) == 1 { - return - } - time.Sleep(100 * time.Millisecond) + select { + case <-tick.C: + // Report the current error ration. No floats, lame, scale it up. + bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) + case <-b.closeCh: + return } } } @@ -145,6 +146,7 @@ func (b *SyncBloom) Close() error { b.closer.Do(func() { // Ensure the initializer is stopped atomic.StoreUint32(&b.closed, 1) + close(b.closeCh) b.pend.Wait() // Wipe the bloom, but mark it "uninited" just in case someone attempts an access