feat(schema/appdata)!: packet support and small refactorings (#20860)

This commit is contained in:
Aaron Craelius 2024-07-04 15:43:49 +02:00 committed by GitHub
parent 7948a57355
commit 54586b2f4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 195 additions and 103 deletions

97
schema/appdata/data.go Normal file
View File

@ -0,0 +1,97 @@
package appdata
import (
"encoding/json"
"cosmossdk.io/schema"
)
// ModuleInitializationData represents data for related to module initialization, in particular
// the module's schema.
type ModuleInitializationData struct {
// ModuleName is the name of the module.
ModuleName string
// Schema is the schema of the module.
Schema schema.ModuleSchema
}
// StartBlockData represents the data that is passed to a listener when a block is started.
type StartBlockData struct {
// Height is the height of the block.
Height uint64
// Bytes is the raw byte representation of the block header. It may be nil if the source does not provide it.
HeaderBytes ToBytes
// JSON is the JSON representation of the block header. It should generally be a JSON object.
// It may be nil if the source does not provide it.
HeaderJSON ToJSON
}
// TxData represents the raw transaction data that is passed to a listener.
type TxData struct {
// TxIndex is the index of the transaction in the block.
TxIndex int32
// Bytes is the raw byte representation of the transaction.
Bytes ToBytes
// JSON is the JSON representation of the transaction. It should generally be a JSON object.
JSON ToJSON
}
// EventData represents event data that is passed to a listener.
type EventData struct {
// TxIndex is the index of the transaction in the block to which this event is associated.
// It should be set to a negative number if the event is not associated with a transaction.
// Canonically -1 should be used to represent begin block processing and -2 should be used to
// represent end block processing.
TxIndex int32
// MsgIndex is the index of the message in the transaction to which this event is associated.
// If TxIndex is negative, this index could correspond to the index of the message in
// begin or end block processing if such indexes exist, or it can be set to zero.
MsgIndex uint32
// EventIndex is the index of the event in the message to which this event is associated.
EventIndex uint32
// Type is the type of the event.
Type string
// Data is the JSON representation of the event data. It should generally be a JSON object.
Data ToJSON
}
// ToBytes is a function that lazily returns the raw byte representation of data.
type ToBytes = func() ([]byte, error)
// ToJSON is a function that lazily returns the JSON representation of data.
type ToJSON = func() (json.RawMessage, error)
// KVPairData represents a batch of key-value pair data that is passed to a listener.
type KVPairData struct {
Updates []ModuleKVPairUpdate
}
// ModuleKVPairUpdate represents a key-value pair update for a specific module.
type ModuleKVPairUpdate struct {
// ModuleName is the name of the module that the key-value pair belongs to.
ModuleName string
// Update is the key-value pair update.
Update schema.KVPairUpdate
}
// ObjectUpdateData represents object update data that is passed to a listener.
type ObjectUpdateData struct {
// ModuleName is the name of the module that the update corresponds to.
ModuleName string
// Updates are the object updates.
Updates []schema.ObjectUpdate
}
// CommitData represents commit data. It is empty for now, but fields could be added later.
type CommitData struct{}

View File

@ -1,30 +1,22 @@
package appdata
import (
"encoding/json"
"cosmossdk.io/schema"
)
// Listener is an interface that defines methods for listening to both raw and logical blockchain data.
// It is valid for any of the methods to be nil, in which case the listener will not be called for that event.
// Listeners should understand the guarantees that are provided by the source they are listening to and
// understand which methods will or will not be called. For instance, most blockchains will not do logical
// decoding of data out of the box, so the InitializeModuleSchema and OnObjectUpdate methods will not be called.
// decoding of data out of the box, so the InitializeModuleData and OnObjectUpdate methods will not be called.
// These methods will only be called when listening logical decoding is setup.
type Listener struct {
// Initialize is called when the listener is initialized before any other methods are called.
// The lastBlockPersisted return value should be the last block height the listener persisted if it is
// persisting block data, 0 if it is not interested in persisting block data, or -1 if it is
// persisting block data but has not persisted any data yet. This check allows the indexer
// framework to ensure that the listener has not missed blocks.
Initialize func(InitializationData) (lastBlockPersisted int64, err error)
// InitializeModuleData should be called whenever the blockchain process starts OR whenever
// logical decoding of a module is initiated. An indexer listening to this event
// should ensure that they have performed whatever initialization steps (such as database
// migrations) required to receive OnObjectUpdate events for the given module. If the
// indexer's schema is incompatible with the module's on-chain schema, the listener should return
// an error. Module names must conform to the NameFormat regular expression.
InitializeModuleData func(ModuleInitializationData) error
// StartBlock is called at the beginning of processing a block.
StartBlock func(uint64) error
// OnBlockHeader is called when a block header is received.
OnBlockHeader func(BlockHeaderData) error
StartBlock func(StartBlockData) error
// OnTx is called when a transaction is received.
OnTx func(TxData) error
@ -34,89 +26,16 @@ type Listener struct {
// OnKVPair is called when a key-value has been written to the store for a given module.
// Module names must conform to the NameFormat regular expression.
OnKVPair func(moduleName string, key, value []byte, delete bool) error
// Commit is called when state is committed, usually at the end of a block. Any
// indexers should commit their data when this is called and return an error if
// they are unable to commit.
Commit func() error
// InitializeModuleSchema should be called whenever the blockchain process starts OR whenever
// logical decoding of a module is initiated. An indexer listening to this event
// should ensure that they have performed whatever initialization steps (such as database
// migrations) required to receive OnObjectUpdate events for the given module. If the
// indexer's schema is incompatible with the module's on-chain schema, the listener should return
// an error. Module names must conform to the NameFormat regular expression.
InitializeModuleSchema func(moduleName string, moduleSchema schema.ModuleSchema) error
OnKVPair func(updates KVPairData) error
// OnObjectUpdate is called whenever an object is updated in a module's state. This is only called
// when logical data is available. It should be assumed that the same data in raw form
// is also passed to OnKVPair. Module names must conform to the NameFormat regular expression.
OnObjectUpdate func(moduleName string, update schema.ObjectUpdate) error
OnObjectUpdate func(ObjectUpdateData) error
// Commit is called when state is committed, usually at the end of a block. Any
// indexers should commit their data when this is called and return an error if
// they are unable to commit. Data sources MUST call Commit when data is committed,
// otherwise it should be assumed that indexers have not persisted their state.
Commit func(CommitData) error
}
// InitializationData represents initialization data that is passed to a listener.
type InitializationData struct {
// HasEventAlignedWrites indicates that the blockchain data source will emit KV-pair events
// in an order aligned with transaction, message and event callbacks. If this is true
// then indexers can assume that KV-pair data is associated with these specific transactions, messages
// and events. This may be useful for indexers which store a log of all operations (such as immutable
// or version controlled databases) so that the history log can include fine grain correlation between
// state updates and transactions, messages and events. If this value is false, then indexers should
// assume that KV-pair data occurs out of order with respect to transaction, message and event callbacks -
// the only safe assumption being that KV-pair data is associated with the block in which it was emitted.
HasEventAlignedWrites bool
}
// BlockHeaderData represents the raw block header data that is passed to a listener.
type BlockHeaderData struct {
// Height is the height of the block.
Height uint64
// Bytes is the raw byte representation of the block header.
Bytes ToBytes
// JSON is the JSON representation of the block header. It should generally be a JSON object.
JSON ToJSON
}
// TxData represents the raw transaction data that is passed to a listener.
type TxData struct {
// TxIndex is the index of the transaction in the block.
TxIndex int32
// Bytes is the raw byte representation of the transaction.
Bytes ToBytes
// JSON is the JSON representation of the transaction. It should generally be a JSON object.
JSON ToJSON
}
// EventData represents event data that is passed to a listener.
type EventData struct {
// TxIndex is the index of the transaction in the block to which this event is associated.
// It should be set to a negative number if the event is not associated with a transaction.
// Canonically -1 should be used to represent begin block processing and -2 should be used to
// represent end block processing.
TxIndex int32
// MsgIndex is the index of the message in the transaction to which this event is associated.
// If TxIndex is negative, this index could correspond to the index of the message in
// begin or end block processing if such indexes exist, or it can be set to zero.
MsgIndex uint32
// EventIndex is the index of the event in the message to which this event is associated.
EventIndex uint32
// Type is the type of the event.
Type string
// Data is the JSON representation of the event data. It should generally be a JSON object.
Data ToJSON
}
// ToBytes is a function that lazily returns the raw byte representation of data.
type ToBytes = func() ([]byte, error)
// ToJSON is a function that lazily returns the JSON representation of data.
type ToJSON = func() (json.RawMessage, error)

61
schema/appdata/packet.go Normal file
View File

@ -0,0 +1,61 @@
package appdata
// Packet is the interface that all listener data structures implement so that this data can be "packetized"
// and processed in a stream, possibly asynchronously.
type Packet interface {
apply(*Listener) error
}
// SendPacket sends a packet to a listener invoking the appropriate callback for this packet if one is registered.
func (l Listener) SendPacket(p Packet) error {
return p.apply(&l)
}
func (m ModuleInitializationData) apply(l *Listener) error {
if l.InitializeModuleData == nil {
return nil
}
return l.InitializeModuleData(m)
}
func (b StartBlockData) apply(l *Listener) error {
if l.StartBlock == nil {
return nil
}
return l.StartBlock(b)
}
func (t TxData) apply(l *Listener) error {
if l.OnTx == nil {
return nil
}
return l.OnTx(t)
}
func (e EventData) apply(l *Listener) error {
if l.OnEvent == nil {
return nil
}
return l.OnEvent(e)
}
func (k KVPairData) apply(l *Listener) error {
if l.OnKVPair == nil {
return nil
}
return l.OnKVPair(k)
}
func (o ObjectUpdateData) apply(l *Listener) error {
if l.OnObjectUpdate == nil {
return nil
}
return l.OnObjectUpdate(o)
}
func (c CommitData) apply(l *Listener) error {
if l.Commit == nil {
return nil
}
return l.Commit(c)
}

View File

@ -18,8 +18,23 @@ type ModuleCodec struct {
KVDecoder KVDecoder
}
// KVDecoder is a function that decodes a key-value pair into an ObjectUpdate.
// If the KV-pair doesn't represent an object update, the function should return false
// as the second return value. Error should only be non-nil when the decoder expected
// to parse a valid update and was unable to.
type KVDecoder = func(key, value []byte) (ObjectUpdate, bool, error)
// KVDecoder is a function that decodes a key-value pair into one or more ObjectUpdate's.
// If the KV-pair doesn't represent object updates, the function should return nil as the first
// and no error. The error result should only be non-nil when the decoder expected
// to parse a valid update and was unable to. In the case of an error, the decoder may return
// a non-nil value for the first return value, which can indicate which parts of the update
// were decodable to aid debugging.
type KVDecoder = func(KVPairUpdate) ([]ObjectUpdate, error)
// KVPairUpdate represents a key-value pair set or delete.
type KVPairUpdate struct {
// Key is the key of the key-value pair.
Key []byte
// Value is the value of the key-value pair. It should be ignored when Delete is true.
Value []byte
// Delete is a flag that indicates that the key-value pair was deleted. If it is false,
// then it is assumed that this has been a set operation.
Delete bool
}