forked from cerc-io/plugeth
snapshot: iteration and buffering optimizations
This commit is contained in:
parent
d7d81d7c12
commit
cdf3f016df
@ -227,9 +227,6 @@ func (dl *diffLayer) flatten() snapshot {
|
|||||||
// This is meant to be used during shutdown to persist the snapshot without
|
// This is meant to be used during shutdown to persist the snapshot without
|
||||||
// flattening everything down (bad for reorgs).
|
// flattening everything down (bad for reorgs).
|
||||||
func (dl *diffLayer) Journal() error {
|
func (dl *diffLayer) Journal() error {
|
||||||
dl.lock.RLock()
|
|
||||||
defer dl.lock.RUnlock()
|
|
||||||
|
|
||||||
writer, err := dl.journal()
|
writer, err := dl.journal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package snapshot
|
package snapshot
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
@ -105,12 +106,22 @@ func (dl *diffLayer) journal() (io.WriteCloser, error) {
|
|||||||
}
|
}
|
||||||
writer = file
|
writer = file
|
||||||
}
|
}
|
||||||
|
dl.lock.RLock()
|
||||||
|
defer dl.lock.RUnlock()
|
||||||
|
|
||||||
|
if dl.stale {
|
||||||
|
writer.Close()
|
||||||
|
return nil, ErrSnapshotStale
|
||||||
|
}
|
||||||
|
buf := bufio.NewWriter(writer)
|
||||||
// Everything below was journalled, persist this layer too
|
// Everything below was journalled, persist this layer too
|
||||||
if err := rlp.Encode(writer, dl.number); err != nil {
|
if err := rlp.Encode(buf, dl.number); err != nil {
|
||||||
|
buf.Flush()
|
||||||
writer.Close()
|
writer.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := rlp.Encode(writer, dl.root); err != nil {
|
if err := rlp.Encode(buf, dl.root); err != nil {
|
||||||
|
buf.Flush()
|
||||||
writer.Close()
|
writer.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -118,7 +129,8 @@ func (dl *diffLayer) journal() (io.WriteCloser, error) {
|
|||||||
for hash, blob := range dl.accountData {
|
for hash, blob := range dl.accountData {
|
||||||
accounts = append(accounts, journalAccount{Hash: hash, Blob: blob})
|
accounts = append(accounts, journalAccount{Hash: hash, Blob: blob})
|
||||||
}
|
}
|
||||||
if err := rlp.Encode(writer, accounts); err != nil {
|
if err := rlp.Encode(buf, accounts); err != nil {
|
||||||
|
buf.Flush()
|
||||||
writer.Close()
|
writer.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -132,9 +144,11 @@ func (dl *diffLayer) journal() (io.WriteCloser, error) {
|
|||||||
}
|
}
|
||||||
storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals})
|
storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals})
|
||||||
}
|
}
|
||||||
if err := rlp.Encode(writer, storage); err != nil {
|
if err := rlp.Encode(buf, storage); err != nil {
|
||||||
|
buf.Flush()
|
||||||
writer.Close()
|
writer.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
buf.Flush()
|
||||||
return writer, nil
|
return writer, nil
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -340,3 +342,46 @@ func BenchmarkFlatten(b *testing.B) {
|
|||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test writes ~324M of diff layers to disk, spread over
|
||||||
|
// - 128 individual layers,
|
||||||
|
// - each with 200 accounts
|
||||||
|
// - containing 200 slots
|
||||||
|
//
|
||||||
|
// BenchmarkJournal-6 1 1471373923 ns/ops
|
||||||
|
// BenchmarkJournal-6 1 1208083335 ns/op // bufio writer
|
||||||
|
func BenchmarkJournal(b *testing.B) {
|
||||||
|
fill := func(parent snapshot, blocknum int) *diffLayer {
|
||||||
|
accounts := make(map[common.Hash][]byte)
|
||||||
|
storage := make(map[common.Hash]map[common.Hash][]byte)
|
||||||
|
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
accountKey := randomHash()
|
||||||
|
accounts[accountKey] = randomAccount()
|
||||||
|
|
||||||
|
accStorage := make(map[common.Hash][]byte)
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
value := make([]byte, 32)
|
||||||
|
rand.Read(value)
|
||||||
|
accStorage[randomHash()] = value
|
||||||
|
|
||||||
|
}
|
||||||
|
storage[accountKey] = accStorage
|
||||||
|
}
|
||||||
|
return newDiffLayer(parent, uint64(blocknum), common.Hash{}, accounts, storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
var layer snapshot
|
||||||
|
layer = &diskLayer{
|
||||||
|
journal: path.Join(os.TempDir(), "difflayer_journal.tmp"),
|
||||||
|
}
|
||||||
|
for i := 1; i < 128; i++ {
|
||||||
|
layer = fill(layer, i)
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
f, _ := layer.(*diffLayer).journal()
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -160,13 +160,15 @@ func (st *SnapshotTree) Update(blockRoot common.Hash, parentRoot common.Hash, ac
|
|||||||
// are flattened downwards.
|
// are flattened downwards.
|
||||||
func (st *SnapshotTree) Cap(blockRoot common.Hash, layers int, memory uint64) error {
|
func (st *SnapshotTree) Cap(blockRoot common.Hash, layers int, memory uint64) error {
|
||||||
// Retrieve the head snapshot to cap from
|
// Retrieve the head snapshot to cap from
|
||||||
snap := st.Snapshot(blockRoot).(snapshot)
|
var snap snapshot
|
||||||
if snap == nil {
|
if s := st.Snapshot(blockRoot); s == nil {
|
||||||
return fmt.Errorf("snapshot [%#x] missing", blockRoot)
|
return fmt.Errorf("snapshot [%#x] missing", blockRoot)
|
||||||
|
} else {
|
||||||
|
snap = s.(snapshot)
|
||||||
}
|
}
|
||||||
diff, ok := snap.(*diffLayer)
|
diff, ok := snap.(*diffLayer)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("snapshot [%#x] is base layer", blockRoot)
|
return fmt.Errorf("snapshot [%#x] is disk layer", blockRoot)
|
||||||
}
|
}
|
||||||
// Run the internal capping and discard all stale layers
|
// Run the internal capping and discard all stale layers
|
||||||
st.lock.Lock()
|
st.lock.Lock()
|
||||||
@ -228,13 +230,14 @@ func (st *SnapshotTree) Cap(blockRoot common.Hash, layers int, memory uint64) er
|
|||||||
// block numbers for the disk layer and first diff layer are returned for GC.
|
// block numbers for the disk layer and first diff layer are returned for GC.
|
||||||
func (st *SnapshotTree) cap(diff *diffLayer, layers int, memory uint64) (uint64, uint64) {
|
func (st *SnapshotTree) cap(diff *diffLayer, layers int, memory uint64) (uint64, uint64) {
|
||||||
// Dive until we run out of layers or reach the persistent database
|
// Dive until we run out of layers or reach the persistent database
|
||||||
if layers > 2 {
|
for ; layers > 2; layers-- {
|
||||||
// If we still have diff layers below, recurse
|
// If we still have diff layers below, continue down
|
||||||
if parent, ok := diff.parent.(*diffLayer); ok {
|
if parent, ok := diff.parent.(*diffLayer); ok {
|
||||||
return st.cap(parent, layers-1, memory)
|
diff = parent
|
||||||
|
} else {
|
||||||
|
// Diff stack too shallow, return block numbers without modifications
|
||||||
|
return diff.parent.(*diskLayer).number, diff.number
|
||||||
}
|
}
|
||||||
// Diff stack too shallow, return block numbers without modifications
|
|
||||||
return diff.parent.(*diskLayer).number, diff.number
|
|
||||||
}
|
}
|
||||||
// We're out of layers, flatten anything below, stopping if it's the disk or if
|
// We're out of layers, flatten anything below, stopping if it's the disk or if
|
||||||
// the memory limit is not yet exceeded.
|
// the memory limit is not yet exceeded.
|
||||||
@ -356,9 +359,11 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
|
|||||||
// flattening everything down (bad for reorgs).
|
// flattening everything down (bad for reorgs).
|
||||||
func (st *SnapshotTree) Journal(blockRoot common.Hash) error {
|
func (st *SnapshotTree) Journal(blockRoot common.Hash) error {
|
||||||
// Retrieve the head snapshot to journal from
|
// Retrieve the head snapshot to journal from
|
||||||
snap := st.Snapshot(blockRoot).(snapshot)
|
var snap snapshot
|
||||||
if snap == nil {
|
if s := st.Snapshot(blockRoot); s == nil {
|
||||||
return fmt.Errorf("snapshot [%#x] missing", blockRoot)
|
return fmt.Errorf("snapshot [%#x] missing", blockRoot)
|
||||||
|
} else {
|
||||||
|
snap = s.(snapshot)
|
||||||
}
|
}
|
||||||
// Run the journaling
|
// Run the journaling
|
||||||
st.lock.Lock()
|
st.lock.Lock()
|
||||||
|
@ -205,6 +205,15 @@ func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ref := snaps.Snapshot(common.HexToHash("0x02"))
|
ref := snaps.Snapshot(common.HexToHash("0x02"))
|
||||||
|
|
||||||
|
// Doing a Cap operation with many allowed layers should be a no-op
|
||||||
|
exp := len(snaps.layers)
|
||||||
|
if err := snaps.Cap(common.HexToHash("0x04"), 2000, 1024*1024); err != nil {
|
||||||
|
t.Fatalf("failed to flatten diff layer into accumulator: %v", err)
|
||||||
|
}
|
||||||
|
if got := len(snaps.layers); got != exp {
|
||||||
|
t.Errorf("layers modified, got %d exp %d", got, exp)
|
||||||
|
}
|
||||||
|
|
||||||
// Flatten the diff layer into the bottom accumulator
|
// Flatten the diff layer into the bottom accumulator
|
||||||
if err := snaps.Cap(common.HexToHash("0x04"), 2, 1024*1024); err != nil {
|
if err := snaps.Cap(common.HexToHash("0x04"), 2, 1024*1024); err != nil {
|
||||||
t.Fatalf("failed to flatten diff layer into accumulator: %v", err)
|
t.Fatalf("failed to flatten diff layer into accumulator: %v", err)
|
||||||
@ -277,6 +286,10 @@ func TestPostCapBasicDataAccess(t *testing.T) {
|
|||||||
if err := checkExist(snap, "0xb3"); err != nil {
|
if err := checkExist(snap, "0xb3"); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
// Cap to a bad root should fail
|
||||||
|
if err := snaps.Cap(common.HexToHash("0x1337"), 0, 1024); err == nil {
|
||||||
|
t.Errorf("expected error, got none")
|
||||||
|
}
|
||||||
// Now, merge the a-chain
|
// Now, merge the a-chain
|
||||||
snaps.Cap(common.HexToHash("0xa3"), 0, 1024)
|
snaps.Cap(common.HexToHash("0xa3"), 0, 1024)
|
||||||
|
|
||||||
@ -300,4 +313,9 @@ func TestPostCapBasicDataAccess(t *testing.T) {
|
|||||||
if err := shouldErr(snap, "0xa3"); err != nil {
|
if err := shouldErr(snap, "0xa3"); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
// Now, merge it again, just for fun. It should now error, since a3
|
||||||
|
// is a disk layer
|
||||||
|
if err := snaps.Cap(common.HexToHash("0xa3"), 0, 1024); err == nil {
|
||||||
|
t.Error("expected error capping the disk layer, got none")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user