feat(schema/appdata)!: make commit async (#21306)

This commit is contained in:
Aaron Craelius 2024-08-20 09:06:25 -04:00 committed by GitHub
parent aeb0f275f9
commit 27d3d4892b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 100 additions and 76 deletions

View File

@ -68,14 +68,14 @@ func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata
return mm.InitializeSchema(ctx, tx)
},
Commit: func(data appdata.CommitData) error {
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
err = tx.Commit()
if err != nil {
return err
return nil, err
}
tx, err = db.BeginTx(ctx, nil)
return err
return nil, err
},
}, nil
}

View File

@ -58,7 +58,11 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st
}))
require.NotNil(t, listener.Commit)
require.NoError(t, listener.Commit(appdata.CommitData{}))
cb, err := listener.Commit(appdata.CommitData{})
require.NoError(t, err)
if cb != nil {
require.NoError(t, cb())
}
golden.Assert(t, buf.String(), goldenFileName)
}

View File

@ -19,50 +19,24 @@ type AsyncListenerOptions struct {
DoneWaitGroup *sync.WaitGroup
}
// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil
// except for Commit which will return an error or nil once all listeners have processed the commit. The context
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the
// channels used to send events to the listeners.
// AsyncListenerMux is a convenience function that calls AsyncListener for each listener
// with the provided options and combines them using ListenerMux.
func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener {
asyncListeners := make([]Listener, len(listeners))
commitChans := make([]chan error, len(listeners))
for i, l := range listeners {
commitChan := make(chan error)
commitChans[i] = commitChan
asyncListeners[i] = AsyncListener(opts, commitChan, l)
asyncListeners[i] = AsyncListener(opts, l)
}
mux := ListenerMux(asyncListeners...)
muxCommit := mux.Commit
mux.Commit = func(data CommitData) error {
if muxCommit != nil {
err := muxCommit(data)
if err != nil {
return err
}
}
for _, commitChan := range commitChans {
err := <-commitChan
if err != nil {
return err
}
}
return nil
}
return mux
return ListenerMux(asyncListeners...)
}
// AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine
// an error or nil will only be returned when the callback returned by Commit is called.
// Thus Commit() can be used as a synchronization and error checking mechanism. The go routine
// that is being used for listening will exit when context.Done() returns and no more events will be received by the listener.
// bufferSize is the size of the buffer for the channel that is used to send events to the listener.
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly
// via its Commit callback.
func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener Listener) Listener {
func AsyncListener(opts AsyncListenerOptions, listener Listener) Listener {
commitChan := make(chan error)
packetChan := make(chan Packet, opts.BufferSize)
res := Listener{}
ctx := opts.Context
@ -151,11 +125,11 @@ func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener
}
}
if listener.Commit != nil {
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}
res.Commit = func(data CommitData) (func() error, error) {
packetChan <- data
return func() error {
return <-commitChan
}, nil
}
return res

View File

