core/state/snapshot: detect and clean up dangling storage snapshot in generation (#24811)

* core/state/snapshot: check dangling storages when generating snapshot

* core/state/snapshot: polish

* core/state/snapshot: wipe the last part of the dangling storages

* core/state/snapshot: fix and add tests

* core/state/snapshot: fix comment

* README: remove mentions of fast sync (#24656)

Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>

* core, cmd: expose dangling storage detector for wider usage

* core/state/snapshot: rename variable

* core, ethdb: use global iterators for snapshot generation

* core/state/snapshot: polish

* cmd, core/state/snapshot: polish

* core/state/snapshot: polish

* Update core/state/snapshot/generate.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* ethdb: extend db test suite and fix memorydb iterator

* ethdb/dbtest: rollback changes

* ethdb/memorydb: simplify iteration

* core/state/snapshot: update dangling counter

* core/state/snapshot: release iterators

* core/state/snapshot: update metrics

* core/state/snapshot: update time metrics

* metrics/influxdb: temp solution to present counter meaningfully, remove it

* add debug log, revert later

* core/state/snapshot: fix iterator panic

* all: customized snapshot iterator for backward iteration

* core, ethdb: polish

* core/state/snapshot: remove debug log

* core/state/snapshot: address comments from peter

* core/state/snapshot: reopen the iterator at the next position

* ethdb, core/state/snapshot: address comment from peter

* core/state/snapshot: reopen exhausted iterators

Co-authored-by: Tbnoapi <63448616+nuoomnoy02@users.noreply.github.com>
Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>
Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
rjl493456442 2022-05-23 18:26:22 +08:00 committed by GitHub
parent 2b0d0ce8b0
commit 59ac229f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 966 additions and 510 deletions

View File

@ -20,7 +20,6 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"time"
@ -32,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@ -223,15 +221,7 @@ func verifyState(ctx *cli.Context) error {
return err
}
log.Info("Verified the state", "root", root)
if err := checkDanglingDiskStorage(chaindb); err != nil {
log.Error("Dangling snap disk-storage check failed", "root", root, "err", err)
return err
}
if err := checkDanglingMemStorage(chaindb); err != nil {
log.Error("Dangling snap mem-storage check failed", "root", root, "err", err)
return err
}
return nil
return snapshot.CheckDanglingStorage(chaindb)
}
// checkDanglingStorage iterates the snap storage data, and verifies that all
@ -240,56 +230,7 @@ func checkDanglingStorage(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
if err := checkDanglingDiskStorage(chaindb); err != nil {
return err
}
return checkDanglingMemStorage(chaindb)
}
// checkDanglingDiskStorage checks if there is any 'dangling' storage data in the
// disk-backed snapshot layer.
func checkDanglingDiskStorage(chaindb ethdb.Database) error {
log.Info("Checking dangling snapshot disk storage")
var (
lastReport = time.Now()
start = time.Now()
lastKey []byte
it = rawdb.NewKeyLengthIterator(chaindb.NewIterator(rawdb.SnapshotStoragePrefix, nil), 1+2*common.HashLength)
)
defer it.Release()
for it.Next() {
k := it.Key()
accKey := k[1:33]
if bytes.Equal(accKey, lastKey) {
// No need to look up for every slot
continue
}
lastKey = common.CopyBytes(accKey)
if time.Since(lastReport) > time.Second*8 {
log.Info("Iterating snap storage", "at", fmt.Sprintf("%#x", accKey), "elapsed", common.PrettyDuration(time.Since(start)))
lastReport = time.Now()
}
if data := rawdb.ReadAccountSnapshot(chaindb, common.BytesToHash(accKey)); len(data) == 0 {
log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", accKey), "storagekey", fmt.Sprintf("%#x", k))
return fmt.Errorf("dangling snapshot storage account %#x", accKey)
}
}
log.Info("Verified the snapshot disk storage", "time", common.PrettyDuration(time.Since(start)), "err", it.Error())
return nil
}
// checkDanglingMemStorage checks if there is any 'dangling' storage in the journalled
// snapshot difflayers.
func checkDanglingMemStorage(chaindb ethdb.Database) error {
start := time.Now()
log.Info("Checking dangling snapshot difflayer journalled storage")
if err := snapshot.CheckJournalStorage(chaindb); err != nil {
return err
}
log.Info("Verified the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start)))
return nil
return snapshot.CheckDanglingStorage(utils.MakeChainDatabase(ctx, stack, true))
}
// traverseState is a helper function used for pruning verification.

View File

@ -0,0 +1,241 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snapshot
import (
"bytes"
"encoding/binary"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
)
const (
snapAccount = "account" // Identifier of account snapshot generation
snapStorage = "storage" // Identifier of storage snapshot generation
)
// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes.
type generatorStats struct {
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
accounts uint64 // Number of accounts indexed(generated or recovered)
slots uint64 // Number of storage slots indexed(generated or recovered)
dangling uint64 // Number of dangling storage slots
storage common.StorageSize // Total account and storage slot size(generation or recovery)
}
// Log creates an contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
var ctx []interface{}
if root != (common.Hash{}) {
ctx = append(ctx, []interface{}{"root", root}...)
}
// Figure out whether we're after or within an account
switch len(marker) {
case common.HashLength:
ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
case 2 * common.HashLength:
ctx = append(ctx, []interface{}{
"in", common.BytesToHash(marker[:common.HashLength]),
"at", common.BytesToHash(marker[common.HashLength:]),
}...)
}
// Add the usual measurements
ctx = append(ctx, []interface{}{
"accounts", gs.accounts,
"slots", gs.slots,
"storage", gs.storage,
"dangling", gs.dangling,
"elapsed", common.PrettyDuration(time.Since(gs.start)),
}...)
// Calculate the estimated indexing time based on current stats
if len(marker) > 0 {
if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
ctx = append(ctx, []interface{}{
"eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
}...)
}
}
log.Info(msg, ctx...)
}
// generatorContext carries a few global values to be shared by all generation functions.
type generatorContext struct {
stats *generatorStats // Generation statistic collection
db ethdb.KeyValueStore // Key-value store containing the snapshot data
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
batch ethdb.Batch // Database batch for writing batch data atomically
logged time.Time // The timestamp when last generation progress was displayed
}
// newGeneratorContext initializes the context for generation.
func newGeneratorContext(stats *generatorStats, db ethdb.KeyValueStore, accMarker []byte, storageMarker []byte) *generatorContext {
ctx := &generatorContext{
stats: stats,
db: db,
batch: db.NewBatch(),
logged: time.Now(),
}
ctx.openIterator(snapAccount, accMarker)
ctx.openIterator(snapStorage, storageMarker)
return ctx
}
// openIterator constructs global account and storage snapshot iterators
// at the interrupted position. These iterators should be reopened from time
// to time to avoid blocking leveldb compaction for a long time.
func (ctx *generatorContext) openIterator(kind string, start []byte) {
if kind == snapAccount {
iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start)
ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength))
return
}
iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start)
ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength))
}
// reopenIterator releases the specified snapshot iterator and re-open it
// in the next position. It's aimed for not blocking leveldb compaction.
func (ctx *generatorContext) reopenIterator(kind string) {
// Shift iterator one more step, so that we can reopen
// the iterator at the right position.
var iter = ctx.account
if kind == snapStorage {
iter = ctx.storage
}
hasNext := iter.Next()
if !hasNext {
// Iterator exhausted, release forever and create an already exhausted virtual iterator
iter.Release()
if kind == snapAccount {
ctx.account = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
ctx.storage = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
next := iter.Key()
iter.Release()
ctx.openIterator(kind, next[1:])
}
// close releases all the held resources.
func (ctx *generatorContext) close() {
ctx.account.Release()
ctx.storage.Release()
}
// iterator returns the corresponding iterator specified by the kind.
func (ctx *generatorContext) iterator(kind string) *holdableIterator {
if kind == snapAccount {
return ctx.account
}
return ctx.storage
}
// removeStorageBefore deletes all storage entries which are located before
// the specified account. When the iterator touches the storage entry which
// is located in or outside the given account, it stops and holds the current
// iterated element locally.
func (ctx *generatorContext) removeStorageBefore(account common.Hash) {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
ctx.stats.dangling += count
snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds())
}
// removeStorageAt deletes all storage entries which are located in the specified
// account. When the iterator touches the storage entry which is outside the given
// account, it stops and holds the current iterated element locally. An error will
// be returned if the initial position of iterator is not in the given account.
func (ctx *generatorContext) removeStorageAt(account common.Hash) error {
var (
count int64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
cmp := bytes.Compare(key[1:1+common.HashLength], account.Bytes())
if cmp < 0 {
return errors.New("invalid iterator position")
}
if cmp > 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
snapWipedStorageMeter.Mark(count)
snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds())
return nil
}
// removeStorageLeft deletes all storage entries which are located after
// the current iterator position.
func (ctx *generatorContext) removeStorageLeft() {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
count++
ctx.batch.Delete(iter.Key())
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
ctx.stats.dangling += count
snapDanglingStorageMeter.Mark(int64(count))
snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds())
}

