fix(server): Fix pruning height calculation (#24583)
Co-authored-by: Alex | Interchain Labs <alex@interchainlabs.io> Co-authored-by: Avory <avorycorelli@gmail.com>
This commit is contained in:
parent
b71d0894f0
commit
98a2f679df
@ -108,6 +108,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* (server)[#24583](https://github.com/cosmos/cosmos-sdk/pull/24583) Fix height calculation in pruning manager and better restart handling.
|
||||
* (x/gov)[#24460](https://github.com/cosmos/cosmos-sdk/pull/24460) Do not call Remove during Walk in defaultCalculateVoteResultsAndVotingPower.
|
||||
* (baseapp) [24261](https://github.com/cosmos/cosmos-sdk/pull/24261) Fix post handler error always results in code 1
|
||||
* (server) [#24068](https://github.com/cosmos/cosmos-sdk/pull/24068) Allow align block header with skip check header in grpc server.
|
||||
|
||||
@ -1,8 +0,0 @@
|
||||
package pruning
|
||||
|
||||
var (
|
||||
PruneSnapshotHeightsKey = pruneSnapshotHeightsKey
|
||||
|
||||
Int64SliceToBytes = int64SliceToBytes
|
||||
LoadPruningSnapshotHeights = loadPruningSnapshotHeights
|
||||
)
|
||||
@ -3,6 +3,7 @@ package pruning
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@ -21,12 +22,14 @@ type Manager struct {
|
||||
opts types.PruningOptions
|
||||
snapshotInterval uint64
|
||||
// Snapshots are taken in a separate goroutine from the regular execution
|
||||
// and can be delivered asynchrounously via HandleSnapshotHeight.
|
||||
// Therefore, we sync access to pruneSnapshotHeights with this mutex.
|
||||
// and can be delivered asynchronously via HandleSnapshotHeight.
|
||||
// Therefore, we sync access to pruneSnapshotHeights, inflightSnapshotHeights and initFromStore with this mutex.
|
||||
pruneSnapshotHeightsMx sync.RWMutex
|
||||
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
|
||||
// The heights are added to be pruned when a snapshot is complete.
|
||||
pruneSnapshotHeights []int64
|
||||
pruneSnapshotHeights []int64
|
||||
inflightSnapshotHeights []int64
|
||||
initFromStore bool
|
||||
}
|
||||
|
||||
// NegativeHeightsError is returned when a negative height is provided to the manager.
|
||||
@ -51,7 +54,7 @@ func NewManager(db dbm.DB, logger log.Logger) *Manager {
|
||||
db: db,
|
||||
logger: logger,
|
||||
opts: types.NewPruningOptions(types.PruningNothing),
|
||||
pruneSnapshotHeights: []int64{0},
|
||||
pruneSnapshotHeights: []int64{0}, // init with 0 block height
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,6 +68,17 @@ func (m *Manager) GetOptions() types.PruningOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
// AnnounceSnapshotHeight announces a new snapshot height for tracking and pruning.
|
||||
func (m *Manager) AnnounceSnapshotHeight(height int64) {
|
||||
if m.opts.GetPruningStrategy() == types.PruningNothing || height <= 0 {
|
||||
return
|
||||
}
|
||||
m.pruneSnapshotHeightsMx.Lock()
|
||||
defer m.pruneSnapshotHeightsMx.Unlock()
|
||||
// called in ascending order so no sorting required
|
||||
m.inflightSnapshotHeights = append(m.inflightSnapshotHeights, height)
|
||||
}
|
||||
|
||||
// HandleSnapshotHeight persists the snapshot height to be pruned at the next appropriate
|
||||
// height defined by the pruning strategy. It flushes the update to disk and panics if the flush fails.
|
||||
// The input height must be greater than 0, and the pruning strategy must not be set to pruning nothing.
|
||||
@ -74,63 +88,88 @@ func (m *Manager) HandleSnapshotHeight(height int64) {
|
||||
return
|
||||
}
|
||||
|
||||
m.logger.Debug("HandleSnapshotHeight", "height", height)
|
||||
|
||||
m.pruneSnapshotHeightsMx.Lock()
|
||||
defer m.pruneSnapshotHeightsMx.Unlock()
|
||||
|
||||
m.logger.Debug("HandleSnapshotHeight", "height", height)
|
||||
// remove from the in-flight list
|
||||
if position := slices.Index(m.inflightSnapshotHeights, height); position != -1 {
|
||||
m.inflightSnapshotHeights = append(m.inflightSnapshotHeights[:position], m.inflightSnapshotHeights[position+1:]...)
|
||||
}
|
||||
|
||||
if m.initFromStore {
|
||||
// drop the legacy state as it may belong to a different interval or an outdated snapshot
|
||||
// that is not in sequence with the current one
|
||||
m.pruneSnapshotHeights = m.pruneSnapshotHeights[1:]
|
||||
m.initFromStore = false
|
||||
}
|
||||
|
||||
m.pruneSnapshotHeights = append(m.pruneSnapshotHeights, height)
|
||||
sort.Slice(m.pruneSnapshotHeights, func(i, j int) bool { return m.pruneSnapshotHeights[i] < m.pruneSnapshotHeights[j] })
|
||||
|
||||
// in-flight snapshots may land out of order due to the concurrent nature of the snapshotter.
|
||||
// we need to detect them to prevent pruning their heights while the snapshots are still in progress.
|
||||
k := 1
|
||||
for ; k < len(m.pruneSnapshotHeights); k++ {
|
||||
if m.pruneSnapshotHeights[k] != m.pruneSnapshotHeights[k-1]+int64(m.snapshotInterval) {
|
||||
// gap detected, snapshot is in-flight
|
||||
break
|
||||
}
|
||||
}
|
||||
// compact the height list for the snapshots in sequence
|
||||
// the last snapshot height is used to allow pruning up to the next interval height
|
||||
m.pruneSnapshotHeights = m.pruneSnapshotHeights[k-1:]
|
||||
|
||||
// flush the updates to disk so that they are not lost if crash happens.
|
||||
if err := m.db.SetSync(pruneSnapshotHeightsKey, int64SliceToBytes(m.pruneSnapshotHeights)); err != nil {
|
||||
// flush the max height to store so that they are not lost if a crash happens.
|
||||
// only the max height matters as there are no in-flight snapshots after a restart
|
||||
if err := storePruningSnapshotHeight(m.db, slices.Max(m.pruneSnapshotHeights)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSnapshotInterval sets the interval at which the snapshots are taken.
|
||||
// This value should be set on startup and not exceed max int64 (2^63-1). Concurrent modifications are not supported.
|
||||
func (m *Manager) SetSnapshotInterval(snapshotInterval uint64) {
|
||||
m.snapshotInterval = snapshotInterval
|
||||
}
|
||||
|
||||
// GetPruningHeight returns the height which can prune upto if it is able to prune at the given height.
|
||||
func (m *Manager) GetPruningHeight(height int64) int64 {
|
||||
if m.opts.GetPruningStrategy() == types.PruningNothing {
|
||||
return 0
|
||||
}
|
||||
if m.opts.Interval <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
if height%int64(m.opts.Interval) != 0 || height <= int64(m.opts.KeepRecent) {
|
||||
if m.opts.GetPruningStrategy() == types.PruningNothing ||
|
||||
m.opts.Interval <= 0 ||
|
||||
height <= int64(m.opts.KeepRecent) ||
|
||||
height%int64(m.opts.Interval) != 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Consider the snapshot height
|
||||
pruneHeight := height - 1 - int64(m.opts.KeepRecent) // we should keep the current height at least
|
||||
|
||||
m.pruneSnapshotHeightsMx.RLock()
|
||||
defer m.pruneSnapshotHeightsMx.RUnlock()
|
||||
|
||||
// snapshotInterval is zero, indicating that all heights can be pruned
|
||||
if m.snapshotInterval <= 0 {
|
||||
return pruneHeight
|
||||
}
|
||||
|
||||
if len(m.pruneSnapshotHeights) == 0 { // the length should be greater than zero
|
||||
m.pruneSnapshotHeightsMx.RLock()
|
||||
defer m.pruneSnapshotHeightsMx.RUnlock()
|
||||
|
||||
if len(m.pruneSnapshotHeights) == 0 { // do not prune before an initial snapshot
|
||||
return 0
|
||||
}
|
||||
|
||||
// the snapshot `m.pruneSnapshotHeights[0]` is already operated,
|
||||
// so we can prune upto `m.pruneSnapshotHeights[0] + int64(m.snapshotInterval) - 1`
|
||||
snHeight := m.pruneSnapshotHeights[0] + int64(m.snapshotInterval) - 1
|
||||
return min(snHeight, pruneHeight)
|
||||
// highest version based on completed snapshots
|
||||
snHeight := m.pruneSnapshotHeights[0] - 1
|
||||
if !m.initFromStore { // ensure non-legacy data
|
||||
// with no inflight snapshots, we may prune up to the next snap interval -1
|
||||
snHeight += int64(m.snapshotInterval)
|
||||
}
|
||||
if len(m.inflightSnapshotHeights) == 0 {
|
||||
return min(snHeight, pruneHeight)
|
||||
}
|
||||
// highest version based on started snapshots
|
||||
inFlightHeight := m.inflightSnapshotHeights[0] - 1
|
||||
return min(snHeight, pruneHeight, inFlightHeight)
|
||||
}
|
||||
|
||||
// LoadSnapshotHeights loads the snapshot heights from the database as a crash recovery.
|
||||
@ -139,20 +178,27 @@ func (m *Manager) LoadSnapshotHeights(db dbm.DB) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// loading list for backwards compatibility
|
||||
loadedPruneSnapshotHeights, err := loadPruningSnapshotHeights(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(loadedPruneSnapshotHeights) > 0 {
|
||||
m.pruneSnapshotHeightsMx.Lock()
|
||||
defer m.pruneSnapshotHeightsMx.Unlock()
|
||||
m.pruneSnapshotHeights = loadedPruneSnapshotHeights
|
||||
if len(loadedPruneSnapshotHeights) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.pruneSnapshotHeightsMx.Lock()
|
||||
defer m.pruneSnapshotHeightsMx.Unlock()
|
||||
// restore max only as there are no in-flight snapshots after a restart
|
||||
m.pruneSnapshotHeights = []int64{slices.Max(loadedPruneSnapshotHeights)}
|
||||
m.initFromStore = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func storePruningSnapshotHeight(db dbm.DB, val int64) error {
|
||||
return db.SetSync(pruneSnapshotHeightsKey, int64SliceToBytes(val))
|
||||
}
|
||||
|
||||
func loadPruningSnapshotHeights(db dbm.DB) ([]int64, error) {
|
||||
bz, err := db.Get(pruneSnapshotHeightsKey)
|
||||
if err != nil {
|
||||
@ -177,7 +223,7 @@ func loadPruningSnapshotHeights(db dbm.DB) ([]int64, error) {
|
||||
return pruneSnapshotHeights, nil
|
||||
}
|
||||
|
||||
func int64SliceToBytes(slice []int64) []byte {
|
||||
func int64SliceToBytes(slice ...int64) []byte {
|
||||
bz := make([]byte, 0, len(slice)*8)
|
||||
for _, ph := range slice {
|
||||
buf := make([]byte, 8)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package pruning_test
|
||||
package pruning
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@ -6,19 +6,19 @@ import (
|
||||
"testing"
|
||||
|
||||
db "github.com/cosmos/cosmos-db"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/mock"
|
||||
"cosmossdk.io/store/pruning"
|
||||
"cosmossdk.io/store/pruning/types"
|
||||
)
|
||||
|
||||
const dbErr = "db error"
|
||||
|
||||
func TestNewManager(t *testing.T) {
|
||||
manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
manager := NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
require.NotNil(t, manager)
|
||||
require.Equal(t, types.PruningNothing, manager.GetOptions().GetPruningStrategy())
|
||||
}
|
||||
@ -78,7 +78,7 @@ func TestStrategies(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
manager := NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
require.NotNil(t, manager)
|
||||
|
||||
curStrategy := tc.strategy
|
||||
@ -110,7 +110,9 @@ func TestStrategies(t *testing.T) {
|
||||
for curHeight := int64(0); curHeight < 110000; curHeight++ {
|
||||
if tc.snapshotInterval != 0 {
|
||||
if curHeight > int64(tc.snapshotInterval) && curHeight%int64(tc.snapshotInterval) == int64(tc.snapshotInterval)-1 {
|
||||
manager.HandleSnapshotHeight(curHeight - int64(tc.snapshotInterval) + 1)
|
||||
snapHeight := curHeight - int64(tc.snapshotInterval) + 1
|
||||
manager.AnnounceSnapshotHeight(snapHeight)
|
||||
manager.HandleSnapshotHeight(snapHeight)
|
||||
snHeight = curHeight
|
||||
}
|
||||
}
|
||||
@ -185,8 +187,7 @@ func TestPruningHeight_Inputs(t *testing.T) {
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
require.NotNil(t, manager)
|
||||
manager := NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
manager.SetOptions(types.NewPruningOptions(tc.strategy))
|
||||
|
||||
pruningHeightActual := manager.GetPruningHeight(tc.height)
|
||||
@ -195,6 +196,158 @@ func TestPruningHeight_Inputs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPruningHeight(t *testing.T) {
|
||||
specs := map[string]struct {
|
||||
initDBState int64
|
||||
opts types.PruningOptions
|
||||
setup func(manager *Manager)
|
||||
exp map[int64]int64
|
||||
}{
|
||||
"init from store - no snap": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
20: 9, // initDBState - 1
|
||||
30: 9, // initDBState - 1
|
||||
45: 0, // not a prune height
|
||||
},
|
||||
},
|
||||
"init from store - snap landed": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
mgr.AnnounceSnapshotHeight(15)
|
||||
mgr.HandleSnapshotHeight(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 4, // 10 - 5 (keep) - 1
|
||||
15: 0, // not on prune interval
|
||||
20: 14, // 20 - 5 (keep) - 1
|
||||
30: 24, // 30 - 5 (keep) - 1
|
||||
40: 29, // 15 (last completed snap) + 15 (snap interval) - 1
|
||||
},
|
||||
},
|
||||
"init from store - snap in-flight": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
mgr.AnnounceSnapshotHeight(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 4, // 10 - 5 (keep) - 1
|
||||
20: 9, // 10 - 5 (keep) - 1
|
||||
},
|
||||
},
|
||||
"init from store - delayed in-flight snap": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
mgr.AnnounceSnapshotHeight(15)
|
||||
mgr.AnnounceSnapshotHeight(30)
|
||||
mgr.HandleSnapshotHeight(30)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 4, // 10 - 5 (keep) - 1
|
||||
20: 14, // 15 (in-flight) - 1
|
||||
30: 14, // 15 (in-flight) - 1
|
||||
40: 14, // 15 (in-flight) - 1
|
||||
},
|
||||
},
|
||||
"empty store": {
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 4, // 10 -5 (keep) -1
|
||||
20: 14, // 20 -5 (keep) -1
|
||||
},
|
||||
},
|
||||
"empty snap interval set": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{KeepRecent: 5, Interval: 10, Strategy: types.PruningCustom},
|
||||
setup: func(mgr *Manager) {},
|
||||
exp: map[int64]int64{
|
||||
10: 4, // 10 -5 (keep) -1
|
||||
20: 14, // 20 -5 (keep) -1
|
||||
},
|
||||
},
|
||||
"prune nothing set": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{Strategy: types.PruningNothing, Interval: 10, KeepRecent: 5},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 0, // nothing
|
||||
20: 0, // nothing
|
||||
30: 0, // nothing
|
||||
},
|
||||
},
|
||||
"empty prune interval": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{Strategy: types.PruningCustom, KeepRecent: 5},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
10: 0, // interval required
|
||||
20: 0, // interval required
|
||||
30: 0, // interval required
|
||||
},
|
||||
},
|
||||
"height <= keep": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{Strategy: types.PruningCustom, Interval: 1, KeepRecent: 5},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
0: 0, // interval required
|
||||
4: 0, // interval required
|
||||
5: 0, // interval required
|
||||
},
|
||||
},
|
||||
"height not on prune interval": {
|
||||
initDBState: 10,
|
||||
opts: types.PruningOptions{Strategy: types.PruningCustom, Interval: 2},
|
||||
setup: func(mgr *Manager) {
|
||||
mgr.SetSnapshotInterval(15)
|
||||
},
|
||||
exp: map[int64]int64{
|
||||
0: 0, // excluded
|
||||
1: 0, // not on prune interval
|
||||
2: 1, // 2 - 1
|
||||
3: 0, // not on prune interval
|
||||
4: 3, // 2 - 1
|
||||
},
|
||||
},
|
||||
}
|
||||
for name, spec := range specs {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
memDB := db.NewMemDB()
|
||||
if spec.initDBState != 0 {
|
||||
require.NoError(t, storePruningSnapshotHeight(memDB, spec.initDBState))
|
||||
}
|
||||
mgr2 := NewManager(memDB, log.NewNopLogger())
|
||||
mgr2.SetOptions(spec.opts)
|
||||
require.NoError(t, mgr2.LoadSnapshotHeights(memDB))
|
||||
spec.setup(mgr2)
|
||||
|
||||
for height, exp := range spec.exp {
|
||||
gotHeight := mgr2.GetPruningHeight(height)
|
||||
assert.Equal(t, exp, gotHeight, "height: %d", height)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSnapshotHeight_DbErr_Panic(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
@ -203,7 +356,8 @@ func TestHandleSnapshotHeight_DbErr_Panic(t *testing.T) {
|
||||
|
||||
dbMock.EXPECT().SetSync(gomock.Any(), gomock.Any()).Return(errors.New(dbErr)).Times(1)
|
||||
|
||||
manager := pruning.NewManager(dbMock, log.NewNopLogger())
|
||||
manager := NewManager(dbMock, log.NewNopLogger())
|
||||
manager.SetSnapshotInterval(1)
|
||||
manager.SetOptions(types.NewPruningOptions(types.PruningEverything))
|
||||
require.NotNil(t, manager)
|
||||
|
||||
@ -221,7 +375,7 @@ func TestHandleSnapshotHeight_LoadFromDisk(t *testing.T) {
|
||||
|
||||
// Setup
|
||||
db := db.NewMemDB()
|
||||
manager := pruning.NewManager(db, log.NewNopLogger())
|
||||
manager := NewManager(db, log.NewNopLogger())
|
||||
require.NotNil(t, manager)
|
||||
|
||||
manager.SetOptions(types.NewPruningOptions(types.PruningEverything))
|
||||
@ -236,7 +390,7 @@ func TestHandleSnapshotHeight_LoadFromDisk(t *testing.T) {
|
||||
expected = 1
|
||||
}
|
||||
|
||||
loadedSnapshotHeights, err := pruning.LoadPruningSnapshotHeights(db)
|
||||
loadedSnapshotHeights, err := loadPruningSnapshotHeights(db)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, len(loadedSnapshotHeights), snapshotHeightStr)
|
||||
|
||||
@ -244,7 +398,7 @@ func TestHandleSnapshotHeight_LoadFromDisk(t *testing.T) {
|
||||
err = manager.LoadSnapshotHeights(db)
|
||||
require.NoError(t, err)
|
||||
|
||||
loadedSnapshotHeights, err = pruning.LoadPruningSnapshotHeights(db)
|
||||
loadedSnapshotHeights, err = loadPruningSnapshotHeights(db)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, len(loadedSnapshotHeights), snapshotHeightStr)
|
||||
}
|
||||
@ -252,7 +406,7 @@ func TestHandleSnapshotHeight_LoadFromDisk(t *testing.T) {
|
||||
|
||||
func TestLoadPruningSnapshotHeights(t *testing.T) {
|
||||
var (
|
||||
manager = pruning.NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
manager = NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
err error
|
||||
)
|
||||
require.NotNil(t, manager)
|
||||
@ -268,7 +422,7 @@ func TestLoadPruningSnapshotHeights(t *testing.T) {
|
||||
getFlushedPruningSnapshotHeights: func() []int64 {
|
||||
return []int64{5, -2, 3}
|
||||
},
|
||||
expectedResult: &pruning.NegativeHeightsError{Height: -2},
|
||||
expectedResult: &NegativeHeightsError{Height: -2},
|
||||
},
|
||||
"non-negative - success": {
|
||||
getFlushedPruningSnapshotHeights: func() []int64 {
|
||||
@ -282,7 +436,7 @@ func TestLoadPruningSnapshotHeights(t *testing.T) {
|
||||
db := db.NewMemDB()
|
||||
|
||||
if tc.getFlushedPruningSnapshotHeights != nil {
|
||||
err = db.Set(pruning.PruneSnapshotHeightsKey, pruning.Int64SliceToBytes(tc.getFlushedPruningSnapshotHeights()))
|
||||
err = db.Set(pruneSnapshotHeightsKey, int64SliceToBytes(tc.getFlushedPruningSnapshotHeights()...))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -293,7 +447,7 @@ func TestLoadPruningSnapshotHeights(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLoadSnapshotHeights_PruneNothing(t *testing.T) {
|
||||
manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
manager := NewManager(db.NewMemDB(), log.NewNopLogger())
|
||||
require.NotNil(t, manager)
|
||||
|
||||
manager.SetOptions(types.NewPruningOptions(types.PruningNothing))
|
||||
|
||||
@ -3,6 +3,7 @@ package types
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
)
|
||||
|
||||
// PruningOptions defines the pruning strategy used when determining which
|
||||
@ -56,6 +57,7 @@ var (
|
||||
ErrPruningIntervalZero = errors.New("'pruning-interval' must not be 0. If you want to disable pruning, select pruning = \"nothing\"")
|
||||
ErrPruningIntervalTooSmall = fmt.Errorf("'pruning-interval' must not be less than %d. For the most aggressive pruning, select pruning = \"everything\"", pruneEverythingInterval)
|
||||
ErrPruningKeepRecentTooSmall = fmt.Errorf("'pruning-keep-recent' must not be less than %d. For the most aggressive pruning, select pruning = \"everything\"", pruneEverythingKeepRecent)
|
||||
ErrPruningKeepRecentTooBig = errors.New("'pruning-keep-recent' must not be greater than 2^63-1. Select pruning = \"nothing\"")
|
||||
)
|
||||
|
||||
func NewPruningOptions(pruningStrategy PruningStrategy) PruningOptions {
|
||||
@ -110,6 +112,9 @@ func (po PruningOptions) Validate() error {
|
||||
if po.KeepRecent < pruneEverythingKeepRecent {
|
||||
return ErrPruningKeepRecentTooSmall
|
||||
}
|
||||
if po.KeepRecent > math.MaxInt64 {
|
||||
return ErrPruningKeepRecentTooBig
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -21,6 +22,7 @@ func TestPruningOptions_Validate(t *testing.T) {
|
||||
{NewCustomPruningOptions(2, 9), ErrPruningIntervalTooSmall},
|
||||
{NewCustomPruningOptions(2, 0), ErrPruningIntervalZero},
|
||||
{NewCustomPruningOptions(2, 0), ErrPruningIntervalZero},
|
||||
{NewCustomPruningOptions(math.MaxInt64+1, 10), ErrPruningKeepRecentTooBig},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
||||
@ -83,8 +83,9 @@ type Store struct {
|
||||
}
|
||||
|
||||
var (
|
||||
_ types.CommitMultiStore = (*Store)(nil)
|
||||
_ types.Queryable = (*Store)(nil)
|
||||
_ types.CommitMultiStore = (*Store)(nil)
|
||||
_ types.Queryable = (*Store)(nil)
|
||||
_ snapshottypes.SnapshotAnnouncer = (*Store)(nil)
|
||||
)
|
||||
|
||||
// NewStore returns a reference to a new Store object with the provided DB. The
|
||||
@ -358,6 +359,10 @@ func (rs *Store) PruneSnapshotHeight(height int64) {
|
||||
rs.pruningManager.HandleSnapshotHeight(height)
|
||||
}
|
||||
|
||||
func (rs *Store) AnnounceSnapshotHeight(height int64) {
|
||||
rs.pruningManager.AnnounceSnapshotHeight(height)
|
||||
}
|
||||
|
||||
// SetInterBlockCache sets the Store's internal inter-block (persistent) cache.
|
||||
// When this is defined, all CommitKVStores will be wrapped with their respective
|
||||
// inter-block cache.
|
||||
|
||||
@ -4,10 +4,13 @@ import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cosmossdk.io/errors"
|
||||
@ -635,6 +638,121 @@ func TestMultiStore_PruningRestart(t *testing.T) {
|
||||
require.Eventually(t, isPruned, 1*time.Second, 10*time.Millisecond, "expected error when loading pruned heights")
|
||||
}
|
||||
|
||||
func TestMultiStore_PruningWithIntervalUpdates(t *testing.T) {
|
||||
// scenarios
|
||||
// snap height in sync - interval not changed
|
||||
// snap height out of order - interval not changed
|
||||
// snap height in flight - interval not changed
|
||||
|
||||
// snap height in sync - interval modified
|
||||
// snap height out of order - interval modified
|
||||
// snap height in flight - interval modified
|
||||
|
||||
const (
|
||||
initialSnapshotInterval uint64 = 10
|
||||
initialPruneInterval uint64 = 10
|
||||
)
|
||||
|
||||
specs := map[string]struct {
|
||||
do func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64)
|
||||
expPruneHeight int64
|
||||
}{
|
||||
"snap height sequential - constant interval": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(20, initialSnapshotInterval)
|
||||
},
|
||||
expPruneHeight: 14, // 20 - 5 (keep) -1
|
||||
},
|
||||
"snap out of order - constant interval": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(20, initialSnapshotInterval)
|
||||
ms.pruningManager.HandleSnapshotHeight(10)
|
||||
},
|
||||
expPruneHeight: 14, // 20 - 5 (keep) -1
|
||||
},
|
||||
"snap height sequential - snap interval increased": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(10, initialSnapshotInterval)
|
||||
currHeight := commitSnapN(10, 20)
|
||||
assert.Equal(t, int64(14), ms.pruningManager.GetPruningHeight(currHeight)) // 20 - 5 (keep) -1
|
||||
commitSnapN(10, 20)
|
||||
},
|
||||
expPruneHeight: 24, // 30 -5 (keep) -1
|
||||
},
|
||||
"snap out of order - snap interval increased": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(10, initialSnapshotInterval)
|
||||
commitSnapN(30, 20)
|
||||
ms.pruningManager.HandleSnapshotHeight(10)
|
||||
},
|
||||
expPruneHeight: 29, // 10 (legacy state not cleared) + 20 - 1
|
||||
},
|
||||
"snap height sequential - snap interval decreased": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(10, initialSnapshotInterval)
|
||||
commitSnapN(10, 6)
|
||||
},
|
||||
expPruneHeight: 14, // 20 -5 (keep) -1
|
||||
},
|
||||
"snap out of order - snap interval decreased": {
|
||||
do: func(t *testing.T, ms *Store, commitSnapN func(n int, snapshotInterval uint64) int64) {
|
||||
t.Helper()
|
||||
commitSnapN(10, initialSnapshotInterval)
|
||||
commitSnapN(10, 6)
|
||||
ms.pruningManager.HandleSnapshotHeight(10)
|
||||
},
|
||||
expPruneHeight: 14, // 20 -5 (keep) -1
|
||||
},
|
||||
}
|
||||
for name, spec := range specs {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
db := dbm.NewMemDB()
|
||||
ms := newMultiStoreWithMounts(db, pruningtypes.NewCustomPruningOptions(5, initialPruneInterval))
|
||||
ms.SetSnapshotInterval(initialSnapshotInterval)
|
||||
require.NoError(t, ms.LoadLatestVersion())
|
||||
rnd := rand.New(rand.NewSource(1))
|
||||
commitSnapN := func(n int, snapshotInterval uint64) int64 {
|
||||
ms.SetSnapshotInterval(snapshotInterval)
|
||||
var wg sync.WaitGroup
|
||||
for range n {
|
||||
height := ms.Commit().Version
|
||||
if height != 0 && snapshotInterval != 0 && uint64(height)%snapshotInterval == 0 {
|
||||
ms.pruningManager.AnnounceSnapshotHeight(height)
|
||||
wg.Add(1)
|
||||
go func() { // random completion order
|
||||
time.Sleep(time.Duration(rnd.Int31n(int32(time.Millisecond))))
|
||||
ms.pruningManager.HandleSnapshotHeight(height)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return ms.LatestVersion()
|
||||
}
|
||||
spec.do(t, ms, commitSnapN)
|
||||
actualHeightToPrune := ms.pruningManager.GetPruningHeight(ms.LatestVersion())
|
||||
require.Equal(t, spec.expPruneHeight, actualHeightToPrune)
|
||||
|
||||
// Ensure async pruning is done
|
||||
isPruned := func() bool {
|
||||
ms.Commit() // to flush the batch with the pruned heights
|
||||
for v := int64(1); v <= actualHeightToPrune; v++ {
|
||||
if _, err := ms.CacheMultiStoreWithVersion(v); err == nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
require.Eventually(t, isPruned, 1*time.Second, 10*time.Millisecond, "expected error when loading pruned heights")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestUnevenStoresHeightCheck tests if loading root store correctly errors when
|
||||
// there's any module store with the wrong height
|
||||
func TestUnevenStoresHeightCheck(t *testing.T) {
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"compress/zlib"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
@ -108,10 +109,15 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b
|
||||
|
||||
type mockSnapshotter struct {
|
||||
items [][]byte
|
||||
announcedHeights map[int64]struct{}
|
||||
prunedHeights map[int64]struct{}
|
||||
snapshotInterval uint64
|
||||
}
|
||||
|
||||
func (m *mockSnapshotter) AnnounceSnapshotHeight(height int64) {
|
||||
m.announcedHeights[height] = struct{}{}
|
||||
}
|
||||
|
||||
func (m *mockSnapshotter) Restore(
|
||||
height uint64, format uint32, protoReader protoio.Reader,
|
||||
) (snapshottypes.SnapshotItem, error) {
|
||||
@ -160,6 +166,9 @@ func (m *mockSnapshotter) SupportedFormats() []uint32 {
|
||||
}
|
||||
|
||||
func (m *mockSnapshotter) PruneSnapshotHeight(height int64) {
|
||||
if _, ok := m.announcedHeights[height]; !ok {
|
||||
panic(fmt.Sprintf("snap height %d was not announced", height))
|
||||
}
|
||||
m.prunedHeights[height] = struct{}{}
|
||||
}
|
||||
|
||||
@ -171,9 +180,12 @@ func (m *mockSnapshotter) SetSnapshotInterval(snapshotInterval uint64) {
|
||||
m.snapshotInterval = snapshotInterval
|
||||
}
|
||||
|
||||
var _ snapshottypes.Snapshotter = (*mockErrorSnapshotter)(nil)
|
||||
|
||||
type mockErrorSnapshotter struct{}
|
||||
|
||||
var _ snapshottypes.Snapshotter = (*mockErrorSnapshotter)(nil)
|
||||
func (m *mockErrorSnapshotter) AnnounceSnapshotHeight(height int64) {
|
||||
}
|
||||
|
||||
func (m *mockErrorSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
|
||||
return errors.New("mock snapshot error")
|
||||
@ -239,15 +251,17 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
|
||||
|
||||
// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot.
|
||||
type hungSnapshotter struct {
|
||||
ch chan struct{}
|
||||
prunedHeights map[int64]struct{}
|
||||
snapshotInterval uint64
|
||||
ch chan struct{}
|
||||
announcedSnapHeights map[int64]struct{}
|
||||
prunedHeights map[int64]struct{}
|
||||
snapshotInterval uint64
|
||||
}
|
||||
|
||||
func newHungSnapshotter() *hungSnapshotter {
|
||||
return &hungSnapshotter{
|
||||
ch: make(chan struct{}),
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
ch: make(chan struct{}),
|
||||
announcedSnapHeights: make(map[int64]struct{}),
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -260,7 +274,14 @@ func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *hungSnapshotter) AnnounceSnapshotHeight(height int64) {
|
||||
m.announcedSnapHeights[height] = struct{}{}
|
||||
}
|
||||
|
||||
func (m *hungSnapshotter) PruneSnapshotHeight(height int64) {
|
||||
if _, ok := m.announcedSnapHeights[height]; !ok {
|
||||
panic(fmt.Sprintf("snap height %d was not announced", height))
|
||||
}
|
||||
m.prunedHeights[height] = struct{}{}
|
||||
}
|
||||
|
||||
|
||||
@ -37,8 +37,9 @@ type Manager struct {
|
||||
store *Store
|
||||
opts types.SnapshotOptions
|
||||
// multistore is the store from which snapshots are taken.
|
||||
multistore types.Snapshotter
|
||||
logger log.Logger
|
||||
multistore types.Snapshotter
|
||||
snapAnnouncer types.SnapshotAnnouncer
|
||||
logger log.Logger
|
||||
|
||||
mtx sync.Mutex
|
||||
operation operation
|
||||
@ -76,12 +77,17 @@ func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snaps
|
||||
if extensions == nil {
|
||||
extensions = map[string]types.ExtensionSnapshotter{}
|
||||
}
|
||||
var snapAnnouncer types.SnapshotAnnouncer = noopSnapshotAnnouncer{}
|
||||
if v, ok := multistore.(types.SnapshotAnnouncer); ok {
|
||||
snapAnnouncer = v
|
||||
}
|
||||
return &Manager{
|
||||
store: store,
|
||||
opts: opts,
|
||||
multistore: multistore,
|
||||
extensions: extensions,
|
||||
logger: logger,
|
||||
store: store,
|
||||
opts: opts,
|
||||
multistore: multistore,
|
||||
snapAnnouncer: snapAnnouncer,
|
||||
extensions: extensions,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,6 +171,7 @@ func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
|
||||
return nil, errorsmod.Wrap(storetypes.ErrLogic, "no snapshot store configured")
|
||||
}
|
||||
|
||||
m.snapAnnouncer.AnnounceSnapshotHeight(int64(height))
|
||||
defer m.multistore.PruneSnapshotHeight(int64(height))
|
||||
|
||||
err := m.begin(opSnapshot)
|
||||
@ -549,3 +556,10 @@ func (m *Manager) snapshot(height int64) {
|
||||
func (m *Manager) Close() error {
|
||||
return m.store.db.Close()
|
||||
}
|
||||
|
||||
// noopSnapshotAnnouncer is a null object for snapshot announcer.
|
||||
type noopSnapshotAnnouncer struct{}
|
||||
|
||||
// AnnounceSnapshotHeight does nothing.
|
||||
func (n noopSnapshotAnnouncer) AnnounceSnapshotHeight(height int64) {
|
||||
}
|
||||
|
||||
@ -68,8 +68,9 @@ func TestManager_Take(t *testing.T) {
|
||||
{7, 8, 9},
|
||||
}
|
||||
snapshotter := &mockSnapshotter{
|
||||
items: items,
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
items: items,
|
||||
announcedHeights: make(map[int64]struct{}),
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
}
|
||||
extSnapshotter := newExtSnapshotter(10)
|
||||
|
||||
@ -138,7 +139,8 @@ func TestManager_Prune(t *testing.T) {
|
||||
func TestManager_Restore(t *testing.T) {
|
||||
store := setupStore(t)
|
||||
target := &mockSnapshotter{
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
announcedHeights: make(map[int64]struct{}),
|
||||
prunedHeights: make(map[int64]struct{}),
|
||||
}
|
||||
extSnapshotter := newExtSnapshotter(0)
|
||||
manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger())
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package types
|
||||
|
||||
import "math"
|
||||
|
||||
// SnapshotOptions defines the snapshot strategy used when determining which
|
||||
// heights are snapshotted for state sync.
|
||||
type SnapshotOptions struct {
|
||||
@ -10,7 +12,12 @@ type SnapshotOptions struct {
|
||||
KeepRecent uint32
|
||||
}
|
||||
|
||||
// NewSnapshotOptions creates and returns a new SnapshotOptions instance.
|
||||
// It panics if the interval exceeds the maximum value for int64.
|
||||
func NewSnapshotOptions(interval uint64, keepRecent uint32) SnapshotOptions {
|
||||
if interval > math.MaxInt64 {
|
||||
panic("interval must not exceed max int64")
|
||||
}
|
||||
return SnapshotOptions{
|
||||
Interval: interval,
|
||||
KeepRecent: keepRecent,
|
||||
|
||||
37
store/snapshots/types/options_test.go
Normal file
37
store/snapshots/types/options_test.go
Normal file
@ -0,0 +1,37 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewSnapshotOptions(t *testing.T) {
|
||||
specs := map[string]struct {
|
||||
srcInterval uint64
|
||||
expPanic bool
|
||||
}{
|
||||
"valid ": {
|
||||
srcInterval: 1,
|
||||
},
|
||||
"max interval ": {
|
||||
srcInterval: math.MaxInt64,
|
||||
},
|
||||
"exceeds max interval ": {
|
||||
srcInterval: math.MaxInt64 + 1,
|
||||
expPanic: true,
|
||||
},
|
||||
}
|
||||
for name, spec := range specs {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if spec.expPanic {
|
||||
assert.Panics(t, func() {
|
||||
NewSnapshotOptions(spec.srcInterval, 2)
|
||||
})
|
||||
return
|
||||
}
|
||||
NewSnapshotOptions(spec.srcInterval, 10)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -4,6 +4,12 @@ import (
|
||||
protoio "github.com/cosmos/gogoproto/io"
|
||||
)
|
||||
|
||||
// SnapshotAnnouncer defines an interface for announcing snapshot initiation at a specified height.
|
||||
type SnapshotAnnouncer interface {
|
||||
// AnnounceSnapshotHeight informs the underlying system of a snapshot being initiated at a given height.
|
||||
AnnounceSnapshotHeight(height int64)
|
||||
}
|
||||
|
||||
// Snapshotter is something that can create and restore snapshots, consisting of streamed binary
|
||||
// chunks - all of which must be read from the channel and closed. If an unsupported format is
|
||||
// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf).
|
||||
|
||||
Loading…
Reference in New Issue
Block a user