a8a2b2a488
When a parent is missing in the block list an attempt should be made to fetch the missing parent and grandparents.
131 lines
2.8 KiB
Go
131 lines
2.8 KiB
Go
package downloader
|
|
|
|
import (
|
|
"math"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"gopkg.in/fatih/set.v0"
|
|
)
|
|
|
|
// queue represents hashes that are either need fetching or are being fetched
|
|
type queue struct {
|
|
hashPool *set.Set
|
|
fetchPool *set.Set
|
|
blockHashes *set.Set
|
|
|
|
mu sync.Mutex
|
|
fetching map[string]*chunk
|
|
blocks []*types.Block
|
|
}
|
|
|
|
func newqueue() *queue {
|
|
return &queue{
|
|
hashPool: set.New(),
|
|
fetchPool: set.New(),
|
|
blockHashes: set.New(),
|
|
fetching: make(map[string]*chunk),
|
|
}
|
|
}
|
|
|
|
// reserve a `max` set of hashes for `p` peer.
|
|
func (c *queue) get(p *peer, max int) *chunk {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// return nothing if the pool has been depleted
|
|
if c.hashPool.Size() == 0 {
|
|
return nil
|
|
}
|
|
|
|
limit := int(math.Min(float64(max), float64(c.hashPool.Size())))
|
|
// Create a new set of hashes
|
|
hashes, i := set.New(), 0
|
|
c.hashPool.Each(func(v interface{}) bool {
|
|
if i == limit {
|
|
return false
|
|
}
|
|
|
|
hashes.Add(v)
|
|
i++
|
|
|
|
return true
|
|
})
|
|
// remove the fetchable hashes from hash pool
|
|
c.hashPool.Separate(hashes)
|
|
c.fetchPool.Merge(hashes)
|
|
|
|
// Create a new chunk for the seperated hashes. The time is being used
|
|
// to reset the chunk (timeout)
|
|
chunk := &chunk{hashes, time.Now()}
|
|
// register as 'fetching' state
|
|
c.fetching[p.id] = chunk
|
|
|
|
// create new chunk for peer
|
|
return chunk
|
|
}
|
|
|
|
func (c *queue) has(hash common.Hash) bool {
|
|
return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
|
|
}
|
|
|
|
func (c *queue) addBlock(id string, block *types.Block, td *big.Int) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// when adding a block make sure it doesn't already exist
|
|
if !c.blockHashes.Has(block.Hash()) {
|
|
c.hashPool.Remove(block.Hash())
|
|
c.blocks = append(c.blocks, block)
|
|
}
|
|
}
|
|
|
|
// deliver delivers a chunk to the queue that was requested of the peer
|
|
func (c *queue) deliver(id string, blocks []*types.Block) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
chunk := c.fetching[id]
|
|
// If the chunk was never requested simply ignore it
|
|
if chunk != nil {
|
|
delete(c.fetching, id)
|
|
|
|
// seperate the blocks and the hashes
|
|
blockHashes := chunk.fetchedHashes(blocks)
|
|
// merge block hashes
|
|
c.blockHashes.Merge(blockHashes)
|
|
// Add the blocks
|
|
c.blocks = append(c.blocks, blocks...)
|
|
|
|
// Add back whatever couldn't be delivered
|
|
c.hashPool.Merge(chunk.hashes)
|
|
c.fetchPool.Separate(chunk.hashes)
|
|
}
|
|
}
|
|
|
|
// puts puts sets of hashes on to the queue for fetching
|
|
func (c *queue) put(hashes *set.Set) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.hashPool.Merge(hashes)
|
|
}
|
|
|
|
type chunk struct {
|
|
hashes *set.Set
|
|
itime time.Time
|
|
}
|
|
|
|
func (ch *chunk) fetchedHashes(blocks []*types.Block) *set.Set {
|
|
fhashes := set.New()
|
|
for _, block := range blocks {
|
|
fhashes.Add(block.Hash())
|
|
}
|
|
ch.hashes.Separate(fhashes)
|
|
|
|
return fhashes
|
|
}
|