feat: save restored snapshot locally (#16060)
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
parent
b500b0104f
commit
81ba019e5e
@ -30,10 +30,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
- [#15712](https://github.com/cosmos/cosmos-sdk/pull/15712) Add `WorkingHash` function to the store interface to get the current app hash before commit.
|
||||
* [#14645](https://github.com/cosmos/cosmos-sdk/pull/14645) Add limit to the length of key and value.
|
||||
* [#15683](https://github.com/cosmos/cosmos-sdk/pull/15683) `rootmulti.Store.CacheMultiStoreWithVersion` now can handle loading archival states that don't persist any of the module stores the current state has.
|
||||
* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally.
|
||||
|
||||
## [v0.1.0-alpha.1](https://github.com/cosmos/cosmos-sdk/releases/tag/store%2Fv0.1.0-alpha.1) - 2023-03-17
|
||||
|
||||
### Features
|
||||
|
||||
* [#14746](https://github.com/cosmos/cosmos-sdk/pull/14746) The `store` module is extracted to have a separate go.mod file which allows it be a standalone module.
|
||||
* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
|
||||
* [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@ -39,12 +40,12 @@ type Manager struct {
|
||||
multistore types.Snapshotter
|
||||
logger log.Logger
|
||||
|
||||
mtx sync.Mutex
|
||||
operation operation
|
||||
chRestore chan<- io.ReadCloser
|
||||
chRestoreDone <-chan restoreDone
|
||||
restoreChunkHashes [][]byte
|
||||
restoreChunkIndex uint32
|
||||
mtx sync.Mutex
|
||||
operation operation
|
||||
chRestore chan<- uint32
|
||||
chRestoreDone <-chan restoreDone
|
||||
restoreSnapshot *types.Snapshot
|
||||
restoreChunkIndex uint32
|
||||
}
|
||||
|
||||
// operation represents a Manager operation. Only one operation can be in progress at a time.
|
||||
@ -62,7 +63,8 @@ const (
|
||||
opPrune operation = "prune"
|
||||
opRestore operation = "restore"
|
||||
|
||||
chunkBufferSize = 4
|
||||
chunkBufferSize = 4
|
||||
chunkIDBufferSize = 1024
|
||||
|
||||
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
|
||||
)
|
||||
@ -135,7 +137,7 @@ func (m *Manager) endLocked() {
|
||||
m.chRestore = nil
|
||||
}
|
||||
m.chRestoreDone = nil
|
||||
m.restoreChunkHashes = nil
|
||||
m.restoreSnapshot = nil
|
||||
m.restoreChunkIndex = 0
|
||||
}
|
||||
|
||||
@ -291,11 +293,18 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
|
||||
}
|
||||
|
||||
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
|
||||
chChunks := make(chan io.ReadCloser, chunkBufferSize)
|
||||
chChunkIDs := make(chan uint32, chunkIDBufferSize)
|
||||
chDone := make(chan restoreDone, 1)
|
||||
|
||||
dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
|
||||
if err := os.MkdirAll(dir, 0o750); err != nil {
|
||||
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
|
||||
chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs)
|
||||
|
||||
go func() {
|
||||
err := m.restoreSnapshot(snapshot, chChunks)
|
||||
err := m.doRestoreSnapshot(snapshot, chChunks)
|
||||
chDone <- restoreDone{
|
||||
complete: err == nil,
|
||||
err: err,
|
||||
@ -303,17 +312,39 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
|
||||
close(chDone)
|
||||
}()
|
||||
|
||||
m.chRestore = chChunks
|
||||
m.chRestore = chChunkIDs
|
||||
m.chRestoreDone = chDone
|
||||
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
|
||||
m.restoreSnapshot = &snapshot
|
||||
m.restoreChunkIndex = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
|
||||
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
|
||||
var nextItem types.SnapshotItem
|
||||
func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser {
|
||||
chunks := make(chan io.ReadCloser, chunkBufferSize)
|
||||
go func() {
|
||||
defer close(chunks)
|
||||
|
||||
for chunkID := range chunkIDs {
|
||||
chunk, err := m.store.loadChunkFile(height, format, chunkID)
|
||||
if err != nil {
|
||||
m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err)
|
||||
break
|
||||
}
|
||||
chunks <- chunk
|
||||
}
|
||||
}()
|
||||
|
||||
return chunks
|
||||
}
|
||||
|
||||
// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
|
||||
func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
|
||||
dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
|
||||
if err := os.MkdirAll(dir, 0o750); err != nil {
|
||||
return errorsmod.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
|
||||
var nextItem types.SnapshotItem
|
||||
streamReader, err := NewStreamReader(chChunks)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -375,7 +406,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
|
||||
return false, errorsmod.Wrap(storetypes.ErrLogic, "no restore operation in progress")
|
||||
}
|
||||
|
||||
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
|
||||
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
|
||||
return false, errorsmod.Wrap(storetypes.ErrLogic, "received unexpected chunk")
|
||||
}
|
||||
|
||||
@ -392,19 +423,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
|
||||
|
||||
// Verify the chunk hash.
|
||||
hash := sha256.Sum256(chunk)
|
||||
expected := m.restoreChunkHashes[m.restoreChunkIndex]
|
||||
expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex]
|
||||
if !bytes.Equal(hash[:], expected) {
|
||||
return false, errorsmod.Wrapf(types.ErrChunkHashMismatch,
|
||||
"expected %x, got %x", hash, expected)
|
||||
}
|
||||
|
||||
if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil {
|
||||
return false, errorsmod.Wrapf(err, "save chunk content %d", m.restoreChunkIndex)
|
||||
}
|
||||
|
||||
// Pass the chunk to the restore, and wait for completion if it was the final one.
|
||||
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
|
||||
m.chRestore <- m.restoreChunkIndex
|
||||
m.restoreChunkIndex++
|
||||
|
||||
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
|
||||
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
|
||||
close(m.chRestore)
|
||||
m.chRestore = nil
|
||||
|
||||
// the chunks are all written into files, we can save the snapshot to the db,
|
||||
// even if the restoration may not completed yet.
|
||||
if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil {
|
||||
return false, errorsmod.Wrap(err, "save restoring snapshot")
|
||||
}
|
||||
|
||||
done := <-m.chRestoreDone
|
||||
m.endLocked()
|
||||
if done.err != nil {
|
||||
@ -413,6 +455,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
|
||||
if !done.complete {
|
||||
return false, errorsmod.Wrap(storetypes.ErrLogic, "restore ended prematurely")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
@ -438,7 +481,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error {
|
||||
}
|
||||
defer m.endLocked()
|
||||
|
||||
return m.restoreSnapshot(*snapshot, ch)
|
||||
return m.doRestoreSnapshot(*snapshot, ch)
|
||||
}
|
||||
|
||||
// sortedExtensionNames sort extension names for deterministic iteration.
|
||||
|
||||
@ -213,6 +213,13 @@ func TestManager_Restore(t *testing.T) {
|
||||
assert.Equal(t, expectItems, target.items)
|
||||
assert.Equal(t, 10, len(extSnapshotter.state))
|
||||
|
||||
// The snapshot is saved in local snapshot store
|
||||
snapshots, err := store.List()
|
||||
require.NoError(t, err)
|
||||
snapshot := snapshots[0]
|
||||
require.Equal(t, uint64(3), snapshot.Height)
|
||||
require.Equal(t, types.CurrentFormat, snapshot.Format)
|
||||
|
||||
// Starting a new restore should fail now, because the target already has contents.
|
||||
err = manager.Restore(types.Snapshot{
|
||||
Height: 3,
|
||||
|
||||
@ -315,6 +315,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveChunkContent save the chunk to disk
|
||||
func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error {
|
||||
path := s.PathChunk(snapshot.Height, snapshot.Format, index)
|
||||
return os.WriteFile(path, chunk, 0o600)
|
||||
}
|
||||
|
||||
// saveSnapshot saves snapshot metadata to the database.
|
||||
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
|
||||
value, err := proto.Marshal(snapshot)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user