feat(schema/appdata)!: efficiency & data model improvements aligned with server/v2 (#21305)

This commit is contained in:
Aaron Craelius 2024-08-20 10:49:06 -04:00 committed by GitHub
parent 27d3d4892b
commit da27d8b9a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 244 additions and 39 deletions

View File

@ -10,7 +10,7 @@ type Changeset struct {
}
// StateChanges represents a set of changes to the state of an actor in storage.
type StateChanges struct {
type StateChanges = struct {
Actor []byte // actor represents the space in storage where state is stored, previously this was called a "storekey"
StateChanges KVPairs // StateChanges is a list of key-value pairs representing the changes to the state.
}

View File

@ -132,5 +132,10 @@ func AsyncListener(opts AsyncListenerOptions, listener Listener) Listener {
}, nil
}
res.onBatch = func(batch PacketBatch) error {
packetChan <- batch
return nil
}
return res
}

41
schema/appdata/batch.go Normal file
View File

@ -0,0 +1,41 @@
package appdata
// BatchablePacket is the interface that packet types which can be batched implement.
// All types that implement Packet except CommitData also implement BatchablePacket.
// CommitData should not be batched because it forces synchronization of asynchronous listeners.
type BatchablePacket interface {
Packet
isBatchablePacket()
}
// PacketBatch is a batch of packets that can be sent to a listener.
// If listener processing is asynchronous, the batch of packets will be sent
// all at once in a single operation which can be more efficient than sending
// each packet individually.
type PacketBatch []BatchablePacket
func (p PacketBatch) apply(l *Listener) error {
if l.onBatch != nil {
return l.onBatch(p)
}
for _, packet := range p {
if err := packet.apply(l); err != nil {
return err
}
}
return nil
}
func (ModuleInitializationData) isBatchablePacket() {}
func (StartBlockData) isBatchablePacket() {}
func (TxData) isBatchablePacket() {}
func (EventData) isBatchablePacket() {}
func (KVPairData) isBatchablePacket() {}
func (ObjectUpdateData) isBatchablePacket() {}

View File

@ -0,0 +1,85 @@
package appdata
import (
"context"
"reflect"
"testing"
)
func TestBatch(t *testing.T) {
l, got := batchListener()
if err := l.SendPacket(testBatch); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(*got, testBatch) {
t.Errorf("got %v, expected %v", *got, testBatch)
}
}
var testBatch = PacketBatch{
ModuleInitializationData{},
StartBlockData{},
TxData{},
EventData{},
KVPairData{},
ObjectUpdateData{},
}
func batchListener() (Listener, *PacketBatch) {
var got = new(PacketBatch)
l := Listener{
InitializeModuleData: func(m ModuleInitializationData) error {
*got = append(*got, m)
return nil
},
StartBlock: func(b StartBlockData) error {
*got = append(*got, b)
return nil
},
OnTx: func(t TxData) error {
*got = append(*got, t)
return nil
},
OnEvent: func(e EventData) error {
*got = append(*got, e)
return nil
},
OnKVPair: func(k KVPairData) error {
*got = append(*got, k)
return nil
},
OnObjectUpdate: func(o ObjectUpdateData) error {
*got = append(*got, o)
return nil
},
}
return l, got
}
func TestBatchAsync(t *testing.T) {
l, got := batchListener()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l = AsyncListenerMux(AsyncListenerOptions{Context: ctx}, l)
if err := l.SendPacket(testBatch); err != nil {
t.Error(err)
}
// commit to synchronize
cb, err := l.Commit(CommitData{})
if err != nil {
t.Error(err)
}
if err := cb(); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(*got, testBatch) {
t.Errorf("got %v, expected %v", *got, testBatch)
}
}

View File

