Remove injection of SubCommand for logs

This commit is contained in:
Andrew J Yao 2019-07-18 18:58:01 -07:00
parent a188e1dd79
commit ee77fc6521
13 changed files with 102 additions and 93 deletions

View File

@ -39,7 +39,8 @@ var coldImportCmd = &cobra.Command{
Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`, Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
coldImport() coldImport()
}, },
} }
@ -56,7 +57,7 @@ func coldImport() {
ethDBConfig := ethereum.CreateDatabaseConfig(ethereum.Level, levelDbPath) ethDBConfig := ethereum.CreateDatabaseConfig(ethereum.Level, levelDbPath)
ethDB, err := ethereum.CreateDatabase(ethDBConfig) ethDB, err := ethereum.CreateDatabase(ethDBConfig)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal("Error connecting to ethereum db: ", err) LogWithCommand.Fatal("Error connecting to ethereum db: ", err)
} }
mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber() mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber()
if syncAll { if syncAll {
@ -64,10 +65,10 @@ func coldImport() {
endingBlockNumber = mostRecentBlockNumberInDb endingBlockNumber = mostRecentBlockNumberInDb
} }
if endingBlockNumber < startingBlockNumber { if endingBlockNumber < startingBlockNumber {
log.WithField("subCommand", subCommand).Fatal("Ending block number must be greater than starting block number for cold import.") LogWithCommand.Fatal("Ending block number must be greater than starting block number for cold import.")
} }
if endingBlockNumber > mostRecentBlockNumberInDb { if endingBlockNumber > mostRecentBlockNumberInDb {
log.WithField("subCommand", subCommand).Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb) LogWithCommand.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb)
} }
// init pg db // init pg db
@ -77,7 +78,7 @@ func coldImport() {
nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser) nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser)
coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath) coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal("Error getting node: ", err) LogWithCommand.Fatal("Error getting node: ", err)
} }
pgDB := utils.LoadPostgres(databaseConfig, coldNode) pgDB := utils.LoadPostgres(databaseConfig, coldNode)
@ -91,6 +92,6 @@ func coldImport() {
coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter) coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter)
err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID) err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal("Error executing cold import: ", err) LogWithCommand.Fatal("Error executing cold import: ", err)
} }
} }

View File

@ -101,7 +101,8 @@ single config file or in separate command instances using different config files
Specify config location when executing the command: Specify config location when executing the command:
./vulcanizedb compose --config=./environments/config_name.toml`, ./vulcanizedb compose --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
compose() compose()
}, },
} }
@ -111,23 +112,25 @@ func compose() {
prepConfig() prepConfig()
// Generate code to build the plugin according to the config file // Generate code to build the plugin according to the config file
log.WithField("subCommand", subCommand).Info("generating plugin") LogWithCommand.Info("generating plugin")
generator, err := p2.NewGenerator(genConfig, databaseConfig) generator, err := p2.NewGenerator(genConfig, databaseConfig)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Debug("initializing plugin generator failed")
LogWithCommand.Fatal(err)
} }
err = generator.GenerateExporterPlugin() err = generator.GenerateExporterPlugin()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Debug("generating plugin failed") LogWithCommand.Debug("generating plugin failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with // TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with
_, pluginPath, err := genConfig.GetPluginPaths() _, pluginPath, err := genConfig.GetPluginPaths()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Debug("getting plugin path failed")
LogWithCommand.Fatal(err)
} }
fmt.Printf("Composed plugin %s", pluginPath) fmt.Printf("Composed plugin %s", pluginPath)
log.WithField("subCommand", subCommand).Info("plugin .so file output to", pluginPath) LogWithCommand.Info("plugin .so file output to ", pluginPath)
} }
func init() { func init() {
@ -135,38 +138,38 @@ func init() {
} }
func prepConfig() { func prepConfig() {
log.WithField("subCommand", subCommand).Info("configuring plugin") LogWithCommand.Info("configuring plugin")
names := viper.GetStringSlice("exporter.transformerNames") names := viper.GetStringSlice("exporter.transformerNames")
transformers := make(map[string]config.Transformer) transformers := make(map[string]config.Transformer)
for _, name := range names { for _, name := range names {
transformer := viper.GetStringMapString("exporter." + name) transformer := viper.GetStringMapString("exporter." + name)
p, pOK := transformer["path"] p, pOK := transformer["path"]
if !pOK || p == "" { if !pOK || p == "" {
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `path` value") LogWithCommand.Fatal(name, " transformer config is missing `path` value")
} }
r, rOK := transformer["repository"] r, rOK := transformer["repository"]
if !rOK || r == "" { if !rOK || r == "" {
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `repository` value") LogWithCommand.Fatal(name, " transformer config is missing `repository` value")
} }
m, mOK := transformer["migrations"] m, mOK := transformer["migrations"]
if !mOK || m == "" { if !mOK || m == "" {
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `migrations` value") LogWithCommand.Fatal(name, " transformer config is missing `migrations` value")
} }
mr, mrOK := transformer["rank"] mr, mrOK := transformer["rank"]
if !mrOK || mr == "" { if !mrOK || mr == "" {
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `rank` value") LogWithCommand.Fatal(name, " transformer config is missing `rank` value")
} }
rank, err := strconv.ParseUint(mr, 10, 64) rank, err := strconv.ParseUint(mr, 10, 64)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(name, "migration `rank` can't be converted to an unsigned integer") LogWithCommand.Fatal(name, " migration `rank` can't be converted to an unsigned integer")
} }
t, tOK := transformer["type"] t, tOK := transformer["type"]
if !tOK { if !tOK {
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `type` value") LogWithCommand.Fatal(name, " transformer config is missing `type` value")
} }
transformerType := config.GetTransformerType(t) transformerType := config.GetTransformerType(t)
if transformerType == config.UnknownTransformerType { if transformerType == config.UnknownTransformerType {
log.WithField("subCommand", subCommand).Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`)) LogWithCommand.Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`))
} }
transformers[name] = config.Transformer{ transformers[name] = config.Transformer{

View File

@ -105,7 +105,8 @@ single config file or in separate command instances using different config files
Specify config location when executing the command: Specify config location when executing the command:
./vulcanizedb composeAndExecute --config=./environments/config_name.toml`, ./vulcanizedb composeAndExecute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
composeAndExecute() composeAndExecute()
}, },
} }
@ -115,44 +116,44 @@ func composeAndExecute() {
prepConfig() prepConfig()
// Generate code to build the plugin according to the config file // Generate code to build the plugin according to the config file
log.WithField("subCommand", subCommand).Info("generating plugin") LogWithCommand.Info("generating plugin")
generator, err := p2.NewGenerator(genConfig, databaseConfig) generator, err := p2.NewGenerator(genConfig, databaseConfig)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
err = generator.GenerateExporterPlugin() err = generator.GenerateExporterPlugin()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Debug("generating plugin failed") LogWithCommand.Debug("generating plugin failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// Get the plugin path and load the plugin // Get the plugin path and load the plugin
_, pluginPath, err := genConfig.GetPluginPaths() _, pluginPath, err := genConfig.GetPluginPaths()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
if !genConfig.Save { if !genConfig.Save {
defer helpers.ClearFiles(pluginPath) defer helpers.ClearFiles(pluginPath)
} }
log.WithField("subCommand", subCommand).Info("linking plugin", pluginPath) LogWithCommand.Info("linking plugin ", pluginPath)
plug, err := plugin.Open(pluginPath) plug, err := plugin.Open(pluginPath)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Debug("linking plugin failed") LogWithCommand.Debug("linking plugin failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// Load the `Exporter` symbol from the plugin // Load the `Exporter` symbol from the plugin
log.WithField("subCommand", subCommand).Info("loading transformers from plugin") LogWithCommand.Info("loading transformers from plugin")
symExporter, err := plug.Lookup("Exporter") symExporter, err := plug.Lookup("Exporter")
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Debug("loading Exporter symbol failed") LogWithCommand.Debug("loading Exporter symbol failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// Assert that the symbol is of type Exporter // Assert that the symbol is of type Exporter
exporter, ok := symExporter.(Exporter) exporter, ok := symExporter.(Exporter)
if !ok { if !ok {
log.WithField("subCommand", subCommand).Debug("plugged-in symbol not of type Exporter") LogWithCommand.Debug("plugged-in symbol not of type Exporter")
os.Exit(1) os.Exit(1)
} }
@ -160,7 +161,7 @@ func composeAndExecute() {
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export() ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
// Setup bc and db objects // Setup bc and db objects
blockChain := getBlockChain(subCommand) blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
// Execute over transformer sets returned by the exporter // Execute over transformer sets returned by the exporter

View File

@ -79,7 +79,8 @@ Requires a .toml config file:
piping = true piping = true
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
contractWatcher() contractWatcher()
}, },
} }
@ -92,7 +93,7 @@ func contractWatcher() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop() defer ticker.Stop()
blockChain := getBlockChain(subCommand) blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
var t st.ContractTransformer var t st.ContractTransformer
@ -104,18 +105,18 @@ func contractWatcher() {
case "full": case "full":
t = ft.NewTransformer(con, blockChain, &db) t = ft.NewTransformer(con, blockChain, &db)
default: default:
log.WithField("subCommand", subCommand).Fatal("Invalid mode") LogWithCommand.Fatal("Invalid mode")
} }
err := t.Init() err := t.Init()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(fmt.Sprintf("Failed to initialized transformer\r\nerr: %v\r\n", err)) LogWithCommand.Fatal(fmt.Sprintf("Failed to initialize transformer, err: %v ", err))
} }
for range ticker.C { for range ticker.C {
err = t.Execute() err = t.Execute()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("Execution error for transformer:", t.GetConfig().Name, err) LogWithCommand.Error("Execution error for transformer: ", t.GetConfig().Name, err)
} }
} }
} }

View File

@ -63,7 +63,8 @@ must have been composed by the same version of vulcanizedb or else it will not b
Specify config location when executing the command: Specify config location when executing the command:
./vulcanizedb execute --config=./environments/config_name.toml`, ./vulcanizedb execute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
execute() execute()
}, },
} }
@ -75,36 +76,36 @@ func execute() {
// Get the plugin path and load the plugin // Get the plugin path and load the plugin
_, pluginPath, err := genConfig.GetPluginPaths() _, pluginPath, err := genConfig.GetPluginPaths()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
fmt.Printf("Executing plugin %s", pluginPath) fmt.Printf("Executing plugin %s", pluginPath)
log.WithField("subCommand", subCommand).Info("linking plugin", pluginPath) LogWithCommand.Info("linking plugin ", pluginPath)
plug, err := plugin.Open(pluginPath) plug, err := plugin.Open(pluginPath)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Warn("linking plugin failed") LogWithCommand.Warn("linking plugin failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// Load the `Exporter` symbol from the plugin // Load the `Exporter` symbol from the plugin
log.WithField("subCommand", subCommand).Info("loading transformers from plugin") LogWithCommand.Info("loading transformers from plugin")
symExporter, err := plug.Lookup("Exporter") symExporter, err := plug.Lookup("Exporter")
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Warn("loading Exporter symbol failed") LogWithCommand.Warn("loading Exporter symbol failed")
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
// Assert that the symbol is of type Exporter // Assert that the symbol is of type Exporter
exporter, ok := symExporter.(Exporter) exporter, ok := symExporter.(Exporter)
if !ok { if !ok {
log.WithField("subCommand", subCommand).Fatal("plugged-in symbol not of type Exporter") LogWithCommand.Fatal("plugged-in symbol not of type Exporter")
} }
// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets // Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export() ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
// Setup bc and db objects // Setup bc and db objects
blockChain := getBlockChain(subCommand) blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
// Execute over transformer sets returned by the exporter // Execute over transformer sets returned by the exporter
@ -148,7 +149,7 @@ type Exporter interface {
func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the EventTransformerInitializer set using the watcher // Execute over the EventTransformerInitializer set using the watcher
log.WithField("subCommand", subCommand).Info("executing event transformers") LogWithCommand.Info("executing event transformers")
var recheck constants.TransformerExecution var recheck constants.TransformerExecution
if recheckHeadersArg { if recheckHeadersArg {
recheck = constants.HeaderRecheck recheck = constants.HeaderRecheck
@ -165,7 +166,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher // Execute over the StorageTransformerInitializer set using the storage watcher
log.WithField("subCommand", subCommand).Info("executing storage transformers") LogWithCommand.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval) ticker := time.NewTicker(pollingInterval)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
@ -178,7 +179,7 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
defer wg.Done() defer wg.Done()
// Execute over the ContractTransformerInitializer set using the contract watcher // Execute over the ContractTransformerInitializer set using the contract watcher
log.WithField("subCommand", subCommand).Info("executing contract_watcher transformers") LogWithCommand.Info("executing contract_watcher transformers")
ticker := time.NewTicker(pollingInterval) ticker := time.NewTicker(pollingInterval)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {

View File

@ -49,7 +49,8 @@ Expects ethereum node to be running and requires a .toml config:
ipcPath = "/Users/user/Library/Ethereum/geth.ipc" ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
fullSync() fullSync()
}, },
} }
@ -62,7 +63,7 @@ func init() {
func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber) populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("backfillAllBlocks: error in populateMissingBlocks: ", err) LogWithCommand.Error("backfillAllBlocks: error in populateMissingBlocks: ", err)
} }
missingBlocksPopulated <- populated missingBlocksPopulated <- populated
} }
@ -71,16 +72,16 @@ func fullSync() {
ticker := time.NewTicker(pollingInterval) ticker := time.NewTicker(pollingInterval)
defer ticker.Stop() defer ticker.Stop()
blockChain := getBlockChain(subCommand) blockChain := getBlockChain()
lastBlock, err := blockChain.LastBlock() lastBlock, err := blockChain.LastBlock()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("fullSync: Error getting last block: ", err) LogWithCommand.Error("fullSync: Error getting last block: ", err)
} }
if lastBlock.Int64() == 0 { if lastBlock.Int64() == 0 {
log.WithField("subCommand", subCommand).Fatal("geth initial: state sync not finished") LogWithCommand.Fatal("geth initial: state sync not finished")
} }
if startingBlockNumber > lastBlock.Int64() { if startingBlockNumber > lastBlock.Int64() {
log.WithField("subCommand", subCommand).Fatal("fullSync: starting block number > current block number") LogWithCommand.Fatal("fullSync: starting block number > current block number")
} }
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
@ -92,11 +93,11 @@ func fullSync() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
window, err := validator.ValidateBlocks(subCommand) window, err := validator.ValidateBlocks()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("fullSync: error in validateBlocks: ", err) LogWithCommand.Error("fullSync: error in validateBlocks: ", err)
} }
log.WithField("subCommand", subCommand).Debug(window.GetString()) LogWithCommand.Debug(window.GetString())
case <-missingBlocksPopulated: case <-missingBlocksPopulated:
go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber) go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber)
} }

