diff --git a/README.md b/README.md index aada1b37..b2546848 100644 --- a/README.md +++ b/README.md @@ -329,7 +329,7 @@ The config provides information for composing a set of transformers: rank = "0" [exporter.transformer2] path = "path/to/transformer2" - type = "eth_event" + type = "eth_generic" repository = "github.com/account/repo" migrations = "db/migrations" rank = "0" @@ -359,6 +359,9 @@ The config provides information for composing a set of transformers: that fetches state and storage diffs from an ETH node (instead of, for example, from IPFS) - `eth_event` indicates the transformer works with the [event watcher](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/event_watcher.go) that fetches event logs from an ETH node + - `eth_generic` indicates the transformer works with the [generic watcher](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/watcher/generic_watcher.go) + that is made to work with [omni pkg](https://github.com/vulcanize/maker-vulcanizedb/tree/staging/pkg/omni) + based transformers which work with either a light or full sync vDB to watch events and poll public methods - `migrations` is the relative path from `repository` to the db migrations directory for the transformer - `rank` determines the order that migrations are ran, with lower ranked migrations running first - this is to help isolate any potential conflicts between transformer migrations @@ -390,14 +393,15 @@ type exporter string var Exporter exporter -func (e exporter) Export() []interface1.EventTransformerInitializer, []interface1.StorageTransformerInitializer { - return []interface1.EventTransformerInitializer{ - transformer1.EventTransformerInitializer, - transformer2.EventTransformerInitializer, - transformer3.EventTransformerInitializer, - }, []interface1.StorageTransformerInitializer{ - transformer4.StorageTransformerInitializer, - } +func (e exporter) Export() []interface1.EventTransformerInitializer, []interface1.StorageTransformerInitializer, []interface1.GenericTransformerInitializer { + return []interface1.TransformerInitializer{ + transformer1.TransformerInitializer, + transformer3.TransformerInitializer, + }, []interface1.StorageTransformerInitializer{ + transformer4.StorageTransformerInitializer, + }, []interface1.GenericTransformerInitializer{ + transformer2.TransformerInitializer, + } } ``` @@ -405,10 +409,12 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface To plug in an external transformer we need to: * Create a [package](https://github.com/vulcanize/ens_transformers/blob/working/transformers/registry/new_owner/initializer/initializer.go) -that exports a variable `EventTransformerInitializer` or `StorageTransformerInitializer` that are of type [EventTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/transformer/event_transformer.go#L33) -or [StorageTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/transformer/storage_transformer.go#L31), respectively -* Design the transformers to work in the context of their [event](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/event_watcher.go#L83) -or [storage](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/storage_watcher.go#L58) watcher execution modes +that exports a variable `TransformerInitializer`, `StorageTransformerInitializer`, or `GenericTransformerInitializer` that are of type [TransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/transformer/event_transformer.go#L33) +or [StorageTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/transformer/storage_transformer.go#L31), +or [GenericTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/transformer/generic_transformer.go#L31), respectively +* Design the transformers to work in the context of their [event](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/watcher/event_watcher.go#L83), +[storage](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/watcher/storage_watcher.go#L53), +or [generic](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/watcher/generic_watcher.go#L68) watcher execution modes * Create db migrations to run against vulcanizeDB so that we can store the transformer output * Do not `goose fix` the transformer migrations * Specify migration locations for each transformer in the config with the `exporter.transformer.migrations` fields diff --git a/cmd/compose.go b/cmd/compose.go index 72f6a299..93b51af9 100644 --- a/cmd/compose.go +++ b/cmd/compose.go @@ -62,7 +62,7 @@ var composeCmd = &cobra.Command{ rank = "0" [exporter.transformer2] path = "path/to/transformer2" - type = "eth_event" + type = "eth_generic" repository = "github.com/account/repo" migrations = "db/migrations" rank = "0" @@ -91,7 +91,8 @@ from it and loaded into and executed over by the appropriate watcher. The type of watcher that the transformer works with is specified using the type variable for each transformer in the config. Currently there are watchers of event data from an eth node (eth_event) and storage data from an eth node -(eth_storage). +(eth_storage), and a more generic interface for accepting omni pkg based transformers +which can perform both event watching and public method polling. Transformers of different types can be ran together in the same command using a single config file or in separate command instances using different config files diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index aef951a3..b74cf280 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -62,7 +62,7 @@ var composeAndExecuteCmd = &cobra.Command{ rank = "0" [exporter.transformer2] path = "path/to/transformer2" - type = "eth_event" + type = "eth_generic" repository = "github.com/account/repo" migrations = "db/migrations" rank = "2" @@ -91,7 +91,8 @@ from it and loaded into and executed over by the appropriate watcher. The type of watcher that the transformer works with is specified using the type variable for each transformer in the config. Currently there are watchers of event data from an eth node (eth_event) and storage data from an eth node -(eth_storage). +(eth_storage), and a more generic interface for accepting omni pkg based transformers +which can perform both event watching and public method polling. Transformers of different types can be ran together in the same command using a single config file or in separate command instances using different config files @@ -150,7 +151,7 @@ func composeAndExecute() { } // Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets - ethEventInitializers, ethStorageInitializers := exporter.Export() + ethEventInitializers, ethStorageInitializers, genericInitializers := exporter.Export() // Setup bc and db objects blockChain := getBlockChain() @@ -173,6 +174,13 @@ func composeAndExecute() { wg.Add(1) go watchEthStorage(&sw, &wg) } + + if len(genericInitializers) > 0 { + gw := watcher.NewGenericWatcher(&db, blockChain) + gw.AddTransformers(genericInitializers) + wg.Add(1) + go genericWatching(&gw, &wg) + } wg.Wait() } diff --git a/cmd/execute.go b/cmd/execute.go index da4bea4e..b76bbb59 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -100,7 +100,7 @@ func execute() { } // Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets - ethEventInitializers, ethStorageInitializers := exporter.Export() + ethEventInitializers, ethStorageInitializers, genericInitializers := exporter.Export() // Setup bc and db objects blockChain := getBlockChain() @@ -123,6 +123,13 @@ func execute() { wg.Add(1) go watchEthStorage(&sw, &wg) } + + if len(genericInitializers) > 0 { + gw := watcher.NewGenericWatcher(&db, blockChain) + gw.AddTransformers(genericInitializers) + wg.Add(1) + go genericWatching(&gw, &wg) + } wg.Wait() } @@ -132,7 +139,7 @@ func init() { } type Exporter interface { - Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer) + Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer, []transformer.GenericTransformerInitializer) } func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { @@ -157,7 +164,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { defer wg.Done() - // Execute over the StorageTransformerInitializer set using the watcher + // Execute over the StorageTransformerInitializer set using the storage watcher log.Info("executing storage transformers") ticker := time.NewTicker(pollingInterval) defer ticker.Stop() @@ -168,3 +175,17 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { } } } + +func genericWatching(w *watcher.GenericWatcher, wg *syn.WaitGroup) { + defer wg.Done() + // Execute over the GenericTransformerInitializer set using the generic watcher + log.Info("executing generic transformers") + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + for range ticker.C { + err := w.Execute(nil) + if err != nil { + // TODO Handle watcher errors in execute + } + } +} diff --git a/libraries/shared/watcher/generic_watcher.go b/libraries/shared/watcher/generic_watcher.go index 2407f175..5a49d2fd 100644 --- a/libraries/shared/watcher/generic_watcher.go +++ b/libraries/shared/watcher/generic_watcher.go @@ -36,8 +36,8 @@ type GenericWatcher struct { BlockChain core.BlockChain } -func NewGenericWatcher(db *postgres.DB, bc core.BlockChain) *GenericWatcher { - return &GenericWatcher{ +func NewGenericWatcher(db *postgres.DB, bc core.BlockChain) GenericWatcher { + return GenericWatcher{ DB: db, BlockChain: bc, } diff --git a/pkg/config/plugin.go b/pkg/config/plugin.go index dbe6a2f3..f521cb43 100644 --- a/pkg/config/plugin.go +++ b/pkg/config/plugin.go @@ -111,6 +111,7 @@ const ( UnknownTransformerType TransformerType = iota EthEvent EthStorage + EthGeneric ) func (pt TransformerType) String() string { @@ -118,9 +119,10 @@ func (pt TransformerType) String() string { "Unknown", "eth_event", "eth_storage", + "eth_generic", } - if pt > EthStorage || pt < EthEvent { + if pt > EthGeneric || pt < EthEvent { return "Unknown" } @@ -131,6 +133,7 @@ func GetTransformerType(str string) TransformerType { types := [...]TransformerType{ EthEvent, EthStorage, + EthGeneric, } for _, ty := range types { diff --git a/pkg/plugin/writer/writer.go b/pkg/plugin/writer/writer.go index 6c0b4ffd..33e8a84c 100644 --- a/pkg/plugin/writer/writer.go +++ b/pkg/plugin/writer/writer.go @@ -74,13 +74,17 @@ func (w *writer) WritePlugin() error { f.Func().Params(Id("e").Id("exporter")).Id("Export").Params().Parens(List( Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "EventTransformerInitializer"), Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "StorageTransformerInitializer"), + Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "GenericTransformerInitializer"), )).Block(Return( Index().Qual( "github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "EventTransformerInitializer").Values(code[config.EthEvent]...), Index().Qual( "github.com/vulcanize/vulcanizedb/libraries/shared/transformer", - "StorageTransformerInitializer").Values(code[config.EthStorage]...))) // Exports the collected event and storage transformer initializers + "StorageTransformerInitializer").Values(code[config.EthStorage]...), + Index().Qual( + "github.com/vulcanize/vulcanizedb/libraries/shared/transformer", + "GenericTransformerInitializer").Values(code[config.EthGeneric]...))) // Exports the collected event and storage transformer initializers // Write code to destination file err = f.Save(goFile) @@ -100,6 +104,8 @@ func (w *writer) collectTransformers() (map[config.TransformerType][]Code, error code[config.EthEvent] = append(code[config.EthEvent], Qual(path, "EventTransformerInitializer")) case config.EthStorage: code[config.EthStorage] = append(code[config.EthStorage], Qual(path, "StorageTransformerInitializer")) + case config.EthGeneric: + code[config.EthGeneric] = append(code[config.EthGeneric], Qual(path, "GenericTransformerInitializer")) default: return nil, errors.New(fmt.Sprintf("invalid transformer type %s", transformer.Type)) }