@ -47,7 +47,12 @@ func TestAsyncListenerMux(t *testing.T) {
BufferSize: 16, Context: ctx, DoneWaitGroup: wg,
}, listener1, listener2)
callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}
expectedCalls := []string{
"InitializeModuleData",
@ -72,15 +77,23 @@ func TestAsyncListenerMux(t *testing.T) {
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener1.Commit = func(data CommitData) error {
return fmt.Errorf("error")
listener1.Commit = func(data CommitData) (completionCallback func() error, err error) {
return nil, fmt.Errorf("error")
}
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux(AsyncListenerOptions{}, listener1, listener2)
err := res.Commit(CommitData{})
cb, err := res.Commit(CommitData{})
if err != nil {
t.Fatalf("expected first error to be nil, got %v", err)
}
if cb == nil {
t.Fatalf("expected completion callback")
}
err = cb()
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}
@ -89,21 +102,19 @@ func TestAsyncListenerMux(t *testing.T) {
func TestAsyncListener(t *testing.T) {
t.Run("call cancel", func(t *testing.T) {
commitChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg},
commitChan, listener)
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg}, listener)
callAllCallbacksOnces(t, res)
err := <-commitChan
if err != nil {
t.Fatalf("expected nil, got %v", err)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}
checkExpectedCallOrder(t, calls, []string{
@ -124,7 +135,6 @@ func TestAsyncListener(t *testing.T) {
})
t.Run("error", func(t *testing.T) {
commitChan := make(chan error)
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
@ -134,11 +144,14 @@ func TestAsyncListener(t *testing.T) {
return fmt.Errorf("error")
}
res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, commitChan, listener)
res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, listener)
callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb == nil {
t.Fatalf("expected completion callback")
}
err := <-commitChan
err := completeCb()
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}

View File

@ -10,6 +10,6 @@ func PacketForwarder(f func(Packet) error) Listener {
OnKVPair: func(data KVPairData) error { return f(data) },
OnObjectUpdate: func(data ObjectUpdateData) error { return f(data) },
StartBlock: func(data StartBlockData) error { return f(data) },
Commit: func(data CommitData) error { return f(data) },
Commit: func(data CommitData) (func() error, error) { return nil, f(data) },
}
}

View File

@ -37,5 +37,10 @@ type Listener struct {
// indexers should commit their data when this is called and return an error if
// they are unable to commit. Data sources MUST call Commit when data is committed,
// otherwise it should be assumed that indexers have not persisted their state.
Commit func(CommitData) error
// Commit is designed to support async processing so that implementations may return
// a completion callback to wait for commit to complete. Callers should first check
// if err is nil and then if it is, check if completionCallback is nil and if not
// call it and check for an error. Commit should be designed to be non-blocking if
// possible, but calling completionCallback should be blocking.
Commit func(CommitData) (completionCallback func() error, err error)
}

View File

@ -107,20 +107,33 @@ func ListenerMux(listeners ...Listener) Listener {
}
}
commitCbs := make([]func(CommitData) error, 0, len(listeners))
commitCbs := make([]func(CommitData) (func() error, error), 0, len(listeners))
for _, l := range listeners {
if l.Commit != nil {
commitCbs = append(commitCbs, l.Commit)
}
}
if len(commitCbs) > 0 {
mux.Commit = func(data CommitData) error {
n := len(commitCbs)
if n > 0 {
mux.Commit = func(data CommitData) (func() error, error) {
waitCbs := make([]func() error, 0, n)
for _, cb := range commitCbs {
if err := cb(data); err != nil {
return err
wait, err := cb(data)
if err != nil {
return nil, err
}
if wait != nil {
waitCbs = append(waitCbs, wait)
}
}
return nil
return func() error {
for _, cb := range waitCbs {
if err := cb(); err != nil {
return err
}
}
return nil
}, nil
}
}

View File

@ -40,7 +40,12 @@ func TestListenerMux(t *testing.T) {
res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall))
callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}
checkExpectedCallOrder(t, calls, []string{
"InitializeModuleData 1",
@ -61,7 +66,7 @@ func TestListenerMux(t *testing.T) {
})
}
func callAllCallbacksOnces(t *testing.T, listener Listener) {
func callAllCallbacksOnces(t *testing.T, listener Listener) (completeCb func() error) {
t.Helper()
if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil {
t.Error(err)
@ -81,9 +86,12 @@ func callAllCallbacksOnces(t *testing.T, listener Listener) {
if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil {
t.Error(err)
}
if err := listener.Commit(CommitData{}); err != nil {
var err error
completeCb, err = listener.Commit(CommitData{})
if err != nil {
t.Error(err)
}
return completeCb
}
func callCollector(i int, onCall func(string, int, Packet)) Listener {
@ -112,9 +120,9 @@ func callCollector(i int, onCall func(string, int, Packet)) Listener {
onCall("OnObjectUpdate", i, nil)
return nil
},
Commit: func(CommitData) error {
Commit: func(data CommitData) (completionCallback func() error, err error) {
onCall("Commit", i, nil)
return nil
return nil, nil
},
}
}

View File

@ -57,5 +57,12 @@ func (c CommitData) apply(l *Listener) error {
if l.Commit == nil {
return nil
}
return l.Commit(c)
cb, err := l.Commit(c)
if err != nil {
return err
}
if cb != nil {
return cb()
}
return nil
}

View File

@ -84,9 +84,9 @@ func writerListener(w io.Writer) appdata.Listener {
OnTx: nil,
OnEvent: nil,
OnKVPair: nil,
Commit: func(data appdata.CommitData) error {
_, err := fmt.Fprintf(w, "Commit: %v\n", data)
return err
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
_, err = fmt.Fprintf(w, "Commit: %v\n", data)
return nil, err
},
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
bz, err := json.Marshal(data)