package downloader import ( "math" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "gopkg.in/fatih/set.v0" ) // queue represents hashes that are either need fetching or are being fetched type queue struct { hashPool *set.Set fetchPool *set.Set blockHashes *set.Set mu sync.Mutex fetching map[string]*chunk blocks []*types.Block } func newqueue() *queue { return &queue{ hashPool: set.New(), fetchPool: set.New(), blockHashes: set.New(), fetching: make(map[string]*chunk), } } func (c *queue) reset() { c.mu.Lock() defer c.mu.Unlock() c.hashPool.Clear() c.fetchPool.Clear() c.blockHashes.Clear() c.blocks = nil c.fetching = make(map[string]*chunk) } // reserve a `max` set of hashes for `p` peer. func (c *queue) get(p *peer, max int) *chunk { c.mu.Lock() defer c.mu.Unlock() // return nothing if the pool has been depleted if c.hashPool.Size() == 0 { return nil } limit := int(math.Min(float64(max), float64(c.hashPool.Size()))) // Create a new set of hashes hashes, i := set.New(), 0 c.hashPool.Each(func(v interface{}) bool { if i == limit { return false } hashes.Add(v) i++ return true }) // remove the fetchable hashes from hash pool c.hashPool.Separate(hashes) c.fetchPool.Merge(hashes) // Create a new chunk for the seperated hashes. The time is being used // to reset the chunk (timeout) chunk := &chunk{hashes, time.Now()} // register as 'fetching' state c.fetching[p.id] = chunk // create new chunk for peer return chunk } func (c *queue) has(hash common.Hash) bool { return c.hashPool.Has(hash) || c.fetchPool.Has(hash) } func (c *queue) addBlock(id string, block *types.Block, td *big.Int) { c.mu.Lock() defer c.mu.Unlock() // when adding a block make sure it doesn't already exist if !c.blockHashes.Has(block.Hash()) { c.hashPool.Remove(block.Hash()) c.blocks = append(c.blocks, block) } } // deliver delivers a chunk to the queue that was requested of the peer func (c *queue) deliver(id string, blocks []*types.Block) { c.mu.Lock() defer c.mu.Unlock() chunk := c.fetching[id] // If the chunk was never requested simply ignore it if chunk != nil { delete(c.fetching, id) // seperate the blocks and the hashes blockHashes := chunk.fetchedHashes(blocks) // merge block hashes c.blockHashes.Merge(blockHashes) // Add the blocks c.blocks = append(c.blocks, blocks...) // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) c.fetchPool.Separate(chunk.hashes) } } // puts puts sets of hashes on to the queue for fetching func (c *queue) put(hashes *set.Set) { c.mu.Lock() defer c.mu.Unlock() c.hashPool.Merge(hashes) } type chunk struct { hashes *set.Set itime time.Time } func (ch *chunk) fetchedHashes(blocks []*types.Block) *set.Set { fhashes := set.New() for _, block := range blocks { fhashes.Add(block.Hash()) } ch.hashes.Separate(fhashes) return fhashes }