View File

@ -0,0 +1,155 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snapshot
import (
"bytes"
"errors"
"fmt"
"io"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// CheckDanglingStorage iterates the snap storage data, and verifies that all
// storage also has corresponding account data.
func CheckDanglingStorage(chaindb ethdb.KeyValueStore) error {
if err := checkDanglingDiskStorage(chaindb); err != nil {
return err
}
return checkDanglingMemStorage(chaindb)
}
// checkDanglingDiskStorage checks if there is any 'dangling' storage data in the
// disk-backed snapshot layer.
func checkDanglingDiskStorage(chaindb ethdb.KeyValueStore) error {
var (
lastReport = time.Now()
start = time.Now()
lastKey []byte
it = rawdb.NewKeyLengthIterator(chaindb.NewIterator(rawdb.SnapshotStoragePrefix, nil), 1+2*common.HashLength)
)
log.Info("Checking dangling snapshot disk storage")
defer it.Release()
for it.Next() {
k := it.Key()
accKey := k[1:33]
if bytes.Equal(accKey, lastKey) {
// No need to look up for every slot
continue
}
lastKey = common.CopyBytes(accKey)
if time.Since(lastReport) > time.Second*8 {
log.Info("Iterating snap storage", "at", fmt.Sprintf("%#x", accKey), "elapsed", common.PrettyDuration(time.Since(start)))
lastReport = time.Now()
}
if data := rawdb.ReadAccountSnapshot(chaindb, common.BytesToHash(accKey)); len(data) == 0 {
log.Warn("Dangling storage - missing account", "account", fmt.Sprintf("%#x", accKey), "storagekey", fmt.Sprintf("%#x", k))
return fmt.Errorf("dangling snapshot storage account %#x", accKey)
}
}
log.Info("Verified the snapshot disk storage", "time", common.PrettyDuration(time.Since(start)), "err", it.Error())
return nil
}
// checkDanglingMemStorage checks if there is any 'dangling' storage in the journalled
// snapshot difflayers.
func checkDanglingMemStorage(db ethdb.KeyValueStore) error {
var (
start = time.Now()
journal = rawdb.ReadSnapshotJournal(db)
)
if len(journal) == 0 {
log.Warn("Loaded snapshot journal", "diffs", "missing")
return nil
}
r := rlp.NewStream(bytes.NewReader(journal), 0)
// Firstly, resolve the first element as the journal version
version, err := r.Uint()
if err != nil {
log.Warn("Failed to resolve the journal version", "error", err)
return nil
}
if version != journalVersion {
log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
return nil
}
// Secondly, resolve the disk layer root, ensure it's continuous
// with disk layer. Note now we can ensure it's the snapshot journal
// correct version, so we expect everything can be resolved properly.
var root common.Hash
if err := r.Decode(&root); err != nil {
return errors.New("missing disk layer root")
}
// The diff journal is not matched with disk, discard them.
// It can happen that Geth crashes without persisting the latest
// diff journal.
// Load all the snapshot diffs from the journal
if err := checkDanglingJournalStorage(r); err != nil {
return err
}
log.Info("Verified the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start)))
return nil
}
// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
// diff and verifying that it can be linked to the requested parent.
func checkDanglingJournalStorage(r *rlp.Stream) error {
for {
// Read the next diff journal entry
var root common.Hash
if err := r.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return nil
}
return fmt.Errorf("load diff root: %v", err)
}
var destructs []journalDestruct
if err := r.Decode(&destructs); err != nil {
return fmt.Errorf("load diff destructs: %v", err)
}
var accounts []journalAccount
if err := r.Decode(&accounts); err != nil {
return fmt.Errorf("load diff accounts: %v", err)
}
accountData := make(map[common.Hash][]byte)
for _, entry := range accounts {
if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
accountData[entry.Hash] = entry.Blob
} else {
accountData[entry.Hash] = nil
}
}
var storage []journalStorage
if err := r.Decode(&storage); err != nil {
return fmt.Errorf("load diff storage: %v", err)
}
for _, entry := range storage {
if _, ok := accountData[entry.Hash]; !ok {
log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", entry.Hash), "root", root)
return fmt.Errorf("dangling journal snapshot storage account %#x", entry.Hash)
}
}
}
}

