feat(store): remove RawDB dependency from snapshot store (#19608)
This commit is contained in:
parent
3d59013709
commit
55370b0198
@ -31,7 +31,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
|
||||
commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshotsStore, err := snapshots.NewStore(db, t.TempDir())
|
||||
snapshotsStore, err := snapshots.NewStore(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshotsManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, log.NewNopLogger())
|
||||
|
||||
@ -17,7 +17,6 @@ import (
|
||||
errorsmod "cosmossdk.io/errors"
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store/v2"
|
||||
dbm "cosmossdk.io/store/v2/db"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
|
||||
)
|
||||
@ -189,7 +188,7 @@ func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 {
|
||||
// The snapshot will complete when the returned closer is called.
|
||||
func setupBusyManager(t *testing.T) *snapshots.Manager {
|
||||
t.Helper()
|
||||
store, err := snapshots.NewStore(dbm.NewMemDB(), t.TempDir())
|
||||
store, err := snapshots.NewStore(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
hung := newHungCommitSnapshotter()
|
||||
mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
|
||||
@ -292,6 +291,7 @@ func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadR
|
||||
|
||||
// GetTempDir returns a writable temporary director for the test to use.
|
||||
func GetTempDir(tb testing.TB) string {
|
||||
//return "/tmp/snapshots"
|
||||
tb.Helper()
|
||||
// os.MkDir() is used instead of testing.T.TempDir()
|
||||
// see https://github.com/cosmos/cosmos-sdk/pull/8475 and
|
||||
|
||||
@ -599,7 +599,4 @@ func (m *Manager) snapshot(height int64) {
|
||||
}
|
||||
|
||||
// Close the snapshot database.
|
||||
func (m *Manager) Close() error {
|
||||
m.logger.Info("snapshotManager Close Database")
|
||||
return m.store.db.Close()
|
||||
}
|
||||
func (m *Manager) Close() error { return nil }
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
dbm "cosmossdk.io/store/v2/db"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
"cosmossdk.io/store/v2/snapshots/types"
|
||||
)
|
||||
@ -237,7 +236,7 @@ func TestManager_Restore(t *testing.T) {
|
||||
|
||||
func TestManager_TakeError(t *testing.T) {
|
||||
snapshotter := &mockErrorCommitSnapshotter{}
|
||||
store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t))
|
||||
store, err := snapshots.NewStore(GetTempDir(t))
|
||||
require.NoError(t, err)
|
||||
manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger())
|
||||
|
||||
|
||||
@ -3,12 +3,15 @@ package snapshots
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
@ -25,7 +28,6 @@ const (
|
||||
|
||||
// Store is a snapshot store, containing snapshot metadata and binary chunks.
|
||||
type Store struct {
|
||||
db store.RawDB
|
||||
dir string
|
||||
|
||||
mtx sync.Mutex
|
||||
@ -33,7 +35,7 @@ type Store struct {
|
||||
}
|
||||
|
||||
// NewStore creates a new snapshot store.
|
||||
func NewStore(db store.RawDB, dir string) (*Store, error) {
|
||||
func NewStore(dir string) (*Store, error) {
|
||||
if dir == "" {
|
||||
return nil, errors.Wrap(store.ErrLogic, "snapshot directory not given")
|
||||
}
|
||||
@ -41,9 +43,12 @@ func NewStore(db store.RawDB, dir string) (*Store, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
err = os.MkdirAll(filepath.Join(dir, "metadata"), 0o750)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot metadata directory %q", dir)
|
||||
}
|
||||
|
||||
return &Store{
|
||||
db: db,
|
||||
dir: dir,
|
||||
saving: make(map[uint64]bool),
|
||||
}, nil
|
||||
@ -58,32 +63,25 @@ func (s *Store) Delete(height uint64, format uint32) error {
|
||||
return errors.Wrapf(store.ErrConflict,
|
||||
"snapshot for height %v format %v is currently being saved", height, format)
|
||||
}
|
||||
b := s.db.NewBatch()
|
||||
defer b.Close()
|
||||
if err := b.Delete(encodeKey(height, format)); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete item in the batch")
|
||||
}
|
||||
if err := b.WriteSync(); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete snapshot for height %v format %v",
|
||||
height, format)
|
||||
}
|
||||
if err := os.RemoveAll(s.pathSnapshot(height, format)); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v",
|
||||
height, format)
|
||||
return errors.Wrapf(err, "failed to delete snapshot chunks for height %v format %v", height, format)
|
||||
}
|
||||
if err := os.RemoveAll(s.pathMetadata(height, format)); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete snapshot metadata for height %v format %v", height, format)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get fetches snapshot info from the database.
|
||||
func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) {
|
||||
bytes, err := s.db.Get(encodeKey(height, format))
|
||||
if _, err := os.Stat(s.pathMetadata(height, format)); os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
bytes, err := os.ReadFile(s.pathMetadata(height, format))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to fetch snapshot metadata for height %v format %v",
|
||||
height, format)
|
||||
}
|
||||
if bytes == nil {
|
||||
return nil, nil
|
||||
}
|
||||
snapshot := &types.Snapshot{}
|
||||
err = proto.Unmarshal(bytes, snapshot)
|
||||
if err != nil {
|
||||
@ -96,44 +94,62 @@ func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) {
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
// Get fetches the latest snapshot from the database, if any.
|
||||
// GetLatest fetches the latest snapshot from the database, if any.
|
||||
func (s *Store) GetLatest() (*types.Snapshot, error) {
|
||||
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32))
|
||||
metadata, err := os.ReadDir(s.pathMetadataDir())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to find latest snapshot")
|
||||
return nil, errors.Wrap(err, "failed to list snapshot metadata")
|
||||
}
|
||||
defer iter.Close()
|
||||
if len(metadata) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// file system may not guarantee the order of the files, so we sort them lexically
|
||||
sort.Slice(metadata, func(i, j int) bool { return metadata[i].Name() < metadata[j].Name() })
|
||||
|
||||
var snapshot *types.Snapshot
|
||||
if iter.Valid() {
|
||||
snapshot = &types.Snapshot{}
|
||||
err := proto.Unmarshal(iter.Value(), snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to decode latest snapshot")
|
||||
}
|
||||
path := filepath.Join(s.pathMetadataDir(), metadata[len(metadata)-1].Name())
|
||||
if err := s.validateMetadataPath(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = iter.Error()
|
||||
return snapshot, errors.Wrap(err, "failed to find latest snapshot")
|
||||
bz, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to read latest snapshot metadata %s", path)
|
||||
}
|
||||
|
||||
snapshot := &types.Snapshot{}
|
||||
err = proto.Unmarshal(bz, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to decode latest snapshot metadata %s", path)
|
||||
}
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
// List lists snapshots, in reverse order (newest first).
|
||||
func (s *Store) List() ([]*types.Snapshot, error) {
|
||||
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32))
|
||||
metadata, err := os.ReadDir(s.pathMetadataDir())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to list snapshots")
|
||||
return nil, errors.Wrap(err, "failed to list snapshot metadata")
|
||||
}
|
||||
defer iter.Close()
|
||||
// file system may not guarantee the order of the files, so we sort them lexically
|
||||
sort.Slice(metadata, func(i, j int) bool { return metadata[i].Name() < metadata[j].Name() })
|
||||
|
||||
snapshots := make([]*types.Snapshot, 0)
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
snapshot := &types.Snapshot{}
|
||||
err := proto.Unmarshal(iter.Value(), snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to decode snapshot info")
|
||||
snapshots := make([]*types.Snapshot, len(metadata))
|
||||
for i, entry := range metadata {
|
||||
path := filepath.Join(s.pathMetadataDir(), entry.Name())
|
||||
if err := s.validateMetadataPath(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
snapshots = append(snapshots, snapshot)
|
||||
bz, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to read snapshot metadata %s", entry.Name())
|
||||
}
|
||||
snapshot := &types.Snapshot{}
|
||||
err = proto.Unmarshal(bz, snapshot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to decode snapshot metadata %s", entry.Name())
|
||||
}
|
||||
snapshots[len(metadata)-1-i] = snapshot
|
||||
}
|
||||
return snapshots, iter.Error()
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
// Load loads a snapshot (both metadata and binary chunks). The chunks must be consumed and closed.
|
||||
@ -188,25 +204,25 @@ func (s *Store) loadChunkFile(height uint64, format, chunk uint32) (io.ReadClose
|
||||
|
||||
// Prune removes old snapshots. The given number of most recent heights (regardless of format) are retained.
|
||||
func (s *Store) Prune(retain uint32) (uint64, error) {
|
||||
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(uint64(math.MaxUint64), math.MaxUint32))
|
||||
metadata, err := os.ReadDir(s.pathMetadataDir())
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to prune snapshots")
|
||||
return 0, errors.Wrap(err, "failed to list snapshot metadata")
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
pruned := uint64(0)
|
||||
prunedHeights := make(map[uint64]bool)
|
||||
skip := make(map[uint64]bool)
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
height, format, err := decodeKey(iter.Key())
|
||||
for i := len(metadata) - 1; i >= 0; i-- {
|
||||
height, format, err := s.parseMetadataFilename(metadata[i].Name())
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to prune snapshots")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if skip[height] || uint32(len(skip)) < retain {
|
||||
skip[height] = true
|
||||
continue
|
||||
}
|
||||
err = s.Delete(height, format)
|
||||
err = s.Delete(height, uint32(format))
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to prune snapshots")
|
||||
}
|
||||
@ -223,7 +239,7 @@ func (s *Store) Prune(retain uint32) (uint64, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return pruned, iter.Error()
|
||||
return pruned, nil
|
||||
}
|
||||
|
||||
// Save saves a snapshot to disk, returning it.
|
||||
@ -249,37 +265,24 @@ func (s *Store) Save(
|
||||
s.mtx.Unlock()
|
||||
}()
|
||||
|
||||
exists, err := s.db.Has(encodeKey(height, format))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
return nil, errors.Wrapf(store.ErrConflict,
|
||||
"snapshot already exists for height %v format %v", height, format)
|
||||
}
|
||||
|
||||
snapshot := &types.Snapshot{
|
||||
Height: height,
|
||||
Format: format,
|
||||
}
|
||||
|
||||
dirCreated := false
|
||||
// create height directory or do nothing
|
||||
if err := os.MkdirAll(s.pathHeight(height), 0o750); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory for height %v", height)
|
||||
}
|
||||
// create format directory or fail (if for example the format directory already exists)
|
||||
if err := os.Mkdir(s.pathSnapshot(height, format), 0o750); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory for height %v format %v", height, format)
|
||||
}
|
||||
|
||||
index := uint32(0)
|
||||
snapshotHasher := sha256.New()
|
||||
chunkHasher := sha256.New()
|
||||
for chunkBody := range chunks {
|
||||
// Only create the snapshot directory on encountering the first chunk.
|
||||
// If the directory disappears during chunk saving,
|
||||
// the whole operation will fail anyway.
|
||||
if !dirCreated {
|
||||
dir := s.pathSnapshot(height, format)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
|
||||
dirCreated = true
|
||||
}
|
||||
|
||||
if err := s.saveChunk(chunkBody, index, snapshot, chunkHasher, snapshotHasher); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -332,13 +335,9 @@ func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to encode snapshot metadata")
|
||||
}
|
||||
b := s.db.NewBatch()
|
||||
defer b.Close()
|
||||
if err := b.Set(encodeKey(snapshot.Height, snapshot.Format), value); err != nil {
|
||||
return errors.Wrap(err, "failed to set snapshot in batch")
|
||||
}
|
||||
if err := b.WriteSync(); err != nil {
|
||||
return errors.Wrap(err, "failed to store snapshot")
|
||||
err = os.WriteFile(s.pathMetadata(snapshot.Height, snapshot.Format), value, 0o600)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to write snapshot metadata")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -353,13 +352,52 @@ func (s *Store) pathSnapshot(height uint64, format uint32) string {
|
||||
return filepath.Join(s.pathHeight(height), strconv.FormatUint(uint64(format), 10))
|
||||
}
|
||||
|
||||
func (s *Store) pathMetadataDir() string {
|
||||
return filepath.Join(s.dir, "metadata")
|
||||
}
|
||||
|
||||
// pathMetadata generates a snapshot metadata path.
|
||||
func (s *Store) pathMetadata(height uint64, format uint32) string {
|
||||
return filepath.Join(s.pathMetadataDir(), fmt.Sprintf("%020d-%08d", height, format))
|
||||
}
|
||||
|
||||
// PathChunk generates a snapshot chunk path.
|
||||
func (s *Store) PathChunk(height uint64, format, chunk uint32) string {
|
||||
return filepath.Join(s.pathSnapshot(height, format), strconv.FormatUint(uint64(chunk), 10))
|
||||
}
|
||||
|
||||
// decodeKey decodes a snapshot key.
|
||||
func decodeKey(k []byte) (uint64, uint32, error) {
|
||||
func (s *Store) parseMetadataFilename(filename string) (height uint64, format uint32, err error) {
|
||||
parts := strings.Split(filename, "-")
|
||||
if len(parts) != 2 {
|
||||
return 0, 0, fmt.Errorf("invalid snapshot metadata filename %s", filename)
|
||||
}
|
||||
height, err = strconv.ParseUint(parts[0], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(err, "invalid snapshot metadata filename %s", filename)
|
||||
}
|
||||
var f uint64
|
||||
f, err = strconv.ParseUint(parts[1], 10, 32)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(err, "invalid snapshot metadata filename %s", filename)
|
||||
}
|
||||
format = uint32(f)
|
||||
if filename != filepath.Base(s.pathMetadata(height, uint32(format))) {
|
||||
return 0, 0, fmt.Errorf("invalid snapshot metadata filename %s", filename)
|
||||
}
|
||||
return height, format, nil
|
||||
}
|
||||
|
||||
func (s *Store) validateMetadataPath(path string) error {
|
||||
dir, f := filepath.Split(path)
|
||||
if dir != fmt.Sprintf("%s/", s.pathMetadataDir()) {
|
||||
return fmt.Errorf("invalid snapshot metadata path %s", path)
|
||||
}
|
||||
_, _, err := s.parseMetadataFilename(f)
|
||||
return err
|
||||
}
|
||||
|
||||
// legacyV1DecodeKey decodes a legacy snapshot key used in a raw kv store.
|
||||
func legacyV1DecodeKey(k []byte) (uint64, uint32, error) {
|
||||
if len(k) != 13 {
|
||||
return 0, 0, errors.Wrapf(store.ErrLogic, "invalid snapshot key with length %v", len(k))
|
||||
}
|
||||
@ -372,11 +410,29 @@ func decodeKey(k []byte) (uint64, uint32, error) {
|
||||
return height, format, nil
|
||||
}
|
||||
|
||||
// encodeKey encodes a snapshot key.
|
||||
func encodeKey(height uint64, format uint32) []byte {
|
||||
// legacyV1EncodeKey encodes a snapshot key for use in a raw kv store.
|
||||
func legacyV1EncodeKey(height uint64, format uint32) []byte {
|
||||
k := make([]byte, 13)
|
||||
k[0] = keyPrefixSnapshot
|
||||
binary.BigEndian.PutUint64(k[1:], height)
|
||||
binary.BigEndian.PutUint32(k[9:], format)
|
||||
return k
|
||||
}
|
||||
|
||||
func (s *Store) MigrateFromV1(db store.RawDB) error {
|
||||
itr, err := db.Iterator(legacyV1EncodeKey(0, 0), legacyV1EncodeKey(math.MaxUint64, math.MaxUint32))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer itr.Close()
|
||||
for ; itr.Valid(); itr.Next() {
|
||||
height, format, err := legacyV1DecodeKey(itr.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.WriteFile(s.pathMetadata(height, format), itr.Value(), 0o600); err != nil {
|
||||
return errors.Wrapf(err, "failed to write snapshot metadata %q", s.pathMetadata(height, format))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -10,14 +10,13 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dbm "cosmossdk.io/store/v2/db"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
"cosmossdk.io/store/v2/snapshots/types"
|
||||
)
|
||||
|
||||
func setupStore(t *testing.T) *snapshots.Store {
|
||||
t.Helper()
|
||||
store, err := snapshots.NewStore(dbm.NewMemDB(), GetTempDir(t))
|
||||
store, err := snapshots.NewStore(GetTempDir(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = store.Save(1, 1, makeChunks([][]byte{
|
||||
@ -42,13 +41,13 @@ func setupStore(t *testing.T) *snapshots.Store {
|
||||
|
||||
func TestNewStore(t *testing.T) {
|
||||
tempdir := GetTempDir(t)
|
||||
_, err := snapshots.NewStore(dbm.NewMemDB(), tempdir)
|
||||
_, err := snapshots.NewStore(tempdir)
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestNewStore_ErrNoDir(t *testing.T) {
|
||||
_, err := snapshots.NewStore(dbm.NewMemDB(), "")
|
||||
_, err := snapshots.NewStore("")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user