refactor(store): extract (*Store).saveChunk method (#15144)
This commit is contained in:
parent
747c29e8fa
commit
caf94106cb
@ -3,6 +3,7 @@ package snapshots
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
@ -256,37 +257,27 @@ func (s *Store) Save(
|
||||
Height: height,
|
||||
Format: format,
|
||||
}
|
||||
|
||||
dirCreated := false
|
||||
index := uint32(0)
|
||||
snapshotHasher := sha256.New()
|
||||
chunkHasher := sha256.New()
|
||||
for chunkBody := range chunks {
|
||||
defer chunkBody.Close() //nolint:staticcheck
|
||||
dir := s.pathSnapshot(height, format)
|
||||
err = os.MkdirAll(dir, 0o755)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
path := s.pathChunk(height, format, index)
|
||||
file, err := os.Create(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot chunk file %q", path)
|
||||
}
|
||||
defer file.Close() //nolint:staticcheck
|
||||
// Only create the snapshot directory on encountering the first chunk.
|
||||
// If the directory disappears during chunk saving,
|
||||
// the whole operation will fail anyway.
|
||||
if !dirCreated {
|
||||
dir := s.pathSnapshot(height, format)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create snapshot directory %q", dir)
|
||||
}
|
||||
|
||||
chunkHasher.Reset()
|
||||
_, err = io.Copy(io.MultiWriter(file, chunkHasher, snapshotHasher), chunkBody)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to generate snapshot chunk %v", index)
|
||||
dirCreated = true
|
||||
}
|
||||
err = file.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to close snapshot chunk %v", index)
|
||||
|
||||
if err := s.saveChunk(chunkBody, index, snapshot, chunkHasher, snapshotHasher); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = chunkBody.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to close snapshot chunk %v", index)
|
||||
}
|
||||
snapshot.Metadata.ChunkHashes = append(snapshot.Metadata.ChunkHashes, chunkHasher.Sum(nil))
|
||||
index++
|
||||
}
|
||||
snapshot.Chunks = index
|
||||
@ -294,6 +285,36 @@ func (s *Store) Save(
|
||||
return snapshot, s.saveSnapshot(snapshot)
|
||||
}
|
||||
|
||||
// saveChunk saves the given chunkBody with the given index to its appropriate path on disk.
|
||||
// The hash of the chunk is appended to the snapshot's metadata,
|
||||
// and the overall snapshot hash is updated with the chunk content too.
|
||||
func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types.Snapshot, chunkHasher, snapshotHasher hash.Hash) error {
|
||||
defer chunkBody.Close()
|
||||
|
||||
path := s.pathChunk(snapshot.Height, snapshot.Format, index)
|
||||
chunkFile, err := os.Create(path)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create snapshot chunk file %q", path)
|
||||
}
|
||||
defer chunkFile.Close()
|
||||
|
||||
chunkHasher.Reset()
|
||||
if _, err := io.Copy(io.MultiWriter(chunkFile, chunkHasher, snapshotHasher), chunkBody); err != nil {
|
||||
return errors.Wrapf(err, "failed to generate snapshot chunk %d", index)
|
||||
}
|
||||
|
||||
if err := chunkFile.Close(); err != nil {
|
||||
return errors.Wrapf(err, "failed to close snapshot chunk file %d", index)
|
||||
}
|
||||
|
||||
if err := chunkBody.Close(); err != nil {
|
||||
return errors.Wrapf(err, "failed to close snapshot chunk body %d", index)
|
||||
}
|
||||
|
||||
snapshot.Metadata.ChunkHashes = append(snapshot.Metadata.ChunkHashes, chunkHasher.Sum(nil))
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveSnapshot saves snapshot metadata to the database.
|
||||
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
|
||||
value, err := proto.Marshal(snapshot)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user