From c16ac026db567cf55e8d9a84d44f9eb6cfcb874c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Oct 2019 09:41:29 -0500 Subject: [PATCH] logWithCommand; rebase fixes; config for testing super node subscription --- cmd/coldImport.go | 14 +- cmd/compose.go | 36 +-- cmd/composeAndExecute.go | 30 +-- cmd/contractWatcher.go | 10 +- cmd/execute.go | 30 +-- cmd/fullSync.go | 16 +- cmd/headerSync.go | 16 +- cmd/root.go | 6 +- cmd/screenAndServe.go | 8 +- cmd/streamSubscribe.go | 40 ++-- cmd/syncAndPublish.go | 38 ++-- cmd/syncPublishScreenAndServe.go | 12 +- db/schema.sql | 2 +- environments/superNodeSubscription.toml | 35 +++ environments/syncPublishScreenAndServe.toml | 4 +- go.mod | 206 ++++++++++++++---- .../contract_watcher_full_transformer_test.go | 2 - .../full/retriever/retriever_suite_test.go | 2 - .../transformer/transformer_suite_test.go | 2 - .../repository/repository_suite_test.go | 2 - .../header/retriever/retriever_suite_test.go | 2 - .../transformer/transformer_suite_test.go | 2 - .../repository/repository_suite_test.go | 2 - .../shared/retriever/address_retriever.go | 2 - .../shared/retriever/retriever_suite_test.go | 2 - pkg/eth/node/node.go | 2 - pkg/super_node/backfiller.go | 160 +++++++++++--- pkg/super_node/backfiller_test.go | 34 +-- ...suite_test.go => super_node_suite_test.go} | 2 +- pkg/super_node/test_helpers.go | 2 +- 30 files changed, 487 insertions(+), 234 deletions(-) create mode 100644 environments/superNodeSubscription.toml rename pkg/super_node/{seed_node_suite_test.go => super_node_suite_test.go} (96%) diff --git a/cmd/coldImport.go b/cmd/coldImport.go index 1883d0b6..3e4bc79d 100644 --- a/cmd/coldImport.go +++ b/cmd/coldImport.go @@ -39,8 +39,8 @@ var coldImportCmd = &cobra.Command{ Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) coldImport() }, } @@ -57,7 +57,7 @@ func coldImport() { ethDBConfig := ethereum.CreateDatabaseConfig(ethereum.Level, levelDbPath) ethDB, err := ethereum.CreateDatabase(ethDBConfig) if err != nil { - LogWithCommand.Fatal("Error connecting to ethereum db: ", err) + logWithCommand.Fatal("Error connecting to ethereum db: ", err) } mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber() if syncAll { @@ -65,10 +65,10 @@ func coldImport() { endingBlockNumber = mostRecentBlockNumberInDb } if endingBlockNumber < startingBlockNumber { - LogWithCommand.Fatal("Ending block number must be greater than starting block number for cold import.") + logWithCommand.Fatal("Ending block number must be greater than starting block number for cold import.") } if endingBlockNumber > mostRecentBlockNumberInDb { - LogWithCommand.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb) + logWithCommand.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb) } // init pg db @@ -78,7 +78,7 @@ func coldImport() { nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser) coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath) if err != nil { - LogWithCommand.Fatal("Error getting node: ", err) + logWithCommand.Fatal("Error getting node: ", err) } pgDB := utils.LoadPostgres(databaseConfig, coldNode) @@ -92,6 +92,6 @@ func coldImport() { coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter) err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID) if err != nil { - LogWithCommand.Fatal("Error executing cold import: ", err) + logWithCommand.Fatal("Error executing cold import: ", err) } } diff --git a/cmd/compose.go b/cmd/compose.go index f92db41a..35d8cae1 100644 --- a/cmd/compose.go +++ b/cmd/compose.go @@ -102,8 +102,8 @@ single config file or in separate command instances using different config files Specify config location when executing the command: ./vulcanizedb compose --config=./environments/config_name.toml`, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) compose() }, } @@ -113,25 +113,25 @@ func compose() { prepConfig() // Generate code to build the plugin according to the config file - LogWithCommand.Info("generating plugin") + logWithCommand.Info("generating plugin") generator, err := p2.NewGenerator(genConfig, databaseConfig) if err != nil { - LogWithCommand.Debug("initializing plugin generator failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("initializing plugin generator failed") + logWithCommand.Fatal(err) } err = generator.GenerateExporterPlugin() if err != nil { - LogWithCommand.Debug("generating plugin failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("generating plugin failed") + logWithCommand.Fatal(err) } // TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with _, pluginPath, err := genConfig.GetPluginPaths() if err != nil { - LogWithCommand.Debug("getting plugin path failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("getting plugin path failed") + logWithCommand.Fatal(err) } fmt.Printf("Composed plugin %s", pluginPath) - LogWithCommand.Info("plugin .so file output to ", pluginPath) + logWithCommand.Info("plugin .so file output to ", pluginPath) } func init() { @@ -139,38 +139,38 @@ func init() { } func prepConfig() { - LogWithCommand.Info("configuring plugin") + logWithCommand.Info("configuring plugin") names := viper.GetStringSlice("exporter.transformerNames") transformers := make(map[string]config.Transformer) for _, name := range names { transformer := viper.GetStringMapString("exporter." + name) p, pOK := transformer["path"] if !pOK || p == "" { - LogWithCommand.Fatal(name, " transformer config is missing `path` value") + logWithCommand.Fatal(name, " transformer config is missing `path` value") } r, rOK := transformer["repository"] if !rOK || r == "" { - LogWithCommand.Fatal(name, " transformer config is missing `repository` value") + logWithCommand.Fatal(name, " transformer config is missing `repository` value") } m, mOK := transformer["migrations"] if !mOK || m == "" { - LogWithCommand.Fatal(name, " transformer config is missing `migrations` value") + logWithCommand.Fatal(name, " transformer config is missing `migrations` value") } mr, mrOK := transformer["rank"] if !mrOK || mr == "" { - LogWithCommand.Fatal(name, " transformer config is missing `rank` value") + logWithCommand.Fatal(name, " transformer config is missing `rank` value") } rank, err := strconv.ParseUint(mr, 10, 64) if err != nil { - LogWithCommand.Fatal(name, " migration `rank` can't be converted to an unsigned integer") + logWithCommand.Fatal(name, " migration `rank` can't be converted to an unsigned integer") } t, tOK := transformer["type"] if !tOK { - LogWithCommand.Fatal(name, " transformer config is missing `type` value") + logWithCommand.Fatal(name, " transformer config is missing `type` value") } transformerType := config.GetTransformerType(t) if transformerType == config.UnknownTransformerType { - LogWithCommand.Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`)) + logWithCommand.Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`)) } transformers[name] = config.Transformer{ diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 4784b92c..9b8219bd 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -107,8 +107,8 @@ single config file or in separate command instances using different config files Specify config location when executing the command: ./vulcanizedb composeAndExecute --config=./environments/config_name.toml`, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) composeAndExecute() }, } @@ -118,44 +118,44 @@ func composeAndExecute() { prepConfig() // Generate code to build the plugin according to the config file - LogWithCommand.Info("generating plugin") + logWithCommand.Info("generating plugin") generator, err := p2.NewGenerator(genConfig, databaseConfig) if err != nil { - LogWithCommand.Fatal(err) + logWithCommand.Fatal(err) } err = generator.GenerateExporterPlugin() if err != nil { - LogWithCommand.Debug("generating plugin failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("generating plugin failed") + logWithCommand.Fatal(err) } // Get the plugin path and load the plugin _, pluginPath, err := genConfig.GetPluginPaths() if err != nil { - LogWithCommand.Fatal(err) + logWithCommand.Fatal(err) } if !genConfig.Save { defer helpers.ClearFiles(pluginPath) } - LogWithCommand.Info("linking plugin ", pluginPath) + logWithCommand.Info("linking plugin ", pluginPath) plug, err := plugin.Open(pluginPath) if err != nil { - LogWithCommand.Debug("linking plugin failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("linking plugin failed") + logWithCommand.Fatal(err) } // Load the `Exporter` symbol from the plugin - LogWithCommand.Info("loading transformers from plugin") + logWithCommand.Info("loading transformers from plugin") symExporter, err := plug.Lookup("Exporter") if err != nil { - LogWithCommand.Debug("loading Exporter symbol failed") - LogWithCommand.Fatal(err) + logWithCommand.Debug("loading Exporter symbol failed") + logWithCommand.Fatal(err) } // Assert that the symbol is of type Exporter exporter, ok := symExporter.(Exporter) if !ok { - LogWithCommand.Debug("plugged-in symbol not of type Exporter") + logWithCommand.Debug("plugged-in symbol not of type Exporter") os.Exit(1) } @@ -173,7 +173,7 @@ func composeAndExecute() { ew := watcher.NewEventWatcher(&db, blockChain) err := ew.AddTransformers(ethEventInitializers) if err != nil { - LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) + logWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) } wg.Add(1) go watchEthEvents(&ew, &wg) diff --git a/cmd/contractWatcher.go b/cmd/contractWatcher.go index 11238869..b8ff58ab 100644 --- a/cmd/contractWatcher.go +++ b/cmd/contractWatcher.go @@ -79,8 +79,8 @@ Requires a .toml config file: piping = true `, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) contractWatcher() }, } @@ -105,18 +105,18 @@ func contractWatcher() { case "full": t = ft.NewTransformer(con, blockChain, &db) default: - LogWithCommand.Fatal("Invalid mode") + logWithCommand.Fatal("Invalid mode") } err := t.Init() if err != nil { - LogWithCommand.Fatal(fmt.Sprintf("Failed to initialize transformer, err: %v ", err)) + logWithCommand.Fatal(fmt.Sprintf("Failed to initialize transformer, err: %v ", err)) } for range ticker.C { err = t.Execute() if err != nil { - LogWithCommand.Error("Execution error for transformer: ", t.GetConfig().Name, err) + logWithCommand.Error("Execution error for transformer: ", t.GetConfig().Name, err) } } } diff --git a/cmd/execute.go b/cmd/execute.go index e23cfcbc..53e87938 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -60,8 +60,8 @@ must have been composed by the same version of vulcanizedb or else it will not b Specify config location when executing the command: ./vulcanizedb execute --config=./environments/config_name.toml`, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) execute() }, } @@ -73,29 +73,29 @@ func execute() { // Get the plugin path and load the plugin _, pluginPath, err := genConfig.GetPluginPaths() if err != nil { - LogWithCommand.Fatal(err) + logWithCommand.Fatal(err) } fmt.Printf("Executing plugin %s", pluginPath) - LogWithCommand.Info("linking plugin ", pluginPath) + logWithCommand.Info("linking plugin ", pluginPath) plug, err := plugin.Open(pluginPath) if err != nil { - LogWithCommand.Warn("linking plugin failed") - LogWithCommand.Fatal(err) + logWithCommand.Warn("linking plugin failed") + logWithCommand.Fatal(err) } // Load the `Exporter` symbol from the plugin - LogWithCommand.Info("loading transformers from plugin") + logWithCommand.Info("loading transformers from plugin") symExporter, err := plug.Lookup("Exporter") if err != nil { - LogWithCommand.Warn("loading Exporter symbol failed") - LogWithCommand.Fatal(err) + logWithCommand.Warn("loading Exporter symbol failed") + logWithCommand.Fatal(err) } // Assert that the symbol is of type Exporter exporter, ok := symExporter.(Exporter) if !ok { - LogWithCommand.Fatal("plugged-in symbol not of type Exporter") + logWithCommand.Fatal("plugged-in symbol not of type Exporter") } // Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets @@ -112,7 +112,7 @@ func execute() { ew := watcher.NewEventWatcher(&db, blockChain) err = ew.AddTransformers(ethEventInitializers) if err != nil { - LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) + logWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) } wg.Add(1) go watchEthEvents(&ew, &wg) @@ -162,7 +162,7 @@ type Exporter interface { func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the EventTransformerInitializer set using the watcher - LogWithCommand.Info("executing event transformers") + logWithCommand.Info("executing event transformers") var recheck constants.TransformerExecution if recheckHeadersArg { recheck = constants.HeaderRecheck @@ -171,14 +171,14 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { } err := w.Execute(recheck) if err != nil { - LogWithCommand.Fatalf("error executing event watcher: %s", err.Error()) + logWithCommand.Fatalf("error executing event watcher: %s", err.Error()) } } func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the StorageTransformerInitializer set using the storage watcher - LogWithCommand.Info("executing storage transformers") + logWithCommand.Info("executing storage transformers") on := viper.GetBool("storageBackFill.on") if on { backFillStorage(w) @@ -198,7 +198,7 @@ func backFillStorage(w watcher.IStorageWatcher) { func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the ContractTransformerInitializer set using the contract watcher - LogWithCommand.Info("executing contract_watcher transformers") + logWithCommand.Info("executing contract_watcher transformers") ticker := time.NewTicker(pollingInterval) defer ticker.Stop() for range ticker.C { diff --git a/cmd/fullSync.go b/cmd/fullSync.go index 2dba6f5f..783cab3f 100644 --- a/cmd/fullSync.go +++ b/cmd/fullSync.go @@ -49,8 +49,8 @@ Expects ethereum node to be running and requires a .toml config: ipcPath = "/Users/user/Library/Ethereum/geth.ipc" `, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) fullSync() }, } @@ -63,7 +63,7 @@ func init() { func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) if err != nil { - LogWithCommand.Error("backfillAllBlocks: error in populateMissingBlocks: ", err) + logWithCommand.Error("backfillAllBlocks: error in populateMissingBlocks: ", err) } missingBlocksPopulated <- populated } @@ -75,13 +75,13 @@ func fullSync() { blockChain := getBlockChain() lastBlock, err := blockChain.LastBlock() if err != nil { - LogWithCommand.Error("fullSync: Error getting last block: ", err) + logWithCommand.Error("fullSync: Error getting last block: ", err) } if lastBlock.Int64() == 0 { - LogWithCommand.Fatal("geth initial: state sync not finished") + logWithCommand.Fatal("geth initial: state sync not finished") } if startingBlockNumber > lastBlock.Int64() { - LogWithCommand.Fatal("fullSync: starting block number > current block number") + logWithCommand.Fatal("fullSync: starting block number > current block number") } db := utils.LoadPostgres(databaseConfig, blockChain.Node()) @@ -95,9 +95,9 @@ func fullSync() { case <-ticker.C: window, err := validator.ValidateBlocks() if err != nil { - LogWithCommand.Error("fullSync: error in validateBlocks: ", err) + logWithCommand.Error("fullSync: error in validateBlocks: ", err) } - LogWithCommand.Debug(window.GetString()) + logWithCommand.Debug(window.GetString()) case <-missingBlocksPopulated: go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber) } diff --git a/cmd/headerSync.go b/cmd/headerSync.go index 87ff22da..d2be71a4 100644 --- a/cmd/headerSync.go +++ b/cmd/headerSync.go @@ -50,8 +50,8 @@ Expects ethereum node to be running and requires a .toml config: ipcPath = "/Users/user/Library/Ethereum/geth.ipc" `, Run: func(cmd *cobra.Command, args []string) { - SubCommand = cmd.CalledAs() - LogWithCommand = *log.WithField("SubCommand", SubCommand) + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) headerSync() }, } @@ -66,7 +66,7 @@ func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.H if err != nil { // TODO Lots of possible errors in the call stack above. If errors occur, we still put // 0 in the channel, triggering another round - LogWithCommand.Error("backfillAllHeaders: Error populating headers: ", err) + logWithCommand.Error("backfillAllHeaders: Error populating headers: ", err) } missingBlocksPopulated <- populated } @@ -88,9 +88,9 @@ func headerSync() { case <-ticker.C: window, err := validator.ValidateHeaders() if err != nil { - LogWithCommand.Error("headerSync: ValidateHeaders failed: ", err) + logWithCommand.Error("headerSync: ValidateHeaders failed: ", err) } - LogWithCommand.Debug(window.GetString()) + logWithCommand.Debug(window.GetString()) case n := <-missingBlocksPopulated: if n == 0 { time.Sleep(3 * time.Second) @@ -103,12 +103,12 @@ func headerSync() { func validateArgs(blockChain *eth.BlockChain) { lastBlock, err := blockChain.LastBlock() if err != nil { - LogWithCommand.Error("validateArgs: Error getting last block: ", err) + logWithCommand.Error("validateArgs: Error getting last block: ", err) } if lastBlock.Int64() == 0 { - LogWithCommand.Fatal("geth initial: state sync not finished") + logWithCommand.Fatal("geth initial: state sync not finished") } if startingBlockNumber > lastBlock.Int64() { - LogWithCommand.Fatal("starting block number > current block number") + logWithCommand.Fatal("starting block number > current block number") } } diff --git a/cmd/root.go b/cmd/root.go index 47b15592..07e3ecf9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,8 +49,8 @@ var ( syncAll bool endingBlockNumber int64 recheckHeadersArg bool - SubCommand string - LogWithCommand log.Entry + subCommand string + logWithCommand log.Entry storageDiffsSource string ) @@ -170,7 +170,7 @@ func getClients() (client.RPCClient, *ethclient.Client) { rawRPCClient, err := rpc.Dial(ipc) if err != nil { - LogWithCommand.Fatal(err) + logWithCommand.Fatal(err) } rpcClient := client.NewRPCClient(rawRPCClient, ipc) ethClient := ethclient.NewClient(rawRPCClient) diff --git a/cmd/screenAndServe.go b/cmd/screenAndServe.go index b6b59b47..ddd7e8e8 100644 --- a/cmd/screenAndServe.go +++ b/cmd/screenAndServe.go @@ -41,6 +41,8 @@ var screenAndServeCmd = &cobra.Command{ relays relevant data to requesting clients. In this mode, the super-node can only relay data which it has already indexed it does not stream out live data.`, Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) screenAndServe() }, } @@ -52,7 +54,7 @@ func init() { func screenAndServe() { superNode, newNodeErr := newSuperNodeWithoutPairedGethNode() if newNodeErr != nil { - log.Fatal(newNodeErr) + logWithCommand.Fatal(newNodeErr) } wg := &syn.WaitGroup{} quitChan := make(chan bool, 1) @@ -61,7 +63,7 @@ func screenAndServe() { serverErr := startServers(superNode) if serverErr != nil { - log.Fatal(serverErr) + logWithCommand.Fatal(serverErr) } wg.Wait() } @@ -87,7 +89,7 @@ func startServers(superNode super_node.NodeInterface) error { wsEndpoint = "127.0.0.1:8080" } var exposeAll = true - var wsOrigins []string = nil + var wsOrigins []string _, _, wsErr := rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vdb"}, wsOrigins, exposeAll) if wsErr != nil { return wsErr diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 998e8d27..c6454072 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -32,7 +32,7 @@ import ( "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" ) // streamSubscribeCmd represents the streamSubscribe command @@ -42,6 +42,8 @@ var streamSubscribeCmd = &cobra.Command{ Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters. It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`, Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) streamSubscribe() }, } @@ -55,7 +57,7 @@ func streamSubscribe() { configureSubscription() // Create a new rpc client and a subscription streamer with that client - rpcClient := getRpcClient() + rpcClient := getRPCClient() str := streamer.NewSuperNodeStreamer(rpcClient) // Buffered channel for reading subscription payloads @@ -64,22 +66,22 @@ func streamSubscribe() { // Subscribe to the super node service with the given config/filter parameters sub, err := str.Stream(payloadChan, subscriptionConfig) if err != nil { - log.Fatal(err) + logWithCommand.Fatal(err) } - log.Info("awaiting payloads") + logWithCommand.Info("awaiting payloads") // Receive response payloads and print out the results for { select { case payload := <-payloadChan: if payload.ErrMsg != "" { - log.Error(payload.ErrMsg) + logWithCommand.Error(payload.ErrMsg) continue } for _, headerRlp := range payload.HeadersRlp { var header types.Header err = rlp.Decode(bytes.NewBuffer(headerRlp), &header) if err != nil { - log.Error(err) + logWithCommand.Error(err) continue } fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex()) @@ -91,7 +93,7 @@ func streamSubscribe() { stream := rlp.NewStream(buff, 0) err := trx.DecodeRLP(stream) if err != nil { - log.Error(err) + logWithCommand.Error(err) continue } fmt.Printf("Transaction with hash %s\n", trx.Hash().Hex()) @@ -103,14 +105,14 @@ func streamSubscribe() { stream := rlp.NewStream(buff, 0) err = rct.DecodeRLP(stream) if err != nil { - log.Error(err) + logWithCommand.Error(err) continue } fmt.Printf("Receipt with block hash %s, trx hash %s\n", rct.BlockHash.Hex(), rct.TxHash.Hex()) fmt.Printf("rct: %v\n", rct) for _, l := range rct.Logs { if len(l.Topics) < 1 { - log.Error(fmt.Sprintf("log only has %d topics", len(l.Topics))) + logWithCommand.Error(fmt.Sprintf("log only has %d topics", len(l.Topics))) continue } fmt.Printf("Log for block hash %s, trx hash %s, address %s, and with topic0 %s\n", @@ -123,7 +125,7 @@ func streamSubscribe() { var acct state.Account err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct) if err != nil { - log.Error(err) + logWithCommand.Error(err) continue } fmt.Printf("Account for key %s, and root %s, with balance %d\n", @@ -135,9 +137,9 @@ func streamSubscribe() { for storageKey, storageRlp := range mappedRlp { fmt.Printf("with storage key %s\n", storageKey.Hex()) var i []interface{} - err := rlp.DecodeBytes(storageRlp, i) + err := rlp.DecodeBytes(storageRlp, &i) if err != nil { - log.Error(err) + logWithCommand.Error(err) continue } // if a leaf node @@ -146,7 +148,7 @@ func streamSubscribe() { if !ok { continue } - valueBytes, ok := i[0].([]byte) + valueBytes, ok := i[1].([]byte) if !ok { continue } @@ -156,13 +158,13 @@ func streamSubscribe() { } } case err = <-sub.Err(): - log.Fatal(err) + logWithCommand.Fatal(err) } } } func configureSubscription() { - log.Info("loading subscription config") + logWithCommand.Info("loading subscription config") subscriptionConfig = config.Subscription{ // Below default to false, which means we do not backfill by default BackFill: viper.GetBool("subscription.backfill"), @@ -214,14 +216,14 @@ func configureSubscription() { } } -func getRpcClient() core.RpcClient { +func getRPCClient() core.RpcClient { vulcPath := viper.GetString("subscription.path") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } - rawRpcClient, err := rpc.Dial(vulcPath) + rawRPCClient, err := rpc.Dial(vulcPath) if err != nil { - log.Fatal(err) + logWithCommand.Fatal(err) } - return client.NewRpcClient(rawRpcClient, vulcPath) + return client.NewRpcClient(rawRPCClient, vulcPath) } diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index 24a4987b..d2bf6851 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -28,10 +28,10 @@ import ( "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/geth" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" - vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc" - "github.com/vulcanize/vulcanizedb/pkg/geth/node" + "github.com/vulcanize/vulcanizedb/pkg/eth" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" + vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" + "github.com/vulcanize/vulcanizedb/pkg/eth/node" "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/utils" ) @@ -45,6 +45,8 @@ all block and state (diff) data over a websocket subscription. This process then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, it maintains a local index of the IPLD objects' CIDs in Postgres.`, Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) syncAndPublish() }, } @@ -58,34 +60,34 @@ func init() { func syncAndPublish() { superNode, newNodeErr := newSuperNode() if newNodeErr != nil { - log.Fatal(newNodeErr) + logWithCommand.Fatal(newNodeErr) } wg := &syn.WaitGroup{} syncAndPubErr := superNode.SyncAndPublish(wg, nil, nil) if syncAndPubErr != nil { - log.Fatal(syncAndPubErr) + logWithCommand.Fatal(syncAndPubErr) } - if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { + if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" { backfiller, newBackFillerErr := newBackFiller() if newBackFillerErr != nil { - log.Fatal(newBackFillerErr) + logWithCommand.Fatal(newBackFillerErr) } backfiller.FillGaps(wg, nil) } wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through } -func getBlockChainAndClient(path string) (*geth.BlockChain, core.RpcClient) { - rawRpcClient, dialErr := rpc.Dial(path) +func getBlockChainAndClient(path string) (*eth.BlockChain, core.RpcClient) { + rawRPCClient, dialErr := rpc.Dial(path) if dialErr != nil { - log.Fatal(dialErr) + logWithCommand.Fatal(dialErr) } - rpcClient := client.NewRpcClient(rawRpcClient, ipc) - ethClient := ethclient.NewClient(rawRpcClient) + rpcClient := client.NewRpcClient(rawRPCClient, ipc) + ethClient := ethclient.NewClient(rawRPCClient) vdbEthClient := client.NewEthClient(ethClient) vdbNode := node.MakeNode(rpcClient) transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) - blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) + blockChain := eth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) return blockChain, rpcClient } @@ -97,7 +99,7 @@ func newSuperNode() (super_node.NodeInterface, error) { if ipfsPath == "" { home, homeDirErr := os.UserHomeDir() if homeDirErr != nil { - log.Fatal(homeDirErr) + logWithCommand.Fatal(homeDirErr) } ipfsPath = filepath.Join(home, ".ipfs") } @@ -109,14 +111,14 @@ func newSuperNode() (super_node.NodeInterface, error) { } func newBackFiller() (super_node.BackFillInterface, error) { - blockChain, archivalRpcClient := getBlockChainAndClient(viper.GetString("backfill.ipcPath")) + blockChain, archivalRPCClient := getBlockChainAndClient(viper.GetString("superNodeBackFill.rpcPath")) db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - freq := viper.GetInt("backfill.frequency") + freq := viper.GetInt("superNodeBackFill.frequency") var frequency time.Duration if freq <= 0 { frequency = time.Minute * 5 } else { frequency = time.Duration(freq) } - return super_node.NewBackFillService(ipfsPath, &db, archivalRpcClient, time.Minute*frequency) + return super_node.NewBackFillService(ipfsPath, &db, archivalRPCClient, time.Minute*frequency) } diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 659ac095..1d7f22d1 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -35,6 +35,8 @@ then converts the eth data to IPLD objects and publishes them to IPFS. Additiona it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which relays relevant data to requesting clients.`, Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) syncPublishScreenAndServe() }, } @@ -46,7 +48,7 @@ func init() { func syncPublishScreenAndServe() { superNode, newNodeErr := newSuperNode() if newNodeErr != nil { - log.Fatal(newNodeErr) + logWithCommand.Fatal(newNodeErr) } wg := &syn.WaitGroup{} @@ -54,20 +56,20 @@ func syncPublishScreenAndServe() { forwardQuitChan := make(chan bool, 1) syncAndPubErr := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) if syncAndPubErr != nil { - log.Fatal(syncAndPubErr) + logWithCommand.Fatal(syncAndPubErr) } superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) - if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { + if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" { backfiller, newBackFillerErr := newBackFiller() if newBackFillerErr != nil { - log.Fatal(newBackFillerErr) + logWithCommand.Fatal(newBackFillerErr) } backfiller.FillGaps(wg, nil) } serverErr := startServers(superNode) if serverErr != nil { - log.Fatal(serverErr) + logWithCommand.Fatal(serverErr) } wg.Wait() } diff --git a/db/schema.sql b/db/schema.sql index 763b5373..66569a53 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -2,7 +2,7 @@ -- PostgreSQL database dump -- --- Dumped from database version 11.5 +-- Dumped from database version 10.10 -- Dumped by pg_dump version 11.5 SET statement_timeout = 0; diff --git a/environments/superNodeSubscription.toml b/environments/superNodeSubscription.toml new file mode 100644 index 00000000..eb1a3021 --- /dev/null +++ b/environments/superNodeSubscription.toml @@ -0,0 +1,35 @@ +[subscription] + path = "ws://127.0.0.1:8080" + backfill = true + backfillOnly = false + startingBlock = 0 + endingBlock = 0 + [subscription.headerFilter] + off = false + uncles = false + [subscription.trxFilter] + off = false + src = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + dst = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + [subscription.receiptFilter] + off = false + contracts = [] + topic0s = [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + ] + [subscription.stateFilter] + off = false + addresses = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" + ] + intermediateNodes = false + [subscription.storageFilter] + off = true + addresses = [] + storageKeys = [] + intermediateNodes = false \ No newline at end of file diff --git a/environments/syncPublishScreenAndServe.toml b/environments/syncPublishScreenAndServe.toml index fb939d37..32a06ead 100644 --- a/environments/syncPublishScreenAndServe.toml +++ b/environments/syncPublishScreenAndServe.toml @@ -12,7 +12,7 @@ ipcPath = "/root/.vulcanize/vulcanize.ipc" wsEndpoint = "127.0.0.1:8080" -[backfill] +[superNodeBackFill] on = false - ipcPath = "" + rpcPath = "" frequency = 5 diff --git a/go.mod b/go.mod index 0d648208..02604f3d 100644 --- a/go.mod +++ b/go.mod @@ -3,45 +3,173 @@ module github.com/vulcanize/vulcanizedb go 1.12 require ( - github.com/BurntSushi/toml v0.3.1 // indirect - github.com/allegro/bigcache v1.2.1 // indirect - github.com/apilayer/freegeoip v3.5.0+incompatible // indirect - github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect - github.com/btcsuite/btcd v0.20.0-beta // indirect - github.com/cespare/cp v1.1.1 // indirect + bazil.org/fuse v0.0.0-20180421153158-65cc252bf669 + github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 + github.com/Stebalien/go-bitfield v0.0.1 + github.com/allegro/bigcache v0.0.0-20190618191010-69ea0af04088 + github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 + github.com/bren2010/proquint v0.0.0-20160323162903-38337c27106d + github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 + github.com/btcsuite/goleveldb v1.0.0 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/cenkalti/backoff/v3 v3.0.0 + github.com/cheekybits/genny v1.0.0 + github.com/coreos/go-semver v0.3.0 + github.com/cskr/pubsub v1.0.2 github.com/dave/jennifer v1.3.0 - github.com/deckarep/golang-set v1.7.1 // indirect - github.com/docker/docker v1.13.1 // indirect - github.com/edsrzf/mmap-go v1.0.0 // indirect - github.com/elastic/gosigar v0.10.4 // indirect - github.com/ethereum/go-ethereum v1.9.5 - github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect - github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect - github.com/go-sql-driver/mysql v1.4.1 // indirect - github.com/golang/protobuf v1.3.2 // indirect - github.com/google/go-cmp v0.3.1 // indirect - github.com/gorilla/websocket v1.4.1 // indirect - github.com/graph-gophers/graphql-go v0.0.0-20191024035216-0a9cfbec35a1 // indirect + github.com/davecgh/go-spew v1.1.1 + github.com/davidlazar/go-crypto v0.0.0-20190522120613-62389b5e4ae0 + github.com/deckarep/golang-set v1.7.1 + github.com/dgraph-io/badger/v2 v2.0.0-rc.2+incompatible + github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 + github.com/dustin/go-humanize v1.0.0 + github.com/edsrzf/mmap-go v1.0.0 + github.com/elastic/gosigar v0.10.4 + github.com/ethereum/go-ethereum v1.9.1 + github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 + github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 + github.com/fsnotify/fsnotify v1.4.7 + github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff + github.com/go-logfmt/logfmt v0.4.0 // indirect + github.com/go-stack/stack v1.8.0 + github.com/gogo/protobuf v1.2.1 + github.com/golang/protobuf v1.3.2 + github.com/golang/snappy v0.0.1 + github.com/google/uuid v1.1.1 + github.com/gorilla/websocket v1.4.0 + github.com/hashicorp/errwrap v1.0.0 + github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/golang-lru v0.5.3 github.com/hashicorp/hcl v1.0.0 github.com/hpcloud/tail v1.0.0 - github.com/huin/goupnp v1.0.0 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect - github.com/influxdata/influxdb v1.7.9 // indirect - github.com/jackpal/go-nat-pmp v1.0.1 // indirect - github.com/jmoiron/sqlx v0.0.0-20181024163419-82935fac6c1a - github.com/karalabe/usb v0.0.0-20190819132248-550797b1cad8 // indirect - github.com/lib/pq v1.0.0 - github.com/mattn/go-colorable v0.1.2 // indirect - github.com/mattn/go-isatty v0.0.9 // indirect - github.com/mattn/go-runewidth v0.0.4 // indirect + github.com/huin/goupnp v1.0.0 + github.com/inconshreveable/mousetrap v1.0.0 + github.com/ipfs/bbloom v0.0.1 + github.com/ipfs/go-bitswap v0.1.6 + github.com/ipfs/go-block-format v0.0.2 + github.com/ipfs/go-blockservice v0.1.2 + github.com/ipfs/go-cid v0.0.3 + github.com/ipfs/go-cidutil v0.0.2 + github.com/ipfs/go-datastore v0.0.5 + github.com/ipfs/go-ds-badger v0.0.5 + github.com/ipfs/go-ds-flatfs v0.0.2 + github.com/ipfs/go-ds-leveldb v0.0.2 + github.com/ipfs/go-ds-measure v0.0.1 + github.com/ipfs/go-fs-lock v0.0.1 + github.com/ipfs/go-ipfs v0.4.22 + github.com/ipfs/go-ipfs-blockstore v0.0.1 + github.com/ipfs/go-ipfs-chunker v0.0.1 + github.com/ipfs/go-ipfs-config v0.0.3 + github.com/ipfs/go-ipfs-delay v0.0.1 + github.com/ipfs/go-ipfs-ds-help v0.0.1 + github.com/ipfs/go-ipfs-exchange-interface v0.0.1 + github.com/ipfs/go-ipfs-exchange-offline v0.0.1 + github.com/ipfs/go-ipfs-files v0.0.3 + github.com/ipfs/go-ipfs-posinfo v0.0.1 + github.com/ipfs/go-ipfs-pq v0.0.1 + github.com/ipfs/go-ipfs-routing v0.1.0 + github.com/ipfs/go-ipfs-util v0.0.1 + github.com/ipfs/go-ipld-cbor v0.0.3 + github.com/ipfs/go-ipld-format v0.0.2 + github.com/ipfs/go-ipld-git v0.0.2 + github.com/ipfs/go-ipns v0.0.1 + github.com/ipfs/go-log v0.0.1 + github.com/ipfs/go-merkledag v0.1.0 + github.com/ipfs/go-metrics-interface v0.0.1 + github.com/ipfs/go-mfs v0.1.1 + github.com/ipfs/go-path v0.0.7 + github.com/ipfs/go-peertaskqueue v0.1.1 + github.com/ipfs/go-todocounter v0.0.1 + github.com/ipfs/go-unixfs v0.1.0 + github.com/ipfs/go-verifcid v0.0.1 + github.com/ipfs/interface-go-ipfs-core v0.1.0 + github.com/jackpal/gateway v1.0.5 + github.com/jackpal/go-nat-pmp v1.0.1 + github.com/jbenet/go-is-domain v1.0.2 + github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 + github.com/jbenet/goprocess v0.1.3 + github.com/jessevdk/go-flags v1.4.0 // indirect + github.com/jmoiron/sqlx v0.0.0-20190426154859-38398a30ed85 + github.com/karalabe/usb v0.0.0-20190703133951-9be757f914c0 + github.com/kisielk/errcheck v1.2.0 // indirect + github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.2 + github.com/koron/go-ssdp v0.0.0-20180514024734-4a0ed625a78b + github.com/lib/pq v1.2.0 + github.com/libp2p/go-addr-util v0.0.1 + github.com/libp2p/go-buffer-pool v0.0.2 + github.com/libp2p/go-conn-security-multistream v0.1.0 + github.com/libp2p/go-eventbus v0.0.3 + github.com/libp2p/go-flow-metrics v0.0.1 + github.com/libp2p/go-libp2p v0.1.2 + github.com/libp2p/go-libp2p-autonat v0.1.0 + github.com/libp2p/go-libp2p-autonat-svc v0.1.0 + github.com/libp2p/go-libp2p-circuit v0.1.0 + github.com/libp2p/go-libp2p-connmgr v0.1.0 + github.com/libp2p/go-libp2p-core v0.0.9 + github.com/libp2p/go-libp2p-crypto v0.1.0 + github.com/libp2p/go-libp2p-discovery v0.1.0 + github.com/libp2p/go-libp2p-kad-dht v0.1.1 + github.com/libp2p/go-libp2p-kbucket v0.2.0 + github.com/libp2p/go-libp2p-loggables v0.1.0 + github.com/libp2p/go-libp2p-mplex v0.2.1 + github.com/libp2p/go-libp2p-nat v0.0.4 + github.com/libp2p/go-libp2p-peer v0.2.0 + github.com/libp2p/go-libp2p-peerstore v0.1.3 + github.com/libp2p/go-libp2p-pnet v0.1.0 + github.com/libp2p/go-libp2p-protocol v0.1.0 // indirect + github.com/libp2p/go-libp2p-pubsub v0.1.0 + github.com/libp2p/go-libp2p-pubsub-router v0.1.0 + github.com/libp2p/go-libp2p-quic-transport v0.1.1 + github.com/libp2p/go-libp2p-record v0.1.0 + github.com/libp2p/go-libp2p-routing v0.1.0 + github.com/libp2p/go-libp2p-routing-helpers v0.1.0 + github.com/libp2p/go-libp2p-secio v0.1.1 + github.com/libp2p/go-libp2p-swarm v0.1.1 + github.com/libp2p/go-libp2p-tls v0.1.0 + github.com/libp2p/go-libp2p-transport-upgrader v0.1.1 + github.com/libp2p/go-libp2p-yamux v0.2.1 + github.com/libp2p/go-maddr-filter v0.0.5 + github.com/libp2p/go-mplex v0.1.0 + github.com/libp2p/go-msgio v0.0.4 + github.com/libp2p/go-nat v0.0.3 + github.com/libp2p/go-reuseport v0.0.1 + github.com/libp2p/go-reuseport-transport v0.0.2 + github.com/libp2p/go-stream-muxer v0.1.0 // indirect + github.com/libp2p/go-stream-muxer-multistream v0.2.0 + github.com/libp2p/go-tcp-transport v0.1.0 + github.com/libp2p/go-testutil v0.1.0 // indirect + github.com/libp2p/go-ws-transport v0.1.0 + github.com/libp2p/go-yamux v1.2.3 + github.com/lucas-clemente/quic-go v0.11.2 + github.com/magiconair/properties v1.8.1 + github.com/marten-seemann/qtls v0.2.4 + github.com/mattn/go-colorable v0.1.2 + github.com/mattn/go-isatty v0.0.8 + github.com/mattn/go-runewidth v0.0.4 + github.com/miekg/dns v1.1.15 + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 + github.com/minio/sha256-simd v0.1.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect - github.com/olekukonko/tablewriter v0.0.1 // indirect - github.com/onsi/ginkgo v1.7.0 - github.com/onsi/gomega v1.4.3 - github.com/oschwald/maxminddb-golang v1.5.0 // indirect - github.com/pborman/uuid v1.2.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 + github.com/mmcloughlin/avo v0.0.0-20190731014047-bb615f61ce85 + github.com/mr-tron/base58 v1.1.2 + github.com/multiformats/go-base32 v0.0.3 + github.com/multiformats/go-multiaddr v0.0.4 + github.com/multiformats/go-multiaddr-dns v0.0.3 + github.com/multiformats/go-multiaddr-fmt v0.0.1 + github.com/multiformats/go-multiaddr-net v0.0.1 + github.com/multiformats/go-multibase v0.0.1 + github.com/multiformats/go-multihash v0.0.6 + github.com/multiformats/go-multistream v0.1.0 + github.com/olekukonko/tablewriter v0.0.1 + github.com/onsi/ginkgo v1.8.0 + github.com/onsi/gomega v1.5.0 + github.com/opentracing/opentracing-go v1.1.0 + github.com/pborman/uuid v1.2.0 + github.com/pelletier/go-toml v1.4.0 + github.com/pkg/errors v0.8.1 + github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a github.com/pressly/goose v2.6.0+incompatible github.com/prometheus/tsdb v0.10.0 github.com/rjeczalik/notify v0.9.2 @@ -82,9 +210,13 @@ require ( golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/sync v0.0.0-20190423024810-112230192c58 - google.golang.org/appengine v1.6.5 // indirect - gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect + golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 + golang.org/x/text v0.3.2 + golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060 + golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 + google.golang.org/appengine v1.4.0 // indirect + gopkg.in/fsnotify.v1 v1.4.7 + gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 gopkg.in/urfave/cli.v1 v1.20.0 // indirect gopkg.in/yaml.v2 v2.2.2 diff --git a/integration_test/contract_watcher_full_transformer_test.go b/integration_test/contract_watcher_full_transformer_test.go index f6623650..0a606c43 100644 --- a/integration_test/contract_watcher_full_transformer_test.go +++ b/integration_test/contract_watcher_full_transformer_test.go @@ -22,8 +22,6 @@ import ( "strings" "time" - "github.com/vulcanize/vulcanizedb/pkg/config" - "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" diff --git a/pkg/contract_watcher/full/retriever/retriever_suite_test.go b/pkg/contract_watcher/full/retriever/retriever_suite_test.go index a79487c7..2f97ce0a 100644 --- a/pkg/contract_watcher/full/retriever/retriever_suite_test.go +++ b/pkg/contract_watcher/full/retriever/retriever_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/full/transformer/transformer_suite_test.go b/pkg/contract_watcher/full/transformer/transformer_suite_test.go index ed6bb349..aac31e85 100644 --- a/pkg/contract_watcher/full/transformer/transformer_suite_test.go +++ b/pkg/contract_watcher/full/transformer/transformer_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/header/repository/repository_suite_test.go b/pkg/contract_watcher/header/repository/repository_suite_test.go index a80fea54..87726ebd 100644 --- a/pkg/contract_watcher/header/repository/repository_suite_test.go +++ b/pkg/contract_watcher/header/repository/repository_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/header/retriever/retriever_suite_test.go b/pkg/contract_watcher/header/retriever/retriever_suite_test.go index ef2a009a..f6d4967e 100644 --- a/pkg/contract_watcher/header/retriever/retriever_suite_test.go +++ b/pkg/contract_watcher/header/retriever/retriever_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/header/transformer/transformer_suite_test.go b/pkg/contract_watcher/header/transformer/transformer_suite_test.go index 91ee7975..56bcc01c 100644 --- a/pkg/contract_watcher/header/transformer/transformer_suite_test.go +++ b/pkg/contract_watcher/header/transformer/transformer_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/shared/repository/repository_suite_test.go b/pkg/contract_watcher/shared/repository/repository_suite_test.go index f10c9448..707e1917 100644 --- a/pkg/contract_watcher/shared/repository/repository_suite_test.go +++ b/pkg/contract_watcher/shared/repository/repository_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/contract_watcher/shared/retriever/address_retriever.go b/pkg/contract_watcher/shared/retriever/address_retriever.go index 074537b3..8afb2b98 100644 --- a/pkg/contract_watcher/shared/retriever/address_retriever.go +++ b/pkg/contract_watcher/shared/retriever/address_retriever.go @@ -20,8 +20,6 @@ import ( "fmt" "strings" - "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" diff --git a/pkg/contract_watcher/shared/retriever/retriever_suite_test.go b/pkg/contract_watcher/shared/retriever/retriever_suite_test.go index 93c4ff6a..6056bbfc 100644 --- a/pkg/contract_watcher/shared/retriever/retriever_suite_test.go +++ b/pkg/contract_watcher/shared/retriever/retriever_suite_test.go @@ -20,8 +20,6 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" diff --git a/pkg/eth/node/node.go b/pkg/eth/node/node.go index 39b72aaf..a60d271e 100644 --- a/pkg/eth/node/node.go +++ b/pkg/eth/node/node.go @@ -22,8 +22,6 @@ import ( "strconv" "strings" - "strings" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" log "github.com/sirupsen/logrus" diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index ab60a9a2..6a119efe 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -17,10 +17,14 @@ package super_node import ( + "errors" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" @@ -29,6 +33,10 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) +const ( + DefaultMaxBatchSize uint64 = 5000 + defaultMaxBatchNumber int64 = 100 +) // BackFillInterface for filling in gaps in the super node type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node @@ -46,9 +54,11 @@ type BackFillService struct { // Interface for searching and retrieving CIDs from Postgres index Retriever CIDRetriever // State-diff fetcher; needs to be configured with an archival core.RpcClient - StateDiffFetcher fetcher.IStateDiffFetcher + Fetcher fetcher.StateDiffFetcher // Check frequency GapCheckFrequency time.Duration + // size of batch fetches + batchSize uint64 } // NewBackFillService returns a new BackFillInterface @@ -62,8 +72,9 @@ func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), Publisher: publisher, Retriever: NewCIDRetriever(db), - StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRPCClient), + Fetcher: fetcher.NewStateDiffFetcher(archivalNodeRPCClient), GapCheckFrequency: freq, + batchSize: DefaultMaxBatchSize, }, nil } @@ -88,12 +99,8 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { continue } if startingBlock != 1 { - startingGap := [2]int64{ - 1, - startingBlock - 1, - } log.Info("found gap at the beginning of the sync") - bfs.fillGaps(startingGap) + bfs.fillGaps(1, uint64(startingBlock-1)) } gaps, gapErr := bfs.Retriever.RetrieveGapsInData() @@ -102,7 +109,7 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { continue } for _, gap := range gaps { - bfs.fillGaps(gap) + bfs.fillGaps(gap[0], gap[1]) } } } @@ -110,31 +117,122 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { log.Info("fillGaps goroutine successfully spun up") } -func (bfs *BackFillService) fillGaps(gap [2]int64) { - log.Infof("filling in gap from block %d to block %d", gap[0], gap[1]) - blockHeights := make([]uint64, 0, gap[1]-gap[0]+1) - for i := gap[0]; i <= gap[1]; i++ { - blockHeights = append(blockHeights, uint64(i)) - } - payloads, fetchErr := bfs.StateDiffFetcher.FetchStateDiffsAt(blockHeights) - if fetchErr != nil { - log.Error(fetchErr) +func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) { + errChan := make(chan error) + done := make(chan bool) + backFillInitErr := bfs.BackFill(startingBlock, endingBlock, errChan, done) + if backFillInitErr != nil { + log.Error(backFillInitErr) return } - for _, payload := range payloads { - ipldPayload, convertErr := bfs.Converter.Convert(*payload) - if convertErr != nil { - log.Error(convertErr) - continue - } - cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload) - if publishErr != nil { - log.Error(publishErr) - continue - } - indexErr := bfs.Repository.Index(cidPayload) - if indexErr != nil { - log.Error(indexErr) + for { + select { + case err := <- errChan: + log.Error(err) + case <- done: + return } } } + + +// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks +// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently +func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { + if endingBlock < startingBlock { + return errors.New("backfill: ending block number needs to be greater than starting block number") + } + // break the range up into bins of smaller ranges + length := endingBlock - startingBlock + 1 + numberOfBins := length / bfs.batchSize + remainder := length % bfs.batchSize + if remainder != 0 { + numberOfBins++ + } + blockRangeBins := make([][]uint64, numberOfBins) + for i := range blockRangeBins { + nextBinStart := startingBlock + uint64(bfs.batchSize) + if nextBinStart > endingBlock { + nextBinStart = endingBlock + 1 + } + blockRange := make([]uint64, 0, nextBinStart-startingBlock+1) + for j := startingBlock; j < nextBinStart; j++ { + blockRange = append(blockRange, j) + } + startingBlock = nextBinStart + blockRangeBins[i] = blockRange + } + + // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have + var activeCount int64 + // channel for processing goroutines to signal when they are done + processingDone := make(chan bool) + forwardDone := make(chan bool) + + // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range + go func() { + for _, blockHeights := range blockRangeBins { + // if we have reached our limit of active goroutines + // wait for one to finish before starting the next + if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber { + // this blocks until a process signals it has finished + <-forwardDone + } + go func(blockHeights []uint64) { + payloads, fetchErr := bfs.Fetcher.FetchStateDiffsAt(blockHeights) + if fetchErr != nil { + errChan <- fetchErr + } + for _, payload := range payloads { + stateDiff := new(statediff.StateDiff) + stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) + if stateDiffDecodeErr != nil { + errChan <- stateDiffDecodeErr + continue + } + ipldPayload, convertErr := bfs.Converter.Convert(payload) + if convertErr != nil { + log.Error(convertErr) + continue + } + cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload) + if publishErr != nil { + log.Error(publishErr) + continue + } + indexErr := bfs.Repository.Index(cidPayload) + if indexErr != nil { + log.Error(indexErr) + } + } + // when this goroutine is done, send out a signal + processingDone <- true + }(blockHeights) + } + }() + + // goroutine that listens on the processingDone chan + // keeps track of the number of processing goroutines that have finished + // when they have all finished, sends the final signal out + go func() { + goroutinesFinished := 0 + for { + select { + case <-processingDone: + atomic.AddInt64(&activeCount, -1) + select { + // if we are waiting for a process to finish, signal that one has + case forwardDone <- true: + default: + } + goroutinesFinished++ + if goroutinesFinished == int(numberOfBins) { + done <- true + return + } + } + } + }() + + return nil +} \ No newline at end of file diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 20fb3e74..5916cd27 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -53,10 +53,10 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks2.MockStateDiffFetcher{ - StateDiffsToReturn: map[uint64]*statediff.Payload{ - 100: &mocks.MockStateDiffPayload, - 101: &mocks.MockStateDiffPayload, + mockFetcher := &mocks2.StateDiffFetcher{ + PayloadsToReturn: map[uint64]statediff.Payload{ + 100: mocks.MockStateDiffPayload, + 101: mocks.MockStateDiffPayload, }, } backfiller := &super_node.BackFillService{ @@ -65,12 +65,12 @@ var _ = Describe("BackFiller", func() { Converter: mockConverter, StateDiffFetcher: mockFetcher, Retriever: mockRetriever, - GapCheckFrequency: time.Second * 10, + GapCheckFrequency: time.Second * 2, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) backfiller.FillGaps(wg, quitChan) - time.Sleep(time.Second * 15) + time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) @@ -106,9 +106,9 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks2.MockStateDiffFetcher{ - StateDiffsToReturn: map[uint64]*statediff.Payload{ - 100: &mocks.MockStateDiffPayload, + mockFetcher := &mocks2.StateDiffFetcher{ + PayloadsToReturn: map[uint64]statediff.Payload{ + 100: mocks.MockStateDiffPayload, }, } backfiller := &super_node.BackFillService{ @@ -117,12 +117,12 @@ var _ = Describe("BackFiller", func() { Converter: mockConverter, StateDiffFetcher: mockFetcher, Retriever: mockRetriever, - GapCheckFrequency: time.Second * 10, + GapCheckFrequency: time.Second * 2, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) backfiller.FillGaps(wg, quitChan) - time.Sleep(time.Second * 15) + time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) @@ -151,10 +151,10 @@ var _ = Describe("BackFiller", func() { FirstBlockNumberToReturn: 3, GapsToRetrieve: [][2]int64{}, } - mockFetcher := &mocks2.MockStateDiffFetcher{ - StateDiffsToReturn: map[uint64]*statediff.Payload{ - 1: &mocks.MockStateDiffPayload, - 2: &mocks.MockStateDiffPayload, + mockFetcher := &mocks2.StateDiffFetcher{ + PayloadsToReturn: map[uint64]statediff.Payload{ + 1: mocks.MockStateDiffPayload, + 2: mocks.MockStateDiffPayload, }, } backfiller := &super_node.BackFillService{ @@ -163,12 +163,12 @@ var _ = Describe("BackFiller", func() { Converter: mockConverter, StateDiffFetcher: mockFetcher, Retriever: mockRetriever, - GapCheckFrequency: time.Second * 10, + GapCheckFrequency: time.Second * 2, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) backfiller.FillGaps(wg, quitChan) - time.Sleep(time.Second * 15) + time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) diff --git a/pkg/super_node/seed_node_suite_test.go b/pkg/super_node/super_node_suite_test.go similarity index 96% rename from pkg/super_node/seed_node_suite_test.go rename to pkg/super_node/super_node_suite_test.go index 83b84e4b..12321813 100644 --- a/pkg/super_node/seed_node_suite_test.go +++ b/pkg/super_node/super_node_suite_test.go @@ -27,7 +27,7 @@ import ( func TestSuperNode(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Seed Node Suite Test") + RunSpecs(t, "Super Node Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/super_node/test_helpers.go b/pkg/super_node/test_helpers.go index 81afe96f..86b4f8ca 100644 --- a/pkg/super_node/test_helpers.go +++ b/pkg/super_node/test_helpers.go @@ -30,7 +30,7 @@ import ( func SetupDB() (*postgres.DB, error) { return postgres.NewDB(config.Database{ Hostname: "localhost", - Name: "vulcanize_private", + Name: "vulcanize_testing", Port: 5432, }, core.Node{}) }