Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
8 changed files with 397 additions and 1 deletions
Showing only changes of commit fd4f60f49b - Show all commits

View File

@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/scwallet" "github.com/ethereum/go-ethereum/accounts/scwallet"
"github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/accounts/usbwallet"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -161,7 +162,24 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.GlobalIsSet(utils.OverrideTerminalTotalDifficulty.Name) { if ctx.GlobalIsSet(utils.OverrideTerminalTotalDifficulty.Name) {
cfg.Eth.OverrideTerminalTotalDifficulty = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideTerminalTotalDifficulty.Name)) cfg.Eth.OverrideTerminalTotalDifficulty = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideTerminalTotalDifficulty.Name))
} }
backend, _ := utils.RegisterEthService(stack, &cfg.Eth) backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
// Warn users to migrate if they have a legacy freezer format.
if eth != nil {
firstIdx := uint64(0)
// Hack to speed up check for mainnet because we know
// the first non-empty block.
ghash := rawdb.ReadCanonicalHash(eth.ChainDb(), 0)
if cfg.Eth.NetworkId == 1 && ghash == params.MainnetGenesisHash {
firstIdx = 46147
}
isLegacy, _, err := dbHasLegacyReceipts(eth.ChainDb(), firstIdx)
if err != nil {
utils.Fatalf("Failed to check db for legacy receipts: %v", err)
}
if isLegacy {
log.Warn("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
}
}
// Configure GraphQL if requested // Configure GraphQL if requested
if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) { if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) {

View File

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/console/prompt" "github.com/ethereum/go-ethereum/console/prompt"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
@ -72,6 +73,7 @@ Remove blockchain and state databases`,
dbImportCmd, dbImportCmd,
dbExportCmd, dbExportCmd,
dbMetadataCmd, dbMetadataCmd,
dbMigrateFreezerCmd,
}, },
} }
dbInspectCmd = cli.Command{ dbInspectCmd = cli.Command{
@ -251,6 +253,23 @@ WARNING: This is a low-level operation which may cause database corruption!`,
}, },
Description: "Shows metadata about the chain status.", Description: "Shows metadata about the chain status.",
} }
dbMigrateFreezerCmd = cli.Command{
Action: utils.MigrateFlags(freezerMigrate),
Name: "freezer-migrate",
Usage: "Migrate legacy parts of the freezer. (WARNING: may take a long time)",
ArgsUsage: "",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.SyncModeFlag,
utils.MainnetFlag,
utils.RopstenFlag,
utils.SepoliaFlag,
utils.RinkebyFlag,
utils.GoerliFlag,
},
Description: `The freezer-migrate command checks your database for receipts in a legacy format and updates those.
WARNING: please back-up the receipt files in your ancients before running this command.`,
}
) )
func removeDB(ctx *cli.Context) error { func removeDB(ctx *cli.Context) error {
@ -750,3 +769,88 @@ func showMetaData(ctx *cli.Context) error {
table.Render() table.Render()
return nil return nil
} }
func freezerMigrate(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
defer db.Close()
// Check first block for legacy receipt format
numAncients, err := db.Ancients()
if err != nil {
return err
}
if numAncients < 1 {
log.Info("No receipts in freezer to migrate")
return nil
}
isFirstLegacy, firstIdx, err := dbHasLegacyReceipts(db, 0)
if err != nil {
return err
}
if !isFirstLegacy {
log.Info("No legacy receipts to migrate")
return nil
}
log.Info("Starting migration", "ancients", numAncients, "firstLegacy", firstIdx)
start := time.Now()
if err := db.MigrateTable("receipts", types.ConvertLegacyStoredReceipts); err != nil {
return err
}
if err := db.Close(); err != nil {
return err
}
log.Info("Migration finished", "duration", time.Since(start))
return nil
}
// dbHasLegacyReceipts checks freezer entries for legacy receipts. It stops at the first
// non-empty receipt and checks its format. The index of this first non-empty element is
// the second return parameter.
func dbHasLegacyReceipts(db ethdb.Database, firstIdx uint64) (bool, uint64, error) {
// Check first block for legacy receipt format
numAncients, err := db.Ancients()
if err != nil {
return false, 0, err
}
if numAncients < 1 {
return false, 0, nil
}
if firstIdx >= numAncients {
return false, firstIdx, nil
}
var (
legacy bool
blob []byte
emptyRLPList = []byte{192}
)
// Find first block with non-empty receipt, only if
// the index is not already provided.
if firstIdx == 0 {
for i := uint64(0); i < numAncients; i++ {
blob, err = db.Ancient("receipts", i)
if err != nil {
return false, 0, err
}
if len(blob) == 0 {
continue
}
if !bytes.Equal(blob, emptyRLPList) {
firstIdx = i
break
}
}
}
// Is first non-empty receipt legacy?
first, err := db.Ancient("receipts", firstIdx)
if err != nil {
return false, 0, err
}
legacy, err = types.IsLegacyStoredReceipts(first)
return legacy, firstIdx, err
}

View File

@ -145,6 +145,12 @@ func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (e
return fn(db) return fn(db)
} }
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (db *nofreezedb) MigrateTable(kind string, convert convertLegacyFn) error {
return errNotSupported
}
// NewDatabase creates a high level database on top of a given key-value data // NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage. // store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {

