Use geth state diff source in composeAndExecute

This commit is contained in:
Elizabeth Engelman 2019-07-10 15:20:14 -05:00
parent 8c4a4d6587
commit a577811e0a
3 changed files with 39 additions and 12 deletions

View File

@ -17,6 +17,10 @@
package cmd package cmd
import ( import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"os" "os"
"plugin" "plugin"
syn "sync" syn "sync"
@ -25,9 +29,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
@ -179,6 +181,20 @@ func composeAndExecute() {
} }
if len(ethStorageInitializers) > 0 { if len(ethStorageInitializers) > 0 {
switch stateDiffSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(&storageFetcher, &db)
sw.SetStorageDiffSource("geth")
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath} tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db) sw := watcher.NewStorageWatcher(storageFetcher, &db)
@ -186,6 +202,7 @@ func composeAndExecute() {
wg.Add(1) wg.Add(1)
go watchEthStorage(&sw, &wg) go watchEthStorage(&sw, &wg)
} }
}
if len(ethContractInitializers) > 0 { if len(ethContractInitializers) > 0 {
gw := watcher.NewContractWatcher(&db, blockChain) gw := watcher.NewContractWatcher(&db, blockChain)

View File

@ -48,6 +48,7 @@ var (
recheckHeadersArg bool recheckHeadersArg bool
SubCommand string SubCommand string
LogWithCommand log.Entry LogWithCommand log.Entry
stateDiffSource string
) )
const ( const (
@ -120,6 +121,7 @@ func init() {
rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file") rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file")
rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin") rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic")
rootCmd.PersistentFlags().StringVar(&stateDiffSource, "state-diff-source", "csv", "where to get the state diffs: csv or geth")
viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port")) viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port"))
@ -152,6 +154,14 @@ func initConfig() {
} }
func getBlockChain() *geth.BlockChain { func getBlockChain() *geth.BlockChain {
rpcClient, ethClient := getClients()
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
}
func getClients() (client.RpcClient, *ethclient.Client) {
rawRpcClient, err := rpc.Dial(ipc) rawRpcClient, err := rpc.Dial(ipc)
if err != nil { if err != nil {
@ -159,8 +169,6 @@ func getBlockChain() *geth.BlockChain {
} }
rpcClient := client.NewRpcClient(rawRpcClient, ipc) rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient) ethClient := ethclient.NewClient(rawRpcClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient) return rpcClient, ethClient
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
} }

View File

@ -17,6 +17,7 @@ package streamer
import ( import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
@ -29,6 +30,7 @@ type StateDiffStreamer struct {
} }
func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
logrus.Info("streaming diffs from geth")
return streamer.client.Subscribe("statediff", payloadChan, "stream") return streamer.client.Subscribe("statediff", payloadChan, "stream")
} }