diff --git a/indexer/postgres/indexer.go b/indexer/postgres/indexer.go index a69eefff1c..ab911d8d22 100644 --- a/indexer/postgres/indexer.go +++ b/indexer/postgres/indexer.go @@ -68,14 +68,14 @@ func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata return mm.InitializeSchema(ctx, tx) }, - Commit: func(data appdata.CommitData) error { + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { err = tx.Commit() if err != nil { - return err + return nil, err } tx, err = db.BeginTx(ctx, nil) - return err + return nil, err }, }, nil } diff --git a/indexer/postgres/tests/init_schema_test.go b/indexer/postgres/tests/init_schema_test.go index 8c4288ba34..ae7da66c52 100644 --- a/indexer/postgres/tests/init_schema_test.go +++ b/indexer/postgres/tests/init_schema_test.go @@ -58,7 +58,11 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st })) require.NotNil(t, listener.Commit) - require.NoError(t, listener.Commit(appdata.CommitData{})) + cb, err := listener.Commit(appdata.CommitData{}) + require.NoError(t, err) + if cb != nil { + require.NoError(t, cb()) + } golden.Assert(t, buf.String(), goldenFileName) } diff --git a/schema/appdata/async.go b/schema/appdata/async.go index 4112ae839f..59c671186e 100644 --- a/schema/appdata/async.go +++ b/schema/appdata/async.go @@ -19,50 +19,24 @@ type AsyncListenerOptions struct { DoneWaitGroup *sync.WaitGroup } -// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously -// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil -// except for Commit which will return an error or nil once all listeners have processed the commit. The context -// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the -// channels used to send events to the listeners. +// AsyncListenerMux is a convenience function that calls AsyncListener for each listener +// with the provided options and combines them using ListenerMux. func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener { asyncListeners := make([]Listener, len(listeners)) - commitChans := make([]chan error, len(listeners)) for i, l := range listeners { - commitChan := make(chan error) - commitChans[i] = commitChan - asyncListeners[i] = AsyncListener(opts, commitChan, l) + asyncListeners[i] = AsyncListener(opts, l) } - mux := ListenerMux(asyncListeners...) - muxCommit := mux.Commit - mux.Commit = func(data CommitData) error { - if muxCommit != nil { - err := muxCommit(data) - if err != nil { - return err - } - } - - for _, commitChan := range commitChans { - err := <-commitChan - if err != nil { - return err - } - } - return nil - } - - return mux + return ListenerMux(asyncListeners...) } // AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously // in a separate go routine. The listener that is returned will return nil for all methods including Commit and -// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has -// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine +// an error or nil will only be returned when the callback returned by Commit is called. +// Thus Commit() can be used as a synchronization and error checking mechanism. The go routine // that is being used for listening will exit when context.Done() returns and no more events will be received by the listener. // bufferSize is the size of the buffer for the channel that is used to send events to the listener. -// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly -// via its Commit callback. -func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener Listener) Listener { +func AsyncListener(opts AsyncListenerOptions, listener Listener) Listener { + commitChan := make(chan error) packetChan := make(chan Packet, opts.BufferSize) res := Listener{} ctx := opts.Context @@ -151,11 +125,11 @@ func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener } } - if listener.Commit != nil { - res.Commit = func(data CommitData) error { - packetChan <- data - return nil - } + res.Commit = func(data CommitData) (func() error, error) { + packetChan <- data + return func() error { + return <-commitChan + }, nil } return res diff --git a/schema/appdata/async_test.go b/schema/appdata/async_test.go index c1df2d4ca6..856575f71b 100644 --- a/schema/appdata/async_test.go +++ b/schema/appdata/async_test.go @@ -47,7 +47,12 @@ func TestAsyncListenerMux(t *testing.T) { BufferSize: 16, Context: ctx, DoneWaitGroup: wg, }, listener1, listener2) - callAllCallbacksOnces(t, res) + completeCb := callAllCallbacksOnces(t, res) + if completeCb != nil { + if err := completeCb(); err != nil { + t.Fatal(err) + } + } expectedCalls := []string{ "InitializeModuleData", @@ -72,15 +77,23 @@ func TestAsyncListenerMux(t *testing.T) { listener1 := callCollector(1, func(name string, _ int, _ Packet) { calls1 = append(calls1, name) }) - listener1.Commit = func(data CommitData) error { - return fmt.Errorf("error") + listener1.Commit = func(data CommitData) (completionCallback func() error, err error) { + return nil, fmt.Errorf("error") } listener2 := callCollector(2, func(name string, _ int, _ Packet) { calls2 = append(calls2, name) }) res := AsyncListenerMux(AsyncListenerOptions{}, listener1, listener2) - err := res.Commit(CommitData{}) + cb, err := res.Commit(CommitData{}) + if err != nil { + t.Fatalf("expected first error to be nil, got %v", err) + } + if cb == nil { + t.Fatalf("expected completion callback") + } + + err = cb() if err == nil || err.Error() != "error" { t.Fatalf("expected error, got %v", err) } @@ -89,21 +102,19 @@ func TestAsyncListenerMux(t *testing.T) { func TestAsyncListener(t *testing.T) { t.Run("call cancel", func(t *testing.T) { - commitChan := make(chan error) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} var calls []string listener := callCollector(1, func(name string, _ int, _ Packet) { calls = append(calls, name) }) - res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg}, - commitChan, listener) + res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg}, listener) - callAllCallbacksOnces(t, res) - - err := <-commitChan - if err != nil { - t.Fatalf("expected nil, got %v", err) + completeCb := callAllCallbacksOnces(t, res) + if completeCb != nil { + if err := completeCb(); err != nil { + t.Fatal(err) + } } checkExpectedCallOrder(t, calls, []string{ @@ -124,7 +135,6 @@ func TestAsyncListener(t *testing.T) { }) t.Run("error", func(t *testing.T) { - commitChan := make(chan error) var calls []string listener := callCollector(1, func(name string, _ int, _ Packet) { calls = append(calls, name) @@ -134,11 +144,14 @@ func TestAsyncListener(t *testing.T) { return fmt.Errorf("error") } - res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, commitChan, listener) + res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, listener) - callAllCallbacksOnces(t, res) + completeCb := callAllCallbacksOnces(t, res) + if completeCb == nil { + t.Fatalf("expected completion callback") + } - err := <-commitChan + err := completeCb() if err == nil || err.Error() != "error" { t.Fatalf("expected error, got %v", err) } diff --git a/schema/appdata/forwarder.go b/schema/appdata/forwarder.go index 2fb4113c85..973b3c3bba 100644 --- a/schema/appdata/forwarder.go +++ b/schema/appdata/forwarder.go @@ -10,6 +10,6 @@ func PacketForwarder(f func(Packet) error) Listener { OnKVPair: func(data KVPairData) error { return f(data) }, OnObjectUpdate: func(data ObjectUpdateData) error { return f(data) }, StartBlock: func(data StartBlockData) error { return f(data) }, - Commit: func(data CommitData) error { return f(data) }, + Commit: func(data CommitData) (func() error, error) { return nil, f(data) }, } } diff --git a/schema/appdata/listener.go b/schema/appdata/listener.go index d4786cb025..0a7efa926a 100644 --- a/schema/appdata/listener.go +++ b/schema/appdata/listener.go @@ -37,5 +37,10 @@ type Listener struct { // indexers should commit their data when this is called and return an error if // they are unable to commit. Data sources MUST call Commit when data is committed, // otherwise it should be assumed that indexers have not persisted their state. - Commit func(CommitData) error + // Commit is designed to support async processing so that implementations may return + // a completion callback to wait for commit to complete. Callers should first check + // if err is nil and then if it is, check if completionCallback is nil and if not + // call it and check for an error. Commit should be designed to be non-blocking if + // possible, but calling completionCallback should be blocking. + Commit func(CommitData) (completionCallback func() error, err error) } diff --git a/schema/appdata/mux.go b/schema/appdata/mux.go index 8e6b886577..c35472dc01 100644 --- a/schema/appdata/mux.go +++ b/schema/appdata/mux.go @@ -107,20 +107,33 @@ func ListenerMux(listeners ...Listener) Listener { } } - commitCbs := make([]func(CommitData) error, 0, len(listeners)) + commitCbs := make([]func(CommitData) (func() error, error), 0, len(listeners)) for _, l := range listeners { if l.Commit != nil { commitCbs = append(commitCbs, l.Commit) } } - if len(commitCbs) > 0 { - mux.Commit = func(data CommitData) error { + n := len(commitCbs) + if n > 0 { + mux.Commit = func(data CommitData) (func() error, error) { + waitCbs := make([]func() error, 0, n) for _, cb := range commitCbs { - if err := cb(data); err != nil { - return err + wait, err := cb(data) + if err != nil { + return nil, err + } + if wait != nil { + waitCbs = append(waitCbs, wait) } } - return nil + return func() error { + for _, cb := range waitCbs { + if err := cb(); err != nil { + return err + } + } + return nil + }, nil } } diff --git a/schema/appdata/mux_test.go b/schema/appdata/mux_test.go index 70787fada8..4f3897238c 100644 --- a/schema/appdata/mux_test.go +++ b/schema/appdata/mux_test.go @@ -40,7 +40,12 @@ func TestListenerMux(t *testing.T) { res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall)) - callAllCallbacksOnces(t, res) + completeCb := callAllCallbacksOnces(t, res) + if completeCb != nil { + if err := completeCb(); err != nil { + t.Fatal(err) + } + } checkExpectedCallOrder(t, calls, []string{ "InitializeModuleData 1", @@ -61,7 +66,7 @@ func TestListenerMux(t *testing.T) { }) } -func callAllCallbacksOnces(t *testing.T, listener Listener) { +func callAllCallbacksOnces(t *testing.T, listener Listener) (completeCb func() error) { t.Helper() if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil { t.Error(err) @@ -81,9 +86,12 @@ func callAllCallbacksOnces(t *testing.T, listener Listener) { if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil { t.Error(err) } - if err := listener.Commit(CommitData{}); err != nil { + var err error + completeCb, err = listener.Commit(CommitData{}) + if err != nil { t.Error(err) } + return completeCb } func callCollector(i int, onCall func(string, int, Packet)) Listener { @@ -112,9 +120,9 @@ func callCollector(i int, onCall func(string, int, Packet)) Listener { onCall("OnObjectUpdate", i, nil) return nil }, - Commit: func(CommitData) error { + Commit: func(data CommitData) (completionCallback func() error, err error) { onCall("Commit", i, nil) - return nil + return nil, nil }, } } diff --git a/schema/appdata/packet.go b/schema/appdata/packet.go index e5fe6be966..e4f6d94e5d 100644 --- a/schema/appdata/packet.go +++ b/schema/appdata/packet.go @@ -57,5 +57,12 @@ func (c CommitData) apply(l *Listener) error { if l.Commit == nil { return nil } - return l.Commit(c) + cb, err := l.Commit(c) + if err != nil { + return err + } + if cb != nil { + return cb() + } + return nil } diff --git a/schema/testing/appdatasim/app_data_test.go b/schema/testing/appdatasim/app_data_test.go index 29701c4411..eba8a8984f 100644 --- a/schema/testing/appdatasim/app_data_test.go +++ b/schema/testing/appdatasim/app_data_test.go @@ -84,9 +84,9 @@ func writerListener(w io.Writer) appdata.Listener { OnTx: nil, OnEvent: nil, OnKVPair: nil, - Commit: func(data appdata.CommitData) error { - _, err := fmt.Fprintf(w, "Commit: %v\n", data) - return err + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { + _, err = fmt.Fprintf(w, "Commit: %v\n", data) + return nil, err }, InitializeModuleData: func(data appdata.ModuleInitializationData) error { bz, err := json.Marshal(data)