feat(store/v2): remove the pruning manager (#19411)
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
parent
92eb6de6e3
commit
8fb9ca87b2
@ -14,14 +14,14 @@ import (
|
||||
|
||||
func TestCommitterSuite(t *testing.T) {
|
||||
s := &commitment.CommitStoreTestSuite{
|
||||
NewStore: func(db store.RawDB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
|
||||
NewStore: func(db store.RawDB, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
|
||||
multiTrees := make(map[string]commitment.Tree)
|
||||
cfg := DefaultConfig()
|
||||
for _, storeKey := range storeKeys {
|
||||
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
|
||||
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
|
||||
}
|
||||
return commitment.NewCommitStore(multiTrees, db, logger)
|
||||
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@ -36,14 +36,22 @@ type CommitStore struct {
|
||||
logger log.Logger
|
||||
db store.RawDB
|
||||
multiTrees map[string]Tree
|
||||
|
||||
// pruneOptions is the pruning configuration.
|
||||
pruneOptions *store.PruneOptions
|
||||
}
|
||||
|
||||
// NewCommitStore creates a new CommitStore instance.
|
||||
func NewCommitStore(multiTrees map[string]Tree, db store.RawDB, logger log.Logger) (*CommitStore, error) {
|
||||
func NewCommitStore(multiTrees map[string]Tree, db store.RawDB, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
|
||||
if pruneOpts == nil {
|
||||
pruneOpts = store.DefaultPruneOptions()
|
||||
}
|
||||
|
||||
return &CommitStore{
|
||||
logger: logger,
|
||||
db: db,
|
||||
multiTrees: multiTrees,
|
||||
logger: logger,
|
||||
db: db,
|
||||
multiTrees: multiTrees,
|
||||
pruneOptions: pruneOpts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -216,6 +224,13 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prune the old versions.
|
||||
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
|
||||
if err := c.Prune(pruneVersion); err != nil {
|
||||
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return cInfo, nil
|
||||
}
|
||||
|
||||
|
||||
@ -23,12 +23,12 @@ const (
|
||||
type CommitStoreTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
NewStore func(db store.RawDB, storeKeys []string, logger log.Logger) (*CommitStore, error)
|
||||
NewStore func(db store.RawDB, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
|
||||
}
|
||||
|
||||
func (s *CommitStoreTestSuite) TestSnapshotter() {
|
||||
func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
|
||||
storeKeys := []string{storeKey1, storeKey2}
|
||||
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
|
||||
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
|
||||
s.Require().NoError(err)
|
||||
|
||||
latestVersion := uint64(10)
|
||||
@ -62,7 +62,7 @@ func (s *CommitStoreTestSuite) TestSnapshotter() {
|
||||
},
|
||||
}
|
||||
|
||||
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
|
||||
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
|
||||
s.Require().NoError(err)
|
||||
|
||||
chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
|
||||
@ -118,3 +118,42 @@ func (s *CommitStoreTestSuite) TestSnapshotter() {
|
||||
s.Require().True(matched)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CommitStoreTestSuite) TestStore_Pruning() {
|
||||
storeKeys := []string{storeKey1, storeKey2}
|
||||
pruneOpts := &store.PruneOptions{
|
||||
KeepRecent: 10,
|
||||
Interval: 5,
|
||||
}
|
||||
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
|
||||
s.Require().NoError(err)
|
||||
|
||||
latestVersion := uint64(100)
|
||||
kvCount := 10
|
||||
for i := uint64(1); i <= latestVersion; i++ {
|
||||
kvPairs := make(map[string]store.KVPairs)
|
||||
for _, storeKey := range storeKeys {
|
||||
kvPairs[storeKey] = store.KVPairs{}
|
||||
for j := 0; j < kvCount; j++ {
|
||||
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
|
||||
value := []byte(fmt.Sprintf("value-%d-%d", i, j))
|
||||
kvPairs[storeKey] = append(kvPairs[storeKey], store.KVPair{Key: key, Value: value})
|
||||
}
|
||||
}
|
||||
s.Require().NoError(commitStore.WriteBatch(store.NewChangesetWithPairs(kvPairs)))
|
||||
|
||||
_, err = commitStore.Commit(i)
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
pruneVersion := latestVersion - pruneOpts.KeepRecent - 1
|
||||
// check the store
|
||||
for i := uint64(1); i <= latestVersion; i++ {
|
||||
commitInfo, _ := commitStore.GetCommitInfo(i)
|
||||
if i <= pruneVersion {
|
||||
s.Require().Nil(commitInfo)
|
||||
} else {
|
||||
s.Require().NotNil(commitInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,10 +148,3 @@ type RawDB interface {
|
||||
// This will does the same thing as NewBatch if the batch implementation doesn't support pre-allocation.
|
||||
NewBatchWithSize(int) RawBatch
|
||||
}
|
||||
|
||||
type (
|
||||
// Options defines the interface of a database options.
|
||||
Options interface {
|
||||
Get(string) interface{}
|
||||
}
|
||||
)
|
||||
|
||||
@ -25,7 +25,7 @@ type GoLevelDB struct {
|
||||
|
||||
var _ store.RawDB = (*GoLevelDB)(nil)
|
||||
|
||||
func NewGoLevelDB(name, dir string, opts store.Options) (*GoLevelDB, error) {
|
||||
func NewGoLevelDB(name, dir string, opts store.DBOptions) (*GoLevelDB, error) {
|
||||
defaultOpts := &opt.Options{
|
||||
Filter: filter.NewBloomFilter(10), // by default, goleveldb doesn't use a bloom filter.
|
||||
}
|
||||
|
||||
@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
|
||||
}
|
||||
|
||||
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
|
||||
commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshotsStore, err := snapshots.NewStore(db, t.TempDir())
|
||||
@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
|
||||
storageDB, err := pebbledb.New(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
newStorageStore := storage.NewStorageStore(storageDB) // for store/v2
|
||||
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
|
||||
|
||||
db1 := dbm.NewMemDB()
|
||||
multiTrees1 := make(map[string]commitment.Tree)
|
||||
@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
|
||||
}
|
||||
|
||||
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
|
||||
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
|
||||
|
||||
44
store/options.go
Normal file
44
store/options.go
Normal file
@ -0,0 +1,44 @@
|
||||
package store
|
||||
|
||||
// PruneOptions defines the pruning configuration.
|
||||
type PruneOptions struct {
|
||||
// KeepRecent sets the number of recent versions to keep.
|
||||
KeepRecent uint64
|
||||
|
||||
// Interval sets the number of how often to prune.
|
||||
// If set to 0, no pruning will be done.
|
||||
Interval uint64
|
||||
}
|
||||
|
||||
// DefaultPruneOptions returns the default pruning options.
|
||||
// Interval is set to 0, which means no pruning will be done.
|
||||
func DefaultPruneOptions() *PruneOptions {
|
||||
return &PruneOptions{
|
||||
KeepRecent: 0,
|
||||
Interval: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// ShouldPrune returns true if the given version should be pruned.
|
||||
// If true, it also returns the version to prune up to.
|
||||
// NOTE: The current version is not pruned.
|
||||
func (opts *PruneOptions) ShouldPrune(version uint64) (bool, uint64) {
|
||||
if opts.Interval == 0 {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
if version <= opts.KeepRecent {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
if version%opts.Interval == 0 {
|
||||
return true, version - opts.KeepRecent - 1
|
||||
}
|
||||
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// DBOptions defines the interface of a database options.
|
||||
type DBOptions interface {
|
||||
Get(string) interface{}
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
# Pruning
|
||||
|
||||
## Overview
|
||||
|
||||
Pruning is the mechanism for deleting old versions data from both state storage and commitment. The pruning operation is triggered periodically.
|
||||
|
||||
## Pruning Options
|
||||
|
||||
Generally, there are three configurable parameters for pruning options:
|
||||
|
||||
- `pruning-keep-recent`: the number of recent versions to keep.
|
||||
- `pruning-interval`: the interval between two pruning operations.
|
||||
- `pruning-sync`: the flag to sync/async the pruning operation.
|
||||
|
||||
Different options will be applied to the state storage and commitment. The pruning option have an effect on the snapshot operation, but it will not manage the conflict resolution in SDK, it is the responsibility of the dedicated backend.
|
||||
@ -1,166 +0,0 @@
|
||||
package pruning
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
)
|
||||
|
||||
// Manager is an abstraction to handle pruning of SS and SC backends.
|
||||
type Manager struct {
|
||||
mtx sync.Mutex
|
||||
isStarted bool
|
||||
|
||||
stateStorage store.VersionedDatabase
|
||||
stateCommitment store.Committer
|
||||
|
||||
logger log.Logger
|
||||
storageOpts Options
|
||||
commitmentOpts Options
|
||||
|
||||
chStorage chan struct{}
|
||||
chCommitment chan struct{}
|
||||
}
|
||||
|
||||
// NewManager creates a new Manager instance.
|
||||
func NewManager(
|
||||
logger log.Logger,
|
||||
ss store.VersionedDatabase,
|
||||
sc store.Committer,
|
||||
) *Manager {
|
||||
return &Manager{
|
||||
stateStorage: ss,
|
||||
stateCommitment: sc,
|
||||
logger: logger,
|
||||
storageOpts: DefaultOptions(),
|
||||
commitmentOpts: DefaultOptions(),
|
||||
}
|
||||
}
|
||||
|
||||
// SetStorageOptions sets the state storage options.
|
||||
func (m *Manager) SetStorageOptions(opts Options) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.storageOpts = opts
|
||||
}
|
||||
|
||||
// SetCommitmentOptions sets the state commitment options.
|
||||
func (m *Manager) SetCommitmentOptions(opts Options) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.commitmentOpts = opts
|
||||
}
|
||||
|
||||
// Start starts the manager.
|
||||
func (m *Manager) Start() {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.isStarted {
|
||||
return
|
||||
}
|
||||
m.isStarted = true
|
||||
|
||||
if !m.storageOpts.Sync {
|
||||
m.chStorage = make(chan struct{}, 1)
|
||||
m.chStorage <- struct{}{}
|
||||
}
|
||||
if !m.commitmentOpts.Sync {
|
||||
m.chCommitment = make(chan struct{}, 1)
|
||||
m.chCommitment <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the manager and waits for all goroutines to finish.
|
||||
func (m *Manager) Stop() {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if !m.isStarted {
|
||||
return
|
||||
}
|
||||
m.isStarted = false
|
||||
|
||||
if !m.storageOpts.Sync {
|
||||
<-m.chStorage
|
||||
close(m.chStorage)
|
||||
}
|
||||
if !m.commitmentOpts.Sync {
|
||||
<-m.chCommitment
|
||||
close(m.chCommitment)
|
||||
}
|
||||
}
|
||||
|
||||
// Prune prunes the state storage and state commitment.
|
||||
// It will check the pruning conditions and prune if necessary.
|
||||
func (m *Manager) Prune(height uint64) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if !m.isStarted {
|
||||
return
|
||||
}
|
||||
|
||||
// storage pruning
|
||||
if m.storageOpts.Interval > 0 && height > m.storageOpts.KeepRecent && height%m.storageOpts.Interval == 0 {
|
||||
pruneHeight := height - m.storageOpts.KeepRecent - 1
|
||||
if m.storageOpts.Sync {
|
||||
m.pruneStorage(pruneHeight)
|
||||
} else {
|
||||
// it will not block if the previous pruning is still running
|
||||
select {
|
||||
case _, stillOpen := <-m.chStorage:
|
||||
if stillOpen {
|
||||
go func() {
|
||||
m.pruneStorage(pruneHeight)
|
||||
m.chStorage <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
default:
|
||||
m.logger.Debug("storage pruning is still running; skipping", "version", pruneHeight)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// commitment pruning
|
||||
if m.commitmentOpts.Interval > 0 && height > m.commitmentOpts.KeepRecent && height%m.commitmentOpts.Interval == 0 {
|
||||
pruneHeight := height - m.commitmentOpts.KeepRecent - 1
|
||||
if m.commitmentOpts.Sync {
|
||||
m.pruneCommitment(pruneHeight)
|
||||
} else {
|
||||
// it will not block if the previous pruning is still running
|
||||
select {
|
||||
case _, stillOpen := <-m.chCommitment:
|
||||
if stillOpen {
|
||||
go func() {
|
||||
m.pruneCommitment(pruneHeight)
|
||||
m.chCommitment <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
default:
|
||||
m.logger.Debug("commitment pruning is still running; skipping", "version", pruneHeight)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) pruneStorage(height uint64) {
|
||||
m.logger.Debug("pruning state storage", "height", height)
|
||||
|
||||
if err := m.stateStorage.Prune(height); err != nil {
|
||||
m.logger.Error("failed to prune state storage", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) pruneCommitment(height uint64) {
|
||||
m.logger.Debug("pruning state commitment", "height", height)
|
||||
|
||||
if err := m.stateCommitment.Prune(height); err != nil {
|
||||
m.logger.Error("failed to prune state commitment", "err", err)
|
||||
}
|
||||
}
|
||||
@ -1,105 +0,0 @@
|
||||
package pruning
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/commitment"
|
||||
"cosmossdk.io/store/v2/commitment/iavl"
|
||||
dbm "cosmossdk.io/store/v2/db"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
"cosmossdk.io/store/v2/storage/sqlite"
|
||||
)
|
||||
|
||||
const defaultStoreKey = "default"
|
||||
|
||||
type PruningTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
manager *Manager
|
||||
ss store.VersionedDatabase
|
||||
sc store.Committer
|
||||
}
|
||||
|
||||
func TestPruningTestSuite(t *testing.T) {
|
||||
suite.Run(t, &PruningTestSuite{})
|
||||
}
|
||||
|
||||
func (s *PruningTestSuite) SetupTest() {
|
||||
logger := log.NewNopLogger()
|
||||
if testing.Verbose() {
|
||||
logger = log.NewTestLogger(s.T())
|
||||
}
|
||||
|
||||
sqliteDB, err := sqlite.New(s.T().TempDir())
|
||||
s.Require().NoError(err)
|
||||
ss := storage.NewStorageStore(sqliteDB)
|
||||
|
||||
tree := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), iavl.DefaultConfig())
|
||||
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, dbm.NewMemDB(), logger)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.manager = NewManager(logger, ss, sc)
|
||||
s.ss = ss
|
||||
s.sc = sc
|
||||
}
|
||||
|
||||
func (s *PruningTestSuite) TearDownTest() {
|
||||
s.manager.Start()
|
||||
s.manager.Stop()
|
||||
}
|
||||
|
||||
func (s *PruningTestSuite) TestPruning() {
|
||||
s.manager.SetCommitmentOptions(Options{4, 2, true})
|
||||
s.manager.SetStorageOptions(Options{3, 3, true})
|
||||
s.manager.Start()
|
||||
|
||||
latestVersion := uint64(100)
|
||||
|
||||
// write batches
|
||||
for i := uint64(0); i < latestVersion; i++ {
|
||||
version := i + 1
|
||||
|
||||
cs := store.NewChangesetWithPairs(map[string]store.KVPairs{defaultStoreKey: {}})
|
||||
cs.AddKVPair(defaultStoreKey, store.KVPair{
|
||||
Key: []byte("key"),
|
||||
Value: []byte(fmt.Sprintf("value%d", version)),
|
||||
})
|
||||
err := s.sc.WriteBatch(cs)
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = s.sc.Commit(version)
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = s.ss.ApplyChangeset(version, cs)
|
||||
s.Require().NoError(err)
|
||||
s.manager.Prune(version)
|
||||
}
|
||||
|
||||
// wait for pruning to finish
|
||||
s.manager.Stop()
|
||||
|
||||
// check the store for the version 96
|
||||
val, err := s.ss.Get(defaultStoreKey, latestVersion-4, []byte("key"))
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal([]byte("value96"), val)
|
||||
|
||||
// check the store for the version 50
|
||||
val, err = s.ss.Get(defaultStoreKey, 50, []byte("key"))
|
||||
s.Require().Error(err)
|
||||
s.Require().Nil(val)
|
||||
|
||||
// check the commitment for the version 96
|
||||
proofOps, err := s.sc.GetProof(defaultStoreKey, latestVersion-4, []byte("key"))
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(proofOps, 2)
|
||||
|
||||
// check the commitment for the version 95
|
||||
proofOps, err = s.sc.GetProof(defaultStoreKey, latestVersion-5, []byte("key"))
|
||||
s.Require().Error(err)
|
||||
s.Require().Nil(proofOps)
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package pruning
|
||||
|
||||
// Options defines the pruning configuration.
|
||||
type Options struct {
|
||||
// KeepRecent sets the number of recent versions to keep.
|
||||
KeepRecent uint64
|
||||
|
||||
// Interval sets the number of how often to prune.
|
||||
// If set to 0, no pruning will be done.
|
||||
Interval uint64
|
||||
|
||||
// Sync when set to true ensure that pruning will be performed
|
||||
// synchronously, otherwise by default it will be done asynchronously.
|
||||
Sync bool
|
||||
}
|
||||
|
||||
// DefaultOptions returns the default pruning options.
|
||||
// Interval is set to 0, which means no pruning will be done.
|
||||
func DefaultOptions() Options {
|
||||
return Options{
|
||||
KeepRecent: 0,
|
||||
Interval: 0,
|
||||
Sync: false,
|
||||
}
|
||||
}
|
||||
@ -14,7 +14,6 @@ import (
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/metrics"
|
||||
"cosmossdk.io/store/v2/proof"
|
||||
"cosmossdk.io/store/v2/pruning"
|
||||
)
|
||||
|
||||
var _ store.RootStore = (*Store)(nil)
|
||||
@ -42,9 +41,6 @@ type Store struct {
|
||||
// workingHash defines the current (yet to be committed) hash
|
||||
workingHash []byte
|
||||
|
||||
// pruningManager manages pruning of the SS and SC backends
|
||||
pruningManager *pruning.Manager
|
||||
|
||||
// telemetry reflects a telemetry agent responsible for emitting metrics (if any)
|
||||
telemetry metrics.StoreMetrics
|
||||
}
|
||||
@ -53,20 +49,13 @@ func New(
|
||||
logger log.Logger,
|
||||
ss store.VersionedDatabase,
|
||||
sc store.Committer,
|
||||
ssOpts, scOpts pruning.Options,
|
||||
m metrics.StoreMetrics,
|
||||
) (store.RootStore, error) {
|
||||
pruningManager := pruning.NewManager(logger, ss, sc)
|
||||
pruningManager.SetStorageOptions(ssOpts)
|
||||
pruningManager.SetCommitmentOptions(scOpts)
|
||||
pruningManager.Start()
|
||||
|
||||
return &Store{
|
||||
logger: logger.With("module", "root_store"),
|
||||
initialVersion: 1,
|
||||
stateStore: ss,
|
||||
stateCommitment: sc,
|
||||
pruningManager: pruningManager,
|
||||
telemetry: m,
|
||||
}, nil
|
||||
}
|
||||
@ -82,8 +71,6 @@ func (s *Store) Close() (err error) {
|
||||
s.lastCommitInfo = nil
|
||||
s.commitHeader = nil
|
||||
|
||||
s.pruningManager.Stop()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -172,7 +159,7 @@ func (s *Store) GetLatestVersion() (uint64, error) {
|
||||
func (s *Store) Query(storeKey string, version uint64, key []byte, prove bool) (store.QueryResult, error) {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
s.telemetry.MeasureSince(now, "root_store", "query")
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "query")
|
||||
}
|
||||
|
||||
val, err := s.stateStore.Get(storeKey, version, key)
|
||||
@ -214,7 +201,7 @@ func (s *Store) Query(storeKey string, version uint64, key []byte, prove bool) (
|
||||
func (s *Store) LoadLatestVersion() error {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
s.telemetry.MeasureSince(now, "root_store", "load_latest_version")
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "load_latest_version")
|
||||
}
|
||||
|
||||
lv, err := s.GetLatestVersion()
|
||||
@ -228,7 +215,7 @@ func (s *Store) LoadLatestVersion() error {
|
||||
func (s *Store) LoadVersion(version uint64) error {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
s.telemetry.MeasureSince(now, "root_store", "load_version")
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "load_version")
|
||||
}
|
||||
|
||||
return s.loadVersion(version)
|
||||
@ -264,7 +251,7 @@ func (s *Store) SetCommitHeader(h *coreheader.Info) {
|
||||
func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
s.telemetry.MeasureSince(now, "root_store", "working_hash")
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "working_hash")
|
||||
}
|
||||
|
||||
if s.workingHash == nil {
|
||||
@ -286,7 +273,7 @@ func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) {
|
||||
func (s *Store) Commit(cs *store.Changeset) ([]byte, error) {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
s.telemetry.MeasureSince(now, "root_store", "commit")
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "commit")
|
||||
}
|
||||
|
||||
if s.workingHash == nil {
|
||||
@ -329,12 +316,27 @@ func (s *Store) Commit(cs *store.Changeset) ([]byte, error) {
|
||||
|
||||
s.workingHash = nil
|
||||
|
||||
// prune SS and SC
|
||||
s.pruningManager.Prune(version)
|
||||
|
||||
return s.lastCommitInfo.Hash(), nil
|
||||
}
|
||||
|
||||
// Prune prunes the root store to the provided version.
|
||||
func (s *Store) Prune(version uint64) error {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "prune")
|
||||
}
|
||||
|
||||
if err := s.stateStore.Prune(version); err != nil {
|
||||
return fmt.Errorf("failed to prune SS store: %w", err)
|
||||
}
|
||||
|
||||
if err := s.stateCommitment.Prune(version); err != nil {
|
||||
return fmt.Errorf("failed to prune SC store: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeSC accepts a Changeset and writes that as a batch to the underlying SC
|
||||
// tree, which allows us to retrieve the working hash of the SC tree. Finally,
|
||||
// we construct a *CommitInfo and set that as lastCommitInfo. Note, this should
|
||||
|
||||
@ -12,7 +12,6 @@ import (
|
||||
"cosmossdk.io/store/v2/commitment"
|
||||
"cosmossdk.io/store/v2/commitment/iavl"
|
||||
dbm "cosmossdk.io/store/v2/db"
|
||||
"cosmossdk.io/store/v2/pruning"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
"cosmossdk.io/store/v2/storage/sqlite"
|
||||
)
|
||||
@ -38,15 +37,15 @@ func (s *RootStoreTestSuite) SetupTest() {
|
||||
|
||||
sqliteDB, err := sqlite.New(s.T().TempDir())
|
||||
s.Require().NoError(err)
|
||||
ss := storage.NewStorageStore(sqliteDB)
|
||||
ss := storage.NewStorageStore(sqliteDB, nil, noopLog)
|
||||
|
||||
tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
|
||||
tree2 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
|
||||
tree3 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
|
||||
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), noopLog)
|
||||
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog)
|
||||
s.Require().NoError(err)
|
||||
|
||||
rs, err := New(noopLog, ss, sc, pruning.DefaultOptions(), pruning.DefaultOptions(), nil)
|
||||
rs, err := New(noopLog, ss, sc, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.rootStore = rs
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
)
|
||||
@ -19,7 +20,7 @@ func TestStorageTestSuite(t *testing.T) {
|
||||
db.SetSync(false)
|
||||
}
|
||||
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
EmptyBatchSize: 12,
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
)
|
||||
@ -22,7 +23,7 @@ func TestStorageTestSuite(t *testing.T) {
|
||||
s := &storage.StorageTestSuite{
|
||||
NewDB: func(dir string) (store.VersionedDatabase, error) {
|
||||
db, err := New(dir)
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
EmptyBatchSize: 12,
|
||||
}
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
)
|
||||
@ -20,7 +21,7 @@ func TestStorageTestSuite(t *testing.T) {
|
||||
s := &storage.StorageTestSuite{
|
||||
NewDB: func(dir string) (store.VersionedDatabase, error) {
|
||||
db, err := New(dir)
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
EmptyBatchSize: 0,
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
"cosmossdk.io/store/v2/storage/pebbledb"
|
||||
@ -27,15 +28,15 @@ var (
|
||||
backends = map[string]func(dataDir string) (store.VersionedDatabase, error){
|
||||
"rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) {
|
||||
db, err := rocksdb.New(dataDir)
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
"pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) {
|
||||
db, err := pebbledb.New(dataDir)
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
"btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) {
|
||||
db, err := sqlite.New(dataDir)
|
||||
return storage.NewStorageStore(db), err
|
||||
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
|
||||
},
|
||||
}
|
||||
rng = rand.New(rand.NewSource(567320))
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
corestore "cosmossdk.io/core/store"
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
)
|
||||
@ -20,13 +21,23 @@ var (
|
||||
|
||||
// StorageStore is a wrapper around the store.VersionedDatabase interface.
|
||||
type StorageStore struct {
|
||||
db Database
|
||||
logger log.Logger
|
||||
db Database
|
||||
|
||||
// pruneOptions defines the pruning configuration.
|
||||
pruneOptions *store.PruneOptions
|
||||
}
|
||||
|
||||
// NewStorageStore returns a reference to a new StorageStore.
|
||||
func NewStorageStore(db Database) *StorageStore {
|
||||
func NewStorageStore(db Database, pruneOpts *store.PruneOptions, logger log.Logger) *StorageStore {
|
||||
if pruneOpts == nil {
|
||||
pruneOpts = store.DefaultPruneOptions()
|
||||
}
|
||||
|
||||
return &StorageStore{
|
||||
db: db,
|
||||
logger: logger,
|
||||
db: db,
|
||||
pruneOptions: pruneOpts,
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,7 +72,17 @@ func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) erro
|
||||
}
|
||||
}
|
||||
|
||||
return b.Write()
|
||||
if err := b.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if prune, pruneVersion := ss.pruneOptions.ShouldPrune(version); prune {
|
||||
if err := ss.Prune(pruneVersion); err != nil {
|
||||
ss.logger.Info("failed to prune SS", "prune_version", pruneVersion, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLatestVersion returns the latest version of the store.
|
||||
|
||||
@ -68,6 +68,10 @@ type RootStore interface {
|
||||
// LastCommitID returns a CommitID pertaining to the last commitment.
|
||||
LastCommitID() (proof.CommitID, error)
|
||||
|
||||
// Prune prunes the RootStore to the provided version. It is used to remove
|
||||
// old versions of the RootStore by the CLI.
|
||||
Prune(version uint64) error
|
||||
|
||||
// SetMetrics sets the telemetry handler on the RootStore.
|
||||
SetMetrics(m metrics.Metrics)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user