Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
7 changed files with 109 additions and 44 deletions
Showing only changes of commit 3da42f85d9 - Show all commits

View File

@ -170,7 +170,14 @@ func pruneState(ctx *cli.Context) error {
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, false)
pruner, err := pruner.NewPruner(chaindb, stack.ResolvePath(""), stack.ResolvePath(config.Eth.TrieCleanCacheJournal), ctx.Uint64(utils.BloomFilterSizeFlag.Name))
defer chaindb.Close()
prunerconfig := pruner.Config{
Datadir: stack.ResolvePath(""),
Cachedir: stack.ResolvePath(config.Eth.TrieCleanCacheJournal),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
}
pruner, err := pruner.NewPruner(chaindb, prunerconfig)
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
@ -199,12 +206,20 @@ func verifyState(ctx *cli.Context) error {
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
defer chaindb.Close()
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")
return errors.New("no head block")
}
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false, false)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root())
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
@ -479,7 +494,13 @@ func dumpState(ctx *cli.Context) error {
if err != nil {
return err
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, root, false, false, false)
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root)
if err != nil {
return err
}

View File

@ -2198,6 +2198,9 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (*core.BlockChain, ethdb.Data
if !ctx.Bool(SnapshotFlag.Name) {
cache.SnapshotLimit = 0 // Disabled
}
// Disable snapshot generation/wiping by default
cache.SnapshotNoBuild = true
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cache.TrieCleanLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
}
@ -2206,7 +2209,6 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (*core.BlockChain, ethdb.Data
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name)}
// TODO(rjl493456442) disable snapshot generation/wiping if the chain is read only.
// Disable transaction indexing/unindexing by default.
chain, err := core.NewBlockChain(chainDb, cache, gspec, nil, engine, vmcfg, nil, nil)
if err != nil {

View File

@ -135,7 +135,8 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}
// defaultCacheConfig are the default caching values if none are specified by the
@ -399,7 +400,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.stateCache.TrieDB(), head.Root())
}
// Start future block processor.

View File

@ -63,6 +63,13 @@ var (
emptyCode = crypto.Keccak256(nil)
)
// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
Cachedir string // The directory of state clean cache
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
}
// Pruner is an offline tool to prune the stale state with the
// help of the snapshot. The workflow of pruner is very simple:
//
@ -75,40 +82,44 @@ var (
// periodically in order to release the disk usage and improve the
// disk read performance to some extent.
type Pruner struct {
db ethdb.Database
stateBloom *stateBloom
datadir string
trieCachePath string
headHeader *types.Header
snaptree *snapshot.Tree
config Config
chainHeader *types.Header
db ethdb.Database
stateBloom *stateBloom
snaptree *snapshot.Tree
}
// NewPruner creates the pruner instance.
func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize uint64) (*Pruner, error) {
func NewPruner(db ethdb.Database, config Config) (*Pruner, error) {
headBlock := rawdb.ReadHeadBlock(db)
if headBlock == nil {
return nil, errors.New("Failed to load head block")
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, false)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return nil, err // The relevant snapshot(s) might not exist
}
// Sanitize the bloom filter size if it's too small.
if bloomSize < 256 {
log.Warn("Sanitizing bloomfilter size", "provided(MB)", bloomSize, "updated(MB)", 256)
bloomSize = 256
if config.BloomSize < 256 {
log.Warn("Sanitizing bloomfilter size", "provided(MB)", config.BloomSize, "updated(MB)", 256)
config.BloomSize = 256
}
stateBloom, err := newStateBloomWithSize(bloomSize)
stateBloom, err := newStateBloomWithSize(config.BloomSize)
if err != nil {
return nil, err
}
return &Pruner{
db: db,
stateBloom: stateBloom,
datadir: datadir,
trieCachePath: trieCachePath,
headHeader: headBlock.Header(),
snaptree: snaptree,
config: config,
chainHeader: headBlock.Header(),
db: db,
stateBloom: stateBloom,
snaptree: snaptree,
}, nil
}
@ -236,12 +247,12 @@ func (p *Pruner) Prune(root common.Hash) error {
// reuse it for pruning instead of generating a new one. It's
// mandatory because a part of state may already be deleted,
// the recovery procedure is necessary.
_, stateBloomRoot, err := findBloomFilter(p.datadir)
_, stateBloomRoot, err := findBloomFilter(p.config.Datadir)
if err != nil {
return err
}
if stateBloomRoot != (common.Hash{}) {
return RecoverPruning(p.datadir, p.db, p.trieCachePath)
return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir)
}
// If the target state root is not specified, use the HEAD-127 as the
// target. The reason for picking it is:
@ -252,7 +263,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// Retrieve all snapshot layers from the current HEAD.
// In theory there are 128 difflayers + 1 disk layer present,
// so 128 diff layers are expected to be returned.
layers = p.snaptree.Snapshots(p.headHeader.Root, 128, true)
layers = p.snaptree.Snapshots(p.chainHeader.Root, 128, true)
if len(layers) != 128 {
// Reject if the accumulated diff layers are less than 128. It
// means in most of normal cases, there is no associated state
@ -294,7 +305,7 @@ func (p *Pruner) Prune(root common.Hash) error {
}
} else {
if len(layers) > 0 {
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.headHeader.Number.Uint64()-127)
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64()-127)
} else {
log.Info("Selecting user-specified state as the pruning target", "root", root)
}
@ -303,7 +314,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// It's necessary otherwise in the next restart we will hit the
// deleted state root in the "clean cache" so that the incomplete
// state is picked for usage.
deleteCleanTrieCache(p.trieCachePath)
deleteCleanTrieCache(p.config.Cachedir)
// All the state roots of the middle layer should be forcibly pruned,
// otherwise the dangling state will be left.
@ -325,7 +336,7 @@ func (p *Pruner) Prune(root common.Hash) error {
if err := extractGenesis(p.db, p.stateBloom); err != nil {
return err
}
filterName := bloomFilterName(p.datadir, root)
filterName := bloomFilterName(p.config.Datadir, root)
log.Info("Writing state bloom to disk", "name", filterName)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix); err != nil {
@ -362,7 +373,13 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
// In this case, even the state HEAD is not exactly matched with snapshot, it
// still feasible to recover the pruning correctly.
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, true)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: true,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return err // The relevant snapshot(s) might not exist
}