View File

@ -30,8 +30,6 @@ import (
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
) )
var subCommand string
// headerSyncCmd represents the headerSync command // headerSyncCmd represents the headerSync command
var headerSyncCmd = &cobra.Command{ var headerSyncCmd = &cobra.Command{
Use: "headerSync", Use: "headerSync",
@ -52,7 +50,8 @@ Expects ethereum node to be running and requires a .toml config:
ipcPath = "/Users/user/Library/Ethereum/geth.ipc" ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
headerSync() headerSync()
}, },
} }
@ -63,11 +62,11 @@ func init() {
} }
func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber, subCommand) populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber)
if err != nil { if err != nil {
// TODO Lots of possible errors in the call stack above. If errors occur, we still put // TODO Lots of possible errors in the call stack above. If errors occur, we still put
// 0 in the channel, triggering another round // 0 in the channel, triggering another round
log.WithField("subCommand", subCommand).Error("backfillAllHeaders: Error populating headers: ", err) LogWithCommand.Error("backfillAllHeaders: Error populating headers: ", err)
} }
missingBlocksPopulated <- populated missingBlocksPopulated <- populated
} }
@ -75,7 +74,7 @@ func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.H
func headerSync() { func headerSync() {
ticker := time.NewTicker(pollingInterval) ticker := time.NewTicker(pollingInterval)
defer ticker.Stop() defer ticker.Stop()
blockChain := getBlockChain(subCommand) blockChain := getBlockChain()
validateArgs(blockChain) validateArgs(blockChain)
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
@ -89,9 +88,9 @@ func headerSync() {
case <-ticker.C: case <-ticker.C:
window, err := validator.ValidateHeaders() window, err := validator.ValidateHeaders()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("headerSync: ValidateHeaders failed: ", err) LogWithCommand.Error("headerSync: ValidateHeaders failed: ", err)
} }
log.WithField("subCommand", subCommand).Debug(window.GetString()) LogWithCommand.Debug(window.GetString())
case n := <-missingBlocksPopulated: case n := <-missingBlocksPopulated:
if n == 0 { if n == 0 {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
@ -104,12 +103,12 @@ func headerSync() {
func validateArgs(blockChain *geth.BlockChain) { func validateArgs(blockChain *geth.BlockChain) {
lastBlock, err := blockChain.LastBlock() lastBlock, err := blockChain.LastBlock()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("validateArgs: Error getting last block: ", err) LogWithCommand.Error("validateArgs: Error getting last block: ", err)
} }
if lastBlock.Int64() == 0 { if lastBlock.Int64() == 0 {
log.WithField("subCommand", subCommand).Fatal("geth initial: state sync not finished") LogWithCommand.Fatal("geth initial: state sync not finished")
} }
if startingBlockNumber > lastBlock.Int64() { if startingBlockNumber > lastBlock.Int64() {
log.WithField("subCommand", subCommand).Fatal("starting block number > current block number") LogWithCommand.Fatal("starting block number > current block number")
} }
} }

View File

@ -46,6 +46,8 @@ var (
syncAll bool syncAll bool
endingBlockNumber int64 endingBlockNumber int64
recheckHeadersArg bool recheckHeadersArg bool
SubCommand string
LogWithCommand log.Entry
) )
const ( const (
@ -66,7 +68,7 @@ func Execute() {
} }
func initFuncs(cmd *cobra.Command, args []string) { func initFuncs(cmd *cobra.Command, args []string) {
database() setViperConfigs()
logLvlErr := logLevel() logLvlErr := logLevel()
if logLvlErr != nil { if logLvlErr != nil {
log.Fatal("Could not set log level: ", logLvlErr) log.Fatal("Could not set log level: ", logLvlErr)
@ -74,7 +76,7 @@ func initFuncs(cmd *cobra.Command, args []string) {
} }
func database() { func setViperConfigs() {
ipc = viper.GetString("client.ipcpath") ipc = viper.GetString("client.ipcpath")
levelDbPath = viper.GetString("client.leveldbpath") levelDbPath = viper.GetString("client.leveldbpath")
storageDiffsPath = viper.GetString("filesystem.storageDiffsPath") storageDiffsPath = viper.GetString("filesystem.storageDiffsPath")
@ -149,11 +151,11 @@ func initConfig() {
} }
} }
func getBlockChain(subCommand string) *geth.BlockChain { func getBlockChain() *geth.BlockChain {
rawRpcClient, err := rpc.Dial(ipc) rawRpcClient, err := rpc.Dial(ipc)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Fatal(err) LogWithCommand.Fatal(err)
} }
rpcClient := client.NewRpcClient(rawRpcClient, ipc) rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient) ethClient := ethclient.NewClient(rawRpcClient)

View File

@ -4,4 +4,4 @@
port = 5432 port = 5432
[client] [client]
ipcPath = "https://kovan.infura.io/v3/be5f83b248b74b34890cbf76e0945e29" ipcPath = ""

View File

@ -17,7 +17,7 @@
package history package history
import ( import (
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
) )
@ -36,29 +36,29 @@ func NewBlockValidator(blockchain core.BlockChain, blockRepository datastore.Blo
} }
} }
func (bv BlockValidator) ValidateBlocks(subCommand string) (ValidationWindow, error) { func (bv BlockValidator) ValidateBlocks() (ValidationWindow, error) {
window, err := MakeValidationWindow(bv.blockchain, bv.windowSize) window, err := MakeValidationWindow(bv.blockchain, bv.windowSize)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error creating validation window: ", err) logrus.Error("ValidateBlocks: error creating validation window: ", err)
return ValidationWindow{}, err return ValidationWindow{}, err
} }
blockNumbers := MakeRange(window.LowerBound, window.UpperBound) blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
_, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers) _, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error getting and updating blocks: ", err) logrus.Error("ValidateBlocks: error getting and updating blocks: ", err)
return ValidationWindow{}, err return ValidationWindow{}, err
} }
lastBlock, err := bv.blockchain.LastBlock() lastBlock, err := bv.blockchain.LastBlock()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error getting last block: ", err) logrus.Error("ValidateBlocks: error getting last block: ", err)
return ValidationWindow{}, err return ValidationWindow{}, err
} }
err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64()) err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64())
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error setting block status: ", err) logrus.Error("ValidateBlocks: error setting block status: ", err)
return ValidationWindow{}, err return ValidationWindow{}, err
} }
return window, nil return window, nil

