Merge pull request #3108 from homotopycolimit/FixChunkerWithBrokenReader
swarm/storage: fixes for tree chunker in the context of a broken reader
This commit is contained in:
		
						commit
						30fb5c3e81
					
				| @ -23,8 +23,6 @@ import ( | ||||
| 	"hash" | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	// "github.com/ethereum/go-ethereum/logger"
 | ||||
| 	// "github.com/ethereum/go-ethereum/logger/glog"
 | ||||
| ) | ||||
| 
 | ||||
| /* | ||||
| @ -124,12 +122,13 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s | ||||
| 	jobC := make(chan *hashJob, 2*processors) | ||||
| 	wg := &sync.WaitGroup{} | ||||
| 	errC := make(chan error) | ||||
| 	quitC := make(chan bool) | ||||
| 
 | ||||
| 	// wwg = workers waitgroup keeps track of hashworkers spawned by this split call
 | ||||
| 	if wwg != nil { | ||||
| 		wwg.Add(1) | ||||
| 	} | ||||
| 	go self.hashWorker(jobC, chunkC, errC, swg, wwg) | ||||
| 	go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) | ||||
| 
 | ||||
| 	depth := 0 | ||||
| 	treeSize := self.chunkSize | ||||
| @ -141,11 +140,10 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s | ||||
| 	} | ||||
| 
 | ||||
| 	key := make([]byte, self.hashFunc().Size()) | ||||
| 	// glog.V(logger.Detail).Infof("split request received for data (%v bytes, depth: %v)", size, depth)
 | ||||
| 	// this waitgroup member is released after the root hash is calculated
 | ||||
| 	wg.Add(1) | ||||
| 	//launch actual recursive function passing the waitgroups
 | ||||
| 	go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, wg, swg, wwg) | ||||
| 	go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg) | ||||
| 
 | ||||
| 	// closes internal error channel if all subprocesses in the workgroup finished
 | ||||
| 	go func() { | ||||
| @ -153,7 +151,6 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s | ||||
| 		wg.Wait() | ||||
| 		// if storage waitgroup is non-nil, we wait for storage to finish too
 | ||||
| 		if swg != nil { | ||||
| 			// glog.V(logger.Detail).Infof("Waiting for storage to finish")
 | ||||
| 			swg.Wait() | ||||
| 		} | ||||
| 		close(errC) | ||||
| @ -162,14 +159,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s | ||||
| 	select { | ||||
| 	case err := <-errC: | ||||
| 		if err != nil { | ||||
| 			close(quitC) | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		//
 | ||||
| 		//TODO: add a timeout
 | ||||
| 	} | ||||
| 	return key, nil | ||||
| } | ||||
| 
 | ||||
| func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, parentWg, swg, wwg *sync.WaitGroup) { | ||||
| func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) { | ||||
| 
 | ||||
| 	for depth > 0 && size < treeSize { | ||||
| 		treeSize /= self.branches | ||||
| @ -180,17 +178,20 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade | ||||
| 		// leaf nodes -> content chunks
 | ||||
| 		chunkData := make([]byte, size+8) | ||||
| 		binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size)) | ||||
| 		data.Read(chunkData[8:]) | ||||
| 		_, err := data.Read(chunkData[8:]) | ||||
| 		if err != nil { | ||||
| 			errC <- err | ||||
| 			return | ||||
| 		} | ||||
| 		select { | ||||
| 		case jobC <- &hashJob{key, chunkData, size, parentWg}: | ||||
| 		case <-errC: | ||||
| 		case <-quitC: | ||||
| 		} | ||||
| 		// glog.V(logger.Detail).Infof("read %v", size)
 | ||||
| 		return | ||||
| 	} | ||||
| 	// dept > 0
 | ||||
| 	// intermediate chunk containing child nodes hashes
 | ||||
| 	branchCnt := int64((size + treeSize - 1) / treeSize) | ||||
| 	// glog.V(logger.Detail).Infof("intermediate node: setting branches: %v, depth: %v, max subtree size: %v, data size: %v", branches, depth, treeSize, size)
 | ||||
| 
 | ||||
| 	var chunk []byte = make([]byte, branchCnt*self.hashSize+8) | ||||
| 	var pos, i int64 | ||||
| @ -210,7 +211,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade | ||||
| 		subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize] | ||||
| 
 | ||||
| 		childrenWg.Add(1) | ||||
| 		self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, childrenWg, swg, wwg) | ||||
| 		self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg) | ||||
| 
 | ||||
| 		i++ | ||||
| 		pos += treeSize | ||||
| @ -224,15 +225,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade | ||||
| 			wwg.Add(1) | ||||
| 		} | ||||
| 		self.workerCount++ | ||||
| 		go self.hashWorker(jobC, chunkC, errC, swg, wwg) | ||||
| 		go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) | ||||
| 	} | ||||
| 	select { | ||||
| 	case jobC <- &hashJob{key, chunk, size, parentWg}: | ||||
| 	case <-errC: | ||||
| 	case <-quitC: | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, swg, wwg *sync.WaitGroup) { | ||||
| func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { | ||||
| 	hasher := self.hashFunc() | ||||
| 	if wwg != nil { | ||||
| 		defer wwg.Done() | ||||
| @ -247,8 +248,7 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC | ||||
| 			// now we got the hashes in the chunk, then hash the chunks
 | ||||
| 			hasher.Reset() | ||||
| 			self.hashChunk(hasher, job, chunkC, swg) | ||||
| 			// glog.V(logger.Detail).Infof("hash chunk (%v)", job.size)
 | ||||
| 		case <-errC: | ||||
| 		case <-quitC: | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| @ -276,6 +276,7 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan * | ||||
| 		} | ||||
| 	} | ||||
| 	job.parentWg.Done() | ||||
| 
 | ||||
| 	if chunkC != nil { | ||||
| 		chunkC <- newChunk | ||||
| 	} | ||||
| @ -328,7 +329,6 @@ func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { | ||||
| func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { | ||||
| 	// this is correct, a swarm doc cannot be zero length, so no EOF is expected
 | ||||
| 	if len(b) == 0 { | ||||
| 		// glog.V(logger.Detail).Infof("Size query for %v", chunk.Key.Log())
 | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	quitC := make(chan bool) | ||||
| @ -336,13 +336,10 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	// glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size)
 | ||||
| 
 | ||||
| 	errC := make(chan error) | ||||
| 	// glog.V(logger.Detail).Infof("readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off)
 | ||||
| 
 | ||||
| 	// }
 | ||||
| 	// glog.V(logger.Detail).Infof("-> want: %v, off: %v size: %v ", want, off, self.size)
 | ||||
| 	var treeSize int64 | ||||
| 	var depth int | ||||
| 	// calculate depth and max treeSize
 | ||||
| @ -364,22 +361,16 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { | ||||
| 
 | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	// glog.V(logger.Detail).Infof("ReadAt received %v", err)
 | ||||
| 	// glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size)
 | ||||
| 	if off+int64(len(b)) >= size { | ||||
| 		// glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b))
 | ||||
| 		return len(b), io.EOF | ||||
| 	} | ||||
| 	// glog.V(logger.Detail).Infof("ReadAt returning at %d: %v", read, err)
 | ||||
| 	return len(b), nil | ||||
| } | ||||
| 
 | ||||
| func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { | ||||
| 	defer parentWg.Done() | ||||
| 	// return NewDPA(&LocalStore{})
 | ||||
| 	// glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff)
 | ||||
| 
 | ||||
| 	// glog.V(logger.Detail).Infof("depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize)
 | ||||
| 
 | ||||
| 	// chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
 | ||||
| 
 | ||||
| @ -391,7 +382,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr | ||||
| 
 | ||||
| 	// leaf chunk found
 | ||||
| 	if depth == 0 { | ||||
| 		// glog.V(logger.Detail).Infof("depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize)
 | ||||
| 		extra := 8 + eoff - int64(len(chunk.SData)) | ||||
| 		if extra > 0 { | ||||
| 			eoff -= extra | ||||
| @ -406,7 +396,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr | ||||
| 
 | ||||
| 	wg := &sync.WaitGroup{} | ||||
| 	defer wg.Wait() | ||||
| 	// glog.V(logger.Detail).Infof("start %v,end %v", start, end)
 | ||||
| 
 | ||||
| 	for i := start; i < end; i++ { | ||||
| 		soff := i * treeSize | ||||
| @ -425,7 +414,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr | ||||
| 		wg.Add(1) | ||||
| 		go func(j int64) { | ||||
| 			childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize] | ||||
| 			// glog.V(logger.Detail).Infof("subtree ind.ex: %v -> %v", j, childKey.Log())
 | ||||
| 			chunk := retrieve(childKey, self.chunkC, quitC) | ||||
| 			if chunk == nil { | ||||
| 				select { | ||||
| @ -450,7 +438,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { | ||||
| 		Key: key, | ||||
| 		C:   make(chan bool), // close channel to signal data delivery
 | ||||
| 	} | ||||
| 	// glog.V(logger.Detail).Infof("chunk data sent for %v (key interval in chunk %v-%v)", ch.Key.Log(), j*self.chunker.hashSize, (j+1)*self.chunker.hashSize)
 | ||||
| 	// submit chunk for retrieval
 | ||||
| 	select { | ||||
| 	case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally)
 | ||||
| @ -464,7 +451,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { | ||||
| 		// this is how we control process leakage (quitC is closed once join is finished (after timeout))
 | ||||
| 		return nil | ||||
| 	case <-chunk.C: // bells are ringing, data have been delivered
 | ||||
| 		// glog.V(logger.Detail).Infof("chunk data received")
 | ||||
| 	} | ||||
| 	if len(chunk.SData) == 0 { | ||||
| 		return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
 | ||||
| @ -476,7 +462,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { | ||||
| // Read keeps a cursor so cannot be called simulateously, see ReadAt
 | ||||
| func (self *LazyChunkReader) Read(b []byte) (read int, err error) { | ||||
| 	read, err = self.ReadAt(b, self.off) | ||||
| 	// glog.V(logger.Detail).Infof("read: %v, off: %v, error: %v", read, self.off, err)
 | ||||
| 
 | ||||
| 	self.off += int64(read) | ||||
| 	return | ||||
|  | ||||
| @ -18,6 +18,7 @@ package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"crypto/rand" | ||||
| 	"encoding/binary" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| @ -27,6 +28,7 @@ import ( | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| /* | ||||
| Tests TreeChunker by splitting and joining a random byte slice | ||||
| */ | ||||
| @ -49,7 +51,7 @@ func (self *chunkerTester) checkChunks(t *testing.T, want int) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup) (key Key) { | ||||
| func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key) { | ||||
| 	// reset
 | ||||
| 	self.chunks = make(map[string]*Chunk) | ||||
| 
 | ||||
| @ -65,14 +67,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c | ||||
| 				select { | ||||
| 				case <-timeout: | ||||
| 					self.t.Fatalf("Join timeout error") | ||||
| 
 | ||||
| 				case chunk, ok := <-chunkC: | ||||
| 					if !ok { | ||||
| 						// glog.V(logger.Info).Infof("chunkC closed quitting")
 | ||||
| 						close(quitC) | ||||
| 						return | ||||
| 					} | ||||
| 					// glog.V(logger.Info).Infof("chunk %v received", len(self.chunks))
 | ||||
| 				case <-quitC: | ||||
| 					return | ||||
| 				case chunk := <-chunkC: | ||||
| 					// self.chunks = append(self.chunks, chunk)
 | ||||
| 					self.chunks[chunk.Key.String()] = chunk | ||||
| 					if chunk.wg != nil { | ||||
| @ -83,21 +80,16 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c | ||||
| 		}() | ||||
| 	} | ||||
| 	key, err := chunker.Split(data, size, chunkC, swg, nil) | ||||
| 	if err != nil { | ||||
| 	if err != nil && expectedError == nil { | ||||
| 		self.t.Fatalf("Split error: %v", err) | ||||
| 	} else if expectedError != nil && (err == nil || err.Error() != expectedError.Error()) { | ||||
| 		self.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) | ||||
| 	} | ||||
| 	if chunkC != nil { | ||||
| 		if swg != nil { | ||||
| 			// glog.V(logger.Info).Infof("Waiting for storage to finish")
 | ||||
| 			swg.Wait() | ||||
| 			// glog.V(logger.Info).Infof("Storage finished")
 | ||||
| 		} | ||||
| 		close(chunkC) | ||||
| 	} | ||||
| 	if chunkC != nil { | ||||
| 		// glog.V(logger.Info).Infof("waiting for splitter finished")
 | ||||
| 		<-quitC | ||||
| 		// glog.V(logger.Info).Infof("Splitter finished")
 | ||||
| 		close(quitC) | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| @ -105,11 +97,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c | ||||
| func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader { | ||||
| 	// reset but not the chunks
 | ||||
| 
 | ||||
| 	// glog.V(logger.Info).Infof("Splitter finished")
 | ||||
| 	reader := chunker.Join(key, chunkC) | ||||
| 
 | ||||
| 	timeout := time.After(600 * time.Second) | ||||
| 	// glog.V(logger.Info).Infof("Splitter finished")
 | ||||
| 	i := 0 | ||||
| 	go func() { | ||||
| 		for { | ||||
| @ -122,15 +112,12 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch | ||||
| 					close(quitC) | ||||
| 					return | ||||
| 				} | ||||
| 				// glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
 | ||||
| 				// this just mocks the behaviour of a chunk store retrieval
 | ||||
| 				stored, success := self.chunks[chunk.Key.String()] | ||||
| 				// glog.V(logger.Info).Infof("chunk %v, success: %v", chunk.Key.String(), success)
 | ||||
| 				if !success { | ||||
| 					self.t.Fatalf("not found") | ||||
| 					return | ||||
| 				} | ||||
| 				// glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
 | ||||
| 				chunk.SData = stored.SData | ||||
| 				chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) | ||||
| 				close(chunk.C) | ||||
| @ -141,6 +128,26 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch | ||||
| 	return reader | ||||
| } | ||||
| 
 | ||||
| func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) { | ||||
| 	data := io.LimitReader(rand.Reader, int64(n)) | ||||
| 	brokendata := brokenLimitReader(data, n, n/2) | ||||
| 
 | ||||
| 	buf := make([]byte, n) | ||||
| 	_, err := brokendata.Read(buf) | ||||
| 	if err == nil || err.Error() != "Broken reader" { | ||||
| 		tester.t.Fatalf("Broken reader is not broken, hence broken. Returns: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	data = io.LimitReader(rand.Reader, int64(n)) | ||||
| 	brokendata = brokenLimitReader(data, n, n/2) | ||||
| 
 | ||||
| 	chunkC := make(chan *Chunk, 1000) | ||||
| 	swg := &sync.WaitGroup{} | ||||
| 
 | ||||
| 	key := tester.Split(splitter, brokendata, int64(n), chunkC, swg, fmt.Errorf("Broken reader")) | ||||
| 	tester.t.Logf(" Key = %v\n", key) | ||||
| } | ||||
| 
 | ||||
| func testRandomData(splitter Splitter, n int, tester *chunkerTester) { | ||||
| 	if tester.inputs == nil { | ||||
| 		tester.inputs = make(map[uint64][]byte) | ||||
| @ -151,13 +158,13 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { | ||||
| 		data, input = testDataReaderAndSlice(n) | ||||
| 		tester.inputs[uint64(n)] = input | ||||
| 	} else { | ||||
| 		data = limitReader(bytes.NewReader(input), n) | ||||
| 		data = io.LimitReader(bytes.NewReader(input), int64(n)) | ||||
| 	} | ||||
| 
 | ||||
| 	chunkC := make(chan *Chunk, 1000) | ||||
| 	swg := &sync.WaitGroup{} | ||||
| 
 | ||||
| 	key := tester.Split(splitter, data, int64(n), chunkC, swg) | ||||
| 	key := tester.Split(splitter, data, int64(n), chunkC, swg, nil) | ||||
| 	tester.t.Logf(" Key = %v\n", key) | ||||
| 
 | ||||
| 	chunkC = make(chan *Chunk, 1000) | ||||
| @ -166,9 +173,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { | ||||
| 	chunker := NewTreeChunker(NewChunkerParams()) | ||||
| 	reader := tester.Join(chunker, key, 0, chunkC, quitC) | ||||
| 	output := make([]byte, n) | ||||
| 	// glog.V(logger.Info).Infof(" Key = %v\n", key)
 | ||||
| 	r, err := reader.Read(output) | ||||
| 	// glog.V(logger.Info).Infof(" read = %v  %v\n", r, err)
 | ||||
| 	if r != n || err != io.EOF { | ||||
| 		tester.t.Fatalf("read error  read: %v  n = %v  err = %v\n", r, n, err) | ||||
| 	} | ||||
| @ -183,7 +188,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { | ||||
| 
 | ||||
| func TestRandomData(t *testing.T) { | ||||
| 	// sizes := []int{123456}
 | ||||
| 	sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 123456} | ||||
| 	sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} | ||||
| 	tester := &chunkerTester{t: t} | ||||
| 	chunker := NewTreeChunker(NewChunkerParams()) | ||||
| 	for _, s := range sizes { | ||||
| @ -195,6 +200,16 @@ func TestRandomData(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestRandomBrokenData(t *testing.T) { | ||||
| 	sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} | ||||
| 	tester := &chunkerTester{t: t} | ||||
| 	chunker := NewTreeChunker(NewChunkerParams()) | ||||
| 	for _, s := range sizes { | ||||
| 		testRandomBrokenData(chunker, s, tester) | ||||
| 		t.Logf("done size: %v", s) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func readAll(reader LazySectionReader, result []byte) { | ||||
| 	size := int64(len(result)) | ||||
| 
 | ||||
| @ -227,7 +242,7 @@ func benchmarkJoin(n int, t *testing.B) { | ||||
| 		chunkC := make(chan *Chunk, 1000) | ||||
| 		swg := &sync.WaitGroup{} | ||||
| 
 | ||||
| 		key := tester.Split(chunker, data, int64(n), chunkC, swg) | ||||
| 		key := tester.Split(chunker, data, int64(n), chunkC, swg, nil) | ||||
| 		// t.StartTimer()
 | ||||
| 		chunkC = make(chan *Chunk, 1000) | ||||
| 		quitC := make(chan bool) | ||||
| @ -248,8 +263,7 @@ func benchmarkSplitTree(n int, t *testing.B) { | ||||
| 		chunker := NewTreeChunker(NewChunkerParams()) | ||||
| 		tester := &chunkerTester{t: t} | ||||
| 		data := testDataReader(n) | ||||
| 		// glog.V(logger.Info).Infof("splitting data of length %v", n)
 | ||||
| 		tester.Split(chunker, data, int64(n), nil, nil) | ||||
| 		tester.Split(chunker, data, int64(n), nil, nil, nil) | ||||
| 	} | ||||
| 	stats := new(runtime.MemStats) | ||||
| 	runtime.ReadMemStats(stats) | ||||
| @ -262,8 +276,7 @@ func benchmarkSplitPyramid(n int, t *testing.B) { | ||||
| 		splitter := NewPyramidChunker(NewChunkerParams()) | ||||
| 		tester := &chunkerTester{t: t} | ||||
| 		data := testDataReader(n) | ||||
| 		// glog.V(logger.Info).Infof("splitting data of length %v", n)
 | ||||
| 		tester.Split(splitter, data, int64(n), nil, nil) | ||||
| 		tester.Split(splitter, data, int64(n), nil, nil, nil) | ||||
| 	} | ||||
| 	stats := new(runtime.MemStats) | ||||
| 	runtime.ReadMemStats(stats) | ||||
|  | ||||
| @ -19,6 +19,7 @@ package storage | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"crypto/rand" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| @ -27,32 +28,31 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum/logger/glog" | ||||
| ) | ||||
| 
 | ||||
| type limitedReader struct { | ||||
| 	r    io.Reader | ||||
| 	off  int64 | ||||
| 	size int64 | ||||
| type brokenLimitedReader struct { | ||||
| 	lr    io.Reader | ||||
| 	errAt int | ||||
| 	off   int | ||||
| 	size  int | ||||
| } | ||||
| 
 | ||||
| func limitReader(r io.Reader, size int) *limitedReader { | ||||
| 	return &limitedReader{r, 0, int64(size)} | ||||
| } | ||||
| 
 | ||||
| func (self *limitedReader) Read(buf []byte) (int, error) { | ||||
| 	limit := int64(len(buf)) | ||||
| 	left := self.size - self.off | ||||
| 	if limit >= left { | ||||
| 		limit = left | ||||
| func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader { | ||||
| 	return &brokenLimitedReader{ | ||||
| 		lr:    data, | ||||
| 		errAt: errAt, | ||||
| 		size:  size, | ||||
| 	} | ||||
| 	n, err := self.r.Read(buf[:limit]) | ||||
| 	if err == nil && limit == left { | ||||
| 		err = io.EOF | ||||
| 	} | ||||
| 	self.off += int64(n) | ||||
| 	return n, err | ||||
| } | ||||
| 
 | ||||
| func testDataReader(l int) (r io.Reader) { | ||||
| 	return limitReader(rand.Reader, l) | ||||
| 	return io.LimitReader(rand.Reader, int64(l)) | ||||
| } | ||||
| 
 | ||||
| func (self *brokenLimitedReader) Read(buf []byte) (int, error) { | ||||
| 	if self.off+len(buf) > self.errAt { | ||||
| 		return 0, fmt.Errorf("Broken reader") | ||||
| 	} | ||||
| 	self.off += len(buf) | ||||
| 	return self.lr.Read(buf) | ||||
| } | ||||
| 
 | ||||
| func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { | ||||
| @ -60,7 +60,7 @@ func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { | ||||
| 	if _, err := rand.Read(slice); err != nil { | ||||
| 		panic("rand error") | ||||
| 	} | ||||
| 	r = limitReader(bytes.NewReader(slice), l) | ||||
| 	r = io.LimitReader(bytes.NewReader(slice), int64(l)) | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk | ||||
| 
 | ||||
| 	chunks := (size + self.chunkSize - 1) / self.chunkSize | ||||
| 	depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1 | ||||
| 	// glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth)
 | ||||
| 
 | ||||
| 	results := Tree{ | ||||
| 		Chunks: chunks, | ||||
| @ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk | ||||
| 		go self.processor(pend, swg, tasks, chunkC, &results) | ||||
| 	} | ||||
| 	// Feed the chunks into the task pool
 | ||||
| 	read := 0 | ||||
| 	for index := 0; ; index++ { | ||||
| 		buffer := make([]byte, self.chunkSize+8) | ||||
| 		n, err := data.Read(buffer[8:]) | ||||
| 		last := err == io.ErrUnexpectedEOF || err == io.EOF | ||||
| 		// glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth)
 | ||||
| 		read += n | ||||
| 		last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF | ||||
| 		if err != nil && !last { | ||||
| 			// glog.V(logger.Info).Infof("error: %v", err)
 | ||||
| 			close(abortC) | ||||
| 			break | ||||
| 		} | ||||
| 		binary.LittleEndian.PutUint64(buffer[:8], uint64(n)) | ||||
| 		pend.Add(1) | ||||
| 		// glog.V(logger.Info).Infof("-> task %v (%v)", index, n)
 | ||||
| 		select { | ||||
| 		case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}: | ||||
| 		case <-abortC: | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		if last { | ||||
| 			// glog.V(logger.Info).Infof("last task %v (%v)", index, n)
 | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| @ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk | ||||
| 	close(tasks) | ||||
| 	pend.Wait() | ||||
| 
 | ||||
| 	// glog.V(logger.Info).Infof("len: %v", results.Levels[0][0])
 | ||||
| 	key := results.Levels[0][0].Children[0][:] | ||||
| 	return key, nil | ||||
| } | ||||
| @ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk | ||||
| func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) { | ||||
| 	defer pend.Done() | ||||
| 
 | ||||
| 	// glog.V(logger.Info).Infof("processor started")
 | ||||
| 	// Start processing leaf chunks ad infinitum
 | ||||
| 	hasher := self.hashFunc() | ||||
| 	for task := range tasks { | ||||
| 		depth, pow := len(results.Levels)-1, self.branches | ||||
| 		// glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last)
 | ||||
| 		size := task.Size | ||||
| 		data := task.Data | ||||
| 		var node *Node | ||||
| @ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas | ||||
| 				} | ||||
| 				node = &Node{pending, 0, make([]common.Hash, pending), last} | ||||
| 				results.Levels[depth][task.Index/pow] = node | ||||
| 				// glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending)
 | ||||
| 			} | ||||
| 			node.Pending-- | ||||
| 			// glog.V(logger.Info).Infof("pending now: %v", node.Pending)
 | ||||
| 			i := task.Index / (pow / self.branches) % self.branches | ||||
| 			if last { | ||||
| 				node.Last = true | ||||
| @ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas | ||||
| 			copy(node.Children[i][:], hash) | ||||
| 			node.Size += size | ||||
| 			left := node.Pending | ||||
| 			// glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size)
 | ||||
| 			if chunkC != nil { | ||||
| 				if swg != nil { | ||||
| 					swg.Add(1) | ||||
| @ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas | ||||
| 
 | ||||
| 			results.Lock.Unlock() | ||||
| 			// If there's more work to be done, leave for others
 | ||||
| 			// glog.V(logger.Info).Infof("left %v", left)
 | ||||
| 			if left > 0 { | ||||
| 				break | ||||
| 			} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user