From a577811e0a920030d30e468e559093fee664f1a8 Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Wed, 10 Jul 2019 15:20:14 -0500 Subject: [PATCH] Use geth state diff source in composeAndExecute --- cmd/composeAndExecute.go | 33 ++++++++++++++----- cmd/root.go | 16 ++++++--- .../shared/streamer/statediff_streamer.go | 2 ++ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 633993fa..ff7581b4 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -17,6 +17,10 @@ package cmd import ( + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/fs" "os" "plugin" syn "sync" @@ -25,9 +29,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/fs" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/utils" @@ -179,12 +181,27 @@ func composeAndExecute() { } if len(ethStorageInitializers) > 0 { - tailer := fs.FileTailer{Path: storageDiffsPath} - storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewStorageWatcher(storageFetcher, &db) - sw.AddTransformers(ethStorageInitializers) - wg.Add(1) - go watchEthStorage(&sw, &wg) + switch stateDiffSource { + case "geth": + log.Debug("fetching storage diffs from geth pub sub") + rpcClient, _ := getClients() + stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) + payloadChan := make(chan statediff.Payload) + storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) + sw := watcher.NewStorageWatcher(&storageFetcher, &db) + sw.SetStorageDiffSource("geth") + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + default: + log.Debug("fetching storage diffs from csv") + tailer := fs.FileTailer{Path: storageDiffsPath} + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + } } if len(ethContractInitializers) > 0 { diff --git a/cmd/root.go b/cmd/root.go index c9250c3f..1ed338e1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -48,6 +48,7 @@ var ( recheckHeadersArg bool SubCommand string LogWithCommand log.Entry + stateDiffSource string ) const ( @@ -120,6 +121,7 @@ func init() { rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file") rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") + rootCmd.PersistentFlags().StringVar(&stateDiffSource, "state-diff-source", "csv", "where to get the state diffs: csv or geth") viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port")) @@ -152,6 +154,14 @@ func initConfig() { } func getBlockChain() *geth.BlockChain { + rpcClient, ethClient := getClients() + vdbEthClient := client.NewEthClient(ethClient) + vdbNode := node.MakeNode(rpcClient) + transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) + return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) +} + +func getClients() (client.RpcClient, *ethclient.Client) { rawRpcClient, err := rpc.Dial(ipc) if err != nil { @@ -159,8 +169,6 @@ func getBlockChain() *geth.BlockChain { } rpcClient := client.NewRpcClient(rawRpcClient, ipc) ethClient := ethclient.NewClient(rawRpcClient) - vdbEthClient := client.NewEthClient(ethClient) - vdbNode := node.MakeNode(rpcClient) - transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) - return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) + + return rpcClient, ethClient } diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go index 49ea45f8..8cda750f 100644 --- a/libraries/shared/streamer/statediff_streamer.go +++ b/libraries/shared/streamer/statediff_streamer.go @@ -17,6 +17,7 @@ package streamer import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" ) @@ -29,6 +30,7 @@ type StateDiffStreamer struct { } func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + logrus.Info("streaming diffs from geth") return streamer.client.Subscribe("statediff", payloadChan, "stream") }