feat(server/v2/stf): delayed marshalling of typed event (#22684)
This commit is contained in:
parent
330954e884
commit
7fa2356c07
@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
|
||||
streamingManager streaming.Manager
|
||||
mempool mempool.Mempool[T]
|
||||
|
||||
cfg Config
|
||||
chainID string
|
||||
indexedEvents map[string]struct{}
|
||||
cfg Config
|
||||
chainID string
|
||||
indexedABCIEvents map[string]struct{}
|
||||
|
||||
initialHeight uint64
|
||||
// this is only available after this node has committed a block (in FinalizeBlock),
|
||||
@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
|
||||
return nil, err
|
||||
}
|
||||
|
||||
events, err := intoABCIEvents(resp.Events, c.indexedEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
events := make([]abci.Event, 0)
|
||||
if !c.cfg.AppTomlConfig.DisableABCIEvents {
|
||||
events, err = intoABCIEvents(
|
||||
resp.Events,
|
||||
c.indexedABCIEvents,
|
||||
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cometResp := &abciproto.CheckTxResponse{
|
||||
@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
|
||||
GasUsed: uint64ToInt64(resp.GasUsed),
|
||||
Events: events,
|
||||
}
|
||||
|
||||
if resp.Error != nil {
|
||||
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
|
||||
cometResp.Code = code
|
||||
@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
|
||||
return finalizeBlockResponse(
|
||||
resp,
|
||||
cp,
|
||||
appHash,
|
||||
c.indexedABCIEvents,
|
||||
c.cfg.AppTomlConfig,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *consensus[T]) internalFinalizeBlock(
|
||||
|
||||
@ -16,7 +16,6 @@ type Config struct {
|
||||
func DefaultAppTomlConfig() *AppTomlConfig {
|
||||
return &AppTomlConfig{
|
||||
MinRetainBlocks: 0,
|
||||
IndexEvents: make([]string, 0),
|
||||
HaltHeight: 0,
|
||||
HaltTime: 0,
|
||||
Address: "tcp://127.0.0.1:26658",
|
||||
@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig {
|
||||
Target: make(map[string]indexer.Config),
|
||||
ChannelBufferSize: 1024,
|
||||
},
|
||||
IndexABCIEvents: make([]string, 0),
|
||||
DisableIndexABCIEvents: false,
|
||||
DisableABCIEvents: false,
|
||||
}
|
||||
}
|
||||
|
||||
type AppTomlConfig struct {
|
||||
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
|
||||
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
|
||||
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
|
||||
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
|
||||
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
|
||||
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
|
||||
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
|
||||
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
|
||||
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
|
||||
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
|
||||
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
|
||||
|
||||
// Sub configs
|
||||
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
|
||||
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
|
||||
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
|
||||
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
|
||||
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
|
||||
DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."`
|
||||
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."`
|
||||
}
|
||||
|
||||
// CfgOption is a function that allows to overwrite the default server configuration.
|
||||
|
||||
@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
|
||||
return nil, errorsmod.Wrap(err, "failed to simulate tx")
|
||||
}
|
||||
|
||||
bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
|
||||
bz, err := intoABCISimulationResponse(
|
||||
txResult,
|
||||
c.indexedABCIEvents,
|
||||
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
|
||||
}
|
||||
|
||||
@ -122,9 +122,9 @@ func New[T transaction.Tx](
|
||||
}
|
||||
}
|
||||
|
||||
indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
|
||||
for _, e := range srv.config.AppTomlConfig.IndexEvents {
|
||||
indexEvents[e] = struct{}{}
|
||||
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
|
||||
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
|
||||
indexedABCIEvents[e] = struct{}{}
|
||||
}
|
||||
|
||||
sc := store.GetStateCommitment().(snapshots.CommitSnapshotter)
|
||||
@ -183,7 +183,7 @@ func New[T transaction.Tx](
|
||||
checkTxHandler: srv.serverOptions.CheckTxHandler,
|
||||
extendVote: srv.serverOptions.ExtendVoteHandler,
|
||||
chainID: chainID,
|
||||
indexedEvents: indexEvents,
|
||||
indexedABCIEvents: indexedABCIEvents,
|
||||
initialHeight: 0,
|
||||
queryHandlersMap: queryHandlers,
|
||||
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
|
||||
|
||||
@ -70,16 +70,23 @@ func finalizeBlockResponse(
|
||||
cp *cmtproto.ConsensusParams,
|
||||
appHash []byte,
|
||||
indexSet map[string]struct{},
|
||||
debug bool,
|
||||
cfg *AppTomlConfig,
|
||||
) (*abci.FinalizeBlockResponse, error) {
|
||||
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)
|
||||
events := make([]abci.Event, 0)
|
||||
|
||||
events, err := intoABCIEvents(allEvents, indexSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !cfg.DisableABCIEvents {
|
||||
var err error
|
||||
events, err = intoABCIEvents(
|
||||
append(in.BeginBlockEvents, in.EndBlockEvents...),
|
||||
indexSet,
|
||||
cfg.DisableIndexABCIEvents,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
|
||||
txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -91,6 +98,7 @@ func finalizeBlockResponse(
|
||||
AppHash: appHash,
|
||||
ConsensusParamUpdates: cp,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -108,12 +116,21 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
|
||||
return valsetUpdates
|
||||
}
|
||||
|
||||
func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
|
||||
func intoABCITxResults(
|
||||
results []server.TxResult,
|
||||
indexSet map[string]struct{},
|
||||
cfg *AppTomlConfig,
|
||||
) ([]*abci.ExecTxResult, error) {
|
||||
res := make([]*abci.ExecTxResult, len(results))
|
||||
for i := range results {
|
||||
events, err := intoABCIEvents(results[i].Events, indexSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var err error
|
||||
events := make([]abci.Event, 0)
|
||||
|
||||
if !cfg.DisableABCIEvents {
|
||||
events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
res[i] = responseExecTxResultWithEvents(
|
||||
@ -121,59 +138,43 @@ func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{},
|
||||
results[i].GasWanted,
|
||||
results[i].GasUsed,
|
||||
events,
|
||||
debug,
|
||||
cfg.Trace,
|
||||
)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) {
|
||||
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) {
|
||||
indexAll := len(indexSet) == 0
|
||||
abciEvents := make([]abci.Event, len(events))
|
||||
for i, e := range events {
|
||||
attributes, err := e.Attributes()
|
||||
attrs, err := e.Attributes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
abciEvents[i] = abci.Event{
|
||||
Type: e.Type,
|
||||
Attributes: make([]abci.EventAttribute, len(attributes)),
|
||||
Attributes: make([]abci.EventAttribute, len(attrs)),
|
||||
}
|
||||
|
||||
for j, attr := range attributes {
|
||||
for j, attr := range attrs {
|
||||
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
|
||||
abciEvents[i].Attributes[j] = abci.EventAttribute{
|
||||
Key: attr.Key,
|
||||
Value: attr.Value,
|
||||
Index: index || indexAll,
|
||||
Index: !indexNone && (index || indexAll),
|
||||
}
|
||||
}
|
||||
}
|
||||
return abciEvents, nil
|
||||
}
|
||||
|
||||
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
|
||||
indexAll := len(indexSet) == 0
|
||||
abciEvents := make([]abci.Event, len(txRes.Events))
|
||||
for i, e := range txRes.Events {
|
||||
attributes, err := e.Attributes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
abciEvents[i] = abci.Event{
|
||||
Type: e.Type,
|
||||
Attributes: make([]abci.EventAttribute, len(attributes)),
|
||||
}
|
||||
|
||||
for j, attr := range attributes {
|
||||
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
|
||||
abciEvents[i].Attributes[j] = abci.EventAttribute{
|
||||
Key: attr.Key,
|
||||
Value: attr.Value,
|
||||
Index: index || indexAll,
|
||||
}
|
||||
}
|
||||
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) {
|
||||
abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msgResponses := make([]*gogoany.Any, len(txRes.Resp))
|
||||
|
||||
@ -39,18 +39,44 @@ type eventManager struct {
|
||||
// Emit emits an typed event that is defined in the protobuf file.
|
||||
// In the future these events will be added to consensus.
|
||||
func (em *eventManager) Emit(tev transaction.Msg) error {
|
||||
res, err := TypedEventToEvent(tev)
|
||||
if err != nil {
|
||||
return err
|
||||
ev := event.Event{
|
||||
Type: gogoproto.MessageName(tev),
|
||||
Attributes: func() ([]event.Attribute, error) {
|
||||
outerEvent, err := TypedEventToEvent(tev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return outerEvent.Attributes()
|
||||
},
|
||||
Data: func() (json.RawMessage, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
|
||||
if err := jm.Marshal(buf, tev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
},
|
||||
}
|
||||
|
||||
em.executionContext.events = append(em.executionContext.events, res)
|
||||
em.executionContext.events = append(em.executionContext.events, ev)
|
||||
return nil
|
||||
}
|
||||
|
||||
// EmitKV emits a key value pair event.
|
||||
func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error {
|
||||
em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...))
|
||||
ev := event.Event{
|
||||
Type: eventType,
|
||||
Attributes: func() ([]event.Attribute, error) {
|
||||
return attrs, nil
|
||||
},
|
||||
Data: func() (json.RawMessage, error) {
|
||||
return json.Marshal(attrs)
|
||||
},
|
||||
}
|
||||
|
||||
em.executionContext.events = append(em.executionContext.events, ev)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
[comet]
|
||||
# min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned.
|
||||
min-retain-blocks = 0
|
||||
# index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
|
||||
index-events = []
|
||||
# halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
|
||||
halt-height = 0
|
||||
# halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
|
||||
@ -15,6 +13,12 @@ transport = 'socket'
|
||||
trace = false
|
||||
# standalone starts the application without the CometBFT node. The node should be started separately.
|
||||
standalone = false
|
||||
# index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
|
||||
index-abci-events = []
|
||||
# disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse.
|
||||
disable-index-abci-events = false
|
||||
# disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing.
|
||||
disable-abci-events = false
|
||||
|
||||
# mempool defines the configuration for the SDK built-in app-side mempool implementations.
|
||||
[comet.mempool]
|
||||
|
||||
@ -41,7 +41,7 @@ type v2KeyChangesMap map[string][]string
|
||||
var v2KeyChanges = v2KeyChangesMap{
|
||||
"minimum-gas-prices": []string{"server.minimum-gas-prices"},
|
||||
"min-retain-blocks": []string{"comet.min-retain-blocks"},
|
||||
"index-events": []string{"comet.index-events"},
|
||||
"index-events": []string{"comet.index-abci-events"},
|
||||
"halt-height": []string{"comet.halt-height"},
|
||||
"halt-time": []string{"comet.halt-time"},
|
||||
"app-db-backend": []string{"store.app-db-backend"},
|
||||
|
||||
Loading…
Reference in New Issue
Block a user