chore: audit store/streaming/file/service.go (#14234)
This commit is contained in:
parent
611265af76
commit
ae2c762bd3
@ -295,10 +295,10 @@ func DefaultConfig() *Config {
|
||||
Streamers: StreamersConfig{
|
||||
File: FileStreamerConfig{
|
||||
Keys: []string{"*"},
|
||||
WriteDir: "data/file_streamer",
|
||||
WriteDir: "",
|
||||
OutputMetadata: true,
|
||||
StopNodeOnError: true,
|
||||
// NOTICE: the default config don't protect the streamer data integrity
|
||||
// NOTICE: The default config doesn't protect the streamer data integrity
|
||||
// in face of system crash.
|
||||
Fsync: false,
|
||||
},
|
||||
|
||||
@ -212,11 +212,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
|
||||
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
|
||||
write_dir = "{{ .Streamers.File.WriteDir }}"
|
||||
prefix = "{{ .Streamers.File.Prefix }}"
|
||||
|
||||
# output-metadata specifies if output the metadata file which includes the abci request/responses
|
||||
# during processing the block.
|
||||
output-metadata = "{{ .Streamers.File.OutputMetadata }}"
|
||||
|
||||
# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine.
|
||||
stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}"
|
||||
|
||||
# fsync specifies if call fsync after writing the files.
|
||||
fsync = "{{ .Streamers.File.Fsync }}"
|
||||
|
||||
@ -229,7 +232,6 @@ fsync = "{{ .Streamers.File.Fsync }}"
|
||||
# Setting max_txs to negative 1 (-1) will disable transactions from being inserted into the mempool.
|
||||
# Setting max_txs to a positive number (> 0) will limit the number of transactions in the mempool, by the specified amount.
|
||||
max-txs = "{{ .Mempool.MaxTxs }}"
|
||||
|
||||
`
|
||||
|
||||
var configTemplate *template.Template
|
||||
|
||||
@ -10,10 +10,10 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"cosmossdk.io/errors"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
"cosmossdk.io/errors"
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/store/types"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
@ -21,7 +21,8 @@ import (
|
||||
|
||||
var _ baseapp.StreamingService = &StreamingService{}
|
||||
|
||||
// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
|
||||
// StreamingService is a concrete implementation of StreamingService that writes
|
||||
// state changes out to files.
|
||||
type StreamingService struct {
|
||||
storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore
|
||||
filePrefix string // optional prefix for each of the generated files
|
||||
@ -31,37 +32,50 @@ type StreamingService struct {
|
||||
|
||||
currentBlockNumber int64
|
||||
blockMetadata types.BlockMetadata
|
||||
// if write the metadata file, otherwise only data file is outputted.
|
||||
|
||||
// outputMetadata, if true, writes additional metadata to file per block
|
||||
outputMetadata bool
|
||||
// if true, when commit failed it will panic and stop the consensus state machine to ensure the
|
||||
// eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data.
|
||||
|
||||
// stopNodeOnErr, if true, will panic and stop the node during ABCI Commit
|
||||
// to ensure eventual consistency of the output, otherwise, any errors are
|
||||
// logged and ignored which could yield data loss in streamed output.
|
||||
stopNodeOnErr bool
|
||||
// if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash.
|
||||
|
||||
// fsync, if true, will execute file Sync to make sure the data is persisted
|
||||
// onto disk, otherwise there is a risk of data loss during any crash.
|
||||
fsync bool
|
||||
}
|
||||
|
||||
// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
|
||||
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c types.Codec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) {
|
||||
func NewStreamingService(
|
||||
writeDir, filePrefix string,
|
||||
storeKeys []types.StoreKey,
|
||||
cdc types.Codec,
|
||||
logger log.Logger,
|
||||
outputMetadata, stopNodeOnErr, fsync bool,
|
||||
) (*StreamingService, error) {
|
||||
// sort storeKeys for deterministic output
|
||||
sort.SliceStable(storeKeys, func(i, j int) bool {
|
||||
return storeKeys[i].Name() < storeKeys[j].Name()
|
||||
})
|
||||
|
||||
// NOTE: We use the same listener for each store.
|
||||
listeners := make([]*types.MemoryListener, len(storeKeys))
|
||||
// in this case, we are using the same listener for each Store
|
||||
for i, key := range storeKeys {
|
||||
listeners[i] = types.NewMemoryListener(key)
|
||||
}
|
||||
// check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not
|
||||
// we don't open a dstFile until we receive our first ABCI message
|
||||
|
||||
// Check that the writeDir exists and is writable so that we can catch the
|
||||
// error here at initialization. If it is not we don't open a dstFile until we
|
||||
// receive our first ABCI message.
|
||||
if err := isDirWriteable(writeDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &StreamingService{
|
||||
storeListeners: listeners,
|
||||
filePrefix: filePrefix,
|
||||
writeDir: writeDir,
|
||||
codec: c,
|
||||
codec: cdc,
|
||||
logger: logger,
|
||||
outputMetadata: outputMetadata,
|
||||
stopNodeOnErr: stopNodeOnErr,
|
||||
@ -69,65 +83,74 @@ func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Listeners satisfies the baseapp.StreamingService interface
|
||||
// It returns the StreamingService's underlying WriteListeners
|
||||
// Use for registering the underlying WriteListeners with the BaseApp
|
||||
// Listeners satisfies the StreamingService interface. It returns the
|
||||
// StreamingService's underlying WriteListeners. Use for registering the
|
||||
// underlying WriteListeners with the BaseApp.
|
||||
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
|
||||
listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners))
|
||||
for _, listener := range fss.storeListeners {
|
||||
listeners[listener.StoreKey()] = []types.WriteListener{listener}
|
||||
}
|
||||
|
||||
return listeners
|
||||
}
|
||||
|
||||
// ListenBeginBlock satisfies the baseapp.ABCIListener interface
|
||||
// It writes the received BeginBlock request and response and the resulting state changes
|
||||
// out to a file as described in the above the naming schema
|
||||
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) {
|
||||
// ListenBeginBlock satisfies the ABCIListener interface. It sets the received
|
||||
// BeginBlock request, response and the current block number. Note, these are
|
||||
// not written to file until ListenCommit is executed and outputMetadata is set,
|
||||
// after which it will be reset again on the next block.
|
||||
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
|
||||
fss.blockMetadata.RequestBeginBlock = &req
|
||||
fss.blockMetadata.ResponseBeginBlock = &res
|
||||
fss.currentBlockNumber = req.Header.Height
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenDeliverTx satisfies the baseapp.ABCIListener interface
|
||||
// It writes the received DeliverTx request and response and the resulting state changes
|
||||
// out to a file as described in the above the naming schema
|
||||
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) {
|
||||
// ListenDeliverTx satisfies the ABCIListener interface. It appends the received
|
||||
// DeliverTx request and response to a list of DeliverTxs objects. Note, these
|
||||
// are not written to file until ListenCommit is executed and outputMetadata is
|
||||
// set, after which it will be reset again on the next block.
|
||||
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
|
||||
fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{
|
||||
Request: &req,
|
||||
Response: &res,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenEndBlock satisfies the baseapp.ABCIListener interface
|
||||
// It writes the received EndBlock request and response and the resulting state changes
|
||||
// out to a file as described in the above the naming schema
|
||||
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) {
|
||||
// ListenEndBlock satisfies the ABCIListener interface. It sets the received
|
||||
// EndBlock request, response and the current block number. Note, these are
|
||||
// not written to file until ListenCommit is executed and outputMetadata is set,
|
||||
// after which it will be reset again on the next block.
|
||||
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
|
||||
fss.blockMetadata.RequestEndBlock = &req
|
||||
fss.blockMetadata.ResponseEndBlock = &res
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenEndBlock satisfies the baseapp.ABCIListener interface
|
||||
// ListenCommit satisfies the ABCIListener interface. It is executed during the
|
||||
// ABCI Commit request and is responsible for writing all staged data to files.
|
||||
// It will only return a non-nil error when stopNodeOnErr is set.
|
||||
func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error {
|
||||
err := fss.doListenCommit(ctx, res)
|
||||
if err != nil {
|
||||
fss.logger.Error("Commit listening hook failed", "height", fss.currentBlockNumber, "err", err)
|
||||
if err := fss.doListenCommit(ctx, res); err != nil {
|
||||
fss.logger.Error("Listen commit failed", "height", fss.currentBlockNumber, "err", err)
|
||||
if fss.stopNodeOnErr {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) {
|
||||
fss.blockMetadata.ResponseCommit = &res
|
||||
|
||||
// write to target files, the file size is written at the beginning, which can be used to detect completeness.
|
||||
// Write to target files, the file size is written at the beginning, which can
|
||||
// be used to detect completeness.
|
||||
metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber)
|
||||
dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber)
|
||||
|
||||
if fss.filePrefix != "" {
|
||||
metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName)
|
||||
dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName)
|
||||
@ -138,6 +161,7 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -147,42 +171,44 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
|
||||
if err := fss.writeBlockData(&buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync)
|
||||
}
|
||||
|
||||
func (fss *StreamingService) writeBlockData(writer io.Writer) error {
|
||||
for _, listener := range fss.storeListeners {
|
||||
cache := listener.PopStateCache()
|
||||
|
||||
for i := range cache {
|
||||
bz, err := fss.codec.MarshalLengthPrefixed(&cache[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = writer.Write(bz); err != nil {
|
||||
|
||||
if _, err := writer.Write(bz); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream satisfies the baseapp.StreamingService interface
|
||||
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
|
||||
return nil
|
||||
}
|
||||
// Stream satisfies the StreamingService interface. It performs a no-op.
|
||||
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil }
|
||||
|
||||
// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
|
||||
func (fss *StreamingService) Close() error {
|
||||
return nil
|
||||
}
|
||||
// Close satisfies the StreamingService interface. It performs a no-op.
|
||||
func (fss *StreamingService) Close() error { return nil }
|
||||
|
||||
// isDirWriteable checks if dir is writable by writing and removing a file
|
||||
// to dir. It returns nil if dir is writable.
|
||||
// to dir. It returns nil if dir is writable. We have to do this as there is no
|
||||
// platform-independent way of determining if a directory is writeable.
|
||||
func isDirWriteable(dir string) error {
|
||||
f := path.Join(dir, ".touch")
|
||||
if err := os.WriteFile(f, []byte(""), 0o600); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Remove(f)
|
||||
}
|
||||
|
||||
@ -192,25 +218,30 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "open file failed: %s", path)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// avoid overriding the real error with file close error
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = errors.Wrapf(err, "close file failed: %s", path)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data))))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "write length prefix failed: %s", path)
|
||||
}
|
||||
|
||||
_, err = f.Write(data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "write block data failed: %s", path)
|
||||
}
|
||||
|
||||
if fsync {
|
||||
err = f.Sync()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "fsync failed: %s", path)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -112,20 +112,23 @@ func TestFileStreamingService(t *testing.T) {
|
||||
if os.Getenv("CI") != "" {
|
||||
t.Skip("Skipping TestFileStreamingService in CI environment")
|
||||
}
|
||||
err := os.Mkdir(testDir, 0o700)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Nil(t, os.Mkdir(testDir, 0o700))
|
||||
defer os.RemoveAll(testDir)
|
||||
|
||||
testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2}
|
||||
testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false)
|
||||
testStreamingService, err := NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false)
|
||||
require.Nil(t, err)
|
||||
require.IsType(t, &StreamingService{}, testStreamingService)
|
||||
require.Equal(t, testPrefix, testStreamingService.filePrefix)
|
||||
require.Equal(t, testDir, testStreamingService.writeDir)
|
||||
require.Equal(t, testMarshaller, testStreamingService.codec)
|
||||
|
||||
testListener1 = testStreamingService.storeListeners[0]
|
||||
testListener2 = testStreamingService.storeListeners[1]
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
testStreamingService.Stream(wg)
|
||||
testListenBlock(t)
|
||||
testStreamingService.Close()
|
||||
@ -133,7 +136,10 @@ func TestFileStreamingService(t *testing.T) {
|
||||
}
|
||||
|
||||
func testListenBlock(t *testing.T) {
|
||||
var expectKVPairsStore1, expectKVPairsStore2 [][]byte
|
||||
var (
|
||||
expectKVPairsStore1 [][]byte
|
||||
expectKVPairsStore2 [][]byte
|
||||
)
|
||||
|
||||
// write state changes
|
||||
testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false)
|
||||
@ -148,6 +154,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey2.Name(),
|
||||
Key: mockKey2,
|
||||
@ -155,6 +162,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey1.Name(),
|
||||
Key: mockKey3,
|
||||
@ -162,6 +170,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair3)
|
||||
expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2)
|
||||
|
||||
@ -182,6 +191,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey2.Name(),
|
||||
Key: mockKey2,
|
||||
@ -189,6 +199,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey2.Name(),
|
||||
Key: mockKey3,
|
||||
@ -196,6 +207,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1)
|
||||
expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2, expectedKVPair3)
|
||||
|
||||
@ -216,6 +228,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey1.Name(),
|
||||
Key: mockKey2,
|
||||
@ -223,6 +236,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey2.Name(),
|
||||
Key: mockKey3,
|
||||
@ -230,6 +244,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair2)
|
||||
expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair1, expectedKVPair3)
|
||||
|
||||
@ -250,6 +265,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey1.Name(),
|
||||
Key: mockKey2,
|
||||
@ -257,6 +273,7 @@ func testListenBlock(t *testing.T) {
|
||||
Delete: false,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{
|
||||
StoreKey: mockStoreKey2.Name(),
|
||||
Key: mockKey3,
|
||||
@ -311,37 +328,46 @@ func readInFile(name string) ([]byte, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
size := sdk.BigEndianToUint64(bz[:8])
|
||||
if len(bz) != int(size)+8 {
|
||||
return nil, errors.New("incomplete file ")
|
||||
}
|
||||
|
||||
return bz[8:], nil
|
||||
}
|
||||
|
||||
// segmentBytes returns all of the protobuf messages contained in the byte array as an array of byte arrays
|
||||
// The messages have their length prefix removed
|
||||
// segmentBytes returns all of the protobuf messages contained in the byte array
|
||||
// as an array of byte arrays. The messages have their length prefix removed.
|
||||
func segmentBytes(bz []byte) ([][]byte, error) {
|
||||
var err error
|
||||
|
||||
segments := make([][]byte, 0)
|
||||
for len(bz) > 0 {
|
||||
var segment []byte
|
||||
|
||||
segment, bz, err = getHeadSegment(bz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
|
||||
return segments, nil
|
||||
}
|
||||
|
||||
// getHeadSegment returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array
|
||||
// getHeadSegment returns the bytes for the leading protobuf object in the byte
|
||||
// array (removing the length prefix) and returns the remainder of the byte array.
|
||||
func getHeadSegment(bz []byte) ([]byte, []byte, error) {
|
||||
size, prefixSize := binary.Uvarint(bz)
|
||||
if prefixSize < 0 {
|
||||
return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize)
|
||||
}
|
||||
|
||||
if size > uint64(len(bz)-prefixSize) {
|
||||
return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize)
|
||||
}
|
||||
|
||||
return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// WriteListener interface for streaming data out from a listenkv.Store
|
||||
// WriteListener interface for streaming data out from a KVStore
|
||||
type WriteListener interface {
|
||||
// if value is nil then it was deleted
|
||||
// storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores
|
||||
@ -12,14 +12,16 @@ type WriteListener interface {
|
||||
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
|
||||
}
|
||||
|
||||
// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed
|
||||
// protobuf encoded StoreKVPairs to an underlying io.Writer
|
||||
// StoreKVPairWriteListener is used to configure listening to a KVStore by
|
||||
// writing out length-prefixed Protobuf encoded StoreKVPairs to an underlying
|
||||
// io.Writer object.
|
||||
type StoreKVPairWriteListener struct {
|
||||
writer io.Writer
|
||||
marshaller Codec
|
||||
}
|
||||
|
||||
// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and Marshaler interface
|
||||
// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a
|
||||
// provided io.Writer and codec.BinaryCodec.
|
||||
func NewStoreKVPairWriteListener(w io.Writer, m Codec) *StoreKVPairWriteListener {
|
||||
return &StoreKVPairWriteListener{
|
||||
writer: w,
|
||||
@ -27,20 +29,25 @@ func NewStoreKVPairWriteListener(w io.Writer, m Codec) *StoreKVPairWriteListener
|
||||
}
|
||||
}
|
||||
|
||||
// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
|
||||
// OnWrite satisfies the WriteListener interface by writing length-prefixed
|
||||
// Protobuf encoded StoreKVPairs.
|
||||
func (wl *StoreKVPairWriteListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error {
|
||||
kvPair := new(StoreKVPair)
|
||||
kvPair.StoreKey = storeKey.Name()
|
||||
kvPair.Delete = delete
|
||||
kvPair.Key = key
|
||||
kvPair.Value = value
|
||||
kvPair := &StoreKVPair{
|
||||
StoreKey: storeKey.Name(),
|
||||
Key: key,
|
||||
Value: value,
|
||||
Delete: delete,
|
||||
}
|
||||
|
||||
by, err := wl.marshaller.MarshalLengthPrefixed(kvPair)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := wl.writer.Write(by); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -55,7 +62,7 @@ func NewMemoryListener(key StoreKey) *MemoryListener {
|
||||
return &MemoryListener{key: key}
|
||||
}
|
||||
|
||||
// OnWrite implements WriteListener interface
|
||||
// OnWrite implements WriteListener interface.
|
||||
func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error {
|
||||
fl.stateCache = append(fl.stateCache, StoreKVPair{
|
||||
StoreKey: storeKey.Name(),
|
||||
@ -63,17 +70,19 @@ func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, d
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PopStateCache returns the current state caches and set to nil
|
||||
// PopStateCache returns the current state caches and set to nil.
|
||||
func (fl *MemoryListener) PopStateCache() []StoreKVPair {
|
||||
res := fl.stateCache
|
||||
fl.stateCache = nil
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// StoreKey returns the storeKey it listens to
|
||||
// StoreKey returns the storeKey it listens to.
|
||||
func (fl *MemoryListener) StoreKey() StoreKey {
|
||||
return fl.key
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user