From 43901c92825389b694fb5488c520cf5122f022de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 7 May 2015 14:40:50 +0300 Subject: [PATCH] eth/downloader: fix priority queue reset, add throttling test --- Godeps/Godeps.json | 2 +- .../cookiejar.v2/collections/prque/prque.go | 2 +- .../collections/prque/prque_test.go | 43 ++++++++++++++--- .../cookiejar.v2/collections/prque/sstack.go | 7 +-- .../collections/prque/sstack_test.go | 30 +++++++++--- eth/downloader/downloader_test.go | 48 +++++++++++++++++++ 6 files changed, 111 insertions(+), 21 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a5b27e76c..012475c08 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -100,7 +100,7 @@ }, { "ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque", - "Rev": "cf5d8079df7c4501217638e1e3a6e43f94822548" + "Rev": "0b2e270613f5d7ba262a5749b9e32270131497a2" }, { "ImportPath": "gopkg.in/qml.v1/cdata", diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go index 8225e8c53..a1009f3be 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go @@ -71,5 +71,5 @@ func (p *Prque) Size() int { // Clears the contents of the priority queue. func (p *Prque) Reset() { - p.cont.Reset() + *p = *New() } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go index 811c53c73..daba691e1 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go @@ -61,16 +61,45 @@ func TestPrque(t *testing.T) { } func TestReset(t *testing.T) { - // Fill the queue with some random data + // Generate a batch of random data and a specific priority order size := 16 * blockSize - queue := New() + prio := rand.Perm(size) + data := make([]int, size) for i := 0; i < size; i++ { - queue.Push(rand.Int(), rand.Float32()) + data[i] = rand.Int() } - // Reset and ensure it's empty - queue.Reset() - if !queue.Empty() { - t.Errorf("priority queue not empty after reset: %v", queue) + queue := New() + for rep := 0; rep < 2; rep++ { + // Fill a priority queue with the above data + for i := 0; i < size; i++ { + queue.Push(data[i], float32(prio[i])) + if queue.Size() != i+1 { + t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) + } + } + // Create a map the values to the priorities for easier verification + dict := make(map[float32]int) + for i := 0; i < size; i++ { + dict[float32(prio[i])] = data[i] + } + // Pop out half the elements in priority order and verify them + prevPrio := float32(size + 1) + for i := 0; i < size/2; i++ { + val, prio := queue.Pop() + if prio > prevPrio { + t.Errorf("invalid priority order: %v after %v.", prio, prevPrio) + } + prevPrio = prio + if val != dict[prio] { + t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio]) + } + delete(dict, prio) + } + // Reset and ensure it's empty + queue.Reset() + if !queue.Empty() { + t.Errorf("priority queue not empty after reset: %v", queue) + } } } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go index 55375a091..c11347f9d 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go @@ -88,7 +88,7 @@ func (s *sstack) Less(i, j int) bool { return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority } -// Swapts two elements in the stack. Required by sort.Interface. +// Swaps two elements in the stack. Required by sort.Interface. func (s *sstack) Swap(i, j int) { ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io] @@ -96,8 +96,5 @@ func (s *sstack) Swap(i, j int) { // Resets the stack, effectively clearing its contents. func (s *sstack) Reset() { - s.size = 0 - s.offset = 0 - s.active = s.blocks[0] - s.capacity = blockSize + *s = *newSstack() } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go index 7bdc08bf5..bcb5b830b 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go @@ -79,15 +79,31 @@ func TestSstackSort(t *testing.T) { } func TestSstackReset(t *testing.T) { - // Push some stuff onto the stack + // Create some initial data size := 16 * blockSize - stack := newSstack() + data := make([]*item, size) for i := 0; i < size; i++ { - stack.Push(&item{i, float32(i)}) + data[i] = &item{rand.Int(), rand.Float32()} } - // Clear and verify - stack.Reset() - if stack.Len() != 0 { - t.Errorf("stack not empty after reset: %v", stack) + stack := newSstack() + for rep := 0; rep < 2; rep++ { + // Push all the data into the stack, pop out every second + secs := []*item{} + for i := 0; i < size; i++ { + stack.Push(data[i]) + if i%2 == 0 { + secs = append(secs, stack.Pop().(*item)) + } + } + // Reset and verify both pulled and stack contents + stack.Reset() + if stack.Len() != 0 { + t.Errorf("stack not empty after reset: %v", stack) + } + for i := 0; i < size; i++ { + if i%2 == 0 && data[i] != secs[i/2] { + t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i]) + } + } } } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 11834d788..bd439d96a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -181,3 +181,51 @@ func TestTaking(t *testing.T) { t.Error("expected to take 1000, got", len(bs1)) } } + +func TestThrottling(t *testing.T) { + minDesiredPeerCount = 4 + blockTtl = 1 * time.Second + + targetBlocks := 4 * blockCacheLimit + hashes := createHashes(0, targetBlocks) + blocks := createBlocksFromHashes(hashes) + tester := newTester(t, hashes, blocks) + + tester.newPeer("peer1", big.NewInt(10000), hashes[0]) + tester.newPeer("peer2", big.NewInt(0), common.Hash{}) + tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{}) + tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) + + // Concurrently download and take the blocks + errc := make(chan error, 1) + go func() { + errc <- tester.sync("peer1", hashes[0]) + }() + + done := make(chan struct{}) + took := []*types.Block{} + go func() { + for { + select { + case <-done: + took = append(took, tester.downloader.TakeBlocks()...) + done <- struct{}{} + return + default: + took = append(took, tester.downloader.TakeBlocks()...) + } + } + }() + + // Synchronize the two threads and verify + err := <-errc + done <- struct{}{} + <-done + + if err != nil { + t.Fatalf("failed to synchronize blocks: %v", err) + } + if len(took) != targetBlocks { + t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) + } +}