View File

@ -18,7 +18,6 @@ package snapshot
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/big"
@ -27,13 +26,11 @@ import (
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@ -47,14 +44,14 @@ var (
// accountCheckRange is the upper limit of the number of accounts involved in
// each range check. This is a value estimated based on experience. If this
// value is too large, the failure rate of range prove will increase. Otherwise
// the value is too small, the efficiency of the state recovery will decrease.
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
accountCheckRange = 128
// storageCheckRange is the upper limit of the number of storage slots involved
// in each range check. This is a value estimated based on experience. If this
// value is too large, the failure rate of range prove will increase. Otherwise
// the value is too small, the efficiency of the state recovery will decrease.
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
storageCheckRange = 1024
// errMissingTrie is returned if the target trie is missing while the generation
@ -62,85 +59,6 @@ var (
errMissingTrie = errors.New("missing trie")
)
// Metrics in generation
var (
snapGeneratedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/generated", nil)
snapRecoveredAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/recovered", nil)
snapWipedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/wiped", nil)
snapMissallAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/missall", nil)
snapGeneratedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/generated", nil)
snapRecoveredStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/recovered", nil)
snapWipedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/wiped", nil)
snapMissallStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/missall", nil)
snapSuccessfulRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/success", nil)
snapFailedRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/failure", nil)
// snapAccountProveCounter measures time spent on the account proving
snapAccountProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/prove", nil)
// snapAccountTrieReadCounter measures time spent on the account trie iteration
snapAccountTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/trieread", nil)
// snapAccountSnapReadCounter measues time spent on the snapshot account iteration
snapAccountSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/snapread", nil)
// snapAccountWriteCounter measures time spent on writing/updating/deleting accounts
snapAccountWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/write", nil)
// snapStorageProveCounter measures time spent on storage proving
snapStorageProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/prove", nil)
// snapStorageTrieReadCounter measures time spent on the storage trie iteration
snapStorageTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/trieread", nil)
// snapStorageSnapReadCounter measures time spent on the snapshot storage iteration
snapStorageSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/snapread", nil)
// snapStorageWriteCounter measures time spent on writing/updating/deleting storages
snapStorageWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/write", nil)
)
// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes.
type generatorStats struct {
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
accounts uint64 // Number of accounts indexed(generated or recovered)
slots uint64 // Number of storage slots indexed(generated or recovered)
storage common.StorageSize // Total account and storage slot size(generation or recovery)
}
// Log creates an contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
var ctx []interface{}
if root != (common.Hash{}) {
ctx = append(ctx, []interface{}{"root", root}...)
}
// Figure out whether we're after or within an account
switch len(marker) {
case common.HashLength:
ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
case 2 * common.HashLength:
ctx = append(ctx, []interface{}{
"in", common.BytesToHash(marker[:common.HashLength]),
"at", common.BytesToHash(marker[common.HashLength:]),
}...)
}
// Add the usual measurements
ctx = append(ctx, []interface{}{
"accounts", gs.accounts,
"slots", gs.slots,
"storage", gs.storage,
"elapsed", common.PrettyDuration(time.Since(gs.start)),
}...)
// Calculate the estimated indexing time based on current stats
if len(marker) > 0 {
if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
ctx = append(ctx, []interface{}{
"eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
}...)
}
}
log.Info(msg, ctx...)
}
// generateSnapshot regenerates a brand new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
// and generation is continued in the background until done.
@ -248,25 +166,35 @@ func (result *proofResult) forEach(callback func(key []byte, val []byte) error)
//
// The proof result will be returned if the range proving is finished, otherwise
// the error will be returned to abort the entire procedure.
func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) {
func (dl *diskLayer) proveRange(ctx *generatorContext, root common.Hash, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) {
var (
keys [][]byte
vals [][]byte
proof = rawdb.NewMemoryDatabase()
diskMore = false
iter = ctx.iterator(kind)
start = time.Now()
min = append(prefix, origin...)
)
iter := dl.diskdb.NewIterator(prefix, origin)
defer iter.Release()
var start = time.Now()
for iter.Next() {
// Ensure the iterated item is always equal or larger than the given origin.
key := iter.Key()
if len(key) != len(prefix)+common.HashLength {
continue
if bytes.Compare(key, min) < 0 {
return nil, errors.New("invalid iteration position")
}
// Ensure the iterated item still fall in the specified prefix. If
// not which means the items in the specified area are all visited.
// Move the iterator a step back since we iterate one extra element
// out.
if !bytes.Equal(key[:len(prefix)], prefix) {
iter.Hold()
break
}
if len(keys) == max {
// Break if we've reached the max size, and signal that we're not
// done yet.
// done yet. Move the iterator a step back since we iterate one
// extra element out.
if len(keys) == max {
iter.Hold()
diskMore = true
break
}
@ -282,7 +210,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
// generation to heal the invalid data.
//
// Here append the original value to ensure that the number of key and
// value are the same.
// value are aligned.
vals = append(vals, common.CopyBytes(iter.Value()))
log.Error("Failed to convert account state data", "err", err)
} else {
@ -291,13 +219,13 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
}
}
// Update metrics for database iteration and merkle proving
if kind == "storage" {
if kind == snapStorage {
snapStorageSnapReadCounter.Inc(time.Since(start).Nanoseconds())
} else {
snapAccountSnapReadCounter.Inc(time.Since(start).Nanoseconds())
}
defer func(start time.Time) {
if kind == "storage" {
if kind == snapStorage {
snapStorageProveCounter.Inc(time.Since(start).Nanoseconds())
} else {
snapAccountProveCounter.Inc(time.Since(start).Nanoseconds())
@ -322,7 +250,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
// Snap state is chunked, generate edge proofs for verification.
tr, err := trie.New(root, dl.triedb)
if err != nil {
stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
return nil, errMissingTrie
}
// Firstly find out the key of last iterated element.
@ -371,19 +299,23 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
// onStateCallback is a function that is called by generateRange, when processing a range of
// accounts or storage slots. For each element, the callback is invoked.
// If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot.
// If 'write' is true, then this element needs to be updated with the 'val'.
// If 'write' is false, then this element is already correct, and needs no update. However,
// for accounts, the storage trie of the account needs to be checked.
//
// - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot.
// - If 'write' is true, then this element needs to be updated with the 'val'.
// - If 'write' is false, then this element is already correct, and needs no update.
// The 'val' is the canonical encoding of the value (not the slim format for accounts)
//
// However, for accounts, the storage trie of the account needs to be checked. Also,
// dangling storages(storage exists but the corresponding account is missing) need to
// be cleaned up.
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
// generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand.
func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
func (dl *diskLayer) generateRange(ctx *generatorContext, root common.Hash, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range
result, err := dl.proveRange(stats, root, prefix, kind, origin, max, valueConvertFn)
result, err := dl.proveRange(ctx, root, prefix, kind, origin, max, valueConvertFn)
if err != nil {
return false, nil, err
}
@ -414,18 +346,17 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
snapFailedRangeProofMeter.Mark(1)
// Special case, the entire trie is missing. In the original trie scheme,
// all the duplicated subtries will be filter out(only one copy of data
// all the duplicated subtries will be filtered out (only one copy of data
// will be stored). While in the snapshot model, all the storage tries
// belong to different contracts will be kept even they are duplicated.
// Track it to a certain extent remove the noise data used for statistics.
if origin == nil && last == nil {
meter := snapMissallAccountMeter
if kind == "storage" {
if kind == snapStorage {
meter = snapMissallStorageMeter
}
meter.Mark(1)
}
// We use the snap data to build up a cache which can be used by the
// main account trie as a primary lookup when resolving hashes
var snapNodeCache ethdb.KeyValueStore
@ -439,15 +370,16 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
root, _, _ := snapTrie.Commit(nil)
snapTrieDb.Commit(root, false, nil)
}
// Construct the trie for state iteration, reuse the trie
// if it's already opened with some nodes resolved.
tr := result.tr
if tr == nil {
tr, err = trie.New(root, dl.triedb)
if err != nil {
stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
ctx.stats.Log("Trie missing, state snapshotting paused", dl.root, dl.genMarker)
return false, nil, errMissingTrie
}
}
var (
trieMore bool
nodeIt = tr.NodeIterator(origin)
@ -466,6 +398,7 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
internal time.Duration
)
nodeIt.AddResolver(snapNodeCache)
for iter.Next() {
if last != nil && bytes.Compare(iter.Key, last) > 0 {
trieMore = true
@ -519,7 +452,7 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
internal += time.Since(istart)
// Update metrics for counting trie iteration
if kind == "storage" {
if kind == snapStorage {
snapStorageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
} else {
snapAccountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
@ -534,66 +467,69 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error {
var abort chan *generatorStats
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(current, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
}
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
journalProgress(batch, current, stats)
journalProgress(ctx.batch, current, ctx.stats)
if err := batch.Write(); err != nil {
if err := ctx.batch.Write(); err != nil {
return err
}
batch.Reset()
ctx.batch.Reset()
dl.lock.Lock()
dl.genMarker = current
dl.lock.Unlock()
if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, current)
ctx.stats.Log("Aborting state snapshot generation", dl.root, current)
return newAbortErr(abort) // bubble up an error for interruption
}
// Don't hold the iterators too long, release them to let compactor works
ctx.reopenIterator(snapAccount)
ctx.reopenIterator(snapStorage)
}
if time.Since(*logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, current)
*logged = time.Now()
if time.Since(ctx.logged) > 8*time.Second {
ctx.stats.Log("Generating state snapshot", dl.root, current)
ctx.logged = time.Now()
}
return nil
}
// generateStorages generates the missing storage slots of the specific contract.
// It's supposed to restart the generation from the given origin position.
func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
func generateStorages(ctx *generatorContext, dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())
if delete {
rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key))
rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val)
rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++
ctx.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
ctx.stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil {
if err := dl.checkAndFlush(ctx, append(account[:], key...)); err != nil {
return err
}
return nil
@ -601,7 +537,7 @@ func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Has
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil)
exhausted, last, err := dl.generateRange(ctx, storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
@ -619,23 +555,19 @@ func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Has
// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
var (
start = time.Now()
accountHash = common.BytesToHash(key)
)
if delete {
rawdb.DeleteAccountSnapshot(batch, accountHash)
snapWipedAccountMeter.Mark(1)
// Make sure to clear all dangling storages before this account
account := common.BytesToHash(key)
ctx.removeStorageBefore(account)
// Ensure that any previous snapshot storage values are cleared
prefix := append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...)
keyLen := len(rawdb.SnapshotStoragePrefix) + 2*common.HashLength
if err := wipeKeyRange(dl.diskdb, "storage", prefix, nil, nil, keyLen, snapWipedStorageMeter, false); err != nil {
return err
}
start := time.Now()
if delete {
rawdb.DeleteAccountSnapshot(ctx.batch, account)
snapWipedAccountMeter.Mark(1)
snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds())
ctx.removeStorageAt(account)
return nil
}
// Retrieve the current account and flatten it into the internal format
@ -649,7 +581,7 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats
log.Crit("Invalid account encountered during snapshot creation", "err", err)
}
// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) {
if accMarker == nil || !bytes.Equal(account[:], accMarker) {
dataLen := len(val) // Approximate size, saves us a round of RLP-encoding
if !write {
if bytes.Equal(acc.CodeHash, emptyCode[:]) {
@ -662,44 +594,34 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats
} else {
data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
dataLen = len(data)
rawdb.WriteAccountSnapshot(batch, accountHash, data)
rawdb.WriteAccountSnapshot(ctx.batch, account, data)
snapGeneratedAccountMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
stats.accounts++
ctx.stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
ctx.stats.accounts++
}
marker := accountHash[:]
// If the snap generation goes here after interrupted, genMarker may go backward
// when last genMarker is consisted of accountHash and storageHash
marker := account[:]
if accMarker != nil && bytes.Equal(marker, accMarker) && len(dl.genMarker) > common.HashLength {
marker = dl.genMarker[:]
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil {
if err := dl.checkAndFlush(ctx, marker); err != nil {
return err
}
snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well
// If the iterated account is the contract, create a further loop to
// verify or regenerate the contract storage.
if acc.Root == emptyRoot {
// If the root is empty, we still need to ensure that any previous snapshot
// storage values are cleared
// TODO: investigate if this can be avoided, this will be very costly since it
// affects every single EOA account
// - Perhaps we can avoid if where codeHash is emptyCode
prefix := append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...)
keyLen := len(rawdb.SnapshotStoragePrefix) + 2*common.HashLength
if err := wipeKeyRange(dl.diskdb, "storage", prefix, nil, nil, keyLen, snapWipedStorageMeter, false); err != nil {
return err
}
snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds())
ctx.removeStorageAt(account)
} else {
snapAccountWriteCounter.Inc(time.Since(start).Nanoseconds())
var storeMarker []byte
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
if accMarker != nil && bytes.Equal(account[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
}
if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil {
if err := generateStorages(ctx, dl, account, acc.Root, storeMarker); err != nil {
return err
}
}
@ -707,25 +629,26 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats
accMarker = nil
return nil
}
// Always reset the initial account range as 1 whenever recover from the interruption.
// Always reset the initial account range as 1 whenever recover from the
// interruption. TODO(rjl493456442) can we remove it?
var accountRange = accountCheckRange
if len(accMarker) > 0 {
accountRange = 1
}
// Global loop for re-generating the account snapshots + all layered storage snapshots.
origin := common.CopyBytes(accMarker)
for {
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP)
exhausted, last, err := dl.generateRange(ctx, dl.root, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountRange, onAccount, FullAccountRLP)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire snapshot is generated
if exhausted {
origin = increaseKey(last)
// Last step, cleanup the storages after the last account.
// All the left storages should be treated as dangling.
if origin == nil || exhausted {
ctx.removeStorageLeft()
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
accountRange = accountCheckRange
}
return nil
@ -736,19 +659,27 @@ func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var accMarker []byte
var (
accMarker []byte
abort chan *generatorStats
)
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
// Generate the snapshot accounts from the point where they left off.
if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil {
// Initialize the global generator context. The snapshot iterators are
// opened at the interrupted position because the assumption is held
// that all the snapshot data are generated correctly before the marker.
// Even if the snapshot data is updated during the interruption (before
// or at the marker), the assumption is still held.
// For the account or storage slot at the interruption, they will be
// processed twice by the generator(they are already processed in the
// last run) but it's fine.
ctx := newGeneratorContext(stats, dl.diskdb, accMarker, dl.genMarker)
defer ctx.close()
if err := generateAccounts(ctx, dl, accMarker); err != nil {
// Extract the received interruption signal if exists
if aerr, ok := err.(*abortErr); ok {
abort = aerr.abort
@ -763,18 +694,18 @@ func (dl *diskLayer) generate(stats *generatorStats) {
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
journalProgress(batch, nil, stats)
if err := batch.Write(); err != nil {
journalProgress(ctx.batch, nil, stats)
if err := ctx.batch.Write(); err != nil {
log.Error("Failed to flush batch", "err", err)
abort = <-dl.genAbort
abort <- stats
return
}
batch.Reset()
ctx.batch.Reset()
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
"storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start)))
"storage", stats.storage, "dangling", stats.dangling, "elapsed", common.PrettyDuration(time.Since(stats.start)))
dl.lock.Lock()
dl.genMarker = nil

View File

@ -148,8 +148,10 @@ func TestGenerateExistentState(t *testing.T) {
func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) {
t.Helper()
accIt := snap.AccountIterator(common.Hash{})
defer accIt.Release()
snapRoot, err := generateTrieRoot(nil, accIt, common.Hash{}, stackTrieGenerate,
func(db ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *generateStats) (common.Hash, error) {
storageIt, _ := snap.StorageIterator(accountHash, common.Hash{})
@ -168,6 +170,9 @@ func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) {
if snapRoot != trieRoot {
t.Fatalf("snaproot: %#x != trieroot #%x", snapRoot, trieRoot)
}
if err := CheckDanglingStorage(snap.diskdb); err != nil {
t.Fatalf("Detected dangling storages %v", err)
}
}
type testHelper struct {
@ -831,3 +836,122 @@ func TestGenerateWithIncompleteStorage(t *testing.T) {
snap.genAbort <- stop
<-stop
}
func incKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
if key[i] != 0x0 {
break
}
}
return key
}
func decKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]--
if key[i] != 0xff {
break
}
}
return key
}
func populateDangling(disk ethdb.KeyValueStore) {
populate := func(accountHash common.Hash, keys []string, vals []string) {
for i, key := range keys {
rawdb.WriteStorageSnapshot(disk, accountHash, hashData([]byte(key)), []byte(vals[i]))
}
}
// Dangling storages of the "first" account
populate(common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the "last" account
populate(common.HexToHash("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 1
hash := decKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 2
hash = decKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 3
hash = decKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the random account
populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(randomHash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
}
// Tests that snapshot generation with dangling storages. Dangling storage means
// the storage data is existent while the corresponding account data is missing.
//
// This test will populate some dangling storages to see if they can be cleaned up.
func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) {
var helper = newHelper()
stRoot := helper.makeStorageTrie([]string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
helper.addAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})
helper.addAccount("acc-2", &Account{Balance: big.NewInt(1), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()})
helper.addAccount("acc-3", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})
helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
helper.addSnapStorage("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populateDangling(helper.diskdb)
root, snap := helper.Generate()
select {
case <-snap.genPending:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
stop := make(chan *generatorStats)
snap.genAbort <- stop
<-stop
}
// Tests that snapshot generation with dangling storages. Dangling storage means
// the storage data is existent while the corresponding account data is missing.
//
// This test will populate some dangling storages to see if they can be cleaned up.
func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) {
var helper = newHelper()
stRoot := helper.makeStorageTrie([]string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
helper.addTrieAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})
helper.addTrieAccount("acc-2", &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()})
helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: stRoot, CodeHash: emptyCode.Bytes()})
populateDangling(helper.diskdb)
root, snap := helper.Generate()
select {
case <-snap.genPending:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
stop := make(chan *generatorStats)
snap.genAbort <- stop
<-stop
}

View File

@ -0,0 +1,97 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
package snapshot
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
)
// holdableIterator is a wrapper of underlying database iterator. It extends
// the basic iterator interface by adding Hold which can hold the element
// locally where the iterator is currently located and serve it up next time.
type holdableIterator struct {
it ethdb.Iterator
key []byte
val []byte
atHeld bool
}
// newHoldableIterator initializes the holdableIterator with the given iterator.
func newHoldableIterator(it ethdb.Iterator) *holdableIterator {
return &holdableIterator{it: it}
}
// Hold holds the element locally where the iterator is currently located which
// can be served up next time.
func (it *holdableIterator) Hold() {
if it.it.Key() == nil {
return // nothing to hold
}
it.key = common.CopyBytes(it.it.Key())
it.val = common.CopyBytes(it.it.Value())
it.atHeld = false
}
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (it *holdableIterator) Next() bool {
if !it.atHeld && it.key != nil {
it.atHeld = true
} else if it.atHeld {
it.atHeld = false
it.key = nil
it.val = nil
}
if it.key != nil {
return true // shifted to locally held value
}
return it.it.Next()
}
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
func (it *holdableIterator) Error() error { return it.it.Error() }
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (it *holdableIterator) Release() {
it.atHeld = false
it.key = nil
it.val = nil
it.it.Release()
}
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (it *holdableIterator) Key() []byte {
if it.key != nil {
return it.key
}
return it.it.Key()
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (it *holdableIterator) Value() []byte {
if it.val != nil {
return it.val
}
return it.it.Value()
}

View File

@ -0,0 +1,163 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
package snapshot
import (
"bytes"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
)
func TestIteratorHold(t *testing.T) {
// Create the key-value data store
var (
content = map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}
order = []string{"k1", "k2", "k3"}
db = rawdb.NewMemoryDatabase()
)
for key, val := range content {
if err := db.Put([]byte(key), []byte(val)); err != nil {
t.Fatalf("failed to insert item %s:%s into database: %v", key, val, err)
}
}
// Iterate over the database with the given configs and verify the results
it, idx := newHoldableIterator(db.NewIterator(nil, nil)), 0
// Nothing should be affected for calling Discard on non-initialized iterator
it.Hold()
for it.Next() {
if len(content) <= idx {
t.Errorf("more items than expected: checking idx=%d (key %q), expecting len=%d", idx, it.Key(), len(order))
break
}
if !bytes.Equal(it.Key(), []byte(order[idx])) {
t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx])
}
if !bytes.Equal(it.Value(), []byte(content[order[idx]])) {
t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]])
}
// Should be safe to call discard multiple times
it.Hold()
it.Hold()
// Shift iterator to the discarded element
it.Next()
if !bytes.Equal(it.Key(), []byte(order[idx])) {
t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx])
}
if !bytes.Equal(it.Value(), []byte(content[order[idx]])) {
t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]])
}
// Discard/Next combo should work always
it.Hold()
it.Next()
if !bytes.Equal(it.Key(), []byte(order[idx])) {
t.Errorf("item %d: key mismatch: have %s, want %s", idx, string(it.Key()), order[idx])
}
if !bytes.Equal(it.Value(), []byte(content[order[idx]])) {
t.Errorf("item %d: value mismatch: have %s, want %s", idx, string(it.Value()), content[order[idx]])
}
idx++
}
if err := it.Error(); err != nil {
t.Errorf("iteration failed: %v", err)
}
if idx != len(order) {
t.Errorf("iteration terminated prematurely: have %d, want %d", idx, len(order))
}
db.Close()
}
func TestReopenIterator(t *testing.T) {
var (
content = map[common.Hash]string{
common.HexToHash("a1"): "v1",
common.HexToHash("a2"): "v2",
common.HexToHash("a3"): "v3",
common.HexToHash("a4"): "v4",
common.HexToHash("a5"): "v5",
common.HexToHash("a6"): "v6",
}
order = []common.Hash{
common.HexToHash("a1"),
common.HexToHash("a2"),
common.HexToHash("a3"),
common.HexToHash("a4"),
common.HexToHash("a5"),
common.HexToHash("a6"),
}
db = rawdb.NewMemoryDatabase()
)
for key, val := range content {
rawdb.WriteAccountSnapshot(db, key, []byte(val))
}
checkVal := func(it *holdableIterator, index int) {
if !bytes.Equal(it.Key(), append(rawdb.SnapshotAccountPrefix, order[index].Bytes()...)) {
t.Fatalf("Unexpected data entry key, want %v got %v", order[index], it.Key())
}
if !bytes.Equal(it.Value(), []byte(content[order[index]])) {
t.Fatalf("Unexpected data entry key, want %v got %v", []byte(content[order[index]]), it.Value())
}
}
// Iterate over the database with the given configs and verify the results
ctx, idx := newGeneratorContext(&generatorStats{}, db, nil, nil), -1
idx++
ctx.account.Next()
checkVal(ctx.account, idx)
ctx.reopenIterator(snapAccount)
idx++
ctx.account.Next()
checkVal(ctx.account, idx)
// reopen twice
ctx.reopenIterator(snapAccount)
ctx.reopenIterator(snapAccount)
idx++
ctx.account.Next()
checkVal(ctx.account, idx)
// reopen iterator with held value
ctx.account.Next()
ctx.account.Hold()
ctx.reopenIterator(snapAccount)
idx++
ctx.account.Next()
checkVal(ctx.account, idx)
// reopen twice iterator with held value
ctx.account.Next()
ctx.account.Hold()
ctx.reopenIterator(snapAccount)
ctx.reopenIterator(snapAccount)
idx++
ctx.account.Next()
checkVal(ctx.account, idx)
// shift to the end and reopen
ctx.account.Next() // the end
ctx.reopenIterator(snapAccount)
ctx.account.Next()
if ctx.account.Key() != nil {
t.Fatal("Unexpected iterated entry")
}
}

View File

@ -345,78 +345,3 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
return base, nil
}
// CheckJournalStorage performs consistency-checks on the journalled
// difflayers.
func CheckJournalStorage(db ethdb.KeyValueStore) error {
journal := rawdb.ReadSnapshotJournal(db)
if len(journal) == 0 {
log.Warn("Loaded snapshot journal", "diffs", "missing")
return nil
}
r := rlp.NewStream(bytes.NewReader(journal), 0)
// Firstly, resolve the first element as the journal version
version, err := r.Uint()
if err != nil {
log.Warn("Failed to resolve the journal version", "error", err)
return nil
}
if version != journalVersion {
log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
return nil
}
// Secondly, resolve the disk layer root, ensure it's continuous
// with disk layer. Note now we can ensure it's the snapshot journal
// correct version, so we expect everything can be resolved properly.
var root common.Hash
if err := r.Decode(&root); err != nil {
return errors.New("missing disk layer root")
}
// The diff journal is not matched with disk, discard them.
// It can happen that Geth crashes without persisting the latest
// diff journal.
// Load all the snapshot diffs from the journal
return checkDanglingJournalStorage(r)
}
// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
// diff and verifying that it can be linked to the requested parent.
func checkDanglingJournalStorage(r *rlp.Stream) error {
for {
// Read the next diff journal entry
var root common.Hash
if err := r.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return nil
}
return fmt.Errorf("load diff root: %v", err)
}
var destructs []journalDestruct
if err := r.Decode(&destructs); err != nil {
return fmt.Errorf("load diff destructs: %v", err)
}
var accounts []journalAccount
if err := r.Decode(&accounts); err != nil {
return fmt.Errorf("load diff accounts: %v", err)
}
accountData := make(map[common.Hash][]byte)
for _, entry := range accounts {
if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
accountData[entry.Hash] = entry.Blob
} else {
accountData[entry.Hash] = nil
}
}
var storage []journalStorage
if err := r.Decode(&storage); err != nil {
return fmt.Errorf("load diff storage: %v", err)
}
for _, entry := range storage {
if _, ok := accountData[entry.Hash]; !ok {
log.Error("Dangling storage - missing account", "account", fmt.Sprintf("%#x", entry.Hash), "root", root)
return fmt.Errorf("dangling journal snapshot storage account %#x", entry.Hash)
}
}
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
package snapshot
import "github.com/ethereum/go-ethereum/metrics"
// Metrics in generation
var (
snapGeneratedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/generated", nil)
snapRecoveredAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/recovered", nil)
snapWipedAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/wiped", nil)
snapMissallAccountMeter = metrics.NewRegisteredMeter("state/snapshot/generation/account/missall", nil)
snapGeneratedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/generated", nil)
snapRecoveredStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/recovered", nil)
snapWipedStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/wiped", nil)
snapMissallStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/missall", nil)
snapDanglingStorageMeter = metrics.NewRegisteredMeter("state/snapshot/generation/storage/dangling", nil)
snapSuccessfulRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/success", nil)
snapFailedRangeProofMeter = metrics.NewRegisteredMeter("state/snapshot/generation/proof/failure", nil)
// snapAccountProveCounter measures time spent on the account proving
snapAccountProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/prove", nil)
// snapAccountTrieReadCounter measures time spent on the account trie iteration
snapAccountTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/trieread", nil)
// snapAccountSnapReadCounter measues time spent on the snapshot account iteration
snapAccountSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/snapread", nil)
// snapAccountWriteCounter measures time spent on writing/updating/deleting accounts
snapAccountWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/account/write", nil)
// snapStorageProveCounter measures time spent on storage proving
snapStorageProveCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/prove", nil)
// snapStorageTrieReadCounter measures time spent on the storage trie iteration
snapStorageTrieReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/trieread", nil)
// snapStorageSnapReadCounter measures time spent on the snapshot storage iteration
snapStorageSnapReadCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/snapread", nil)
// snapStorageWriteCounter measures time spent on writing/updating storages
snapStorageWriteCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/write", nil)
// snapStorageCleanCounter measures time spent on deleting storages
snapStorageCleanCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/clean", nil)
)

