cosmos-sdk/store/streaming/file/service.go
Ian Norden 1326fa2a7d
feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (#8664)
<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v                               ✰  Thanks for creating a PR! ✰
v    Before smashing the submit button please review the checkboxes.
v    If a checkbox is n/a - please still include it but + a little note why
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

## Description

<!-- Add a description of the changes that this PR introduces and the files that
are the most critical to review.
-->

Hello 👋 this PR introduces the second stage of changes to support [ADR-038](https://github.com/cosmos/cosmos-sdk/pull/8012) state listening. This is rebased on top of the [first segment](https://github.com/cosmos/cosmos-sdk/pull/8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type.

In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level.

The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here.

Thanks!



This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md)

---

Before we can merge this PR, please make sure that all the following items have been
checked off. If any of the checklist items are not applicable, please leave them but
write a little note why.

- [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md).
- [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`)
- [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code).
- [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md`
- [x] Re-reviewed `Files changed` in the Github PR explorer
- [x] Review `Codecov Report` in the comment section below once CI passes
2021-10-24 21:37:37 +00:00

280 lines
9.4 KiB
Go

package file
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
var _ baseapp.StreamingService = &StreamingService{}
// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
type StreamingService struct {
listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp
srcChan <-chan []byte // the channel that all the WriteListeners write their data out to
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
stateCacheLock *sync.Mutex // mutex for the state cache
currentBlockNumber int64 // the current block number
currentTxIndex int64 // the index of the current tx
quitChan chan struct{} // channel to synchronize closure
}
// IntermediateWriter is used so that we do not need to update the underlying io.Writer
// inside the StoreKVPairWriteListener everytime we begin writing to a new file
type IntermediateWriter struct {
outChan chan<- []byte
}
// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel
func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter {
return &IntermediateWriter{
outChan: outChan,
}
}
// Write satisfies io.Writer
func (iw *IntermediateWriter) Write(b []byte) (int, error) {
iw.outChan <- b
return len(b), nil
}
// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) {
listenChan := make(chan []byte)
iw := NewIntermediateWriter(listenChan)
listener := types.NewStoreKVPairWriteListener(iw, c)
listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys))
// in this case, we are using the same listener for each Store
for _, key := range storeKeys {
listeners[key] = append(listeners[key], listener)
}
// check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not
// we don't open a dstFile until we receive our first ABCI message
if err := isDirWriteable(writeDir); err != nil {
return nil, err
}
return &StreamingService{
listeners: listeners,
srcChan: listenChan,
filePrefix: filePrefix,
writeDir: writeDir,
codec: c,
stateCache: make([][]byte, 0),
stateCacheLock: new(sync.Mutex),
}, nil
}
// Listeners satisfies the baseapp.StreamingService interface
// It returns the StreamingService's underlying WriteListeners
// Use for registering the underlying WriteListeners with the BaseApp
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
return fss.listeners
}
// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It writes the received BeginBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// generate the new file
dstFile, err := fss.openBeginBlockFile(req)
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) {
fss.currentBlockNumber = req.GetHeader().Height
fss.currentTxIndex = 0
fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenDeliverTx satisfies the baseapp.ABCIListener interface
// It writes the received DeliverTx request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
// generate the new file
dstFile, err := fss.openDeliverTxFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openDeliverTxFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
fss.currentTxIndex++
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It writes the received EndBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
// generate the new file
dstFile, err := fss.openEndBlockFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openEndBlockFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// Stream satisfies the baseapp.StreamingService interface
// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs
// and caches them in the order they were received
// returns an error if it is called twice
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
if fss.quitChan != nil {
return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again")
}
fss.quitChan = make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-fss.quitChan:
fss.quitChan = nil
return
case by := <-fss.srcChan:
fss.stateCacheLock.Lock()
fss.stateCache = append(fss.stateCache, by)
fss.stateCacheLock.Unlock()
}
}
}()
return nil
}
// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
close(fss.quitChan)
return nil
}
// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func isDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), 0600); err != nil {
return err
}
return os.Remove(f)
}