View File

@ -34,7 +34,7 @@ var _ = Describe("Blocks validator", func() {
blocksRepository := fakes.NewMockBlockRepository() blocksRepository := fakes.NewMockBlockRepository()
validator := history.NewBlockValidator(blockChain, blocksRepository, 2) validator := history.NewBlockValidator(blockChain, blocksRepository, 2)
window, err := validator.ValidateBlocks("subCommandForLogs") window, err := validator.ValidateBlocks()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7})) Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))

View File

@ -24,25 +24,25 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
) )
func PopulateMissingHeaders(blockChain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64, subCommand string) (int, error) { func PopulateMissingHeaders(blockChain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
lastBlock, err := blockChain.LastBlock() lastBlock, err := blockChain.LastBlock()
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting last block: ", err) log.Error("PopulateMissingHeaders: Error getting last block: ", err)
return 0, err return 0, err
} }
blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockChain.Node().ID) blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockChain.Node().ID)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting missing block numbers: ", err) log.Error("PopulateMissingHeaders: Error getting missing block numbers: ", err)
return 0, err return 0, err
} else if len(blockNumbers) == 0 { } else if len(blockNumbers) == 0 {
return 0, nil return 0, nil
} }
log.WithField("subCommand", subCommand).Debug(getBlockRangeString(blockNumbers)) log.Debug(getBlockRangeString(blockNumbers))
_, err = RetrieveAndUpdateHeaders(blockChain, headerRepository, blockNumbers) _, err = RetrieveAndUpdateHeaders(blockChain, headerRepository, blockNumbers)
if err != nil { if err != nil {
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting/updating headers:", err) log.Error("PopulateMissingHeaders: Error getting/updating headers: ", err)
return 0, err return 0, err
} }
return len(blockNumbers), nil return len(blockNumbers), nil

