work on porting storage watcher; watcher type is defined for

transformers in the config, storage transformers are exported from
plugin like event transformers
This commit is contained in:
Ian Norden 2019-02-07 15:35:25 -06:00
parent b51dcb55de
commit 03a7379617
5 changed files with 485 additions and 136 deletions

View File

@ -16,10 +16,12 @@
package cmd
import (
"errors"
"fmt"
"log"
"os"
"plugin"
syn "sync"
"time"
"github.com/spf13/cobra"
@ -28,6 +30,7 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils"
@ -56,9 +59,14 @@ var composeAndExecuteCmd = &cobra.Command{
transformer2 = "github.com/path/to/transformer2"
transformer3 = "github.com/path/to/transformer3"
transformer4 = "github.com/different/path/to/transformer1"
[exporter.types]
transformer1 = "eth_event"
transformer2 = "eth_event"
transformer3 = "eth_event"
transformer4 = "eth_storage"
[exporter.repositories]
transformers = "github.com/path/to"
transformer4 = "github.com/different/path
transformer4 = "github.com/different/path"
[exporter.migrations]
transformers = "db/migrations"
transformer4 = "to/db/migrations"
@ -66,26 +74,31 @@ var composeAndExecuteCmd = &cobra.Command{
Note: If any of the imported transformer need additional
config variables do not forget to include those as well
This information is used to write and build a .so with an arbitrary transformer
This information is used to write and build a go plugin with a transformer
set composed from the transformer imports specified in the config file
This .so is loaded as a plugin and the set of transformer initializers is
loaded into and executed over by a generic watcher`,
This plugin is loaded and the set of transformer initializers is exported
from it and loaded into and executed over by the appropriate watcher.
The type of watcher that the transformer works with is specified using the
exporter.types config variable as shown above. Currently there are watchers
of event data from an eth node (eth_event) and storage data from an eth node
(eth_storage). Soon there will be watchers for ipfs (ipfs_event and ipfs_storage).
Transformers of different types can be ran together in the same command using a
single config file or in separate command instances using different config files
Specify config location when executing the command:
./vulcanizedb composeAndExecute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) {
composeAndExecute()
},
}
func composeAndExecute() {
// generate code to build the plugin according to the config file
genConfig = config.Plugin{
FilePath: "$GOPATH/src/github.com/vulcanize/vulcanizedb/plugins",
FileName: viper.GetString("exporter.name"),
Save: viper.GetBool("exporter.save"),
Initializers: viper.GetStringMapString("exporter.transformers"),
Dependencies: viper.GetStringMapString("exporter.repositories"),
Migrations: viper.GetStringMapString("exporter.migrations"),
}
// Build plugin generator config
prepConfig()
// Generate code to build the plugin according to the config file
fmt.Println("generating plugin")
generator, err := p2.NewGenerator(genConfig, databaseConfig)
if err != nil {
@ -127,17 +140,44 @@ func composeAndExecute() {
os.Exit(1)
}
// Use the Exporters export method to load the TransformerInitializer set
initializers := exporter.Export()
// Use the Exporters export method to load the TransformerInitializer and StorageTransformerInitializer sets
ethEventInitializers, ethStorageInitializers := exporter.Export()
// Setup bc and db objects
blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
// Create a watcher and load the TransformerInitializer set into it
// Execute over transformer sets returned by the exporter
// Use WaitGroup to wait on both goroutines
var wg syn.WaitGroup
if len(ethEventInitializers) > 0 {
w := watcher.NewWatcher(&db, blockChain)
w.AddTransformers(initializers)
w.AddTransformers(ethEventInitializers)
wg.Add(1)
go watchEthEvents(&w, &wg)
}
if len(ethStorageInitializers) > 0 {
tailer := fs.FileTailer{Path: storageDiffsPath}
w := watcher.NewStorageWatcher(tailer, &db)
w.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&w, &wg)
}
wg.Wait()
}
type Exporter interface {
Export() ([]transformer.TransformerInitializer, []transformer.StorageTransformerInitializer)
}
func init() {
rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start transformer execution from")
}
func watchEthEvents(w *watcher.Watcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the TransformerInitializer set using the watcher
fmt.Println("executing transformers")
ticker := time.NewTicker(pollingInterval)
@ -150,11 +190,39 @@ func composeAndExecute() {
}
}
type Exporter interface {
Export() []transformer.TransformerInitializer
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the TransformerInitializer set using the watcher
fmt.Println("executing transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute()
if err != nil {
// TODO Handle watcher errors in composeAndExecute
}
}
}
func init() {
rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start transformer execution from")
func prepConfig() {
fmt.Println("configuring plugin")
types := viper.GetStringMapString("exporter.types")
genTypes := map[string]config.PluginType{}
for transformerName, transformerType := range types {
genType := config.GetPluginType(transformerType)
if genType == config.UnknownTransformerType {
log.Fatal(errors.New(`unknown transformer type in exporter config
accepted types are "eth_event", "eth_storage", "ipfs_event" and "ipfs_storage"`))
}
genTypes[transformerName] = genType
}
genConfig = config.Plugin{
FilePath: "$GOPATH/src/github.com/vulcanize/vulcanizedb/plugins",
FileName: viper.GetString("exporter.name"),
Save: viper.GetBool("exporter.save"),
Initializers: viper.GetStringMapString("exporter.transformers"),
Dependencies: viper.GetStringMapString("exporter.repositories"),
Migrations: viper.GetStringMapString("exporter.migrations"),
Types: genTypes,
}
}

View File

@ -12,7 +12,7 @@
name = "maker_vdb_staging"
[exporter]
name = "exporter"
name = "eventTransformerExporter"
save = false
[exporter.transformers]
bite = "github.com/vulcanize/mcd_transformers/transformers/bite/initializer"
@ -43,8 +43,40 @@
vat_toll = "github.com/vulcanize/mcd_transformers/transformers/vat_toll/initializer"
vat_tune = "github.com/vulcanize/mcd_transformers/transformers/vat_tune/initializer"
vow_flog = "github.com/vulcanize/mcd_transformers/transformers/vow_flog/initializer"
[exporter.types]
bite = "eth_event"
cat_chop_lump = "eth_event"
cat_flip = "eth_event"
cat_pit_vow = "eth_event"
deal = "eth_event"
dent = "eth_event"
drip_drip = "eth_event"
drip_file_ilk = "eth_event"
drip_file_repo = "eth_event"
drip_file_vow = "eth_event"
flap_kick = "eth_event"
flip_kick = "eth_event"
flop_kick = "eth_event"
frob = "eth_event"
pit_file_debt_ceiling = "eth_event"
pit_file_ilk = "eth_event"
price_feeds = "eth_event"
tend = "eth_event"
vat_flux = "eth_event"
vat_fold = "eth_event"
vat_grab = "eth_event"
vat_heal = "eth_event"
vat_init = "eth_event"
vat_move = "eth_event"
vat_slip = "eth_event"
vat_toll = "eth_event"
vat_tune = "eth_event"
vow_flog = "eth_event"
[exporter.repositories]
mcd_transformers = "github.com/vulcanize/mcd_transformers"
mcd__event_transformers = "github.com/vulcanize/mcd_transformers"
[filesystem]
storageDiffsPath = "INSERT-PATH-TO-STORAGE-DIFFS"
[contract]
[contract.address]

View File

@ -29,6 +29,7 @@ type Plugin struct {
Initializers map[string]string // Map of import aliases to transformer initializer paths
Dependencies map[string]string // Map of vendor dep names to their repositories
Migrations map[string]string // Map of vendor dep names to relative path from repository to db migrations
Types map[string]PluginType // Map of import aliases to their transformer initializer type (e.g. eth-event vs eth-storage)
FilePath string
FileName string
Save bool
@ -64,3 +65,45 @@ func (c *Plugin) GetMigrationsPaths() ([]string, error) {
return paths, nil
}
type PluginType int
const (
UnknownTransformerType PluginType = iota + 1
EthEvent
EthStorage
IpfsEvent
IpfsStorage
)
func (pt PluginType) String() string {
names := [...]string{
"eth_event",
"eth_storage",
"ipfs_event",
"ipfs_storage",
}
if pt > IpfsStorage || pt < EthEvent {
return "Unknown"
}
return names[pt]
}
func GetPluginType(str string) PluginType {
types := [...]PluginType{
EthEvent,
EthStorage,
IpfsEvent,
IpfsStorage,
}
for _, ty := range types {
if ty.String() == str && ty.String() != "Unknown" {
return ty
}
}
return UnknownTransformerType
}

View File

@ -16,7 +16,7 @@
package plugin_test
/* comment out til mcd_transformers is updated
/*
import (
"plugin"
@ -30,6 +30,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/pkg/plugin/test_helpers"
@ -40,11 +41,51 @@ var genConfig = config.Plugin{
"bite": "github.com/vulcanize/mcd_transformers/transformers/bite/initializer",
"deal": "github.com/vulcanize/mcd_transformers/transformers/deal/initializer",
},
Types: map[string]config.PluginType{
"bite": config.EthEvent,
"deal": config.EthEvent,
},
Dependencies: map[string]string{
"mcd_transformers": "github.com/vulcanize/mcd_transformers",
},
//Migrations: map[string]string{"mcd_transformers" : "db/migrations"},
FileName: "externalTestTransformerSet",
FileName: "testEventTransformerSet",
FilePath: "$GOPATH/src/github.com/vulcanize/vulcanizedb/pkg/plugin/test_helpers/test",
Save: false,
}
var genStorageConfig = config.Plugin{
Initializers: map[string]string{
"pit": "github.com/vulcanize/mcd_transformers/transformers/storage_diffs/maker/pit/initializer",
},
Types: map[string]config.PluginType{
"pit": config.EthStorage,
},
Dependencies: map[string]string{
"mcd_transformers": "github.com/vulcanize/mcd_transformers",
},
//Migrations: map[string]string{"mcd_transformers" : "db/migrations"},
FileName: "testStorageTransformerSet",
FilePath: "$GOPATH/src/github.com/vulcanize/vulcanizedb/pkg/plugin/test_helpers/test",
Save: false,
}
var combinedConfig = config.Plugin{
Initializers: map[string]string{
"bite": "github.com/vulcanize/mcd_transformers/transformers/bite/initializer",
"deal": "github.com/vulcanize/mcd_transformers/transformers/deal/initializer",
"pit": "github.com/vulcanize/mcd_transformers/transformers/storage_diffs/maker/pit/initializer",
},
Types: map[string]config.PluginType{
"bite": config.EthEvent,
"deal": config.EthEvent,
"pit": config.EthStorage,
},
Dependencies: map[string]string{
"mcd_transformers": "github.com/vulcanize/mcd_transformers",
},
//Migrations: map[string]string{"mcd_transformers" : "db/migrations"},
FileName: "testStorageTransformerSet",
FilePath: "$GOPATH/src/github.com/vulcanize/vulcanizedb/pkg/plugin/test_helpers/test",
Save: false,
}
@ -56,7 +97,7 @@ var dbConfig = config.Database{
}
type Exporter interface {
Export() []transformer.TransformerInitializer
Export() ([]transformer.TransformerInitializer, []transformer.StorageTransformerInitializer)
}
var _ = Describe("Generator test", func() {
@ -70,6 +111,7 @@ var _ = Describe("Generator test", func() {
viper.SetConfigName("compose")
viper.AddConfigPath("$GOPATH/src/github.com/vulcanize/vulcanizedb/environments/")
Describe("Event Transformers only", func() {
BeforeEach(func() {
goPath, soPath, err = genConfig.GetPluginPaths()
Expect(err).ToNot(HaveOccurred())
@ -78,22 +120,22 @@ var _ = Describe("Generator test", func() {
err = g.GenerateExporterPlugin()
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
err := helpers.ClearFiles(goPath, soPath)
Expect(err).ToNot(HaveOccurred())
})
Describe("GenerateTransformerPlugin", func() {
It("It bundles the specified transformer initializers into a Exporter object and creates .so", func() {
It("It bundles the specified TransformerInitializers into a Exporter object and creates .so", func() {
plug, err := plugin.Open(soPath)
Expect(err).ToNot(HaveOccurred())
symExporter, err := plug.Lookup("Exporter")
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
initializers := exporter.Export()
initializers, store := exporter.Export()
Expect(len(initializers)).To(Equal(2))
Expect(len(store)).To(Equal(0))
})
It("Loads our generated Exporter and uses it to import an arbitrary set of TransformerInitializers that we can execute over", func() {
@ -112,7 +154,7 @@ var _ = Describe("Generator test", func() {
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
initializers := exporter.Export()
initializers, _ := exporter.Export()
w := watcher.NewWatcher(db, bc)
w.AddTransformers(initializers)
@ -149,5 +191,142 @@ var _ = Describe("Generator test", func() {
Expect(returned.LogIndex).To(Equal(uint(4)))
})
})
})
Describe("Storage Transformers only", func() {
BeforeEach(func() {
goPath, soPath, err = genConfig.GetPluginPaths()
Expect(err).ToNot(HaveOccurred())
g, err = p2.NewGenerator(genStorageConfig, dbConfig)
Expect(err).ToNot(HaveOccurred())
err = g.GenerateExporterPlugin()
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
err := helpers.ClearFiles(goPath, soPath)
Expect(err).ToNot(HaveOccurred())
})
Describe("GenerateTransformerPlugin", func() {
It("It bundles the specified StorageTransformerInitializers into a Exporter object and creates .so", func() {
plug, err := plugin.Open(soPath)
Expect(err).ToNot(HaveOccurred())
symExporter, err := plug.Lookup("Exporter")
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
event, initializers := exporter.Export()
Expect(len(initializers)).To(Equal(1))
Expect(len(event)).To(Equal(0))
})
It("Loads our generated Exporter and uses it to import an arbitrary set of StorageTransformerInitializers that we can execute over", func() {
db, bc = test_helpers.SetupDBandBC()
defer test_helpers.TearDown(db)
plug, err := plugin.Open(soPath)
Expect(err).ToNot(HaveOccurred())
symExporter, err := plug.Lookup("Exporter")
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
_, initializers := exporter.Export()
tailer := fs.FileTailer{Path: viper.GetString("filesystem.storageDiffsPath")}
w := watcher.NewStorageWatcher(tailer, db)
w.AddTransformers(initializers)
err = w.Execute()
Expect(err).ToNot(HaveOccurred())
})
})
})
Describe("Event and Storage Transformers in same instance", func() {
BeforeEach(func() {
goPath, soPath, err = genConfig.GetPluginPaths()
Expect(err).ToNot(HaveOccurred())
g, err = p2.NewGenerator(combinedConfig, dbConfig)
Expect(err).ToNot(HaveOccurred())
err = g.GenerateExporterPlugin()
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
err := helpers.ClearFiles(goPath, soPath)
Expect(err).ToNot(HaveOccurred())
})
Describe("GenerateTransformerPlugin", func() {
It("It bundles the specified TransformerInitializers and StorageTransformerInitializers into a Exporter object and creates .so", func() {
plug, err := plugin.Open(soPath)
Expect(err).ToNot(HaveOccurred())
symExporter, err := plug.Lookup("Exporter")
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
eventInitializers, storageInitializers := exporter.Export()
Expect(len(eventInitializers)).To(Equal(2))
Expect(len(storageInitializers)).To(Equal(1))
})
It("Loads our generated Exporter and uses it to import an arbitrary set of TransformerInitializers and StorageTransformerInitializers that we can execute over", func() {
db, bc = test_helpers.SetupDBandBC()
defer test_helpers.TearDown(db)
hr = repositories.NewHeaderRepository(db)
header1, err := bc.GetHeaderByNumber(9377319)
Expect(err).ToNot(HaveOccurred())
headerID, err = hr.CreateOrUpdateHeader(header1)
Expect(err).ToNot(HaveOccurred())
plug, err := plugin.Open(soPath)
Expect(err).ToNot(HaveOccurred())
symExporter, err := plug.Lookup("Exporter")
Expect(err).ToNot(HaveOccurred())
exporter, ok := symExporter.(Exporter)
Expect(ok).To(Equal(true))
eventInitializers, storageInitializers := exporter.Export()
ew := watcher.NewWatcher(db, bc)
ew.AddTransformers(eventInitializers)
err = ew.Execute()
Expect(err).ToNot(HaveOccurred())
type model struct {
Ilk string
Urn string
Ink string
Art string
IArt string
Tab string
NFlip string
LogIndex uint `db:"log_idx"`
TransactionIndex uint `db:"tx_idx"`
Raw []byte `db:"raw_log"`
Id int64 `db:"id"`
HeaderId int64 `db:"header_id"`
}
returned := model{}
err = db.Get(&returned, `SELECT * FROM maker.bite WHERE header_id = $1`, headerID)
Expect(err).ToNot(HaveOccurred())
Expect(returned.Ilk).To(Equal("ETH"))
Expect(returned.Urn).To(Equal("0x0000d8b4147eDa80Fec7122AE16DA2479Cbd7ffB"))
Expect(returned.Ink).To(Equal("80000000000000000000"))
Expect(returned.Art).To(Equal("11000000000000000000000"))
Expect(returned.IArt).To(Equal("12496609999999999999992"))
Expect(returned.Tab).To(Equal("11000000000000000000000"))
Expect(returned.NFlip).To(Equal("7"))
Expect(returned.TransactionIndex).To(Equal(uint(1)))
Expect(returned.LogIndex).To(Equal(uint(4)))
tailer := fs.FileTailer{Path: viper.GetString("filesystem.storageDiffsPath")}
sw := watcher.NewStorageWatcher(tailer, db)
sw.AddTransformers(storageInitializers)
err = sw.Execute()
Expect(err).ToNot(HaveOccurred())
})
})
})
})
*/

View File

@ -58,21 +58,23 @@ func (w *writer) WritePlugin() error {
f.ImportAlias(imp, alias)
}
// Collect TransformerInitializer names
importedInitializers := make([]Code, 0, len(w.GenConfig.Initializers))
for _, path := range w.GenConfig.Initializers {
importedInitializers = append(importedInitializers, Qual(path, "TransformerInitializer"))
}
// Collect initializer code
ethEventInitializers, ethStorageInitializers, _, _ := w.sortTransformers()
// Create Exporter variable with method to export the set of the imported TransformerInitializers
// Create Exporter variable with method to export the set of the imported storage and event transformer initializers
f.Type().Id("exporter").String()
f.Var().Id("Exporter").Id("exporter")
f.Func().Params(Id("e").Id("exporter")).Id("Export").Params().Index().Qual(
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
"TransformerInitializer").Block(
"TransformerInitializer").Index().Qual(
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
"StorageTransformerInitializer").Block(
Return(Index().Qual(
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
"TransformerInitializer").Values(importedInitializers...))) // Exports the collected TransformerInitializers
"TransformerInitializer").Values(ethEventInitializers...)),
Index().Qual(
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer",
"StorageTransformerInitializer").Values(ethStorageInitializers...)) // Exports the collected initializers
// Write code to destination file
err = f.Save(goFile)
@ -82,6 +84,31 @@ func (w *writer) WritePlugin() error {
return nil
}
func (w *writer) sortTransformers() ([]Code, []Code, []Code, []Code) {
// Collect code for various initializers
importedEthEventInitializers := make([]Code, 0)
importerEthStorageInitializers := make([]Code, 0)
importedIpfsEventInitializers := make([]Code, 0)
importerIpfsStorageInitializers := make([]Code, 0)
for name, path := range w.GenConfig.Initializers {
switch w.GenConfig.Types[name] {
case config.EthEvent:
importedEthEventInitializers = append(importedEthEventInitializers, Qual(path, "TransformerInitializer"))
case config.EthStorage:
importerEthStorageInitializers = append(importerEthStorageInitializers, Qual(path, "StorageTransformerInitializer"))
case config.IpfsEvent:
//importedIpfsEventInitializers = append(importedIpfsEventInitializers, Qual(path, "IpfsEventTransformerInitializer"))
case config.IpfsStorage:
//importerIpfsStorageInitializers = append(importerIpfsStorageInitializers, Qual(path, "IpfsStorageTransformerInitializer"))
}
}
return importedEthEventInitializers,
importerEthStorageInitializers,
importedIpfsEventInitializers,
importerIpfsStorageInitializers
}
func (w *writer) setupFilePath() (string, error) {
goFile, soFile, err := w.GenConfig.GetPluginPaths()
if err != nil {