View File

@ -1,91 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snapshot
import (
"bytes"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
// wipeKeyRange deletes a range of keys from the database starting with prefix
// and having a specific total key length. The start and limit is optional for
// specifying a particular key range for deletion.
//
// Origin is included for wiping and limit is excluded if they are specified.
func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, origin []byte, limit []byte, keylen int, meter metrics.Meter, report bool) error {
// Batch deletions together to avoid holding an iterator for too long
var (
batch = db.NewBatch()
items int
)
// Iterate over the key-range and delete all of them
start, logged := time.Now(), time.Now()
it := db.NewIterator(prefix, origin)
var stop []byte
if limit != nil {
stop = append(prefix, limit...)
}
for it.Next() {
// Skip any keys with the correct prefix but wrong length (trie nodes)
key := it.Key()
if !bytes.HasPrefix(key, prefix) {
break
}
if len(key) != keylen {
continue
}
if stop != nil && bytes.Compare(key, stop) >= 0 {
break
}
// Delete the key and periodically recreate the batch and iterator
batch.Delete(key)
items++
if items%10000 == 0 {
// Batch too large (or iterator too long lived, flush and recreate)
it.Release()
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
seekPos := key[len(prefix):]
it = db.NewIterator(prefix, seekPos)
if time.Since(logged) > 8*time.Second && report {
log.Info("Deleting state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
}
}
it.Release()
if err := batch.Write(); err != nil {
return err
}
if meter != nil {
meter.Mark(int64(items))
}
if report {
log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start)))
}
return nil
}

