core/rawdb: support freezer batch read with no size limit (#27687)

This change adds the ability to perform reads from freezer without size limitation. This can be useful in cases where callers are certain that out-of-memory will not happen (e.g. reading only a few elements). 

The previous API was designed to behave both optimally and secure while servicing a request from a peer, whereas this change should _not_ be used when an untrusted peer can influence the query size.
This commit is contained in:
rjl493456442 2023-07-12 15:19:01 +08:00 committed by GitHub
parent cecd22143b
commit 0b1f97e151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 23 deletions

View File

@ -197,9 +197,10 @@ func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) {
// AncientRange retrieves multiple items in sequence, starting from the index 'start'. // AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return // It will return
// - at most 'max' items, // - at most 'count' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise // - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// return as many items as fit into maxByteSize. // but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present.
func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil { if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes) return table.RetrieveItems(start, count, maxBytes)

View File

@ -712,7 +712,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
if !t.noCompression { if !t.noCompression {
decompressedSize, _ = snappy.DecodedLen(item) decompressedSize, _ = snappy.DecodedLen(item)
} }
if i > 0 && uint64(outputSize+decompressedSize) > maxBytes { if i > 0 && maxBytes != 0 && uint64(outputSize+decompressedSize) > maxBytes {
break break
} }
if !t.noCompression { if !t.noCompression {
@ -730,8 +730,10 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
} }
// retrieveItems reads up to 'count' items from the table. It reads at least // retrieveItems reads up to 'count' items from the table. It reads at least
// one item, but otherwise avoids reading more than maxBytes bytes. // one item, but otherwise avoids reading more than maxBytes bytes. Freezer
// It returns the (potentially compressed) data, and the sizes. // will ignore the size limitation and continuously allocate memory to store
// data if maxBytes is 0. It returns the (potentially compressed) data, and
// the sizes.
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
@ -752,25 +754,22 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
if start+count > items { if start+count > items {
count = items - start count = items - start
} }
var ( var output []byte // Buffer to read data into
output = make([]byte, maxBytes) // Buffer to read data into if maxBytes != 0 {
outputSize int // Used size of that buffer output = make([]byte, 0, maxBytes)
) } else {
output = make([]byte, 0, 1024) // initial buffer cap
}
// readData is a helper method to read a single data item from disk. // readData is a helper method to read a single data item from disk.
readData := func(fileId, start uint32, length int) error { readData := func(fileId, start uint32, length int) error {
// In case a small limit is used, and the elements are large, may need to output = grow(output, length)
// realloc the read-buffer when reading the first (and only) item.
if len(output) < length {
output = make([]byte, length)
}
dataFile, exist := t.files[fileId] dataFile, exist := t.files[fileId]
if !exist { if !exist {
return fmt.Errorf("missing data file %d", fileId) return fmt.Errorf("missing data file %d", fileId)
} }
if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { if _, err := dataFile.ReadAt(output[len(output)-length:], int64(start)); err != nil {
return err return err
} }
outputSize += length
return nil return nil
} }
// Read all the indexes in one go // Read all the indexes in one go
@ -801,7 +800,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
} }
readStart = 0 readStart = 0
} }
if i > 0 && uint64(totalSize+size) > maxBytes { if i > 0 && uint64(totalSize+size) > maxBytes && maxBytes != 0 {
// About to break out due to byte limit being exceeded. We don't // About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now. // read this last item, but we need to do the deferred reads now.
if unreadSize > 0 { if unreadSize > 0 {
@ -815,7 +814,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
unreadSize += size unreadSize += size
totalSize += size totalSize += size
sizes = append(sizes, size) sizes = append(sizes, size)
if i == len(indices)-2 || uint64(totalSize) > maxBytes { if i == len(indices)-2 || (uint64(totalSize) > maxBytes && maxBytes != 0) {
// Last item, need to do the read now // Last item, need to do the read now
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err return nil, nil, err
@ -826,7 +825,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
// Update metrics. // Update metrics.
t.readMeter.Mark(int64(totalSize)) t.readMeter.Mark(int64(totalSize))
return output[:outputSize], sizes, nil return output, sizes, nil
} }
// has returns an indicator whether the specified number data is still accessible // has returns an indicator whether the specified number data is still accessible

View File

@ -994,6 +994,52 @@ func TestSequentialReadByteLimit(t *testing.T) {
} }
} }
// TestSequentialReadNoByteLimit tests the batch-read if maxBytes is not specified.
// Freezer should return the requested items regardless the size limitation.
func TestSequentialReadNoByteLimit(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-3-%d", rand.Uint64())
{ // Fill table
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false)
if err != nil {
t.Fatal(err)
}
// Write 10 bytes 30 times,
// Splitting it at every 100 bytes (10 items)
writeChunks(t, f, 30, 10)
f.Close()
}
for i, tc := range []struct {
items uint64
want int
}{
{1, 1},
{30, 30},
{31, 30},
} {
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true, false)
if err != nil {
t.Fatal(err)
}
items, err := f.RetrieveItems(0, tc.items, 0)
if err != nil {
t.Fatal(err)
}
if have, want := len(items), tc.want; have != want {
t.Fatalf("test %d: want %d items, have %d ", i, want, have)
}
for ii, have := range items {
want := getChunk(10, ii)
if !bytes.Equal(want, have) {
t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want)
}
}
f.Close()
}
}
}
func TestFreezerReadonly(t *testing.T) { func TestFreezerReadonly(t *testing.T) {
tmpdir := os.TempDir() tmpdir := os.TempDir()
// Case 1: Check it fails on non-existent file. // Case 1: Check it fails on non-existent file.

View File

@ -117,3 +117,19 @@ func truncateFreezerFile(file *os.File, size int64) error {
} }
return nil return nil
} }
// grow prepares the slice space for new item, and doubles the slice capacity
// if space is not enough.
func grow(buf []byte, n int) []byte {
if cap(buf)-len(buf) < n {
newcap := 2 * cap(buf)
if newcap-len(buf) < n {
newcap = len(buf) + n
}
nbuf := make([]byte, len(buf), newcap)
copy(nbuf, buf)
buf = nbuf
}
buf = buf[:len(buf)+n]
return buf
}

View File

@ -79,9 +79,10 @@ type AncientReaderOp interface {
// AncientRange retrieves multiple items in sequence, starting from the index 'start'. // AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return // It will return
// - at most 'count' items, // - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise // - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// return as many items as fit into maxBytes. // but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present
AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error)
// Ancients returns the ancient item numbers in the ancient store. // Ancients returns the ancient item numbers in the ancient store.