@ -41,8 +41,14 @@ type TxData struct {
JSON ToJSON
}
// EventData represents event data that is passed to a listener.
// EventData represents event data that is passed to a listener when events are received.
type EventData struct {
// Events are the events that are received.
Events []Event
}
// Event represents the data for a single event.
type Event struct {
// TxIndex is the index of the transaction in the block to which this event is associated.
// It should be set to a negative number if the event is not associated with a transaction.
// Canonically -1 should be used to represent begin block processing and -2 should be used to
@ -52,16 +58,23 @@ type EventData struct {
// MsgIndex is the index of the message in the transaction to which this event is associated.
// If TxIndex is negative, this index could correspond to the index of the message in
// begin or end block processing if such indexes exist, or it can be set to zero.
MsgIndex uint32
MsgIndex int32
// EventIndex is the index of the event in the message to which this event is associated.
EventIndex uint32
EventIndex int32
// Type is the type of the event.
Type string
// Data is the JSON representation of the event data. It should generally be a JSON object.
// Data lazily returns the JSON representation of the event.
Data ToJSON
// Attributes lazily returns the key-value attribute representation of the event.
Attributes ToEventAttributes
}
type EventAttribute = struct {
Key, Value string
}
// ToBytes is a function that lazily returns the raw byte representation of data.
@ -70,18 +83,21 @@ type ToBytes = func() ([]byte, error)
// ToJSON is a function that lazily returns the JSON representation of data.
type ToJSON = func() (json.RawMessage, error)
// ToEventAttributes is a function that lazily returns the key-value attribute representation of an event.
type ToEventAttributes = func() ([]EventAttribute, error)
// KVPairData represents a batch of key-value pair data that is passed to a listener.
type KVPairData struct {
Updates []ModuleKVPairUpdate
Updates []ActorKVPairUpdate
}
// ModuleKVPairUpdate represents a key-value pair update for a specific module.
type ModuleKVPairUpdate struct {
// ModuleName is the name of the module that the key-value pair belongs to.
ModuleName string
// ActorKVPairUpdate represents a key-value pair update for a specific module or account.
type ActorKVPairUpdate = struct {
// Actor is the byte representation of the module or account that is updating the key-value pair.
Actor []byte
// Update is the key-value pair update.
Update schema.KVPairUpdate
// StateChanges are key-value pair updates.
StateChanges []schema.KVPairUpdate
}
// ObjectUpdateData represents object update data that is passed to a listener.

View File

@ -42,5 +42,14 @@ type Listener struct {
// if err is nil and then if it is, check if completionCallback is nil and if not
// call it and check for an error. Commit should be designed to be non-blocking if
// possible, but calling completionCallback should be blocking.
// When listener processing is pushed into background go routines using AsyncListener
// or AsyncListenerMux, the Commit completion callback will synchronize the processing of
// all listeners. Producers that do not want to block on Commit in a given block
// can delay calling the completion callback until the start of the next block to
// give listeners time to complete their processing.
Commit func(CommitData) (completionCallback func() error, err error)
// onBatch can be used internally to efficiently forward packet batches to
// async listeners.
onBatch func(PacketBatch) error
}

View File

@ -137,5 +137,15 @@ func ListenerMux(listeners ...Listener) Listener {
}
}
mux.onBatch = func(batch PacketBatch) error {
for _, listener := range listeners {
err := batch.apply(&listener)
if err != nil {
return err
}
}
return nil
}
return mux
}

View File

@ -2,6 +2,8 @@ package appdata
// Packet is the interface that all listener data structures implement so that this data can be "packetized"
// and processed in a stream, possibly asynchronously.
// Valid implementations are ModuleInitializationData, StartBlockData, TxData, EventData, KVPairData, ObjectUpdateData,
// and CommitData.
type Packet interface {
apply(*Listener) error
}

View File

