diff --git a/core/rawdb/database.go b/core/rawdb/database.go index c8bfdbace..90619169a 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -89,6 +89,11 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { return nil, errNotSupported } +// ReadAncients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { + return nil, errNotSupported +} + // Ancients returns an error as we don't have a backing chain freezer. func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index ff8919b59..253de9f7c 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -180,6 +180,18 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { return nil, errUnknownTable } +// ReadAncients retrieves multiple items in sequence, starting from the index 'start'. +// It will return +// - at most 'max' items, +// - at least 1 item (even if exceeding the maxByteSize), but will otherwise +// return as many items as fit into maxByteSize. +func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { + if table := f.tables[kind]; table != nil { + return table.RetrieveItems(start, count, maxBytes) + } + return nil, errUnknownTable +} + // Ancients returns the length of the frozen items. func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d7bfe18e0..9d052f7cd 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte { return b } +// bounds returns the start- and end- offsets, and the file number of where to +// read there data item marked by the two index entries. The two entries are +// assumed to be sequential. +func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { + if start.filenum != end.filenum { + // If a piece of data 'crosses' a data-file, + // it's actually in one piece on the second data-file. + // We return a zero-indexEntry for the second file as start + return 0, end.offset, end.filenum + } + return start.offset, end.offset, end.filenum +} + // freezerTable represents a single chained data table within the freezer (e.g. blocks). // It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry // file (uncompressed 64 bit indices into the data file). @@ -546,84 +559,183 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool return false, nil } -// getBounds returns the indexes for the item -// returns start, end, filenumber and error -func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { - buffer := make([]byte, indexEntrySize) - var startIdx, endIdx indexEntry - // Read second index - if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { - return 0, 0, 0, err +// getIndices returns the index entries for the given from-item, covering 'count' items. +// N.B: The actual number of returned indices for N items will always be N+1 (unless an +// error is returned). +// OBS: This method assumes that the caller has already verified (and/or trimmed) the range +// so that the items are within bounds. If this method is used to read out of bounds, +// it will return error. +func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { + // Apply the table-offset + from = from - uint64(t.itemOffset) + // For reading N items, we need N+1 indices. + buffer := make([]byte, (count+1)*indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { + return nil, err } - endIdx.unmarshalBinary(buffer) - // Read first index (unless it's the very first item) - if item != 0 { - if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { - return 0, 0, 0, err - } - startIdx.unmarshalBinary(buffer) - } else { + var ( + indices []*indexEntry + offset int + ) + for i := from; i <= from+count; i++ { + index := new(indexEntry) + index.unmarshalBinary(buffer[offset:]) + offset += indexEntrySize + indices = append(indices, index) + } + if from == 0 { // Special case if we're reading the first item in the freezer. We assume that // the first item always start from zero(regarding the deletion, we // only support deletion by files, so that the assumption is held). // This means we can use the first item metadata to carry information about // the 'global' offset, for the deletion-case - return 0, endIdx.offset, endIdx.filenum, nil + indices[0].offset = 0 + indices[0].filenum = indices[1].filenum } - if startIdx.filenum != endIdx.filenum { - // If a piece of data 'crosses' a data-file, - // it's actually in one piece on the second data-file. - // We return a zero-indexEntry for the second file as start - return 0, endIdx.offset, endIdx.filenum, nil - } - return startIdx.offset, endIdx.offset, endIdx.filenum, nil + return indices, nil } // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - blob, err := t.retrieve(item) + items, err := t.RetrieveItems(item, 1, 0) if err != nil { return nil, err } - if t.noCompression { - return blob, nil - } - return snappy.Decode(nil, blob) + return items[0], nil } -// retrieve looks up the data offset of an item with the given number and retrieves -// the raw binary blob from the data file. OBS! This method does not decode -// compressed data. -func (t *freezerTable) retrieve(item uint64) ([]byte, error) { +// RetrieveItems returns multiple items in sequence, starting from the index 'start'. +// It will return at most 'max' items, but will abort earlier to respect the +// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one +// item, it _will_ return one element and possibly overflow the maxBytes. +func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) { + // First we read the 'raw' data, which might be compressed. + diskData, sizes, err := t.retrieveItems(start, count, maxBytes) + if err != nil { + return nil, err + } + var ( + output = make([][]byte, 0, count) + offset int // offset for reading + outputSize int // size of uncompressed data + ) + // Now slice up the data and decompress. + for i, diskSize := range sizes { + item := diskData[offset : offset+diskSize] + offset += diskSize + decompressedSize := diskSize + if !t.noCompression { + decompressedSize, _ = snappy.DecodedLen(item) + } + if i > 0 && uint64(outputSize+decompressedSize) > maxBytes { + break + } + if !t.noCompression { + data, err := snappy.Decode(nil, item) + if err != nil { + return nil, err + } + output = append(output, data) + } else { + output = append(output, item) + } + outputSize += decompressedSize + } + return output, nil +} + +// retrieveItems reads up to 'count' items from the table. It reads at least +// one item, but otherwise avoids reading more than maxBytes bytes. +// It returns the (potentially compressed) data, and the sizes. +func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { - return nil, errClosed + return nil, nil, errClosed } - if atomic.LoadUint64(&t.items) <= item { - return nil, errOutOfBounds + itemCount := atomic.LoadUint64(&t.items) // max number + // Ensure the start is written, not deleted from the tail, and that the + // caller actually wants something + if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { + return nil, nil, errOutOfBounds } - // Ensure the item was not deleted from the tail either - if uint64(t.itemOffset) > item { - return nil, errOutOfBounds + if start+count > itemCount { + count = itemCount - start } - startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) + var ( + output = make([]byte, maxBytes) // Buffer to read data into + outputSize int // Used size of that buffer + ) + // readData is a helper method to read a single data item from disk. + readData := func(fileId, start uint32, length int) error { + // In case a small limit is used, and the elements are large, may need to + // realloc the read-buffer when reading the first (and only) item. + if len(output) < length { + output = make([]byte, length) + } + dataFile, exist := t.files[fileId] + if !exist { + return fmt.Errorf("missing data file %d", fileId) + } + if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { + return err + } + outputSize += length + return nil + } + // Read all the indexes in one go + indices, err := t.getIndices(start, count) if err != nil { - return nil, err + return nil, nil, err } - dataFile, exist := t.files[filenum] - if !exist { - return nil, fmt.Errorf("missing data file %d", filenum) + var ( + sizes []int // The sizes for each element + totalSize = 0 // The total size of all data read so far + readStart = indices[0].offset // Where, in the file, to start reading + unreadSize = 0 // The size of the as-yet-unread data + ) + + for i, firstIndex := range indices[:len(indices)-1] { + secondIndex := indices[i+1] + // Determine the size of the item. + offset1, offset2, _ := firstIndex.bounds(secondIndex) + size := int(offset2 - offset1) + // Crossing a file boundary? + if secondIndex.filenum != firstIndex.filenum { + // If we have unread data in the first file, we need to do that read now. + if unreadSize > 0 { + if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + unreadSize = 0 + } + readStart = 0 + } + if i > 0 && uint64(totalSize+size) > maxBytes { + // About to break out due to byte limit being exceeded. We don't + // read this last item, but we need to do the deferred reads now. + if unreadSize > 0 { + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + } + break + } + // Defer the read for later + unreadSize += size + totalSize += size + sizes = append(sizes, size) + if i == len(indices)-2 || uint64(totalSize) > maxBytes { + // Last item, need to do the read now + if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { + return nil, nil, err + } + break + } } - // Retrieve the data itself, decompress and return - blob := make([]byte, endOffset-startOffset) - if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { - return nil, err - } - t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) - return blob, nil + return output[:outputSize], sizes, nil } // has returns an indicator whether the specified number data diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 0df28f236..e8a8b5c46 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -74,7 +74,7 @@ func TestFreezerBasics(t *testing.T) { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { - t.Fatal(err) + t.Fatalf("reading item %d: %v", y, err) } if !bytes.Equal(got, exp) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) @@ -692,3 +692,118 @@ func TestAppendTruncateParallel(t *testing.T) { } } } + +// TestSequentialRead does some basic tests on the RetrieveItems. +func TestSequentialRead(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + if err != nil { + t.Fatal(err) + } + // Write 15 bytes 30 times + for x := 0; x < 30; x++ { + data := getChunk(15, x) + f.Append(uint64(x), data) + } + f.DumpIndex(0, 30) + f.Close() + } + { // Open it, iterate, verify iteration + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 100000) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 30; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } + { // Open it, iterate, verify byte limit. The byte limit is less than item + // size, so each lookup should only return one item + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, 10000, 10) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), 1; have != want { + t.Fatalf("want %d items, have %d ", want, have) + } + for i, have := range items { + want := getChunk(15, i) + if !bytes.Equal(want, have) { + t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want) + } + } + f.Close() + } +} + +// TestSequentialReadByteLimit does some more advanced tests on batch reads. +// These tests check that when the byte limit hits, we correctly abort in time, +// but also properly do all the deferred reads for the previous data, regardless +// of whether the data crosses a file boundary or not. +func TestSequentialReadByteLimit(t *testing.T) { + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("batchread-2-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + if err != nil { + t.Fatal(err) + } + // Write 10 bytes 30 times, + // Splitting it at every 100 bytes (10 items) + for x := 0; x < 30; x++ { + data := getChunk(10, x) + f.Append(uint64(x), data) + } + f.Close() + } + for i, tc := range []struct { + items uint64 + limit uint64 + want int + }{ + {9, 89, 8}, + {10, 99, 9}, + {11, 109, 10}, + {100, 89, 8}, + {100, 99, 9}, + {100, 109, 10}, + } { + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + if err != nil { + t.Fatal(err) + } + items, err := f.RetrieveItems(0, tc.items, tc.limit) + if err != nil { + t.Fatal(err) + } + if have, want := len(items), tc.want; have != want { + t.Fatalf("test %d: want %d items, have %d ", i, want, have) + } + for ii, have := range items { + want := getChunk(10, ii) + if !bytes.Equal(want, have) { + t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want) + } + } + f.Close() + } + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index d5ef60ae5..586451c06 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -62,6 +62,12 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) { return t.db.Ancient(kind, number) } +// ReadAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { + return t.db.ReadAncients(kind, start, count, maxBytes) +} + // Ancients is a noop passthrough that just forwards the request to the underlying // database. func (t *table) Ancients() (uint64, error) { diff --git a/ethdb/database.go b/ethdb/database.go index 0dc14624b..bdc09d5e9 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -76,6 +76,13 @@ type AncientReader interface { // Ancient retrieves an ancient binary blob from the append-only immutable files. Ancient(kind string, number uint64) ([]byte, error) + // ReadAncients retrieves multiple items in sequence, starting from the index 'start'. + // It will return + // - at most 'count' items, + // - at least 1 item (even if exceeding the maxBytes), but will otherwise + // return as many items as fit into maxBytes. + ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) + // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error)