forked from cerc-io/ipld-eth-server
changes to plugin and commands to accomodate changes
This commit is contained in:
parent
05c3b9bb48
commit
e4e092f542
32
README.md
32
README.md
@ -329,7 +329,7 @@ The config provides information for composing a set of transformers:
|
||||
rank = "0"
|
||||
[exporter.transformer2]
|
||||
path = "path/to/transformer2"
|
||||
type = "eth_event"
|
||||
type = "eth_generic"
|
||||
repository = "github.com/account/repo"
|
||||
migrations = "db/migrations"
|
||||
rank = "0"
|
||||
@ -359,6 +359,9 @@ The config provides information for composing a set of transformers:
|
||||
that fetches state and storage diffs from an ETH node (instead of, for example, from IPFS)
|
||||
- `eth_event` indicates the transformer works with the [event watcher](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/event_watcher.go)
|
||||
that fetches event logs from an ETH node
|
||||
- `eth_generic` indicates the transformer works with the [generic watcher](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/watcher/generic_watcher.go)
|
||||
that is made to work with [omni pkg](https://github.com/vulcanize/maker-vulcanizedb/tree/staging/pkg/omni)
|
||||
based transformers which work with either a light or full sync vDB to watch events and poll public methods
|
||||
- `migrations` is the relative path from `repository` to the db migrations directory for the transformer
|
||||
- `rank` determines the order that migrations are ran, with lower ranked migrations running first
|
||||
- this is to help isolate any potential conflicts between transformer migrations
|
||||
@ -390,14 +393,15 @@ type exporter string
|
||||
|
||||
var Exporter exporter
|
||||
|
||||
func (e exporter) Export() []interface1.EventTransformerInitializer, []interface1.StorageTransformerInitializer {
|
||||
return []interface1.EventTransformerInitializer{
|
||||
transformer1.EventTransformerInitializer,
|
||||
transformer2.EventTransformerInitializer,
|
||||
transformer3.EventTransformerInitializer,
|
||||
}, []interface1.StorageTransformerInitializer{
|
||||
transformer4.StorageTransformerInitializer,
|
||||
}
|
||||
func (e exporter) Export() []interface1.EventTransformerInitializer, []interface1.StorageTransformerInitializer, []interface1.GenericTransformerInitializer {
|
||||
return []interface1.TransformerInitializer{
|
||||
transformer1.TransformerInitializer,
|
||||
transformer3.TransformerInitializer,
|
||||
}, []interface1.StorageTransformerInitializer{
|
||||
transformer4.StorageTransformerInitializer,
|
||||
}, []interface1.GenericTransformerInitializer{
|
||||
transformer2.TransformerInitializer,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@ -405,10 +409,12 @@ func (e exporter) Export() []interface1.EventTransformerInitializer, []interface
|
||||
To plug in an external transformer we need to:
|
||||
|
||||
* Create a [package](https://github.com/vulcanize/ens_transformers/blob/working/transformers/registry/new_owner/initializer/initializer.go)
|
||||
that exports a variable `EventTransformerInitializer` or `StorageTransformerInitializer` that are of type [EventTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/transformer/event_transformer.go#L33)
|
||||
or [StorageTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/transformer/storage_transformer.go#L31), respectively
|
||||
* Design the transformers to work in the context of their [event](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/event_watcher.go#L83)
|
||||
or [storage](https://github.com/vulcanize/maker-vulcanizedb/blob/staging/libraries/shared/watcher/storage_watcher.go#L58) watcher execution modes
|
||||
that exports a variable `TransformerInitializer`, `StorageTransformerInitializer`, or `GenericTransformerInitializer` that are of type [TransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/transformer/event_transformer.go#L33)
|
||||
or [StorageTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/transformer/storage_transformer.go#L31),
|
||||
or [GenericTransformerInitializer](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/transformer/generic_transformer.go#L31), respectively
|
||||
* Design the transformers to work in the context of their [event](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/watcher/event_watcher.go#L83),
|
||||
[storage](https://github.com/vulcanize/maker-vulcanizedb/blob/compose_and_execute/libraries/shared/watcher/storage_watcher.go#L53),
|
||||
or [generic](https://github.com/vulcanize/maker-vulcanizedb/blob/omni_update/libraries/shared/watcher/generic_watcher.go#L68) watcher execution modes
|
||||
* Create db migrations to run against vulcanizeDB so that we can store the transformer output
|
||||
* Do not `goose fix` the transformer migrations
|
||||
* Specify migration locations for each transformer in the config with the `exporter.transformer.migrations` fields
|
||||
|
@ -62,7 +62,7 @@ var composeCmd = &cobra.Command{
|
||||
rank = "0"
|
||||
[exporter.transformer2]
|
||||
path = "path/to/transformer2"
|
||||
type = "eth_event"
|
||||
type = "eth_generic"
|
||||
repository = "github.com/account/repo"
|
||||
migrations = "db/migrations"
|
||||
rank = "0"
|
||||
@ -91,7 +91,8 @@ from it and loaded into and executed over by the appropriate watcher.
|
||||
The type of watcher that the transformer works with is specified using the
|
||||
type variable for each transformer in the config. Currently there are watchers
|
||||
of event data from an eth node (eth_event) and storage data from an eth node
|
||||
(eth_storage).
|
||||
(eth_storage), and a more generic interface for accepting omni pkg based transformers
|
||||
which can perform both event watching and public method polling.
|
||||
|
||||
Transformers of different types can be ran together in the same command using a
|
||||
single config file or in separate command instances using different config files
|
||||
|
@ -62,7 +62,7 @@ var composeAndExecuteCmd = &cobra.Command{
|
||||
rank = "0"
|
||||
[exporter.transformer2]
|
||||
path = "path/to/transformer2"
|
||||
type = "eth_event"
|
||||
type = "eth_generic"
|
||||
repository = "github.com/account/repo"
|
||||
migrations = "db/migrations"
|
||||
rank = "2"
|
||||
@ -91,7 +91,8 @@ from it and loaded into and executed over by the appropriate watcher.
|
||||
The type of watcher that the transformer works with is specified using the
|
||||
type variable for each transformer in the config. Currently there are watchers
|
||||
of event data from an eth node (eth_event) and storage data from an eth node
|
||||
(eth_storage).
|
||||
(eth_storage), and a more generic interface for accepting omni pkg based transformers
|
||||
which can perform both event watching and public method polling.
|
||||
|
||||
Transformers of different types can be ran together in the same command using a
|
||||
single config file or in separate command instances using different config files
|
||||
@ -150,7 +151,7 @@ func composeAndExecute() {
|
||||
}
|
||||
|
||||
// Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets
|
||||
ethEventInitializers, ethStorageInitializers := exporter.Export()
|
||||
ethEventInitializers, ethStorageInitializers, genericInitializers := exporter.Export()
|
||||
|
||||
// Setup bc and db objects
|
||||
blockChain := getBlockChain()
|
||||
@ -173,6 +174,13 @@ func composeAndExecute() {
|
||||
wg.Add(1)
|
||||
go watchEthStorage(&sw, &wg)
|
||||
}
|
||||
|
||||
if len(genericInitializers) > 0 {
|
||||
gw := watcher.NewGenericWatcher(&db, blockChain)
|
||||
gw.AddTransformers(genericInitializers)
|
||||
wg.Add(1)
|
||||
go genericWatching(&gw, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ func execute() {
|
||||
}
|
||||
|
||||
// Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets
|
||||
ethEventInitializers, ethStorageInitializers := exporter.Export()
|
||||
ethEventInitializers, ethStorageInitializers, genericInitializers := exporter.Export()
|
||||
|
||||
// Setup bc and db objects
|
||||
blockChain := getBlockChain()
|
||||
@ -123,6 +123,13 @@ func execute() {
|
||||
wg.Add(1)
|
||||
go watchEthStorage(&sw, &wg)
|
||||
}
|
||||
|
||||
if len(genericInitializers) > 0 {
|
||||
gw := watcher.NewGenericWatcher(&db, blockChain)
|
||||
gw.AddTransformers(genericInitializers)
|
||||
wg.Add(1)
|
||||
go genericWatching(&gw, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@ -132,7 +139,7 @@ func init() {
|
||||
}
|
||||
|
||||
type Exporter interface {
|
||||
Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer)
|
||||
Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer, []transformer.GenericTransformerInitializer)
|
||||
}
|
||||
|
||||
func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
||||
@ -157,7 +164,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
||||
|
||||
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
||||
defer wg.Done()
|
||||
// Execute over the StorageTransformerInitializer set using the watcher
|
||||
// Execute over the StorageTransformerInitializer set using the storage watcher
|
||||
log.Info("executing storage transformers")
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
defer ticker.Stop()
|
||||
@ -168,3 +175,17 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func genericWatching(w *watcher.GenericWatcher, wg *syn.WaitGroup) {
|
||||
defer wg.Done()
|
||||
// Execute over the GenericTransformerInitializer set using the generic watcher
|
||||
log.Info("executing generic transformers")
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
err := w.Execute(nil)
|
||||
if err != nil {
|
||||
// TODO Handle watcher errors in execute
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ type GenericWatcher struct {
|
||||
BlockChain core.BlockChain
|
||||
}
|
||||
|
||||
func NewGenericWatcher(db *postgres.DB, bc core.BlockChain) *GenericWatcher {
|
||||
return &GenericWatcher{
|
||||
func NewGenericWatcher(db *postgres.DB, bc core.BlockChain) GenericWatcher {
|
||||
return GenericWatcher{
|
||||
DB: db,
|
||||
BlockChain: bc,
|
||||
}
|
||||
|
@ -111,6 +111,7 @@ const (
|
||||
UnknownTransformerType TransformerType = iota
|
||||
EthEvent
|
||||
EthStorage
|
||||
EthGeneric
|
||||
)
|
||||
|
||||
func (pt TransformerType) String() string {
|
||||
@ -118,9 +119,10 @@ func (pt TransformerType) String() string {
|
||||
"Unknown",
|
||||
"eth_event",
|
||||
"eth_storage",
|
||||
"eth_generic",
|
||||
}
|
||||
|
||||
if pt > EthStorage || pt < EthEvent {
|
||||
if pt > EthGeneric || pt < EthEvent {
|
||||
return "Unknown"
|
||||
}
|
||||
|
||||
@ -131,6 +133,7 @@ func GetTransformerType(str string) TransformerType {
|
||||
types := [...]TransformerType{
|
||||
EthEvent,
|
||||
EthStorage,
|
||||
EthGeneric,
|
||||
}
|
||||
|
||||
for _, ty := range types {
|
||||
|
@ -74,13 +74,17 @@ func (w *writer) WritePlugin() error {
|
||||
f.Func().Params(Id("e").Id("exporter")).Id("Export").Params().Parens(List(
|
||||
Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "EventTransformerInitializer"),
|
||||
Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "StorageTransformerInitializer"),
|
||||
Index().Qual("github.com/vulcanize/vulcanizedb/libraries/shared/transformer", "GenericTransformerInitializer"),
|
||||
)).Block(Return(
|
||||
Index().Qual(
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
|
||||
"EventTransformerInitializer").Values(code[config.EthEvent]...),
|
||||
Index().Qual(
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
|
||||
"StorageTransformerInitializer").Values(code[config.EthStorage]...))) // Exports the collected event and storage transformer initializers
|
||||
"StorageTransformerInitializer").Values(code[config.EthStorage]...),
|
||||
Index().Qual(
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
|
||||
"GenericTransformerInitializer").Values(code[config.EthGeneric]...))) // Exports the collected event and storage transformer initializers
|
||||
|
||||
// Write code to destination file
|
||||
err = f.Save(goFile)
|
||||
@ -100,6 +104,8 @@ func (w *writer) collectTransformers() (map[config.TransformerType][]Code, error
|
||||
code[config.EthEvent] = append(code[config.EthEvent], Qual(path, "EventTransformerInitializer"))
|
||||
case config.EthStorage:
|
||||
code[config.EthStorage] = append(code[config.EthStorage], Qual(path, "StorageTransformerInitializer"))
|
||||
case config.EthGeneric:
|
||||
code[config.EthGeneric] = append(code[config.EthGeneric], Qual(path, "GenericTransformerInitializer"))
|
||||
default:
|
||||
return nil, errors.New(fmt.Sprintf("invalid transformer type %s", transformer.Type))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user