View File

@ -120,7 +120,7 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
}
// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) {
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash, cache int, recovery bool, noBuild bool) (snapshot, bool, error) {
// If snapshotting is disabled (initial sync in progress), don't do anything,
// wait for the chain to permit us to do something meaningful
if rawdb.ReadSnapshotDisabled(diskdb) {
@ -140,7 +140,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
}
snapshot, generator, err := loadAndParseJournal(diskdb, base)
if err != nil {
log.Warn("Failed to load new-format journal", "error", err)
log.Warn("Failed to load journal", "error", err)
return nil, false, err
}
// Entire snapshot journal loaded, sanity check the head. If the loaded
@ -164,13 +164,16 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
// disk layer.
log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root)
}
// Everything loaded correctly, resume any suspended operations
// Load the disk layer status from the generator if it's not complete
if !generator.Done {
// Whether or not wiping was in progress, load any generator progress too
base.genMarker = generator.Marker
if base.genMarker == nil {
base.genMarker = []byte{}
}
}
// Everything loaded correctly, resume any suspended operations
// if the background generation is allowed
if !generator.Done && !noBuild {
base.genPending = make(chan struct{})
base.genAbort = make(chan chan *generatorStats)

View File

@ -148,6 +148,14 @@ type snapshot interface {
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
}
// Config includes the configurations for snapshots.
type Config struct {
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
}
// Tree is an Ethereum state snapshot tree. It consists of one persistent base
// layer backed by a key-value store, on top of which arbitrarily many in-memory
// diff layers are topped. The memory diffs can form a tree with branching, but
@ -158,9 +166,9 @@ type snapshot interface {
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
// cheap iteration of the account/storage tries for sync aid.
type Tree struct {
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *trie.Database // In-memory cache to access the trie through
cache int // Megabytes permitted to use for read caches
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
@ -183,26 +191,27 @@ type Tree struct {
// This case happens when the snapshot is 'ahead' of the state trie.
// - otherwise, the entire snapshot is considered invalid and will be recreated on
// a background thread.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) {
func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash) (*Tree, error) {
// Create a new, empty snapshot tree
snap := &Tree{
config: config,
diskdb: diskdb,
triedb: triedb,
cache: cache,
layers: make(map[common.Hash]snapshot),
}
if !async {
// Create the building waiter iff the background generation is allowed
if !config.NoBuild && !config.AsyncBuild {
defer snap.waitBuild()
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
head, disabled, err := loadSnapshot(diskdb, triedb, root, config.CacheSize, config.Recovery, config.NoBuild)
if disabled {
log.Warn("Snapshot maintenance disabled (syncing)")
return snap, nil
}
if err != nil {
if rebuild {
log.Warn("Failed to load snapshot, regenerating", "err", err)
log.Warn("Failed to load snapshot", "err", err)
if !config.NoBuild {
snap.Rebuild(root)
return snap, nil
}
@ -727,7 +736,7 @@ func (t *Tree) Rebuild(root common.Hash) {
// generator will run a wiper first if there's not one running right now.
log.Info("Rebuilding state snapshot")
t.layers = map[common.Hash]snapshot{
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root),
root: generateSnapshot(t.diskdb, t.triedb, t.config.CacheSize, root),
}
}

View File

@ -267,7 +267,13 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo
var snaps *snapshot.Tree
if snapshotter {
snaps, _ = snapshot.New(db, sdb.TrieDB(), 1, root, false, true, false)
snapconfig := snapshot.Config{
CacheSize: 1,
Recovery: false,
NoBuild: false,
AsyncBuild: false,
}
snaps, _ = snapshot.New(snapconfig, db, sdb.TrieDB(), root)
}
statedb, _ = state.New(root, sdb, snaps)
return snaps, statedb