Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
8 changed files with 90 additions and 100 deletions
Showing only changes of commit 905a723fae - Show all commits

View File

@ -43,10 +43,7 @@ const (
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
// WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
*Freezer
quit chan struct{}
@ -60,12 +57,13 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if err != nil {
return nil, err
}
return &chainFreezer{
Freezer: freezer,
threshold: params.FullImmutabilityThreshold,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}, nil
cf := chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}
cf.threshold.Store(params.FullImmutabilityThreshold)
return &cf, nil
}
// Close closes the chain freezer instance and terminates the background thread.
@ -124,8 +122,8 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := atomic.LoadUint64(&f.threshold)
frozen := atomic.LoadUint64(&f.frozen)
threshold := f.threshold.Load()
frozen := f.frozen.Load()
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
@ -186,7 +184,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
// Wipe out side chains also and track dangling side chains
var dangling []common.Hash
frozen = atomic.LoadUint64(&f.frozen) // Needs reload after during freezeRange
frozen = f.frozen.Load() // Needs reload after during freezeRange
for number := first; number < frozen; number++ {
// Always keep the genesis block in active database
if number != 0 {

View File

@ -132,11 +132,12 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
}
}
// process runs in parallel
nThreadsAlive := int32(threads)
var nThreadsAlive atomic.Int32
nThreadsAlive.Store(int32(threads))
process := func() {
defer func() {
// Last processor closes the result channel
if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
if nThreadsAlive.Add(-1) == 0 {
close(hashesCh)
}
}()

View File

@ -24,7 +24,6 @@ import (
"path"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -72,9 +71,9 @@ func (frdb *freezerdb) Freeze(threshold uint64) error {
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, old)
}(atomic.LoadUint64(&frdb.AncientStore.(*chainFreezer).threshold))
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, threshold)
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)
// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)

View File

