feat(store/v2): pruning manager (#20430)

This commit is contained in:
cool-developer 2024-05-29 09:53:47 -04:00 committed by GitHub
parent c7dc9156c3
commit b8c8482903
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 436 additions and 134 deletions

View File

@ -8,11 +8,15 @@ import (
"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)
var _ commitment.Tree = (*IavlTree)(nil)
var (
_ commitment.Tree = (*IavlTree)(nil)
_ store.PausablePruner = (*IavlTree)(nil)
)
// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
@ -21,7 +25,7 @@ type IavlTree struct {
// NewIavlTree creates a new IavlTree instance.
func NewIavlTree(db corestore.KVStoreWithBatch, logger log.Logger, cfg *Config) *IavlTree {
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger)
tree := iavl.NewMutableTree(dbm.NewWrapper(db), cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger, iavl.AsyncPruningOption(true))
return &IavlTree{
tree: tree,
}
@ -98,6 +102,15 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}
// PausePruning pauses the pruning process.
func (t *IavlTree) PausePruning(pause bool) {
if pause {
t.tree.SetCommitting()
} else {
t.tree.UnsetCommitting()
}
}
// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))

View File

@ -2,27 +2,27 @@ package iavl
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)
func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
return commitment.NewCommitStore(multiTrees, db, logger)
},
}
@ -100,8 +100,14 @@ func TestIavlTree(t *testing.T) {
err = tree.Prune(1)
require.NoError(t, err)
require.Equal(t, uint64(3), tree.GetLatestVersion())
err = tree.LoadVersion(1)
require.Error(t, err)
// async pruning check
checkErr := func() bool {
if _, err := tree.tree.LoadVersion(1); err != nil {
return true
}
return false
}
require.Eventually(t, checkErr, 2*time.Second, 100*time.Millisecond)
// load version 2
err = tree.LoadVersion(2)

View File

@ -28,6 +28,7 @@ const (
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
_ store.PausablePruner = (*CommitStore)(nil)
)
// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
@ -39,26 +40,18 @@ type CommitStore struct {
logger log.Logger
db corestore.KVStoreWithBatch
multiTrees map[string]Tree
// pruneOptions is the pruning configuration.
pruneOptions *store.PruneOptions // TODO are there no default prune options?
}
// NewCommitStore creates a new CommitStore instance.
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) {
return &CommitStore{
logger: logger,
db: db,
multiTrees: trees,
pruneOptions: pruneOpts,
logger: logger,
db: db,
multiTrees: trees,
}, nil
}
func (c *CommitStore) WriteBatch(cs *corestore.Changeset) error {
func (c *CommitStore) WriteChangeset(cs *corestore.Changeset) error {
for _, pairs := range cs.Changes {
key := conv.UnsafeBytesToStr(pairs.Actor)
@ -237,13 +230,6 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
return nil, err
}
// Prune the old versions.
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
if err := c.Prune(pruneVersion); err != nil {
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
}
}
return cInfo, nil
}
@ -297,6 +283,7 @@ func (c *CommitStore) Get(storeKey []byte, version uint64, key []byte) ([]byte,
return bz, nil
}
// Prune implements store.Pruner.
func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
batch := c.db.NewBatch()
@ -322,6 +309,15 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
return ferr
}
// PausePruning implements store.PausablePruner.
func (c *CommitStore) PausePruning(pause bool) {
for _, tree := range c.multiTrees {
if pruner, ok := tree.(store.PausablePruner); ok {
pruner.PausePruning(pause)
}
}
}
// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {

View File

@ -25,12 +25,12 @@ const (
type CommitStoreTestSuite struct {
suite.Suite
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error)
}
func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)
latestVersion := uint64(10)
@ -45,7 +45,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))
_, err = commitStore.Commit(i)
s.Require().NoError(err)
@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
},
}
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)
chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
KeepRecent: 10,
Interval: 5,
}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)
latestVersion := uint64(100)
@ -144,10 +144,15 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
kvPairs[storeKey] = append(kvPairs[storeKey], corestore.KVPair{Key: key, Value: value})
}
}
s.Require().NoError(commitStore.WriteBatch(corestore.NewChangesetWithPairs(kvPairs)))
s.Require().NoError(commitStore.WriteChangeset(corestore.NewChangesetWithPairs(kvPairs)))
_, err = commitStore.Commit(i)
s.Require().NoError(err)
if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune {
s.Require().NoError(commitStore.Prune(pruneVersion))
}
}
pruneVersion := latestVersion - pruneOpts.KeepRecent - 1