View File

@ -39,7 +39,7 @@ var _ = Describe("Populating headers", func() {
blockChain.SetLastBlock(big.NewInt(2)) blockChain.SetLastBlock(big.NewInt(2))
headerRepository.SetMissingBlockNumbers([]int64{2}) headerRepository.SetMissingBlockNumbers([]int64{2})
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, "subCommandForLogs") headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(headersAdded).To(Equal(1)) Expect(headersAdded).To(Equal(1))
@ -50,7 +50,7 @@ var _ = Describe("Populating headers", func() {
blockChain.SetLastBlock(big.NewInt(2)) blockChain.SetLastBlock(big.NewInt(2))
headerRepository.SetMissingBlockNumbers([]int64{2}) headerRepository.SetMissingBlockNumbers([]int64{2})
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, "subCommandForLogs") _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2}) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2})
@ -59,7 +59,7 @@ var _ = Describe("Populating headers", func() {
It("returns early if the db is already synced up to the head of the chain", func() { It("returns early if the db is already synced up to the head of the chain", func() {
blockChain := fakes.NewMockBlockChain() blockChain := fakes.NewMockBlockChain()
blockChain.SetLastBlock(big.NewInt(2)) blockChain.SetLastBlock(big.NewInt(2))
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2, "subCommandForLogs") headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(headersAdded).To(Equal(0)) Expect(headersAdded).To(Equal(0))