View File

@ -1,79 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snapshot
import (
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
)
// Tests that given a database with random data content, all parts of a snapshot
// can be crrectly wiped without touching anything else.
func TestWipe(t *testing.T) {
// Create a database with some random snapshot data
db := memorydb.New()
for i := 0; i < 128; i++ {
rawdb.WriteAccountSnapshot(db, randomHash(), randomHash().Bytes())
}
// Add some random non-snapshot data too to make wiping harder
for i := 0; i < 500; i++ {
// Generate keys with wrong length for a state snapshot item
keysuffix := make([]byte, 31)
rand.Read(keysuffix)
db.Put(append(rawdb.SnapshotAccountPrefix, keysuffix...), randomHash().Bytes())
keysuffix = make([]byte, 33)
rand.Read(keysuffix)
db.Put(append(rawdb.SnapshotAccountPrefix, keysuffix...), randomHash().Bytes())
}
count := func() (items int) {
it := db.NewIterator(rawdb.SnapshotAccountPrefix, nil)
defer it.Release()
for it.Next() {
if len(it.Key()) == len(rawdb.SnapshotAccountPrefix)+common.HashLength {
items++
}
}
return items
}
// Sanity check that all the keys are present
if items := count(); items != 128 {
t.Fatalf("snapshot size mismatch: have %d, want %d", items, 128)
}
// Wipe the accounts
if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, nil, nil,
len(rawdb.SnapshotAccountPrefix)+common.HashLength, snapWipedAccountMeter, true); err != nil {
t.Fatal(err)
}
// Iterate over the database end ensure no snapshot information remains
if items := count(); items != 0 {
t.Fatalf("snapshot size mismatch: have %d, want %d", items, 0)
}
// Iterate over the database and ensure miscellaneous items are present
items := 0
it := db.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
items++
}
if items != 1000 {
t.Fatalf("misc item count mismatch: have %d, want %d", items, 1000)
}
}