View File

@ -20,11 +20,6 @@ type VersionedDatabase interface {
ApplyChangeset(version uint64, cs *corestore.Changeset) error
// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error
// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
@ -32,8 +27,8 @@ type VersionedDatabase interface {
// Committer defines an API for committing state.
type Committer interface {
// WriteBatch writes a batch of key-value pairs to the tree.
WriteBatch(cs *corestore.Changeset) error
// WriteChangeset writes the changeset to the commitment state.
WriteChangeset(cs *corestore.Changeset) error
// WorkingCommitInfo returns the CommitInfo for the working tree.
WorkingCommitInfo(version uint64) *proof.CommitInfo
@ -62,11 +57,6 @@ type Committer interface {
// GetCommitInfo returns the CommitInfo for the given version.
GetCommitInfo(version uint64) (*proof.CommitInfo, error)
// Prune attempts to prune all versions up to and including the provided
// version argument. The operation should be idempotent. An error should be
// returned upon failure.
Prune(version uint64) error
// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer

View File

@ -8,7 +8,7 @@ require (
cosmossdk.io/log v1.3.1
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/gogoproto v1.4.12
github.com/cosmos/iavl v1.1.4
github.com/cosmos/iavl v1.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3

View File

@ -36,8 +36,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK
github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA=
github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE=
github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY=
github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34=
github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=

View File

@ -243,7 +243,7 @@ func (m *Manager) Sync() error {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
if err := m.stateCommitment.WriteChangeset(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {

View File

@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}
commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)
snapshotsStore, err := snapshots.NewStore(t.TempDir())
@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2
db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
@ -71,7 +71,7 @@ func TestMigrateState(t *testing.T) {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
require.NoError(t, orgCommitStore.WriteChangeset(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}

View File

@ -0,0 +1,52 @@
# Pruning Manager
The `pruning` package defines the `PruningManager` struct which is responsible for
pruning the state storage (SS) and the state commitment (SC) based on the current
height of the chain. The `PruneOptions` struct defines the configuration for pruning
and is passed to the `PruningManager` during initialization.
## Prune Options
The `PruneOptions` struct includes the following fields:
* `KeepRecent` (uint64): The number of recent heights to keep in the state.
* `Interval` (uint64): The interval of how often to prune the state. 0 means no pruning.
## Pausable Pruner
The `PausablePruner` interface defines the `PausePruning` method, which is used to pause
the pruning process. The `PruningManager` will check if the pruner is a `PausablePruner`
and call the `PausePruning` method before and after `Commit` to pause and resume pruning.
This is useful for when the pruning process is asynchronous and needs to be paused during
a commit to prevent parallel writes.
## Pruning Flow
```mermaid
sequenceDiagram
autonumber
participant A as RootStore
participant B as PruningManager
participant C as CommitmentStore
participant D as StorageStore
loop Commit
A->>B: SignalCommit(true, height)
alt SC is PausablePruner
B->>C: PausePruning(true)
else SS is PausablePruner
B->>D: PausePruing(true)
end
A->>C: Commit Changeset
A->>D: Write Changeset
A->>B: SignalCommit(false, height)
alt SC is PausablePruner
B->>C: PausePruning(false)
else SS is PausablePruner
B->>D: PausePruing(false)
end
B->>C: Prune(height)
B->>D: Prune(height)
end
```

View File

@ -0,0 +1,69 @@
package pruning
import "cosmossdk.io/store/v2"
// Manager is a struct that manages the pruning of old versions of the SC and SS.
type Manager struct {
// scPruner is the pruner for the SC.
scPruner store.Pruner
// scPruningOptions are the pruning options for the SC.
scPruningOptions *store.PruneOptions
// ssPruner is the pruner for the SS.
ssPruner store.Pruner
// ssPruningOptions are the pruning options for the SS.
ssPruningOptions *store.PruneOptions
}
// NewManager creates a new Pruning Manager.
func NewManager(scPruner, ssPruner store.Pruner, scPruningOptions, ssPruningOptions *store.PruneOptions) *Manager {
return &Manager{
scPruner: scPruner,
scPruningOptions: scPruningOptions,
ssPruner: ssPruner,
ssPruningOptions: ssPruningOptions,
}
}
// Prune prunes the SC and SS to the provided version.
//
// NOTE: It can be called outside of the store manually.
func (m *Manager) Prune(version uint64) error {
// Prune the SC.
if m.scPruningOptions != nil {
if prune, pruneTo := m.scPruningOptions.ShouldPrune(version); prune {
if err := m.scPruner.Prune(pruneTo); err != nil {
return err
}
}
}
// Prune the SS.
if m.ssPruningOptions != nil {
if prune, pruneTo := m.ssPruningOptions.ShouldPrune(version); prune {
if err := m.ssPruner.Prune(pruneTo); err != nil {
return err
}
}
}
return nil
}
// SignalCommit signals to the manager that a commit has started or finished.
// It is used to trigger the pruning of the SC and SS.
// It pauses or resumes the pruning of the SC and SS if the pruner implements
// the PausablePruner interface.
func (m *Manager) SignalCommit(start bool, version uint64) error {
if scPausablePruner, ok := m.scPruner.(store.PausablePruner); ok {
scPausablePruner.PausePruning(start)
}
if ssPausablePruner, ok := m.ssPruner.(store.PausablePruner); ok {
ssPausablePruner.PausePruning(start)
}
if !start {
return m.Prune(version)
}
return nil
}

View File

@ -0,0 +1,172 @@
package pruning
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
var storeKeys = []string{"store1", "store2", "store3"}
type PruningManagerTestSuite struct {
suite.Suite
manager *Manager
sc *commitment.CommitStore
ss *storage.StorageStore
}
func TestPruningManagerTestSuite(t *testing.T) {
suite.Run(t, &PruningManagerTestSuite{})
}
func (s *PruningManagerTestSuite) SetupTest() {
nopLog := log.NewNopLogger()
var err error
mdb := dbm.NewMemDB()
multiTrees := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey))
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig())
}
s.sc, err = commitment.NewCommitStore(multiTrees, mdb, nopLog)
s.Require().NoError(err)
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
s.ss = storage.NewStorageStore(sqliteDB, nopLog)
scPruneOptions := &store.PruneOptions{
KeepRecent: 0,
Interval: 1,
} // prune all
ssPruneOptions := &store.PruneOptions{
KeepRecent: 5,
Interval: 10,
} // prune some
s.manager = NewManager(s.sc, s.ss, scPruneOptions, ssPruneOptions)
}
func (s *PruningManagerTestSuite) TestPrune() {
// commit changesets with pruning
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
s.Require().NoError(s.sc.WriteChangeset(cs))
_, err := s.sc.Commit(version)
s.Require().NoError(err)
s.Require().NoError(s.ss.ApplyChangeset(version, cs))
s.Require().NoError(s.manager.Prune(version))
}
// wait for the pruning to finish in the commitment store
checkSCPrune := func() bool {
count := 0
for _, storeKey := range storeKeys {
_, err := s.sc.GetProof([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", toVersion-1, 0)))
if err != nil {
count++
}
}
return count == len(storeKeys)
}
s.Require().Eventually(checkSCPrune, 10*time.Second, 1*time.Second)
// check the storage store
_, pruneVersion := s.manager.ssPruningOptions.ShouldPrune(toVersion)
for version := uint64(1); version <= toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
key := []byte(fmt.Sprintf("key-%d-%d", version, i))
value, err := s.ss.Get([]byte(storeKey), version, key)
if version <= pruneVersion {
s.Require().Nil(value)
s.Require().Error(err)
} else {
s.Require().NoError(err)
s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), value)
}
}
}
}
}
func TestPruneOptions(t *testing.T) {
testCases := []struct {
name string
options *store.PruneOptions
version uint64
pruning bool
pruneVersion uint64
}{
{
name: "no pruning",
options: &store.PruneOptions{
KeepRecent: 100,
Interval: 0,
},
version: 100,
pruning: false,
pruneVersion: 0,
},
{
name: "prune all",
options: &store.PruneOptions{
KeepRecent: 0,
Interval: 1,
},
version: 19,
pruning: true,
pruneVersion: 18,
},
{
name: "prune none",
options: &store.PruneOptions{
KeepRecent: 100,
Interval: 10,
},
version: 19,
pruning: false,
pruneVersion: 0,
},
{
name: "prune some",
options: &store.PruneOptions{
KeepRecent: 10,
Interval: 50,
},
version: 100,
pruning: true,
pruneVersion: 89,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
pruning, pruneVersion := tc.options.ShouldPrune(tc.version)
require.Equal(t, tc.pruning, pruning)
require.Equal(t, tc.pruneVersion, pruneVersion)
})
}
}