View File

@ -19,6 +19,7 @@ package rawdb
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
@ -617,3 +618,116 @@ func (f *freezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []
return hashes, err return hashes, err
} }
// convertLegacyFn takes a raw freezer entry in an older format and
// returns it in the new format.
type convertLegacyFn = func([]byte) ([]byte, error)
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (f *freezer) MigrateTable(kind string, convert convertLegacyFn) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
table, ok := f.tables[kind]
if !ok {
return errUnknownTable
}
// forEach iterates every entry in the table serially and in order, calling `fn`
// with the item as argument. If `fn` returns an error the iteration stops
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = atomic.LoadUint64(&t.items)
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
for i := offset; i < items; {
if i+batchSize > items {
batchSize = items - i
}
data, err := t.RetrieveItems(i, batchSize, maxBytes)
if err != nil {
return err
}
for j, item := range data {
if err := fn(i+uint64(j), item); err != nil {
return err
}
}
i += uint64(len(data))
}
return nil
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset > 0 || table.itemHidden > 0 {
return fmt.Errorf("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
// Set up new dir for the migrated table, the content of which
// we'll at the end move over to the ancients dir.
migrationPath := filepath.Join(ancientsPath, "migration")
newTable, err := NewFreezerTable(migrationPath, kind, FreezerNoSnappy[kind], false)
if err != nil {
return err
}
var (
batch = newTable.newBatch()
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)
}
// Iterate through entries and transform them
if err := forEach(table, offset, func(i uint64, blob []byte) error {
if i%10000 == 0 && time.Since(logged) > 16*time.Second {
log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
out, err = convert(blob)
if err != nil {
return err
}
if err := batch.AppendRaw(i, out); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err := batch.commit(); err != nil {
return err
}
log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start)))
// Release and delete old table files. Note this won't
// delete the index file.
table.releaseFilesAfter(0, true)
if err := newTable.Close(); err != nil {
return err
}
files, err := ioutil.ReadDir(migrationPath)
if err != nil {
return err
}
// Move migrated files to ancients dir.
for _, f := range files {
// This will replace the old index file as a side-effect.
if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil {
return err
}
}
// Delete by now empty dir.
if err := os.Remove(migrationPath); err != nil {
return err
}
return nil
}

View File