@ -297,12 +297,14 @@ func (t testStore) GetUInt64(key []byte) uint64 {
func (t testStore) Set(key, value []byte) {
if t.listener.OnKVPair != nil {
err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ModuleKVPairUpdate{
err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ActorKVPairUpdate{
{
ModuleName: t.modName,
Update: schema.KVPairUpdate{
Key: key,
Value: value,
Actor: []byte(t.modName),
StateChanges: []schema.KVPairUpdate{
{
Key: key,
Value: value,
},
},
},
}})

View File

@ -23,6 +23,7 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
onKVPair := target.OnKVPair
moduleCodecs := map[string]*schema.ModuleCodec{}
moduleNames := map[string]string{}
target.OnKVPair = func(data appdata.KVPairData) error {
// first forward kv pair updates
@ -34,17 +35,28 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
}
for _, kvUpdate := range data.Updates {
// look for an existing codec
pcdc, ok := moduleCodecs[kvUpdate.ModuleName]
moduleName, ok := moduleNames[string(kvUpdate.Actor)]
if !ok {
if opts.ModuleFilter != nil && !opts.ModuleFilter(kvUpdate.ModuleName) {
var err error
moduleName, err = resolver.DecodeModuleName(kvUpdate.Actor)
if err != nil {
return err
}
moduleNames[string(kvUpdate.Actor)] = moduleName
}
// look for an existing codec
pcdc, ok := moduleCodecs[moduleName]
if !ok {
if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) {
// we don't care about this module so store nil and continue
moduleCodecs[kvUpdate.ModuleName] = nil
moduleCodecs[moduleName] = nil
continue
}
// look for a new codec
cdc, found, err := resolver.LookupDecoder(kvUpdate.ModuleName)
cdc, found, err := resolver.LookupDecoder(moduleName)
if err != nil {
return err
}
@ -52,16 +64,16 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
if !found {
// store nil to indicate we've seen this module and don't have a codec
// and keep processing the kv updates
moduleCodecs[kvUpdate.ModuleName] = nil
moduleCodecs[moduleName] = nil
continue
}
pcdc = &cdc
moduleCodecs[kvUpdate.ModuleName] = pcdc
moduleCodecs[moduleName] = pcdc
if initializeModuleData != nil {
err = initializeModuleData(appdata.ModuleInitializationData{
ModuleName: kvUpdate.ModuleName,
ModuleName: moduleName,
Schema: cdc.Schema,
})
if err != nil {
@ -80,22 +92,24 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
continue
}
updates, err := pcdc.KVDecoder(kvUpdate.Update)
if err != nil {
return err
}
for _, u := range kvUpdate.StateChanges {
updates, err := pcdc.KVDecoder(u)
if err != nil {
return err
}
if len(updates) == 0 {
// no updates
continue
}
if len(updates) == 0 {
// no updates
continue
}
err = target.OnObjectUpdate(appdata.ObjectUpdateData{
ModuleName: kvUpdate.ModuleName,
Updates: updates,
})
if err != nil {
return err
err = target.OnObjectUpdate(appdata.ObjectUpdateData{
ModuleName: moduleName,
Updates: updates,
})
if err != nil {
return err
}
}
}

View File

@ -1,6 +1,7 @@
package decoding
import (
"fmt"
"sort"
"cosmossdk.io/schema"
@ -8,6 +9,12 @@ import (
// DecoderResolver is an interface that allows indexers to discover and use module decoders.
type DecoderResolver interface {
// DecodeModuleName decodes a module name from a byte slice passed as the actor in a KVPairUpdate.
DecodeModuleName([]byte) (string, error)
// EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate.
EncodeModuleName(string) ([]byte, error)
// IterateAll iterates over all available module decoders.
IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error
@ -27,6 +34,20 @@ type moduleSetDecoderResolver struct {
moduleSet map[string]interface{}
}
func (a moduleSetDecoderResolver) DecodeModuleName(bytes []byte) (string, error) {
if _, ok := a.moduleSet[string(bytes)]; ok {
return string(bytes), nil
}
return "", fmt.Errorf("module %s not found", bytes)
}
func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) {
if _, ok := a.moduleSet[s]; ok {
return []byte(s), nil
}
return nil, fmt.Errorf("module %s not found", s)
}
func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {