feat: Make extension snapshotter interface safer to use (#11825)

* Make extension snapshotter interface safer to use

Closes: #11824
Solution:
- Use new methods `SnapshotExtension`/`RestoreExtension` to handle payload stream specifically.
- Improve unit tests.

* update changelog

* Update snapshots/types/util.go

* changelog

* go linter

* Update CHANGELOG.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
yihuang 2022-08-18 15:33:55 +08:00 committed by GitHub
parent 0ed7360921
commit e397434d9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 152 additions and 30 deletions

View File

@ -105,6 +105,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
A SendEnabled query has been added to both GRPC and CLI.
* (appModule) Remove `Route`, `QuerierRoute` and `LegacyQuerierHandler` from AppModule Interface.
* (x/modules) Remove all LegacyQueries and related code from modules
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.
### CLI Breaking Changes

View File

@ -3,10 +3,11 @@
## Changelog
- Jan 19, 2022: Initial Draft
- Apr 29, 2022: Safer extension snapshotter interface
## Status
Draft, Under Implementation
Implemented
## Abstract
@ -107,11 +108,16 @@ func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) e
On top of the existing `Snapshotter` interface for the `multistore`, we add `ExtensionSnapshotter` interface for the extension snapshotters. Three more function signatures: `SnapshotFormat()`, `SupportedFormats()` and `SnapshotName()` are added to `ExtensionSnapshotter`.
```go
// ExtensionPayloadReader read extension payloads,
// it returns io.EOF when reached either end of stream or the extension boundaries.
type ExtensionPayloadReader = func() ([]byte, error)
// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream.
type ExtensionPayloadWriter = func([]byte) error
// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter
// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string
@ -120,6 +126,14 @@ type ExtensionSnapshotter interface {
// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32
// SnapshotExtension writes extension payloads into the underlying protobuf stream.
SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error
// RestoreExtension restores an extension state snapshot,
// the payload reader returns `io.EOF` when reached the extension boundaries.
RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error
}
```

View File

@ -18,6 +18,7 @@ import (
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/testutil"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
}
// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)
@ -74,8 +75,20 @@ func snapshotItems(items [][]byte) [][]byte {
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
_ = snapshottypes.WriteExtensionItem(protoWriter, item)
_ = snapshottypes.WriteExtensionPayload(protoWriter, item)
}
// write extension metadata
_ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
Name: ext.SnapshotName(),
Format: ext.SnapshotFormat(),
},
},
})
_ = ext.SnapshotExtension(0, func(payload []byte) error {
return snapshottypes.WriteExtensionPayload(protoWriter, payload)
})
_ = protoWriter.Close()
_ = zWriter.Close()
_ = bufWriter.Flush()
@ -110,10 +123,11 @@ func (m *mockSnapshotter) Restore(
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
var item snapshottypes.SnapshotItem
m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
item.Reset()
err := protoReader.ReadMsg(&item)
if err == io.EOF {
break
} else if err != nil {
@ -121,17 +135,17 @@ func (m *mockSnapshotter) Restore(
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
break
}
m.items = append(m.items, payload.Payload)
}
return snapshottypes.SnapshotItem{}, nil
return item, nil
}
func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := snapshottypes.WriteExtensionItem(protoWriter, item); err != nil {
if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil {
return err
}
}
@ -216,3 +230,52 @@ func (m *hungSnapshotter) Restore(
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}
type extSnapshotter struct {
state []uint64
}
func newExtSnapshotter(count int) *extSnapshotter {
state := make([]uint64, 0, count)
for i := 0; i < count; i++ {
state = append(state, uint64(i))
}
return &extSnapshotter{
state,
}
}
func (s *extSnapshotter) SnapshotName() string {
return "mock"
}
func (s *extSnapshotter) SnapshotFormat() uint32 {
return 1
}
func (s *extSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}
func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error {
for _, i := range s.state {
if err := payloadWriter(sdk.Uint64ToBigEndian(uint64(i))); err != nil {
return err
}
}
return nil
}
func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error {
for {
payload, err := payloadReader()
if err == io.EOF {
break
} else if err != nil {
return err
}
s.state = append(s.state, sdk.BigEndianToUint64(payload))
}
// finalize restoration
return nil
}

View File

