<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺ v ✰ Thanks for creating a PR! ✰ v Before smashing the submit button please review the checkboxes. v If a checkbox is n/a - please still include it but + a little note why ☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --> ## Description <!-- Add a description of the changes that this PR introduces and the files that are the most critical to review. --> Hello 👋 this PR introduces the second stage of changes to support [ADR-038](https://github.com/cosmos/cosmos-sdk/pull/8012) state listening. This is rebased on top of the [first segment](https://github.com/cosmos/cosmos-sdk/pull/8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type. In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level. The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here. Thanks! This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md) --- Before we can merge this PR, please make sure that all the following items have been checked off. If any of the checklist items are not applicable, please leave them but write a little note why. - [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work. - [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md). - [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`) - [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code). - [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md` - [x] Re-reviewed `Files changed` in the Github PR explorer - [x] Review `Codecov Report` in the comment section below once CI passes
243 lines
6.6 KiB
Go
243 lines
6.6 KiB
Go
package baseapp
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
"github.com/cosmos/cosmos-sdk/codec/types"
|
|
"github.com/cosmos/cosmos-sdk/snapshots"
|
|
"github.com/cosmos/cosmos-sdk/store"
|
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
|
"github.com/cosmos/cosmos-sdk/types/tx"
|
|
)
|
|
|
|
// File for storing in-package BaseApp optional functions,
|
|
// for options that need access to non-exported fields of the BaseApp
|
|
|
|
// SetPruning sets a pruning option on the multistore associated with the app
|
|
func SetPruning(opts sdk.PruningOptions) func(*BaseApp) {
|
|
return func(bap *BaseApp) { bap.cms.SetPruning(opts) }
|
|
}
|
|
|
|
// SetMinGasPrices returns an option that sets the minimum gas prices on the app.
|
|
func SetMinGasPrices(gasPricesStr string) func(*BaseApp) {
|
|
gasPrices, err := sdk.ParseDecCoins(gasPricesStr)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("invalid minimum gas prices: %v", err))
|
|
}
|
|
|
|
return func(bap *BaseApp) { bap.setMinGasPrices(gasPrices) }
|
|
}
|
|
|
|
// SetHaltHeight returns a BaseApp option function that sets the halt block height.
|
|
func SetHaltHeight(blockHeight uint64) func(*BaseApp) {
|
|
return func(bap *BaseApp) { bap.setHaltHeight(blockHeight) }
|
|
}
|
|
|
|
// SetHaltTime returns a BaseApp option function that sets the halt block time.
|
|
func SetHaltTime(haltTime uint64) func(*BaseApp) {
|
|
return func(bap *BaseApp) { bap.setHaltTime(haltTime) }
|
|
}
|
|
|
|
// SetMinRetainBlocks returns a BaseApp option function that sets the minimum
|
|
// block retention height value when determining which heights to prune during
|
|
// ABCI Commit.
|
|
func SetMinRetainBlocks(minRetainBlocks uint64) func(*BaseApp) {
|
|
return func(bapp *BaseApp) { bapp.setMinRetainBlocks(minRetainBlocks) }
|
|
}
|
|
|
|
// SetTrace will turn on or off trace flag
|
|
func SetTrace(trace bool) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.setTrace(trace) }
|
|
}
|
|
|
|
// SetIndexEvents provides a BaseApp option function that sets the events to index.
|
|
func SetIndexEvents(ie []string) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.setIndexEvents(ie) }
|
|
}
|
|
|
|
// SetInterBlockCache provides a BaseApp option function that sets the
|
|
// inter-block cache.
|
|
func SetInterBlockCache(cache sdk.MultiStorePersistentCache) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.setInterBlockCache(cache) }
|
|
}
|
|
|
|
// SetSnapshotInterval sets the snapshot interval.
|
|
func SetSnapshotInterval(interval uint64) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.SetSnapshotInterval(interval) }
|
|
}
|
|
|
|
// SetSnapshotKeepRecent sets the recent snapshots to keep.
|
|
func SetSnapshotKeepRecent(keepRecent uint32) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.SetSnapshotKeepRecent(keepRecent) }
|
|
}
|
|
|
|
// SetSnapshotStore sets the snapshot store.
|
|
func SetSnapshotStore(snapshotStore *snapshots.Store) func(*BaseApp) {
|
|
return func(app *BaseApp) { app.SetSnapshotStore(snapshotStore) }
|
|
}
|
|
|
|
func (app *BaseApp) SetName(name string) {
|
|
if app.sealed {
|
|
panic("SetName() on sealed BaseApp")
|
|
}
|
|
|
|
app.name = name
|
|
}
|
|
|
|
// SetParamStore sets a parameter store on the BaseApp.
|
|
func (app *BaseApp) SetParamStore(ps ParamStore) {
|
|
if app.sealed {
|
|
panic("SetParamStore() on sealed BaseApp")
|
|
}
|
|
|
|
app.paramStore = ps
|
|
}
|
|
|
|
// SetVersion sets the application's version string.
|
|
func (app *BaseApp) SetVersion(v string) {
|
|
if app.sealed {
|
|
panic("SetVersion() on sealed BaseApp")
|
|
}
|
|
app.version = v
|
|
}
|
|
|
|
// SetProtocolVersion sets the application's protocol version
|
|
func (app *BaseApp) SetProtocolVersion(v uint64) {
|
|
app.appVersion = v
|
|
}
|
|
|
|
func (app *BaseApp) SetDB(db dbm.DB) {
|
|
if app.sealed {
|
|
panic("SetDB() on sealed BaseApp")
|
|
}
|
|
|
|
app.db = db
|
|
}
|
|
|
|
func (app *BaseApp) SetCMS(cms store.CommitMultiStore) {
|
|
if app.sealed {
|
|
panic("SetEndBlocker() on sealed BaseApp")
|
|
}
|
|
|
|
app.cms = cms
|
|
}
|
|
|
|
func (app *BaseApp) SetInitChainer(initChainer sdk.InitChainer) {
|
|
if app.sealed {
|
|
panic("SetInitChainer() on sealed BaseApp")
|
|
}
|
|
|
|
app.initChainer = initChainer
|
|
}
|
|
|
|
func (app *BaseApp) SetBeginBlocker(beginBlocker sdk.BeginBlocker) {
|
|
if app.sealed {
|
|
panic("SetBeginBlocker() on sealed BaseApp")
|
|
}
|
|
|
|
app.beginBlocker = beginBlocker
|
|
}
|
|
|
|
func (app *BaseApp) SetEndBlocker(endBlocker sdk.EndBlocker) {
|
|
if app.sealed {
|
|
panic("SetEndBlocker() on sealed BaseApp")
|
|
}
|
|
|
|
app.endBlocker = endBlocker
|
|
}
|
|
|
|
func (app *BaseApp) SetTxHandler(txHandler tx.Handler) {
|
|
if app.sealed {
|
|
panic("SetTxHandler() on sealed BaseApp")
|
|
}
|
|
|
|
app.txHandler = txHandler
|
|
}
|
|
|
|
func (app *BaseApp) SetAddrPeerFilter(pf sdk.PeerFilter) {
|
|
if app.sealed {
|
|
panic("SetAddrPeerFilter() on sealed BaseApp")
|
|
}
|
|
|
|
app.addrPeerFilter = pf
|
|
}
|
|
|
|
func (app *BaseApp) SetIDPeerFilter(pf sdk.PeerFilter) {
|
|
if app.sealed {
|
|
panic("SetIDPeerFilter() on sealed BaseApp")
|
|
}
|
|
|
|
app.idPeerFilter = pf
|
|
}
|
|
|
|
func (app *BaseApp) SetFauxMerkleMode() {
|
|
if app.sealed {
|
|
panic("SetFauxMerkleMode() on sealed BaseApp")
|
|
}
|
|
|
|
app.fauxMerkleMode = true
|
|
}
|
|
|
|
// SetCommitMultiStoreTracer sets the store tracer on the BaseApp's underlying
|
|
// CommitMultiStore.
|
|
func (app *BaseApp) SetCommitMultiStoreTracer(w io.Writer) {
|
|
app.cms.SetTracer(w)
|
|
}
|
|
|
|
// SetStoreLoader allows us to customize the rootMultiStore initialization.
|
|
func (app *BaseApp) SetStoreLoader(loader StoreLoader) {
|
|
if app.sealed {
|
|
panic("SetStoreLoader() on sealed BaseApp")
|
|
}
|
|
|
|
app.storeLoader = loader
|
|
}
|
|
|
|
// SetSnapshotStore sets the snapshot store.
|
|
func (app *BaseApp) SetSnapshotStore(snapshotStore *snapshots.Store) {
|
|
if app.sealed {
|
|
panic("SetSnapshotStore() on sealed BaseApp")
|
|
}
|
|
if snapshotStore == nil {
|
|
app.snapshotManager = nil
|
|
return
|
|
}
|
|
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms)
|
|
}
|
|
|
|
// SetSnapshotInterval sets the snapshot interval.
|
|
func (app *BaseApp) SetSnapshotInterval(snapshotInterval uint64) {
|
|
if app.sealed {
|
|
panic("SetSnapshotInterval() on sealed BaseApp")
|
|
}
|
|
app.snapshotInterval = snapshotInterval
|
|
}
|
|
|
|
// SetSnapshotKeepRecent sets the number of recent snapshots to keep.
|
|
func (app *BaseApp) SetSnapshotKeepRecent(snapshotKeepRecent uint32) {
|
|
if app.sealed {
|
|
panic("SetSnapshotKeepRecent() on sealed BaseApp")
|
|
}
|
|
app.snapshotKeepRecent = snapshotKeepRecent
|
|
}
|
|
|
|
// SetInterfaceRegistry sets the InterfaceRegistry.
|
|
func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
|
|
app.interfaceRegistry = registry
|
|
app.grpcQueryRouter.SetInterfaceRegistry(registry)
|
|
}
|
|
|
|
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
|
|
func (app *BaseApp) SetStreamingService(s StreamingService) {
|
|
// add the listeners for each StoreKey
|
|
for key, lis := range s.Listeners() {
|
|
app.cms.AddListeners(key, lis)
|
|
}
|
|
// register the StreamingService within the BaseApp
|
|
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
|
|
app.abciListeners = append(app.abciListeners, s)
|
|
}
|