View File

@ -12,6 +12,7 @@ import (
"cosmossdk.io/store/v2/commitment/mem"
"cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/internal"
"cosmossdk.io/store/v2/pruning"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/pebbledb"
"cosmossdk.io/store/v2/storage/sqlite"
@ -31,14 +32,15 @@ const (
)
type FactoryOptions struct {
Logger log.Logger
RootDir string
SSType SSType
SCType SCType
PruneOptions *store.PruneOptions
IavlConfig *iavl.Config
StoreKeys []string
SCRawDB corestore.KVStoreWithBatch
Logger log.Logger
RootDir string
SSType SSType
SCType SCType
SSPruneOptions *store.PruneOptions
SCPruneOptions *store.PruneOptions
IavlConfig *iavl.Config
StoreKeys []string
SCRawDB corestore.KVStoreWithBatch
}
// CreateRootStore is a convenience function to create a root store based on the
@ -48,8 +50,8 @@ type FactoryOptions struct {
func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
var (
ssDb storage.Database
ss store.VersionedDatabase
sc store.Committer
ss *storage.StorageStore
sc *commitment.CommitStore
err error
ensureDir = func(dir string) error {
if err := os.MkdirAll(dir, 0x0755); err != nil {
@ -71,7 +73,7 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
if err = ensureDir(dir); err != nil {
return nil, err
}
ssDb, err = pebbledb.New(fmt.Sprintf("%s/data/ss/pebble", opts.RootDir))
ssDb, err = pebbledb.New(dir)
case SSTypeRocks:
// TODO: rocksdb requires build tags so is not supported here by default
return nil, fmt.Errorf("rocksdb not supported")
@ -79,7 +81,7 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
if err != nil {
return nil, err
}
ss = storage.NewStorageStore(ssDb, opts.PruneOptions, opts.Logger)
ss = storage.NewStorageStore(ssDb, opts.Logger)
trees := make(map[string]commitment.Tree)
for _, key := range opts.StoreKeys {
@ -93,12 +95,13 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
return nil, fmt.Errorf("iavl v2 not supported")
}
}
sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.PruneOptions, opts.Logger)
}
sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.Logger)
if err != nil {
return nil, err
}
return New(opts.Logger, ss, sc, nil, nil)
pm := pruning.NewManager(sc, ss, opts.SCPruneOptions, opts.SSPruneOptions)
return New(opts.Logger, ss, sc, pm, nil, nil)
}

