## Description Closes: #7340 - Support registering multiple snapshotters in snapshot manager. - Append the extension snapshotters to existing snapshot stream. ~TODO: testing.~ - existing tests are fixed --- ### Author Checklist *All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues.* I have... - [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] added `!` to the type prefix if API or client breaking change - [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [ ] provided a link to the relevant issue or specification - [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [ ] added a changelog entry to `CHANGELOG.md` - [ ] included comments for [documenting Go code](https://blog.golang.org/godoc) - [ ] updated the relevant documentation or specification - [ ] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist *All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items.* I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed `!` in the type prefix if API or client breaking change - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable)
110 lines
3.1 KiB
Go
110 lines
3.1 KiB
Go
package snapshots
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/zlib"
|
|
"io"
|
|
|
|
protoio "github.com/gogo/protobuf/io"
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
|
|
)
|
|
|
|
const (
|
|
// Do not change chunk size without new snapshot format (must be uniform across nodes)
|
|
snapshotChunkSize = uint64(10e6)
|
|
snapshotBufferSize = int(snapshotChunkSize)
|
|
// Do not change compression level without new snapshot format (must be uniform across nodes)
|
|
snapshotCompressionLevel = 7
|
|
)
|
|
|
|
// StreamWriter set up a stream pipeline to serialize snapshot nodes:
|
|
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
|
|
type StreamWriter struct {
|
|
chunkWriter *ChunkWriter
|
|
bufWriter *bufio.Writer
|
|
zWriter *zlib.Writer
|
|
protoWriter protoio.WriteCloser
|
|
}
|
|
|
|
// NewStreamWriter set up a stream pipeline to serialize snapshot DB records.
|
|
func NewStreamWriter(ch chan<- io.ReadCloser) *StreamWriter {
|
|
chunkWriter := NewChunkWriter(ch, snapshotChunkSize)
|
|
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
|
|
zWriter, err := zlib.NewWriterLevel(bufWriter, snapshotCompressionLevel)
|
|
if err != nil {
|
|
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
|
|
return nil
|
|
}
|
|
protoWriter := protoio.NewDelimitedWriter(zWriter)
|
|
return &StreamWriter{
|
|
chunkWriter: chunkWriter,
|
|
bufWriter: bufWriter,
|
|
zWriter: zWriter,
|
|
protoWriter: protoWriter,
|
|
}
|
|
}
|
|
|
|
// WriteMsg implements protoio.Write interface
|
|
func (sw *StreamWriter) WriteMsg(msg proto.Message) error {
|
|
return sw.protoWriter.WriteMsg(msg)
|
|
}
|
|
|
|
// Close implements io.Closer interface
|
|
func (sw *StreamWriter) Close() error {
|
|
if err := sw.protoWriter.Close(); err != nil {
|
|
sw.chunkWriter.CloseWithError(err)
|
|
return err
|
|
}
|
|
if err := sw.zWriter.Close(); err != nil {
|
|
sw.chunkWriter.CloseWithError(err)
|
|
return err
|
|
}
|
|
if err := sw.bufWriter.Flush(); err != nil {
|
|
sw.chunkWriter.CloseWithError(err)
|
|
return err
|
|
}
|
|
return sw.chunkWriter.Close()
|
|
}
|
|
|
|
// CloseWithError pass error to chunkWriter
|
|
func (sw *StreamWriter) CloseWithError(err error) {
|
|
sw.chunkWriter.CloseWithError(err)
|
|
}
|
|
|
|
// StreamReader set up a restore stream pipeline
|
|
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
|
|
type StreamReader struct {
|
|
chunkReader *ChunkReader
|
|
zReader io.ReadCloser
|
|
protoReader protoio.ReadCloser
|
|
}
|
|
|
|
// NewStreamReader set up a restore stream pipeline.
|
|
func NewStreamReader(chunks <-chan io.ReadCloser) (*StreamReader, error) {
|
|
chunkReader := NewChunkReader(chunks)
|
|
zReader, err := zlib.NewReader(chunkReader)
|
|
if err != nil {
|
|
return nil, sdkerrors.Wrap(err, "zlib failure")
|
|
}
|
|
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
|
|
return &StreamReader{
|
|
chunkReader: chunkReader,
|
|
zReader: zReader,
|
|
protoReader: protoReader,
|
|
}, nil
|
|
}
|
|
|
|
// ReadMsg implements protoio.Reader interface
|
|
func (sr *StreamReader) ReadMsg(msg proto.Message) error {
|
|
return sr.protoReader.ReadMsg(msg)
|
|
}
|
|
|
|
// Close implements io.Closer interface
|
|
func (sr *StreamReader) Close() error {
|
|
sr.protoReader.Close()
|
|
sr.zReader.Close()
|
|
return sr.chunkReader.Close()
|
|
}
|