diff --git a/schema/appdata/data.go b/schema/appdata/data.go new file mode 100644 index 0000000000..7e02fbc5db --- /dev/null +++ b/schema/appdata/data.go @@ -0,0 +1,97 @@ +package appdata + +import ( + "encoding/json" + + "cosmossdk.io/schema" +) + +// ModuleInitializationData represents data for related to module initialization, in particular +// the module's schema. +type ModuleInitializationData struct { + // ModuleName is the name of the module. + ModuleName string + + // Schema is the schema of the module. + Schema schema.ModuleSchema +} + +// StartBlockData represents the data that is passed to a listener when a block is started. +type StartBlockData struct { + // Height is the height of the block. + Height uint64 + + // Bytes is the raw byte representation of the block header. It may be nil if the source does not provide it. + HeaderBytes ToBytes + + // JSON is the JSON representation of the block header. It should generally be a JSON object. + // It may be nil if the source does not provide it. + HeaderJSON ToJSON +} + +// TxData represents the raw transaction data that is passed to a listener. +type TxData struct { + // TxIndex is the index of the transaction in the block. + TxIndex int32 + + // Bytes is the raw byte representation of the transaction. + Bytes ToBytes + + // JSON is the JSON representation of the transaction. It should generally be a JSON object. + JSON ToJSON +} + +// EventData represents event data that is passed to a listener. +type EventData struct { + // TxIndex is the index of the transaction in the block to which this event is associated. + // It should be set to a negative number if the event is not associated with a transaction. + // Canonically -1 should be used to represent begin block processing and -2 should be used to + // represent end block processing. + TxIndex int32 + + // MsgIndex is the index of the message in the transaction to which this event is associated. + // If TxIndex is negative, this index could correspond to the index of the message in + // begin or end block processing if such indexes exist, or it can be set to zero. + MsgIndex uint32 + + // EventIndex is the index of the event in the message to which this event is associated. + EventIndex uint32 + + // Type is the type of the event. + Type string + + // Data is the JSON representation of the event data. It should generally be a JSON object. + Data ToJSON +} + +// ToBytes is a function that lazily returns the raw byte representation of data. +type ToBytes = func() ([]byte, error) + +// ToJSON is a function that lazily returns the JSON representation of data. +type ToJSON = func() (json.RawMessage, error) + +// KVPairData represents a batch of key-value pair data that is passed to a listener. +type KVPairData struct { + Updates []ModuleKVPairUpdate +} + +// ModuleKVPairUpdate represents a key-value pair update for a specific module. +type ModuleKVPairUpdate struct { + // ModuleName is the name of the module that the key-value pair belongs to. + ModuleName string + + // Update is the key-value pair update. + Update schema.KVPairUpdate +} + +// ObjectUpdateData represents object update data that is passed to a listener. +type ObjectUpdateData struct { + // ModuleName is the name of the module that the update corresponds to. + ModuleName string + + // Updates are the object updates. + Updates []schema.ObjectUpdate +} + +// CommitData represents commit data. It is empty for now, but fields could be added later. +type CommitData struct{} diff --git a/schema/appdata/listener.go b/schema/appdata/listener.go index e0868ae0c4..d4786cb025 100644 --- a/schema/appdata/listener.go +++ b/schema/appdata/listener.go @@ -1,30 +1,22 @@ package appdata -import ( - "encoding/json" - - "cosmossdk.io/schema" -) - // Listener is an interface that defines methods for listening to both raw and logical blockchain data. // It is valid for any of the methods to be nil, in which case the listener will not be called for that event. // Listeners should understand the guarantees that are provided by the source they are listening to and // understand which methods will or will not be called. For instance, most blockchains will not do logical -// decoding of data out of the box, so the InitializeModuleSchema and OnObjectUpdate methods will not be called. +// decoding of data out of the box, so the InitializeModuleData and OnObjectUpdate methods will not be called. // These methods will only be called when listening logical decoding is setup. type Listener struct { - // Initialize is called when the listener is initialized before any other methods are called. - // The lastBlockPersisted return value should be the last block height the listener persisted if it is - // persisting block data, 0 if it is not interested in persisting block data, or -1 if it is - // persisting block data but has not persisted any data yet. This check allows the indexer - // framework to ensure that the listener has not missed blocks. - Initialize func(InitializationData) (lastBlockPersisted int64, err error) + // InitializeModuleData should be called whenever the blockchain process starts OR whenever + // logical decoding of a module is initiated. An indexer listening to this event + // should ensure that they have performed whatever initialization steps (such as database + // migrations) required to receive OnObjectUpdate events for the given module. If the + // indexer's schema is incompatible with the module's on-chain schema, the listener should return + // an error. Module names must conform to the NameFormat regular expression. + InitializeModuleData func(ModuleInitializationData) error // StartBlock is called at the beginning of processing a block. - StartBlock func(uint64) error - - // OnBlockHeader is called when a block header is received. - OnBlockHeader func(BlockHeaderData) error + StartBlock func(StartBlockData) error // OnTx is called when a transaction is received. OnTx func(TxData) error @@ -34,89 +26,16 @@ type Listener struct { // OnKVPair is called when a key-value has been written to the store for a given module. // Module names must conform to the NameFormat regular expression. - OnKVPair func(moduleName string, key, value []byte, delete bool) error - - // Commit is called when state is committed, usually at the end of a block. Any - // indexers should commit their data when this is called and return an error if - // they are unable to commit. - Commit func() error - - // InitializeModuleSchema should be called whenever the blockchain process starts OR whenever - // logical decoding of a module is initiated. An indexer listening to this event - // should ensure that they have performed whatever initialization steps (such as database - // migrations) required to receive OnObjectUpdate events for the given module. If the - // indexer's schema is incompatible with the module's on-chain schema, the listener should return - // an error. Module names must conform to the NameFormat regular expression. - InitializeModuleSchema func(moduleName string, moduleSchema schema.ModuleSchema) error + OnKVPair func(updates KVPairData) error // OnObjectUpdate is called whenever an object is updated in a module's state. This is only called // when logical data is available. It should be assumed that the same data in raw form // is also passed to OnKVPair. Module names must conform to the NameFormat regular expression. - OnObjectUpdate func(moduleName string, update schema.ObjectUpdate) error + OnObjectUpdate func(ObjectUpdateData) error + + // Commit is called when state is committed, usually at the end of a block. Any + // indexers should commit their data when this is called and return an error if + // they are unable to commit. Data sources MUST call Commit when data is committed, + // otherwise it should be assumed that indexers have not persisted their state. + Commit func(CommitData) error } - -// InitializationData represents initialization data that is passed to a listener. -type InitializationData struct { - // HasEventAlignedWrites indicates that the blockchain data source will emit KV-pair events - // in an order aligned with transaction, message and event callbacks. If this is true - // then indexers can assume that KV-pair data is associated with these specific transactions, messages - // and events. This may be useful for indexers which store a log of all operations (such as immutable - // or version controlled databases) so that the history log can include fine grain correlation between - // state updates and transactions, messages and events. If this value is false, then indexers should - // assume that KV-pair data occurs out of order with respect to transaction, message and event callbacks - - // the only safe assumption being that KV-pair data is associated with the block in which it was emitted. - HasEventAlignedWrites bool -} - -// BlockHeaderData represents the raw block header data that is passed to a listener. -type BlockHeaderData struct { - // Height is the height of the block. - Height uint64 - - // Bytes is the raw byte representation of the block header. - Bytes ToBytes - - // JSON is the JSON representation of the block header. It should generally be a JSON object. - JSON ToJSON -} - -// TxData represents the raw transaction data that is passed to a listener. -type TxData struct { - // TxIndex is the index of the transaction in the block. - TxIndex int32 - - // Bytes is the raw byte representation of the transaction. - Bytes ToBytes - - // JSON is the JSON representation of the transaction. It should generally be a JSON object. - JSON ToJSON -} - -// EventData represents event data that is passed to a listener. -type EventData struct { - // TxIndex is the index of the transaction in the block to which this event is associated. - // It should be set to a negative number if the event is not associated with a transaction. - // Canonically -1 should be used to represent begin block processing and -2 should be used to - // represent end block processing. - TxIndex int32 - - // MsgIndex is the index of the message in the transaction to which this event is associated. - // If TxIndex is negative, this index could correspond to the index of the message in - // begin or end block processing if such indexes exist, or it can be set to zero. - MsgIndex uint32 - - // EventIndex is the index of the event in the message to which this event is associated. - EventIndex uint32 - - // Type is the type of the event. - Type string - - // Data is the JSON representation of the event data. It should generally be a JSON object. - Data ToJSON -} - -// ToBytes is a function that lazily returns the raw byte representation of data. -type ToBytes = func() ([]byte, error) - -// ToJSON is a function that lazily returns the JSON representation of data. -type ToJSON = func() (json.RawMessage, error) diff --git a/schema/appdata/packet.go b/schema/appdata/packet.go new file mode 100644 index 0000000000..e5fe6be966 --- /dev/null +++ b/schema/appdata/packet.go @@ -0,0 +1,61 @@ +package appdata + +// Packet is the interface that all listener data structures implement so that this data can be "packetized" +// and processed in a stream, possibly asynchronously. +type Packet interface { + apply(*Listener) error +} + +// SendPacket sends a packet to a listener invoking the appropriate callback for this packet if one is registered. +func (l Listener) SendPacket(p Packet) error { + return p.apply(&l) +} + +func (m ModuleInitializationData) apply(l *Listener) error { + if l.InitializeModuleData == nil { + return nil + } + return l.InitializeModuleData(m) +} + +func (b StartBlockData) apply(l *Listener) error { + if l.StartBlock == nil { + return nil + } + return l.StartBlock(b) +} + +func (t TxData) apply(l *Listener) error { + if l.OnTx == nil { + return nil + } + return l.OnTx(t) +} + +func (e EventData) apply(l *Listener) error { + if l.OnEvent == nil { + return nil + } + return l.OnEvent(e) +} + +func (k KVPairData) apply(l *Listener) error { + if l.OnKVPair == nil { + return nil + } + return l.OnKVPair(k) +} + +func (o ObjectUpdateData) apply(l *Listener) error { + if l.OnObjectUpdate == nil { + return nil + } + return l.OnObjectUpdate(o) +} + +func (c CommitData) apply(l *Listener) error { + if l.Commit == nil { + return nil + } + return l.Commit(c) +} diff --git a/schema/decoder.go b/schema/decoder.go index 1d228d1c06..86aedec9f9 100644 --- a/schema/decoder.go +++ b/schema/decoder.go @@ -18,8 +18,23 @@ type ModuleCodec struct { KVDecoder KVDecoder } -// KVDecoder is a function that decodes a key-value pair into an ObjectUpdate. -// If the KV-pair doesn't represent an object update, the function should return false -// as the second return value. Error should only be non-nil when the decoder expected -// to parse a valid update and was unable to. -type KVDecoder = func(key, value []byte) (ObjectUpdate, bool, error) +// KVDecoder is a function that decodes a key-value pair into one or more ObjectUpdate's. +// If the KV-pair doesn't represent object updates, the function should return nil as the first +// and no error. The error result should only be non-nil when the decoder expected +// to parse a valid update and was unable to. In the case of an error, the decoder may return +// a non-nil value for the first return value, which can indicate which parts of the update +// were decodable to aid debugging. +type KVDecoder = func(KVPairUpdate) ([]ObjectUpdate, error) + +// KVPairUpdate represents a key-value pair set or delete. +type KVPairUpdate struct { + // Key is the key of the key-value pair. + Key []byte + + // Value is the value of the key-value pair. It should be ignored when Delete is true. + Value []byte + + // Delete is a flag that indicates that the key-value pair was deleted. If it is false, + // then it is assumed that this has been a set operation. + Delete bool +}