@ -84,6 +84,9 @@ func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snaps
// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
if m.extensions == nil {
m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions))
}
for _, extension := range extensions {
name := extension.SnapshotName()
if _, ok := m.extensions[name]; ok {
@ -215,7 +218,10 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
streamWriter.CloseWithError(err)
return
}
if err := extension.Snapshot(height, streamWriter); err != nil {
payloadWriter := func(payload []byte) error {
return types.WriteExtensionPayload(streamWriter, payload)
}
if err := extension.SnapshotExtension(height, payloadWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
@ -305,24 +311,40 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
var nextItem types.SnapshotItem
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()
next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
// payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries.
payloadReader := func() ([]byte, error) {
nextItem.Reset()
if err := streamReader.ReadMsg(&nextItem); err != nil {
return nil, err
}
payload := nextItem.GetExtensionPayload()
if payload == nil {
return nil, io.EOF
}
return payload.Payload, nil
}
nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
if err != nil {
return sdkerrors.Wrap(err, "multistore restore")
}
for {
if next.Item == nil {
if nextItem.Item == nil {
// end of stream
break
}
metadata := next.GetExtension()
metadata := nextItem.GetExtension()
if metadata == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item)
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item)
}
extension, ok := m.extensions[metadata.Name]
if !ok {
@ -331,10 +353,14 @@ func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.Re
if !IsFormatSupported(extension, metadata.Format) {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name)
}
next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader)
if err != nil {
if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil {
return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name)
}
if nextItem.GetExtensionPayload() != nil {
return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name)
}
}
return nil
}

View File

@ -68,11 +68,15 @@ func TestManager_Take(t *testing.T) {
items: items,
prunedHeights: make(map[int64]struct{}),
}
expectChunks := snapshotItems(items)
extSnapshotter := newExtSnapshotter(10)
expectChunks := snapshotItems(items, extSnapshotter)
manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)
// nil manager should return error
_, err := (*snapshots.Manager)(nil).Create(1)
_, err = (*snapshots.Manager)(nil).Create(1)
require.Error(t, err)
// creating a snapshot at a lower height than the latest should error
@ -91,7 +95,7 @@ func TestManager_Take(t *testing.T) {
Height: 5,
Format: snapshotter.SnapshotFormat(),
Chunks: 1,
Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2},
Hash: []uint8{0x89, 0xfa, 0x18, 0xbc, 0x5a, 0xe3, 0xdc, 0x36, 0xa6, 0x95, 0x5, 0x17, 0xf9, 0x2, 0x1a, 0x55, 0x36, 0x16, 0x5d, 0x4b, 0x8b, 0x2b, 0x3d, 0xfd, 0xe, 0x2f, 0xb6, 0x40, 0x6b, 0xc3, 0xbc, 0x23},
Metadata: types.Metadata{
ChunkHashes: checksums(expectChunks),
},
@ -133,7 +137,10 @@ func TestManager_Restore(t *testing.T) {
target := &mockSnapshotter{
prunedHeights: make(map[int64]struct{}),
}
extSnapshotter := newExtSnapshotter(0)
manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger())
err := manager.RegisterExtensions(extSnapshotter)
require.NoError(t, err)
expectItems := [][]byte{
{1, 2, 3},
@ -141,10 +148,10 @@ func TestManager_Restore(t *testing.T) {
{7, 8, 9},
}
chunks := snapshotItems(expectItems)
chunks := snapshotItems(expectItems, newExtSnapshotter(10))
// Restore errors on invalid format
err := manager.Restore(types.Snapshot{
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 0,
Hash: []byte{1, 2, 3},
@ -204,6 +211,7 @@ func TestManager_Restore(t *testing.T) {
}
assert.Equal(t, expectItems, target.items)
assert.Equal(t, 10, len(extSnapshotter.state))
// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{

View File

@ -22,17 +22,20 @@ type Snapshotter interface {
// to determine which heights to retain until after the snapshot is complete.
SetSnapshotInterval(snapshotInterval uint64)
// Restore restores a state snapshot, taking snapshot chunk readers as input.
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
// Restore restores a state snapshot, taking the reader of protobuf message stream as input.
Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error)
}
// ExtensionPayloadReader read extension payloads,
// it returns io.EOF when reached either end of stream or the extension boundaries.
type ExtensionPayloadReader = func() ([]byte, error)
// ExtensionPayloadWriter is a helper to write extension payloads to underlying stream.
type ExtensionPayloadWriter = func([]byte) error
// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter
// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string
@ -43,4 +46,11 @@ type ExtensionSnapshotter interface {
// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32
// SnapshotExtension writes extension payloads into the underlying protobuf stream.
SnapshotExtension(height uint64, payloadWriter ExtensionPayloadWriter) error
// RestoreExtension restores an extension state snapshot,
// the payload reader returns `io.EOF` when reached the extension boundaries.
RestoreExtension(height uint64, format uint32, payloadReader ExtensionPayloadReader) error
}

View File

@ -4,12 +4,12 @@ import (
protoio "github.com/gogo/protobuf/io"
)
// WriteExtensionItem writes an item payload for current extension snapshotter.
func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error {
// WriteExtensionPayload writes an extension payload for current extension snapshotter.
func WriteExtensionPayload(protoWriter protoio.Writer, payload []byte) error {
return protoWriter.WriteMsg(&SnapshotItem{
Item: &SnapshotItem_ExtensionPayload{
ExtensionPayload: &SnapshotExtensionPayload{
Payload: item,
Payload: payload,
},
},
})