diff --git a/les/distributor.go b/les/distributor.go index 4d2be1b8f..97d2ccdfe 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/les/utils" ) // requestDistributor implements a mechanism that distributes requests to @@ -194,7 +195,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { elem := d.reqQueue.Front() var ( bestWait time.Duration - sel *weightedRandomSelect + sel *utils.WeightedRandomSelect ) d.peerLock.RLock() @@ -219,9 +220,9 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { wait, bufRemain := peer.waitBefore(cost) if wait == 0 { if sel == nil { - sel = newWeightedRandomSelect() + sel = utils.NewWeightedRandomSelect() } - sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) + sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { if bestWait == 0 || wait < bestWait { bestWait = wait @@ -239,7 +240,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { } if sel != nil { - c := sel.choose().(selectPeerItem) + c := sel.Choose().(selectPeerItem) return c.peer, c.req, 0 } return nil, nil, bestWait diff --git a/les/peer.go b/les/peer.go index d308fd249..c2f4235eb 100644 --- a/les/peer.go +++ b/les/peer.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" + "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -135,7 +136,7 @@ type peerCommons struct { headInfo blockInfo // Latest block information. // Background task queue for caching peer tasks and executing in order. - sendQueue *execQueue + sendQueue *utils.ExecQueue // Flow control agreement. fcParams flowcontrol.ServerParams // The config for token bucket. @@ -153,13 +154,13 @@ func (p *peerCommons) isFrozen() bool { // canQueue returns an indicator whether the peer can queue a operation. func (p *peerCommons) canQueue() bool { - return p.sendQueue.canQueue() && !p.isFrozen() + return p.sendQueue.CanQueue() && !p.isFrozen() } // queueSend caches a peer operation in the background task queue. // Please ensure to check `canQueue` before call this function func (p *peerCommons) queueSend(f func()) bool { - return p.sendQueue.queue(f) + return p.sendQueue.Queue(f) } // mustQueueSend starts a for loop and retry the caching if failed. @@ -337,7 +338,7 @@ func (p *peerCommons) handshake(td *big.Int, head common.Hash, headNum uint64, g // close closes the channel and notifies all background routines to exit. func (p *peerCommons) close() { close(p.closeCh) - p.sendQueue.quit() + p.sendQueue.Quit() } // serverPeer represents each node to which the client is connected. @@ -375,7 +376,7 @@ func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2 id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: newExecQueue(100), + sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, trusted: trusted, @@ -407,7 +408,7 @@ func (p *serverPeer) rejectUpdate(size uint64) bool { // frozen. func (p *serverPeer) freeze() { if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) { - p.sendQueue.clear() + p.sendQueue.Clear() } } @@ -652,7 +653,7 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: newExecQueue(100), + sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, errCh: make(chan error, 1), diff --git a/les/serverpool.go b/les/serverpool.go index ec99a2d98..d1c53295a 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -129,7 +130,7 @@ type serverPool struct { adjustStats chan poolStatAdjust knownQueue, newQueue poolEntryQueue - knownSelect, newSelect *weightedRandomSelect + knownSelect, newSelect *utils.WeightedRandomSelect knownSelected, newSelected int fastDiscover bool connCh chan *connReq @@ -152,8 +153,8 @@ func newServerPool(db ethdb.Database, ulcServers []string) *serverPool { disconnCh: make(chan *disconnReq), registerCh: make(chan *registerReq), closeCh: make(chan struct{}), - knownSelect: newWeightedRandomSelect(), - newSelect: newWeightedRandomSelect(), + knownSelect: utils.NewWeightedRandomSelect(), + newSelect: utils.NewWeightedRandomSelect(), fastDiscover: true, trustedNodes: parseTrustedNodes(ulcServers), } @@ -402,8 +403,8 @@ func (pool *serverPool) eventLoop() { entry.lastConnected = addr entry.addr = make(map[string]*poolEntryAddress) entry.addr[addr.strKey()] = addr - entry.addrSelect = *newWeightedRandomSelect() - entry.addrSelect.update(addr) + entry.addrSelect = *utils.NewWeightedRandomSelect() + entry.addrSelect.Update(addr) req.result <- entry } @@ -459,7 +460,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { entry = &poolEntry{ node: node, addr: make(map[string]*poolEntryAddress), - addrSelect: *newWeightedRandomSelect(), + addrSelect: *utils.NewWeightedRandomSelect(), shortRetry: shortRetryCnt, } pool.entries[node.ID()] = entry @@ -477,7 +478,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { entry.addr[addr.strKey()] = addr } addr.lastSeen = now - entry.addrSelect.update(addr) + entry.addrSelect.Update(addr) if !entry.known { pool.newQueue.setLatest(entry) } @@ -505,7 +506,7 @@ func (pool *serverPool) loadNodes() { pool.entries[e.node.ID()] = e if pool.trustedNodes[e.node.ID()] == nil { pool.knownQueue.setLatest(e) - pool.knownSelect.update((*knownEntry)(e)) + pool.knownSelect.Update((*knownEntry)(e)) } } } @@ -556,8 +557,8 @@ func (pool *serverPool) saveNodes() { // Note that it is called by the new/known queues from which the entry has already // been removed so removing it from the queues is not necessary. func (pool *serverPool) removeEntry(entry *poolEntry) { - pool.newSelect.remove((*discoveredEntry)(entry)) - pool.knownSelect.remove((*knownEntry)(entry)) + pool.newSelect.Remove((*discoveredEntry)(entry)) + pool.knownSelect.Remove((*knownEntry)(entry)) entry.removed = true delete(pool.entries, entry.node.ID()) } @@ -586,8 +587,8 @@ func (pool *serverPool) setRetryDial(entry *poolEntry) { // updateCheckDial is called when an entry can potentially be dialed again. It updates // its selection weights and checks if new dials can/should be made. func (pool *serverPool) updateCheckDial(entry *poolEntry) { - pool.newSelect.update((*discoveredEntry)(entry)) - pool.knownSelect.update((*knownEntry)(entry)) + pool.newSelect.Update((*discoveredEntry)(entry)) + pool.knownSelect.Update((*knownEntry)(entry)) pool.checkDial() } @@ -596,7 +597,7 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) { func (pool *serverPool) checkDial() { fillWithKnownSelects := !pool.fastDiscover for pool.knownSelected < targetKnownSelect { - entry := pool.knownSelect.choose() + entry := pool.knownSelect.Choose() if entry == nil { fillWithKnownSelects = false break @@ -604,7 +605,7 @@ func (pool *serverPool) checkDial() { pool.dial((*poolEntry)(entry.(*knownEntry)), true) } for pool.knownSelected+pool.newSelected < targetServerCount { - entry := pool.newSelect.choose() + entry := pool.newSelect.Choose() if entry == nil { break } @@ -615,7 +616,7 @@ func (pool *serverPool) checkDial() { // is over, we probably won't find more in the near future so select more // known entries if possible for pool.knownSelected < targetServerCount { - entry := pool.knownSelect.choose() + entry := pool.knownSelect.Choose() if entry == nil { break } @@ -636,7 +637,7 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { } else { pool.newSelected++ } - addr := entry.addrSelect.choose().(*poolEntryAddress) + addr := entry.addrSelect.Choose().(*poolEntryAddress) log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) entry.dialed = addr go func() { @@ -684,7 +685,7 @@ type poolEntry struct { addr map[string]*poolEntryAddress node *enode.Node lastConnected, dialed *poolEntryAddress - addrSelect weightedRandomSelect + addrSelect utils.WeightedRandomSelect lastDiscovered mclock.AbsTime known, knownSelected, trusted bool @@ -734,8 +735,8 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr - e.addrSelect = *newWeightedRandomSelect() - e.addrSelect.update(addr) + e.addrSelect = *utils.NewWeightedRandomSelect() + e.addrSelect.Update(addr) e.lastConnected = addr e.connectStats = entry.CStat e.delayStats = entry.DStat diff --git a/les/execqueue.go b/les/utils/exec_queue.go similarity index 71% rename from les/execqueue.go rename to les/utils/exec_queue.go index e0c88a990..a8f9b84ac 100644 --- a/les/execqueue.go +++ b/les/utils/exec_queue.go @@ -14,35 +14,35 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utils import "sync" -// execQueue implements a queue that executes function calls in a single thread, +// ExecQueue implements a queue that executes function calls in a single thread, // in the same order as they have been queued. -type execQueue struct { +type ExecQueue struct { mu sync.Mutex cond *sync.Cond funcs []func() closeWait chan struct{} } -// newExecQueue creates a new execution queue. -func newExecQueue(capacity int) *execQueue { - q := &execQueue{funcs: make([]func(), 0, capacity)} +// NewExecQueue creates a new execution Queue. +func NewExecQueue(capacity int) *ExecQueue { + q := &ExecQueue{funcs: make([]func(), 0, capacity)} q.cond = sync.NewCond(&q.mu) go q.loop() return q } -func (q *execQueue) loop() { +func (q *ExecQueue) loop() { for f := q.waitNext(false); f != nil; f = q.waitNext(true) { f() } close(q.closeWait) } -func (q *execQueue) waitNext(drop bool) (f func()) { +func (q *ExecQueue) waitNext(drop bool) (f func()) { q.mu.Lock() if drop && len(q.funcs) > 0 { // Remove the function that just executed. We do this here instead of when @@ -60,20 +60,20 @@ func (q *execQueue) waitNext(drop bool) (f func()) { return f } -func (q *execQueue) isClosed() bool { +func (q *ExecQueue) isClosed() bool { return q.closeWait != nil } -// canQueue returns true if more function calls can be added to the execution queue. -func (q *execQueue) canQueue() bool { +// CanQueue returns true if more function calls can be added to the execution Queue. +func (q *ExecQueue) CanQueue() bool { q.mu.Lock() ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) q.mu.Unlock() return ok } -// queue adds a function call to the execution queue. Returns true if successful. -func (q *execQueue) queue(f func()) bool { +// Queue adds a function call to the execution Queue. Returns true if successful. +func (q *ExecQueue) Queue(f func()) bool { q.mu.Lock() ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) if ok { @@ -84,16 +84,17 @@ func (q *execQueue) queue(f func()) bool { return ok } -// clear drops all queued functions -func (q *execQueue) clear() { +// Clear drops all queued functions. +func (q *ExecQueue) Clear() { q.mu.Lock() q.funcs = q.funcs[:0] q.mu.Unlock() } -// quit stops the exec queue. -// quit waits for the current execution to finish before returning. -func (q *execQueue) quit() { +// Quit stops the exec Queue. +// +// Quit waits for the current execution to finish before returning. +func (q *ExecQueue) Quit() { q.mu.Lock() if !q.isClosed() { q.closeWait = make(chan struct{}) diff --git a/les/execqueue_test.go b/les/utils/exec_queue_test.go similarity index 83% rename from les/execqueue_test.go rename to les/utils/exec_queue_test.go index cd45b03f2..98601c448 100644 --- a/les/execqueue_test.go +++ b/les/utils/exec_queue_test.go @@ -14,21 +14,19 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utils -import ( - "testing" -) +import "testing" func TestExecQueue(t *testing.T) { var ( N = 10000 - q = newExecQueue(N) + q = NewExecQueue(N) counter int execd = make(chan int) testexit = make(chan struct{}) ) - defer q.quit() + defer q.Quit() defer close(testexit) check := func(state string, wantOK bool) { @@ -40,11 +38,11 @@ func TestExecQueue(t *testing.T) { case <-testexit: } } - if q.canQueue() != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) + if q.CanQueue() != wantOK { + t.Fatalf("CanQueue() == %t for %s", !wantOK, state) } - if q.queue(qf) != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) + if q.Queue(qf) != wantOK { + t.Fatalf("Queue() == %t for %s", !wantOK, state) } } @@ -57,6 +55,6 @@ func TestExecQueue(t *testing.T) { t.Fatal("execution out of order") } } - q.quit() + q.Quit() check("closed queue", false) } diff --git a/les/randselect.go b/les/utils/weighted_select.go similarity index 82% rename from les/randselect.go rename to les/utils/weighted_select.go index 8efe0c94d..fbf1f37d6 100644 --- a/les/randselect.go +++ b/les/utils/weighted_select.go @@ -14,43 +14,30 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utils -import ( - "math/rand" -) +import "math/rand" // wrsItem interface should be implemented by any entries that are to be selected from -// a weightedRandomSelect set. Note that recalculating monotonously decreasing item -// weights on-demand (without constantly calling update) is allowed +// a WeightedRandomSelect set. Note that recalculating monotonously decreasing item +// weights on-demand (without constantly calling Update) is allowed type wrsItem interface { Weight() int64 } -// weightedRandomSelect is capable of weighted random selection from a set of items -type weightedRandomSelect struct { +// WeightedRandomSelect is capable of weighted random selection from a set of items +type WeightedRandomSelect struct { root *wrsNode idx map[wrsItem]int } -// newWeightedRandomSelect returns a new weightedRandomSelect structure -func newWeightedRandomSelect() *weightedRandomSelect { - return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} -} - -// update updates an item's weight, adds it if it was non-existent or removes it if -// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. -func (w *weightedRandomSelect) update(item wrsItem) { - w.setWeight(item, item.Weight()) -} - -// remove removes an item from the set -func (w *weightedRandomSelect) remove(item wrsItem) { - w.setWeight(item, 0) +// NewWeightedRandomSelect returns a new WeightedRandomSelect structure +func NewWeightedRandomSelect() *WeightedRandomSelect { + return &WeightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} } // setWeight sets an item's weight to a specific value (removes it if zero) -func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { +func (w *WeightedRandomSelect) setWeight(item wrsItem, weight int64) { idx, ok := w.idx[item] if ok { w.root.setWeight(idx, weight) @@ -71,11 +58,22 @@ func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { } } -// choose randomly selects an item from the set, with a chance proportional to its +// Update updates an item's weight, adds it if it was non-existent or removes it if +// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. +func (w *WeightedRandomSelect) Update(item wrsItem) { + w.setWeight(item, item.Weight()) +} + +// Remove removes an item from the set +func (w *WeightedRandomSelect) Remove(item wrsItem) { + w.setWeight(item, 0) +} + +// Choose randomly selects an item from the set, with a chance proportional to its // current weight. If the weight of the chosen element has been decreased since the // last stored value, returns it with a newWeight/oldWeight chance, otherwise just // updates its weight and selects another one -func (w *weightedRandomSelect) choose() wrsItem { +func (w *WeightedRandomSelect) Choose() wrsItem { for { if w.root.sumWeight == 0 { return nil @@ -154,7 +152,7 @@ func (n *wrsNode) setWeight(idx int, weight int64) int64 { return diff } -// choose recursively selects an item from the tree and returns it along with its weight +// Choose recursively selects an item from the tree and returns it along with its weight func (n *wrsNode) choose(val int64) (wrsItem, int64) { for i, w := range n.weights { if val < w { diff --git a/les/randselect_test.go b/les/utils/weighted_select_test.go similarity index 93% rename from les/randselect_test.go rename to les/utils/weighted_select_test.go index 9ae7726dd..e1969e1a6 100644 --- a/les/randselect_test.go +++ b/les/utils/weighted_select_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utils import ( "math/rand" @@ -36,15 +36,15 @@ func (t *testWrsItem) Weight() int64 { func TestWeightedRandomSelect(t *testing.T) { testFn := func(cnt int) { - s := newWeightedRandomSelect() + s := NewWeightedRandomSelect() w := -1 list := make([]testWrsItem, cnt) for i := range list { list[i] = testWrsItem{idx: i, widx: &w} - s.update(&list[i]) + s.Update(&list[i]) } w = rand.Intn(cnt) - c := s.choose() + c := s.Choose() if c == nil { t.Errorf("expected item, got nil") } else { @@ -53,7 +53,7 @@ func TestWeightedRandomSelect(t *testing.T) { } } w = -2 - if s.choose() != nil { + if s.Choose() != nil { t.Errorf("expected nil, got item") } }