View File

@ -169,6 +169,7 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
values = append(values, db.db[key])
}
return &iterator{
index: -1,
keys: keys,
values: values,
}
@ -279,7 +280,7 @@ func (b *batch) Replay(w ethdb.KeyValueWriter) error {
// value store. Internally it is a deep copy of the entire iterated state,
// sorted by keys.
type iterator struct {
inited bool
index int
keys []string
values [][]byte
}
@ -287,17 +288,12 @@ type iterator struct {
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (it *iterator) Next() bool {
// If the iterator was not yet initialized, do it now
if !it.inited {
it.inited = true
return len(it.keys) > 0
// Short circuit if iterator is already exhausted in the forward direction.
if it.index >= len(it.keys) {
return false
}
// Iterator already initialize, advance it
if len(it.keys) > 0 {
it.keys = it.keys[1:]
it.values = it.values[1:]
}
return len(it.keys) > 0
it.index += 1
return it.index < len(it.keys)
}
// Error returns any accumulated error. Exhausting all the key/value pairs
@ -310,26 +306,28 @@ func (it *iterator) Error() error {
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (it *iterator) Key() []byte {
if len(it.keys) > 0 {
return []byte(it.keys[0])
}
// Short circuit if iterator is not in a valid position
if it.index < 0 || it.index >= len(it.keys) {
return nil
}
return []byte(it.keys[it.index])
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (it *iterator) Value() []byte {
if len(it.values) > 0 {
return it.values[0]
}
// Short circuit if iterator is not in a valid position
if it.index < 0 || it.index >= len(it.keys) {
return nil
}
return it.values[it.index]
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (it *iterator) Release() {
it.keys, it.values = nil, nil
it.index, it.keys, it.values = -1, nil, nil
}
// snapshot wraps a batch of key-value entries deep copied from the in-memory

View File

@ -129,17 +129,15 @@ func (r *reporter) send() error {
switch metric := i.(type) {
case metrics.Counter:
v := metric.Count()
l := r.cache[name]
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": v - l,
"value": count,
},
Time: now,
})
r.cache[name] = v
case metrics.Gauge:
ms := metric.Snapshot()
pts = append(pts, client.Point{