@ -62,11 +62,8 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type Freezer struct {
// WARNING: The `frozen` and `tail` fields are accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
frozen atomic.Uint64 // Number of blocks already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer
// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
@ -212,12 +209,12 @@ func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]
// Ancients returns the length of the frozen items.
func (f *Freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
return f.frozen.Load(), nil
}
// Tail returns the number of first stored item in the freezer.
func (f *Freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
return f.tail.Load(), nil
}
// AncientSize returns the ancient size of the specified category.
@ -251,7 +248,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
defer f.writeLock.Unlock()
// Roll back all tables to the starting position in case of error.
prevItem := atomic.LoadUint64(&f.frozen)
prevItem := f.frozen.Load()
defer func() {
if err != nil {
// The write operation has failed. Go back to the previous item position.
@ -272,7 +269,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
return 0, err
}
atomic.StoreUint64(&f.frozen, item)
f.frozen.Store(item)
return writeSize, nil
}
@ -284,7 +281,7 @@ func (f *Freezer) TruncateHead(items uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()
if atomic.LoadUint64(&f.frozen) <= items {
if f.frozen.Load() <= items {
return nil
}
for _, table := range f.tables {
@ -292,7 +289,7 @@ func (f *Freezer) TruncateHead(items uint64) error {
return err
}
}
atomic.StoreUint64(&f.frozen, items)
f.frozen.Store(items)
return nil
}
@ -304,7 +301,7 @@ func (f *Freezer) TruncateTail(tail uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()
if atomic.LoadUint64(&f.tail) >= tail {
if f.tail.Load() >= tail {
return nil
}
for _, table := range f.tables {
@ -312,7 +309,7 @@ func (f *Freezer) TruncateTail(tail uint64) error {
return err
}
}
atomic.StoreUint64(&f.tail, tail)
f.tail.Store(tail)
return nil
}
@ -343,22 +340,22 @@ func (f *Freezer) validate() error {
)
// Hack to get boundary of any table
for kind, table := range f.tables {
head = atomic.LoadUint64(&table.items)
tail = atomic.LoadUint64(&table.itemHidden)
head = table.items.Load()
tail = table.itemHidden.Load()
name = kind
break
}
// Now check every table against those boundaries.
for kind, table := range f.tables {
if head != atomic.LoadUint64(&table.items) {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, atomic.LoadUint64(&table.items), head)
if head != table.items.Load() {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, table.items.Load(), head)
}
if tail != atomic.LoadUint64(&table.itemHidden) {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, atomic.LoadUint64(&table.itemHidden), tail)
if tail != table.itemHidden.Load() {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, table.itemHidden.Load(), tail)
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}
@ -369,11 +366,11 @@ func (f *Freezer) repair() error {
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
items := table.items.Load()
if head > items {
head = items
}
hidden := atomic.LoadUint64(&table.itemHidden)
hidden := table.itemHidden.Load()
if hidden > tail {
tail = hidden
}
@ -386,8 +383,8 @@ func (f *Freezer) repair() error {
return err
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}
@ -413,7 +410,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = atomic.LoadUint64(&t.items)
items = t.items.Load()
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
@ -436,7 +433,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset > 0 || table.itemHidden > 0 {
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 {
return fmt.Errorf("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
@ -452,7 +449,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items
offset = newTable.items.Load()
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)

View File

@ -18,7 +18,6 @@ package rawdb
import (
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
@ -107,7 +106,7 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
batch.curItem = batch.t.items.Load()
batch.totalBytes = 0
}
@ -201,7 +200,7 @@ func (batch *freezerTableBatch) commit() error {
// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
batch.t.items.Store(batch.curItem)
// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)

View File

@ -88,18 +88,15 @@ func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uin
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
// file (uncompressed 64 bit indices into the data file).
type freezerTable struct {
// WARNING: The `items` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
items uint64 // Number of items stored in the table (including items removed from tail)
itemOffset uint64 // Number of items removed from the table
items atomic.Uint64 // Number of items stored in the table (including items removed from tail)
itemOffset atomic.Uint64 // Number of items removed from the table
// itemHidden is the number of items marked as deleted. Tail deletion is
// only supported at file level which means the actual deletion will be
// delayed until the entire data file is marked as deleted. Before that
// these items will be hidden to prevent being visited again. The value
// should never be lower than itemOffset.
itemHidden uint64
itemHidden atomic.Uint64
noCompression bool // if true, disables snappy compression. Note: does not work retroactively
readonly bool
@ -241,14 +238,14 @@ func (t *freezerTable) repair() error {
// which is not enough in theory but enough in practice.
// TODO: use uint64 to represent total removed items.
t.tailId = firstIndex.filenum
t.itemOffset = uint64(firstIndex.offset)
t.itemOffset.Store(uint64(firstIndex.offset))
// Load metadata from the file
meta, err := loadMetadata(t.meta, t.itemOffset)
meta, err := loadMetadata(t.meta, t.itemOffset.Load())
if err != nil {
return err
}
t.itemHidden = meta.VirtualTail
t.itemHidden.Store(meta.VirtualTail)
// Read the last index, use the default value in case the freezer is empty
if offsetsSize == indexEntrySize {
@ -331,7 +328,7 @@ func (t *freezerTable) repair() error {
}
}
// Update the item and byte counters and return
t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file
t.headBytes = contentSize
t.headId = lastIndex.filenum
@ -346,9 +343,9 @@ func (t *freezerTable) repair() error {
return err
}
if verbose {
t.logger.Info("Chain freezer table opened", "items", t.items, "size", t.headBytes)
t.logger.Info("Chain freezer table opened", "items", t.items.Load(), "size", t.headBytes)
} else {
t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.headBytes))
t.logger.Debug("Chain freezer table opened", "items", t.items.Load(), "size", common.StorageSize(t.headBytes))
}
return nil
}
@ -382,11 +379,11 @@ func (t *freezerTable) truncateHead(items uint64) error {
defer t.lock.Unlock()
// Ensure the given truncate target falls in the correct range
existing := atomic.LoadUint64(&t.items)
existing := t.items.Load()
if existing <= items {
return nil
}
if items < atomic.LoadUint64(&t.itemHidden) {
if items < t.itemHidden.Load() {
return errors.New("truncation below tail")
}
// We need to truncate, save the old size for metrics tracking
@ -403,7 +400,7 @@ func (t *freezerTable) truncateHead(items uint64) error {
// Truncate the index file first, the tail position is also considered
// when calculating the new freezer table length.
length := items - atomic.LoadUint64(&t.itemOffset)
length := items - t.itemOffset.Load()
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
return err
}
@ -438,7 +435,7 @@ func (t *freezerTable) truncateHead(items uint64) error {
}
// All data files truncated, set internal counters and return
t.headBytes = int64(expected.offset)
atomic.StoreUint64(&t.items, items)
t.items.Store(items)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
@ -455,10 +452,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
defer t.lock.Unlock()
// Ensure the given truncate target falls in the correct range
if atomic.LoadUint64(&t.itemHidden) >= items {
if t.itemHidden.Load() >= items {
return nil
}
if atomic.LoadUint64(&t.items) < items {
if t.items.Load() < items {
return errors.New("truncation above head")
}
// Load the new tail index by the given new tail position
@ -466,10 +463,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
newTailId uint32
buffer = make([]byte, indexEntrySize)
)
if atomic.LoadUint64(&t.items) == items {
if t.items.Load() == items {
newTailId = t.headId
} else {
offset := items - atomic.LoadUint64(&t.itemOffset)
offset := items - t.itemOffset.Load()
if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil {
return err
}
@ -478,7 +475,7 @@ func (t *freezerTable) truncateTail(items uint64) error {
newTailId = newTail.filenum
}
// Update the virtual tail marker and hidden these entries in table.
atomic.StoreUint64(&t.itemHidden, items)
t.itemHidden.Store(items)
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
return err
}
@ -501,7 +498,7 @@ func (t *freezerTable) truncateTail(items uint64) error {
// Count how many items can be deleted from the file.
var (
newDeleted = items
deleted = atomic.LoadUint64(&t.itemOffset)
deleted = t.itemOffset.Load()
)
for current := items - 1; current >= deleted; current -= 1 {
if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil {
@ -541,7 +538,7 @@ func (t *freezerTable) truncateTail(items uint64) error {
}
// Release any files before the current tail
t.tailId = newTailId
atomic.StoreUint64(&t.itemOffset, newDeleted)
t.itemOffset.Store(newDeleted)
t.releaseFilesBefore(t.tailId, true)
// Retrieve the new size and update the total size counter
@ -654,7 +651,7 @@ func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) {
// it will return error.
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
// Apply the table-offset
from = from - t.itemOffset
from = from - t.itemOffset.Load()
// For reading N items, we need N+1 indices.
buffer := make([]byte, (count+1)*indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
@ -744,8 +741,8 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
return nil, nil, errClosed
}
var (
items = atomic.LoadUint64(&t.items) // the total items(head + 1)
hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items
items = t.items.Load() // the total items(head + 1)
hidden = t.itemHidden.Load() // the number of hidden items
)
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
@ -832,7 +829,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
// has returns an indicator whether the specified number data is still accessible
// in the freezer table.
func (t *freezerTable) has(number uint64) bool {
return atomic.LoadUint64(&t.items) > number && atomic.LoadUint64(&t.itemHidden) <= number
return t.items.Load() > number && t.itemHidden.Load() <= number
}
// size returns the total data size in the freezer table.
@ -922,7 +919,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
return
}
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version,
atomic.LoadUint64(&t.items), atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden))
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
buf := make([]byte, indexEntrySize)

View File

@ -24,7 +24,6 @@ import (
"os"
"path/filepath"
"reflect"
"sync/atomic"
"testing"
"testing/quick"
@ -191,7 +190,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
writeChunks(t, f, 255, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
if _, err = f.Retrieve(f.items.Load() - 1); err != nil {
t.Fatal(err)
}
f.Close()
@ -317,7 +316,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
writeChunks(t, f, 9, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
if _, err = f.Retrieve(f.items.Load() - 1); err != nil {
f.Close()
t.Fatal(err)
}
@ -350,8 +349,8 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
t.Fatal(err)
}
defer f.Close()
if f.items != 7 {
t.Fatalf("expected %d items, got %d", 7, f.items)
if f.items.Load() != 7 {
t.Fatalf("expected %d items, got %d", 7, f.items.Load())
}
if err := assertFileSize(fileToCrop, 15); err != nil {
t.Fatal(err)
@ -374,7 +373,7 @@ func TestFreezerTruncate(t *testing.T) {
writeChunks(t, f, 30, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
if _, err = f.Retrieve(f.items.Load() - 1); err != nil {
t.Fatal(err)
}
f.Close()
@ -388,8 +387,8 @@ func TestFreezerTruncate(t *testing.T) {
}
defer f.Close()
f.truncateHead(10) // 150 bytes
if f.items != 10 {
t.Fatalf("expected %d items, got %d", 10, f.items)
if f.items.Load() != 10 {
t.Fatalf("expected %d items, got %d", 10, f.items.Load())
}
// 45, 45, 45, 15 -- bytes should be 15
if f.headBytes != 15 {
@ -444,9 +443,9 @@ func TestFreezerRepairFirstFile(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if f.items != 1 {
if f.items.Load() != 1 {
f.Close()
t.Fatalf("expected %d items, got %d", 0, f.items)
t.Fatalf("expected %d items, got %d", 0, f.items.Load())
}
// Write 40 bytes
@ -483,7 +482,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
writeChunks(t, f, 30, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
if _, err = f.Retrieve(f.items.Load() - 1); err != nil {
t.Fatal(err)
}
f.Close()
@ -495,9 +494,9 @@ func TestFreezerReadAndTruncate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if f.items != 30 {
if f.items.Load() != 30 {
f.Close()
t.Fatalf("expected %d items, got %d", 0, f.items)
t.Fatalf("expected %d items, got %d", 0, f.items.Load())
}
for y := byte(0); y < 30; y++ {
f.Retrieve(uint64(y))
@ -1210,13 +1209,13 @@ func runRandTest(rt randTest) bool {
rt[i].err = fmt.Errorf("failed to reload table %v", err)
}
case opCheckAll:
tail := atomic.LoadUint64(&f.itemHidden)
head := atomic.LoadUint64(&f.items)
tail := f.itemHidden.Load()
head := f.items.Load()
if tail == head {
continue
}
got, err := f.RetrieveItems(atomic.LoadUint64(&f.itemHidden), head-tail, 100000)
got, err := f.RetrieveItems(f.itemHidden.Load(), head-tail, 100000)
if err != nil {
rt[i].err = err
} else {
@ -1238,7 +1237,7 @@ func runRandTest(rt randTest) bool {
if len(step.items) == 0 {
continue
}
tail := atomic.LoadUint64(&f.itemHidden)
tail := f.itemHidden.Load()
for i := 0; i < len(step.items); i++ {
blobs = append(blobs, values[step.items[i]-tail])
}
@ -1254,7 +1253,7 @@ func runRandTest(rt randTest) bool {
case opTruncateHead:
f.truncateHead(step.target)
length := atomic.LoadUint64(&f.items) - atomic.LoadUint64(&f.itemHidden)
length := f.items.Load() - f.itemHidden.Load()
values = values[:length]
case opTruncateHeadAll:
@ -1262,10 +1261,10 @@ func runRandTest(rt randTest) bool {
values = nil
case opTruncateTail:
prev := atomic.LoadUint64(&f.itemHidden)
prev := f.itemHidden.Load()
f.truncateTail(step.target)
truncated := atomic.LoadUint64(&f.itemHidden) - prev
truncated := f.itemHidden.Load() - prev
values = values[truncated:]
case opTruncateTailAll:

View File

@ -267,10 +267,10 @@ func TestFreezerReadonlyValidate(t *testing.T) {
bBatch := f.tables["b"].newBatch()
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items != 3 {
if f.tables["a"].items.Load() != 3 {
t.Fatalf("unexpected number of items in table")
}
if f.tables["b"].items != 1 {
if f.tables["b"].items.Load() != 1 {
t.Fatalf("unexpected number of items in table")
}
require.NoError(t, f.Close())