View File

@ -15,6 +15,7 @@ import (
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/migration"
"cosmossdk.io/store/v2/pruning"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
@ -42,7 +43,7 @@ func (s *MigrateStoreTestSuite) SetupTest() {
prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey))
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig())
}
orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog)
orgSC, err := commitment.NewCommitStore(multiTrees, mdb, testLog)
s.Require().NoError(err)
// apply changeset against the original store
@ -55,7 +56,7 @@ func (s *MigrateStoreTestSuite) SetupTest() {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
s.Require().NoError(orgSC.WriteBatch(cs))
s.Require().NoError(orgSC.WriteChangeset(cs))
_, err = orgSC.Commit(version)
s.Require().NoError(err)
}
@ -63,22 +64,23 @@ func (s *MigrateStoreTestSuite) SetupTest() {
// create a new storage and commitment stores
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
ss := storage.NewStorageStore(sqliteDB, nil, testLog)
ss := storage.NewStorageStore(sqliteDB, testLog)
multiTrees1 := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), nopLog, iavl.DefaultConfig())
}
sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog)
sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), testLog)
s.Require().NoError(err)
snapshotsStore, err := snapshots.NewStore(s.T().TempDir())
s.Require().NoError(err)
snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog)
migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog)
pm := pruning.NewManager(sc, ss, nil, nil)
// assume no storage store, simulate the migration process
s.rootStore, err = New(testLog, ss, orgSC, migrationManager, nil)
s.rootStore, err = New(testLog, ss, orgSC, pm, migrationManager, nil)
s.Require().NoError(err)
}
@ -143,10 +145,6 @@ func (s *MigrateStoreTestSuite) TestMigrateState() {
}
}
// prune the old versions
err = s.rootStore.Prune(latestVersion - 1)
s.Require().NoError(err)
// apply changeset against the migrated store
for version := latestVersion + 1; version <= latestVersion+10; version++ {
cs := corestore.NewChangeset()

View File

@ -16,6 +16,7 @@ import (
"cosmossdk.io/store/v2/metrics"
"cosmossdk.io/store/v2/migration"
"cosmossdk.io/store/v2/proof"
"cosmossdk.io/store/v2/pruning"
)
var _ store.RootStore = (*Store)(nil)
@ -43,6 +44,9 @@ type Store struct {
// telemetry reflects a telemetry agent responsible for emitting metrics (if any)
telemetry metrics.StoreMetrics
// pruningManager reflects the pruning manager used to prune state of the SS and SC backends
pruningManager *pruning.Manager
// Migration related fields
// migrationManager reflects the migration manager used to migrate state from v1 to v2
migrationManager *migration.Manager
@ -62,6 +66,7 @@ func New(
logger log.Logger,
ss store.VersionedDatabase,
sc store.Committer,
pm *pruning.Manager,
mm *migration.Manager,
m metrics.StoreMetrics,
) (store.RootStore, error) {
@ -70,6 +75,7 @@ func New(
initialVersion: 1,
stateStorage: ss,
stateCommitment: sc,
pruningManager: pm,
migrationManager: mm,
telemetry: m,
isMigrating: mm != nil,
@ -272,6 +278,13 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {
s.logger.Debug("commit header and version mismatch", "header_height", s.commitHeader.Height, "version", version)
}
// signal to the pruning manager that a new version is about to be committed
// this may be required if the SS and SC backends implementation have the
// background pruning process which must be paused during the commit
if err := s.pruningManager.SignalCommit(true, version); err != nil {
s.logger.Error("failed to signal commit to pruning manager", "err", err)
}
eg := new(errgroup.Group)
// if we're migrating, we don't want to commit to the state storage to avoid
@ -300,6 +313,11 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {
return nil, err
}
// signal to the pruning manager that the commit is done
if err := s.pruningManager.SignalCommit(false, version); err != nil {
s.logger.Error("failed to signal commit done to pruning manager", "err", err)
}
if s.commitHeader != nil {
s.lastCommitInfo.Timestamp = s.commitHeader.Time
}
@ -307,24 +325,6 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {
return s.lastCommitInfo.Hash(), nil
}
// Prune prunes the root store to the provided version.
func (s *Store) Prune(version uint64) error {
if s.telemetry != nil {
now := time.Now()
defer s.telemetry.MeasureSince(now, "root_store", "prune")
}
if err := s.stateStorage.Prune(version); err != nil {
return fmt.Errorf("failed to prune SS store: %w", err)
}
if err := s.stateCommitment.Prune(version); err != nil {
return fmt.Errorf("failed to prune SC store: %w", err)
}
return nil
}
// startMigration starts a migration process to migrate the RootStore/v1 to the
// SS and SC backends of store/v2 and initializes the channels.
// It runs in a separate goroutine and replaces the current RootStore with the
@ -384,7 +384,7 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
}
}
if err := s.stateCommitment.WriteBatch(cs); err != nil {
if err := s.stateCommitment.WriteChangeset(cs); err != nil {
return fmt.Errorf("failed to write batch to SC store: %w", err)
}

View File

@ -13,6 +13,7 @@ import (
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/pruning"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
@ -44,15 +45,16 @@ func (s *RootStoreTestSuite) SetupTest() {
sqliteDB, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)
ss := storage.NewStorageStore(sqliteDB, nil, noopLog)
ss := storage.NewStorageStore(sqliteDB, noopLog)
tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
tree2 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
tree3 := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog)
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), noopLog)
s.Require().NoError(err)
rs, err := New(noopLog, ss, sc, nil, nil)
pm := pruning.NewManager(sc, ss, nil, nil)
rs, err := New(noopLog, ss, sc, pm, nil, nil)
s.Require().NoError(err)
s.rootStore = rs
@ -112,7 +114,7 @@ func (s *RootStoreTestSuite) TestGetFallback() {
cs := corestore.NewChangeset()
cs.Add(testStoreKeyBytes, []byte("foo"), []byte("bar"), false)
err := sc.WriteBatch(cs)
err := sc.WriteChangeset(cs)
s.Require().NoError(err)
ci := sc.WorkingCommitInfo(1)

View File

@ -6,13 +6,12 @@ import (
"github.com/stretchr/testify/suite"
"cosmossdk.io/core/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
)
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
NewDB: func(dir string) (*storage.StorageStore, error) {
db, err := New(dir)
if err == nil && db != nil {
// We set sync=false just to speed up CI tests. Operators should take
@ -20,7 +19,7 @@ func TestStorageTestSuite(t *testing.T) {
db.SetSync(false)
}
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
EmptyBatchSize: 12,
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/suite"
"cosmossdk.io/core/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
)
@ -19,9 +18,9 @@ var storeKey1 = []byte("store1")
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
NewDB: func(dir string) (*storage.StorageStore, error) {
db, err := New(dir)
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
EmptyBatchSize: 12,
}

View File

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/suite"
"cosmossdk.io/core/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/storage"
)
@ -17,9 +16,9 @@ var storeKey1 = []byte("store1")
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
NewDB: func(dir string) (*storage.StorageStore, error) {
db, err := New(dir)
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
EmptyBatchSize: 0,
}

View File

@ -27,7 +27,7 @@ var (
backends = map[string]func(dataDir string) (store.VersionedDatabase, error){
"rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) {
db, err := rocksdb.New(dataDir)
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
"pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) {
db, err := pebbledb.New(dataDir)
@ -35,11 +35,11 @@ var (
db.SetSync(false)
}
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
"btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) {
db, err := sqlite.New(dataDir)
return storage.NewStorageStore(db, nil, log.NewNopLogger()), err
return storage.NewStorageStore(db, log.NewNopLogger()), err
},
}
rng = rand.New(rand.NewSource(567320))

View File

@ -22,7 +22,7 @@ var storeKey1Bytes = []byte(storeKey1)
type StorageTestSuite struct {
suite.Suite
NewDB func(dir string) (store.VersionedDatabase, error)
NewDB func(dir string) (*StorageStore, error)
EmptyBatchSize int
SkipTests []string
}

View File

@ -17,27 +17,20 @@ const (
var (
_ store.VersionedDatabase = (*StorageStore)(nil)
_ snapshots.StorageSnapshotter = (*StorageStore)(nil)
_ store.Pruner = (*StorageStore)(nil)
)
// StorageStore is a wrapper around the store.VersionedDatabase interface.
type StorageStore struct {
logger log.Logger
db Database
// pruneOptions defines the pruning configuration.
pruneOptions *store.PruneOptions
}
// NewStorageStore returns a reference to a new StorageStore.
func NewStorageStore(db Database, pruneOpts *store.PruneOptions, logger log.Logger) *StorageStore {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}
func NewStorageStore(db Database, logger log.Logger) *StorageStore {
return &StorageStore{
logger: logger,
db: db,
pruneOptions: pruneOpts,
logger: logger,
db: db,
}
}
@ -76,12 +69,6 @@ func (ss *StorageStore) ApplyChangeset(version uint64, cs *corestore.Changeset)
return err
}
if prune, pruneVersion := ss.pruneOptions.ShouldPrune(version); prune {
if err := ss.Prune(pruneVersion); err != nil {
ss.logger.Info("failed to prune SS", "prune_version", pruneVersion, "err", err)
}
}
return nil
}

View File

@ -59,10 +59,6 @@ type RootStore interface {
// LastCommitID returns a CommitID pertaining to the last commitment.
LastCommitID() (proof.CommitID, error)
// Prune prunes the RootStore to the provided version. It is used to remove
// old versions of the RootStore by the CLI.
Prune(version uint64) error
// SetMetrics sets the telemetry handler on the RootStore.
SetMetrics(m metrics.Metrics)
@ -83,6 +79,22 @@ type UpgradeableRootStore interface {
LoadVersionAndUpgrade(version uint64, upgrades *corestore.StoreUpgrades) error
}
// Pruner defines the interface for pruning old versions of the store or database.
type Pruner interface {
// Prune prunes the store to the provided version.
Prune(version uint64) error
}
// PausablePruner extends the Pruner interface to include the API for pausing
// the pruning process.
type PausablePruner interface {
Pruner
// PausePruning pauses or resumes the pruning process to avoid the parallel writes
// while committing the state.
PausePruning(pause bool)
}
// QueryResult defines the response type to performing a query on a RootStore.
type QueryResult struct {
Key []byte