cosmos-sdk/snapshots/stream.go
Julien Robert 1fe59eb22a
ci: improve error checking (errcheck linter) (#11195)
## Description

Implements part of #7258

Check some of currently unchecked errors.

- [x] baseapp
- [x] client
- [x] codec
- [x] crypto
- [x] server
- [x] simapp
- [ ] snapshots
- [ ] store
- [x] testutil
- [ ] types
- [ ] modules

---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed 
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
2022-04-13 06:46:51 +00:00

118 lines
3.2 KiB
Go

package snapshots
import (
"bufio"
"compress/zlib"
"io"
protoio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
const (
// Do not change chunk size without new snapshot format (must be uniform across nodes)
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
// Do not change compression level without new snapshot format (must be uniform across nodes)
snapshotCompressionLevel = 7
)
// StreamWriter set up a stream pipeline to serialize snapshot nodes:
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
type StreamWriter struct {
chunkWriter *ChunkWriter
bufWriter *bufio.Writer
zWriter *zlib.Writer
protoWriter protoio.WriteCloser
}
// NewStreamWriter set up a stream pipeline to serialize snapshot DB records.
func NewStreamWriter(ch chan<- io.ReadCloser) *StreamWriter {
chunkWriter := NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, err := zlib.NewWriterLevel(bufWriter, snapshotCompressionLevel)
if err != nil {
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
return nil
}
protoWriter := protoio.NewDelimitedWriter(zWriter)
return &StreamWriter{
chunkWriter: chunkWriter,
bufWriter: bufWriter,
zWriter: zWriter,
protoWriter: protoWriter,
}
}
// WriteMsg implements protoio.Write interface
func (sw *StreamWriter) WriteMsg(msg proto.Message) error {
return sw.protoWriter.WriteMsg(msg)
}
// Close implements io.Closer interface
func (sw *StreamWriter) Close() error {
if err := sw.protoWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.zWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.bufWriter.Flush(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
return sw.chunkWriter.Close()
}
// CloseWithError pass error to chunkWriter
func (sw *StreamWriter) CloseWithError(err error) {
sw.chunkWriter.CloseWithError(err)
}
// StreamReader set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
type StreamReader struct {
chunkReader *ChunkReader
zReader io.ReadCloser
protoReader protoio.ReadCloser
}
// NewStreamReader set up a restore stream pipeline.
func NewStreamReader(chunks <-chan io.ReadCloser) (*StreamReader, error) {
chunkReader := NewChunkReader(chunks)
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return nil, sdkerrors.Wrap(err, "zlib failure")
}
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
return &StreamReader{
chunkReader: chunkReader,
zReader: zReader,
protoReader: protoReader,
}, nil
}
// ReadMsg implements protoio.Reader interface
func (sr *StreamReader) ReadMsg(msg proto.Message) error {
return sr.protoReader.ReadMsg(msg)
}
// Close implements io.Closer interface
func (sr *StreamReader) Close() error {
var err error
if err1 := sr.protoReader.Close(); err1 != nil {
err = err1
}
if err2 := sr.zReader.Close(); err2 != nil {
err = err2
}
if err3 := sr.chunkReader.Close(); err3 != nil {
err = err3
}
return err
}