@ -24,6 +24,7 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"os" "os"
"path"
"sync" "sync"
"testing" "testing"
@ -337,3 +338,92 @@ func checkAncientCount(t *testing.T, f *freezer, kind string, n uint64) {
t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
} }
} }
func TestRenameWindows(t *testing.T) {
var (
fname = "file.bin"
fname2 = "file2.bin"
data = []byte{1, 2, 3, 4}
data2 = []byte{2, 3, 4, 5}
data3 = []byte{3, 5, 6, 7}
dataLen = 4
)
// Create 2 temp dirs
dir1, err := os.MkdirTemp("", "rename-test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(dir1)
dir2, err := os.MkdirTemp("", "rename-test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(dir2)
// Create file in dir1 and fill with data
f, err := os.Create(path.Join(dir1, fname))
if err != nil {
t.Fatal(err)
}
f2, err := os.Create(path.Join(dir1, fname2))
if err != nil {
t.Fatal(err)
}
f3, err := os.Create(path.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
if _, err := f2.Write(data2); err != nil {
t.Fatal(err)
}
if _, err := f3.Write(data3); err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f2.Close(); err != nil {
t.Fatal(err)
}
if err := f3.Close(); err != nil {
t.Fatal(err)
}
if err := os.Rename(f.Name(), path.Join(dir2, fname)); err != nil {
t.Fatal(err)
}
if err := os.Rename(f2.Name(), path.Join(dir2, fname2)); err != nil {
t.Fatal(err)
}
// Check file contents
f, err = os.Open(path.Join(dir2, fname))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
buf := make([]byte, dataLen)
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}
f, err = os.Open(path.Join(dir2, fname2))
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.Remove(f.Name())
if _, err := f.Read(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, data2) {
t.Errorf("unexpected file contents. Got %v\n", buf)
}
}

View File

@ -113,6 +113,12 @@ func (t *table) Sync() error {
return t.db.Sync() return t.db.Sync()
} }
// MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format.
func (t *table) MigrateTable(kind string, convert convertLegacyFn) error {
return t.db.MigrateTable(kind, convert)
}
// Put inserts the given value into the database at a prefixed version of the // Put inserts the given value into the database at a prefixed version of the
// provided key. // provided key.
func (t *table) Put(key []byte, value []byte) error { func (t *table) Put(key []byte, value []byte) error {

53
core/types/legacy.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
import (
"errors"
"github.com/ethereum/go-ethereum/rlp"
)
// IsLegacyStoredReceipts tries to parse the RLP-encoded blob
// first as an array of v3 stored receipt, then v4 stored receipt and
// returns true if successful.
func IsLegacyStoredReceipts(raw []byte) (bool, error) {
var v3 []v3StoredReceiptRLP
if err := rlp.DecodeBytes(raw, &v3); err == nil {
return true, nil
}
var v4 []v4StoredReceiptRLP
if err := rlp.DecodeBytes(raw, &v4); err == nil {
return true, nil
}
var v5 []storedReceiptRLP
// Check to see valid fresh stored receipt
if err := rlp.DecodeBytes(raw, &v5); err == nil {
return false, nil
}
return false, errors.New("value is not a valid receipt encoding")
}
// ConvertLegacyStoredReceipts takes the RLP encoding of an array of legacy
// stored receipts and returns a fresh RLP-encoded stored receipt.
func ConvertLegacyStoredReceipts(raw []byte) ([]byte, error) {
var receipts []ReceiptForStorage
if err := rlp.DecodeBytes(raw, &receipts); err != nil {
return nil, err
}
return rlp.EncodeToBytes(&receipts)
}

View File

@ -124,6 +124,11 @@ type AncientWriter interface {
// Sync flushes all in-memory ancient store data to disk. // Sync flushes all in-memory ancient store data to disk.
Sync() error Sync() error
// MigrateTable processes and migrates entries of a given table to a new format.
// The second argument is a function that takes a raw entry and returns it
// in the newest format.
MigrateTable(string, func([]byte) ([]byte, error)) error
} }
// AncientWriteOp is given to the function argument of ModifyAncients. // AncientWriteOp is given to the function argument of ModifyAncients.