forked from cerc-io/ipld-eth-server
Add subCommand name to log fields
This commit is contained in:
parent
d7a82e353b
commit
92d153b010
@ -39,6 +39,7 @@ var coldImportCmd = &cobra.Command{
|
|||||||
|
|
||||||
Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`,
|
Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
coldImport()
|
coldImport()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -55,7 +56,7 @@ func coldImport() {
|
|||||||
ethDBConfig := ethereum.CreateDatabaseConfig(ethereum.Level, levelDbPath)
|
ethDBConfig := ethereum.CreateDatabaseConfig(ethereum.Level, levelDbPath)
|
||||||
ethDB, err := ethereum.CreateDatabase(ethDBConfig)
|
ethDB, err := ethereum.CreateDatabase(ethDBConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error connecting to ethereum db: ", err)
|
log.WithField("subCommand", subCommand).Fatal("Error connecting to ethereum db: ", err)
|
||||||
}
|
}
|
||||||
mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber()
|
mostRecentBlockNumberInDb := ethDB.GetHeadBlockNumber()
|
||||||
if syncAll {
|
if syncAll {
|
||||||
@ -63,10 +64,10 @@ func coldImport() {
|
|||||||
endingBlockNumber = mostRecentBlockNumberInDb
|
endingBlockNumber = mostRecentBlockNumberInDb
|
||||||
}
|
}
|
||||||
if endingBlockNumber < startingBlockNumber {
|
if endingBlockNumber < startingBlockNumber {
|
||||||
log.Fatal("Ending block number must be greater than starting block number for cold import.")
|
log.WithField("subCommand", subCommand).Fatal("Ending block number must be greater than starting block number for cold import.")
|
||||||
}
|
}
|
||||||
if endingBlockNumber > mostRecentBlockNumberInDb {
|
if endingBlockNumber > mostRecentBlockNumberInDb {
|
||||||
log.Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb)
|
log.WithField("subCommand", subCommand).Fatal("Ending block number is greater than most recent block in db: ", mostRecentBlockNumberInDb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// init pg db
|
// init pg db
|
||||||
@ -76,7 +77,7 @@ func coldImport() {
|
|||||||
nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser)
|
nodeBuilder := cold_import.NewColdImportNodeBuilder(reader, parser)
|
||||||
coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath)
|
coldNode, err := nodeBuilder.GetNode(genesisBlock, levelDbPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error getting node: ", err)
|
log.WithField("subCommand", subCommand).Fatal("Error getting node: ", err)
|
||||||
}
|
}
|
||||||
pgDB := utils.LoadPostgres(databaseConfig, coldNode)
|
pgDB := utils.LoadPostgres(databaseConfig, coldNode)
|
||||||
|
|
||||||
@ -90,6 +91,6 @@ func coldImport() {
|
|||||||
coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter)
|
coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter)
|
||||||
err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID)
|
err = coldImporter.Execute(startingBlockNumber, endingBlockNumber, coldNode.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error executing cold import: ", err)
|
log.WithField("subCommand", subCommand).Fatal("Error executing cold import: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,6 +101,7 @@ single config file or in separate command instances using different config files
|
|||||||
Specify config location when executing the command:
|
Specify config location when executing the command:
|
||||||
./vulcanizedb compose --config=./environments/config_name.toml`,
|
./vulcanizedb compose --config=./environments/config_name.toml`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
compose()
|
compose()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -110,23 +111,23 @@ func compose() {
|
|||||||
prepConfig()
|
prepConfig()
|
||||||
|
|
||||||
// Generate code to build the plugin according to the config file
|
// Generate code to build the plugin according to the config file
|
||||||
log.Info("generating plugin")
|
log.WithField("subCommand", subCommand).Info("generating plugin")
|
||||||
generator, err := p2.NewGenerator(genConfig, databaseConfig)
|
generator, err := p2.NewGenerator(genConfig, databaseConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
err = generator.GenerateExporterPlugin()
|
err = generator.GenerateExporterPlugin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("generating plugin failed")
|
log.WithField("subCommand", subCommand).Debug("generating plugin failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
// TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with
|
// TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with
|
||||||
_, pluginPath, err := genConfig.GetPluginPaths()
|
_, pluginPath, err := genConfig.GetPluginPaths()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
fmt.Printf("Composed plugin %s", pluginPath)
|
fmt.Printf("Composed plugin %s", pluginPath)
|
||||||
log.Info("plugin .so file output to", pluginPath)
|
log.WithField("subCommand", subCommand).Info("plugin .so file output to", pluginPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -134,38 +135,38 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func prepConfig() {
|
func prepConfig() {
|
||||||
log.Info("configuring plugin")
|
log.WithField("subCommand", subCommand).Info("configuring plugin")
|
||||||
names := viper.GetStringSlice("exporter.transformerNames")
|
names := viper.GetStringSlice("exporter.transformerNames")
|
||||||
transformers := make(map[string]config.Transformer)
|
transformers := make(map[string]config.Transformer)
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
transformer := viper.GetStringMapString("exporter." + name)
|
transformer := viper.GetStringMapString("exporter." + name)
|
||||||
p, pOK := transformer["path"]
|
p, pOK := transformer["path"]
|
||||||
if !pOK || p == "" {
|
if !pOK || p == "" {
|
||||||
log.Fatal(name, "transformer config is missing `path` value")
|
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `path` value")
|
||||||
}
|
}
|
||||||
r, rOK := transformer["repository"]
|
r, rOK := transformer["repository"]
|
||||||
if !rOK || r == "" {
|
if !rOK || r == "" {
|
||||||
log.Fatal(name, "transformer config is missing `repository` value")
|
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `repository` value")
|
||||||
}
|
}
|
||||||
m, mOK := transformer["migrations"]
|
m, mOK := transformer["migrations"]
|
||||||
if !mOK || m == "" {
|
if !mOK || m == "" {
|
||||||
log.Fatal(name, "transformer config is missing `migrations` value")
|
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `migrations` value")
|
||||||
}
|
}
|
||||||
mr, mrOK := transformer["rank"]
|
mr, mrOK := transformer["rank"]
|
||||||
if !mrOK || mr == "" {
|
if !mrOK || mr == "" {
|
||||||
log.Fatal(name, "transformer config is missing `rank` value")
|
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `rank` value")
|
||||||
}
|
}
|
||||||
rank, err := strconv.ParseUint(mr, 10, 64)
|
rank, err := strconv.ParseUint(mr, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(name, "migration `rank` can't be converted to an unsigned integer")
|
log.WithField("subCommand", subCommand).Fatal(name, "migration `rank` can't be converted to an unsigned integer")
|
||||||
}
|
}
|
||||||
t, tOK := transformer["type"]
|
t, tOK := transformer["type"]
|
||||||
if !tOK {
|
if !tOK {
|
||||||
log.Fatal(name, "transformer config is missing `type` value")
|
log.WithField("subCommand", subCommand).Fatal(name, "transformer config is missing `type` value")
|
||||||
}
|
}
|
||||||
transformerType := config.GetTransformerType(t)
|
transformerType := config.GetTransformerType(t)
|
||||||
if transformerType == config.UnknownTransformerType {
|
if transformerType == config.UnknownTransformerType {
|
||||||
log.Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`))
|
log.WithField("subCommand", subCommand).Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`))
|
||||||
}
|
}
|
||||||
|
|
||||||
transformers[name] = config.Transformer{
|
transformers[name] = config.Transformer{
|
||||||
|
@ -105,6 +105,7 @@ single config file or in separate command instances using different config files
|
|||||||
Specify config location when executing the command:
|
Specify config location when executing the command:
|
||||||
./vulcanizedb composeAndExecute --config=./environments/config_name.toml`,
|
./vulcanizedb composeAndExecute --config=./environments/config_name.toml`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
composeAndExecute()
|
composeAndExecute()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -114,44 +115,44 @@ func composeAndExecute() {
|
|||||||
prepConfig()
|
prepConfig()
|
||||||
|
|
||||||
// Generate code to build the plugin according to the config file
|
// Generate code to build the plugin according to the config file
|
||||||
log.Info("generating plugin")
|
log.WithField("subCommand", subCommand).Info("generating plugin")
|
||||||
generator, err := p2.NewGenerator(genConfig, databaseConfig)
|
generator, err := p2.NewGenerator(genConfig, databaseConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
err = generator.GenerateExporterPlugin()
|
err = generator.GenerateExporterPlugin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("generating plugin failed")
|
log.WithField("subCommand", subCommand).Debug("generating plugin failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the plugin path and load the plugin
|
// Get the plugin path and load the plugin
|
||||||
_, pluginPath, err := genConfig.GetPluginPaths()
|
_, pluginPath, err := genConfig.GetPluginPaths()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
if !genConfig.Save {
|
if !genConfig.Save {
|
||||||
defer helpers.ClearFiles(pluginPath)
|
defer helpers.ClearFiles(pluginPath)
|
||||||
}
|
}
|
||||||
log.Info("linking plugin", pluginPath)
|
log.WithField("subCommand", subCommand).Info("linking plugin", pluginPath)
|
||||||
plug, err := plugin.Open(pluginPath)
|
plug, err := plugin.Open(pluginPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("linking plugin failed")
|
log.WithField("subCommand", subCommand).Debug("linking plugin failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the `Exporter` symbol from the plugin
|
// Load the `Exporter` symbol from the plugin
|
||||||
log.Info("loading transformers from plugin")
|
log.WithField("subCommand", subCommand).Info("loading transformers from plugin")
|
||||||
symExporter, err := plug.Lookup("Exporter")
|
symExporter, err := plug.Lookup("Exporter")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("loading Exporter symbol failed")
|
log.WithField("subCommand", subCommand).Debug("loading Exporter symbol failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assert that the symbol is of type Exporter
|
// Assert that the symbol is of type Exporter
|
||||||
exporter, ok := symExporter.(Exporter)
|
exporter, ok := symExporter.(Exporter)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug("plugged-in symbol not of type Exporter")
|
log.WithField("subCommand", subCommand).Debug("plugged-in symbol not of type Exporter")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,7 +160,7 @@ func composeAndExecute() {
|
|||||||
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
|
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
|
||||||
|
|
||||||
// Setup bc and db objects
|
// Setup bc and db objects
|
||||||
blockChain := getBlockChain()
|
blockChain := getBlockChain(subCommand)
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
|
|
||||||
// Execute over transformer sets returned by the exporter
|
// Execute over transformer sets returned by the exporter
|
||||||
|
@ -79,6 +79,7 @@ Requires a .toml config file:
|
|||||||
piping = true
|
piping = true
|
||||||
`,
|
`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
contractWatcher()
|
contractWatcher()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -91,7 +92,7 @@ func contractWatcher() {
|
|||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
blockChain := getBlockChain()
|
blockChain := getBlockChain(subCommand)
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
|
|
||||||
var t st.ContractTransformer
|
var t st.ContractTransformer
|
||||||
@ -103,18 +104,18 @@ func contractWatcher() {
|
|||||||
case "full":
|
case "full":
|
||||||
t = ft.NewTransformer(con, blockChain, &db)
|
t = ft.NewTransformer(con, blockChain, &db)
|
||||||
default:
|
default:
|
||||||
log.Fatal("Invalid mode")
|
log.WithField("subCommand", subCommand).Fatal("Invalid mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := t.Init()
|
err := t.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(fmt.Sprintf("Failed to initialized transformer\r\nerr: %v\r\n", err))
|
log.WithField("subCommand", subCommand).Fatal(fmt.Sprintf("Failed to initialized transformer\r\nerr: %v\r\n", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
err = t.Execute()
|
err = t.Execute()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Execution error for transformer:", t.GetConfig().Name, err)
|
log.WithField("subCommand", subCommand).Error("Execution error for transformer:", t.GetConfig().Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,7 @@ must have been composed by the same version of vulcanizedb or else it will not b
|
|||||||
Specify config location when executing the command:
|
Specify config location when executing the command:
|
||||||
./vulcanizedb execute --config=./environments/config_name.toml`,
|
./vulcanizedb execute --config=./environments/config_name.toml`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
execute()
|
execute()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -74,36 +75,36 @@ func execute() {
|
|||||||
// Get the plugin path and load the plugin
|
// Get the plugin path and load the plugin
|
||||||
_, pluginPath, err := genConfig.GetPluginPaths()
|
_, pluginPath, err := genConfig.GetPluginPaths()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Executing plugin %s", pluginPath)
|
fmt.Printf("Executing plugin %s", pluginPath)
|
||||||
log.Info("linking plugin", pluginPath)
|
log.WithField("subCommand", subCommand).Info("linking plugin", pluginPath)
|
||||||
plug, err := plugin.Open(pluginPath)
|
plug, err := plugin.Open(pluginPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("linking plugin failed")
|
log.WithField("subCommand", subCommand).Warn("linking plugin failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the `Exporter` symbol from the plugin
|
// Load the `Exporter` symbol from the plugin
|
||||||
log.Info("loading transformers from plugin")
|
log.WithField("subCommand", subCommand).Info("loading transformers from plugin")
|
||||||
symExporter, err := plug.Lookup("Exporter")
|
symExporter, err := plug.Lookup("Exporter")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("loading Exporter symbol failed")
|
log.WithField("subCommand", subCommand).Warn("loading Exporter symbol failed")
|
||||||
log.Fatal(err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assert that the symbol is of type Exporter
|
// Assert that the symbol is of type Exporter
|
||||||
exporter, ok := symExporter.(Exporter)
|
exporter, ok := symExporter.(Exporter)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("plugged-in symbol not of type Exporter")
|
log.WithField("subCommand", subCommand).Fatal("plugged-in symbol not of type Exporter")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
|
// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
|
||||||
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
|
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()
|
||||||
|
|
||||||
// Setup bc and db objects
|
// Setup bc and db objects
|
||||||
blockChain := getBlockChain()
|
blockChain := getBlockChain(subCommand)
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
|
|
||||||
// Execute over transformer sets returned by the exporter
|
// Execute over transformer sets returned by the exporter
|
||||||
@ -147,7 +148,7 @@ type Exporter interface {
|
|||||||
func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// Execute over the EventTransformerInitializer set using the watcher
|
// Execute over the EventTransformerInitializer set using the watcher
|
||||||
log.Info("executing event transformers")
|
log.WithField("subCommand", subCommand).Info("executing event transformers")
|
||||||
var recheck constants.TransformerExecution
|
var recheck constants.TransformerExecution
|
||||||
if recheckHeadersArg {
|
if recheckHeadersArg {
|
||||||
recheck = constants.HeaderRecheck
|
recheck = constants.HeaderRecheck
|
||||||
@ -164,7 +165,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
|
|||||||
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// Execute over the StorageTransformerInitializer set using the storage watcher
|
// Execute over the StorageTransformerInitializer set using the storage watcher
|
||||||
log.Info("executing storage transformers")
|
log.WithField("subCommand", subCommand).Info("executing storage transformers")
|
||||||
ticker := time.NewTicker(pollingInterval)
|
ticker := time.NewTicker(pollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
@ -177,7 +178,7 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
|||||||
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
|
func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// Execute over the ContractTransformerInitializer set using the contract watcher
|
// Execute over the ContractTransformerInitializer set using the contract watcher
|
||||||
log.Info("executing contract_watcher transformers")
|
log.WithField("subCommand", subCommand).Info("executing contract_watcher transformers")
|
||||||
ticker := time.NewTicker(pollingInterval)
|
ticker := time.NewTicker(pollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
|
@ -49,20 +49,20 @@ Expects ethereum node to be running and requires a .toml config:
|
|||||||
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
|
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
|
||||||
`,
|
`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
fullSync()
|
fullSync()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(fullSyncCmd)
|
rootCmd.AddCommand(fullSyncCmd)
|
||||||
|
|
||||||
fullSyncCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from")
|
fullSyncCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "Block number to start syncing from")
|
||||||
}
|
}
|
||||||
|
|
||||||
func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
|
func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.BlockRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
|
||||||
populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
|
populated, err := history.PopulateMissingBlocks(blockchain, blockRepository, startingBlockNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("backfillAllBlocks: error in populateMissingBlocks: ", err)
|
log.WithField("subCommand", subCommand).Error("backfillAllBlocks: error in populateMissingBlocks: ", err)
|
||||||
}
|
}
|
||||||
missingBlocksPopulated <- populated
|
missingBlocksPopulated <- populated
|
||||||
}
|
}
|
||||||
@ -71,16 +71,16 @@ func fullSync() {
|
|||||||
ticker := time.NewTicker(pollingInterval)
|
ticker := time.NewTicker(pollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
blockChain := getBlockChain()
|
blockChain := getBlockChain(subCommand)
|
||||||
lastBlock, err := blockChain.LastBlock()
|
lastBlock, err := blockChain.LastBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("fullSync: Error getting last block: ", err)
|
log.WithField("subCommand", subCommand).Error("fullSync: Error getting last block: ", err)
|
||||||
}
|
}
|
||||||
if lastBlock.Int64() == 0 {
|
if lastBlock.Int64() == 0 {
|
||||||
log.Fatal("geth initial: state sync not finished")
|
log.WithField("subCommand", subCommand).Fatal("geth initial: state sync not finished")
|
||||||
}
|
}
|
||||||
if startingBlockNumber > lastBlock.Int64() {
|
if startingBlockNumber > lastBlock.Int64() {
|
||||||
log.Fatal("fullSync: starting block number > current block number")
|
log.WithField("subCommand", subCommand).Fatal("fullSync: starting block number > current block number")
|
||||||
}
|
}
|
||||||
|
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
@ -92,11 +92,11 @@ func fullSync() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
window, err := validator.ValidateBlocks()
|
window, err := validator.ValidateBlocks(subCommand)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("fullSync: error in validateBlocks: ", err)
|
log.WithField("subCommand", subCommand).Error("fullSync: error in validateBlocks: ", err)
|
||||||
}
|
}
|
||||||
log.Debug(window.GetString())
|
log.WithField("subCommand", subCommand).Debug(window.GetString())
|
||||||
case <-missingBlocksPopulated:
|
case <-missingBlocksPopulated:
|
||||||
go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber)
|
go backFillAllBlocks(blockChain, blockRepository, missingBlocksPopulated, startingBlockNumber)
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,8 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/utils"
|
"github.com/vulcanize/vulcanizedb/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var subCommand string
|
||||||
|
|
||||||
// headerSyncCmd represents the headerSync command
|
// headerSyncCmd represents the headerSync command
|
||||||
var headerSyncCmd = &cobra.Command{
|
var headerSyncCmd = &cobra.Command{
|
||||||
Use: "headerSync",
|
Use: "headerSync",
|
||||||
@ -50,6 +52,7 @@ Expects ethereum node to be running and requires a .toml config:
|
|||||||
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
|
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
|
||||||
`,
|
`,
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
|
subCommand = cmd.CalledAs()
|
||||||
headerSync()
|
headerSync()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -60,11 +63,11 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
|
func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) {
|
||||||
populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber)
|
populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber, subCommand)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO Lots of possible errors in the call stack above. If errors occur, we still put
|
// TODO Lots of possible errors in the call stack above. If errors occur, we still put
|
||||||
// 0 in the channel, triggering another round
|
// 0 in the channel, triggering another round
|
||||||
log.Error("backfillAllHeaders: Error populating headers: ", err)
|
log.WithField("subCommand", subCommand).Error("backfillAllHeaders: Error populating headers: ", err)
|
||||||
}
|
}
|
||||||
missingBlocksPopulated <- populated
|
missingBlocksPopulated <- populated
|
||||||
}
|
}
|
||||||
@ -72,7 +75,7 @@ func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.H
|
|||||||
func headerSync() {
|
func headerSync() {
|
||||||
ticker := time.NewTicker(pollingInterval)
|
ticker := time.NewTicker(pollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
blockChain := getBlockChain()
|
blockChain := getBlockChain(subCommand)
|
||||||
validateArgs(blockChain)
|
validateArgs(blockChain)
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
|
|
||||||
@ -86,9 +89,9 @@ func headerSync() {
|
|||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
window, err := validator.ValidateHeaders()
|
window, err := validator.ValidateHeaders()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("headerSync: ValidateHeaders failed: ", err)
|
log.WithField("subCommand", subCommand).Error("headerSync: ValidateHeaders failed: ", err)
|
||||||
}
|
}
|
||||||
log.Debug(window.GetString())
|
log.WithField("subCommand", subCommand).Debug(window.GetString())
|
||||||
case n := <-missingBlocksPopulated:
|
case n := <-missingBlocksPopulated:
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
@ -101,12 +104,12 @@ func headerSync() {
|
|||||||
func validateArgs(blockChain *geth.BlockChain) {
|
func validateArgs(blockChain *geth.BlockChain) {
|
||||||
lastBlock, err := blockChain.LastBlock()
|
lastBlock, err := blockChain.LastBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("validateArgs: Error getting last block: ", err)
|
log.WithField("subCommand", subCommand).Error("validateArgs: Error getting last block: ", err)
|
||||||
}
|
}
|
||||||
if lastBlock.Int64() == 0 {
|
if lastBlock.Int64() == 0 {
|
||||||
log.Fatal("geth initial: state sync not finished")
|
log.WithField("subCommand", subCommand).Fatal("geth initial: state sync not finished")
|
||||||
}
|
}
|
||||||
if startingBlockNumber > lastBlock.Int64() {
|
if startingBlockNumber > lastBlock.Int64() {
|
||||||
log.Fatal("starting block number > current block number")
|
log.WithField("subCommand", subCommand).Fatal("starting block number > current block number")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -150,11 +150,11 @@ func initConfig() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBlockChain() *geth.BlockChain {
|
func getBlockChain(subCommand string) *geth.BlockChain {
|
||||||
rawRpcClient, err := rpc.Dial(ipc)
|
rawRpcClient, err := rpc.Dial(ipc)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Could not dial client: ", err)
|
log.WithField("subCommand", subCommand).Fatal(err)
|
||||||
}
|
}
|
||||||
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
|
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
|
||||||
ethClient := ethclient.NewClient(rawRpcClient)
|
ethClient := ethclient.NewClient(rawRpcClient)
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
package history
|
package history
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||||
)
|
)
|
||||||
@ -36,29 +36,29 @@ func NewBlockValidator(blockchain core.BlockChain, blockRepository datastore.Blo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bv BlockValidator) ValidateBlocks() (ValidationWindow, error) {
|
func (bv BlockValidator) ValidateBlocks(subCommand string) (ValidationWindow, error) {
|
||||||
window, err := MakeValidationWindow(bv.blockchain, bv.windowSize)
|
window, err := MakeValidationWindow(bv.blockchain, bv.windowSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error("ValidateBlocks: error creating validation window: ", err)
|
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error creating validation window: ", err)
|
||||||
return ValidationWindow{}, err
|
return ValidationWindow{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
||||||
_, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
|
_, err = RetrieveAndUpdateBlocks(bv.blockchain, bv.blockRepository, blockNumbers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error("ValidateBlocks: error getting and updating blocks: ", err)
|
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error getting and updating blocks: ", err)
|
||||||
return ValidationWindow{}, err
|
return ValidationWindow{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
lastBlock, err := bv.blockchain.LastBlock()
|
lastBlock, err := bv.blockchain.LastBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error("ValidateBlocks: error getting last block: ", err)
|
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error getting last block: ", err)
|
||||||
return ValidationWindow{}, err
|
return ValidationWindow{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64())
|
err = bv.blockRepository.SetBlocksStatus(lastBlock.Int64())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error("ValidateBlocks: error setting block status: ", err)
|
log.WithField("subCommand", subCommand).Error("ValidateBlocks: error setting block status: ", err)
|
||||||
return ValidationWindow{}, err
|
return ValidationWindow{}, err
|
||||||
}
|
}
|
||||||
return window, nil
|
return window, nil
|
||||||
|
@ -34,7 +34,7 @@ var _ = Describe("Blocks validator", func() {
|
|||||||
blocksRepository := fakes.NewMockBlockRepository()
|
blocksRepository := fakes.NewMockBlockRepository()
|
||||||
validator := history.NewBlockValidator(blockChain, blocksRepository, 2)
|
validator := history.NewBlockValidator(blockChain, blocksRepository, 2)
|
||||||
|
|
||||||
window, err := validator.ValidateBlocks()
|
window, err := validator.ValidateBlocks("subCommandForLogs")
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))
|
Expect(window).To(Equal(history.ValidationWindow{LowerBound: 5, UpperBound: 7}))
|
||||||
|
@ -24,25 +24,25 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
|
||||||
)
|
)
|
||||||
|
|
||||||
func PopulateMissingHeaders(blockChain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
|
func PopulateMissingHeaders(blockChain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64, subCommand string) (int, error) {
|
||||||
lastBlock, err := blockChain.LastBlock()
|
lastBlock, err := blockChain.LastBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("PopulateMissingHeaders: Error getting last block: ", err)
|
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting last block: ", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockChain.Node().ID)
|
blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock.Int64(), blockChain.Node().ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("PopulateMissingHeaders: Error getting missing block numbers: ", err)
|
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting missing block numbers: ", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
} else if len(blockNumbers) == 0 {
|
} else if len(blockNumbers) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug(getBlockRangeString(blockNumbers))
|
log.WithField("subCommand", subCommand).Debug(getBlockRangeString(blockNumbers))
|
||||||
_, err = RetrieveAndUpdateHeaders(blockChain, headerRepository, blockNumbers)
|
_, err = RetrieveAndUpdateHeaders(blockChain, headerRepository, blockNumbers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("PopulateMissingHeaders: Error getting/updating headers:", err)
|
log.WithField("subCommand", subCommand).Error("PopulateMissingHeaders: Error getting/updating headers:", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return len(blockNumbers), nil
|
return len(blockNumbers), nil
|
||||||
|
@ -39,7 +39,7 @@ var _ = Describe("Populating headers", func() {
|
|||||||
blockChain.SetLastBlock(big.NewInt(2))
|
blockChain.SetLastBlock(big.NewInt(2))
|
||||||
headerRepository.SetMissingBlockNumbers([]int64{2})
|
headerRepository.SetMissingBlockNumbers([]int64{2})
|
||||||
|
|
||||||
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1)
|
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, "subCommandForLogs")
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(headersAdded).To(Equal(1))
|
Expect(headersAdded).To(Equal(1))
|
||||||
@ -50,7 +50,7 @@ var _ = Describe("Populating headers", func() {
|
|||||||
blockChain.SetLastBlock(big.NewInt(2))
|
blockChain.SetLastBlock(big.NewInt(2))
|
||||||
headerRepository.SetMissingBlockNumbers([]int64{2})
|
headerRepository.SetMissingBlockNumbers([]int64{2})
|
||||||
|
|
||||||
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1)
|
_, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, "subCommandForLogs")
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2})
|
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2})
|
||||||
@ -59,7 +59,7 @@ var _ = Describe("Populating headers", func() {
|
|||||||
It("returns early if the db is already synced up to the head of the chain", func() {
|
It("returns early if the db is already synced up to the head of the chain", func() {
|
||||||
blockChain := fakes.NewMockBlockChain()
|
blockChain := fakes.NewMockBlockChain()
|
||||||
blockChain.SetLastBlock(big.NewInt(2))
|
blockChain.SetLastBlock(big.NewInt(2))
|
||||||
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2)
|
headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 2, "subCommandForLogs")
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(headersAdded).To(Equal(0))
|
Expect(headersAdded).To(Equal(0))
|
||||||
|
Loading…
Reference in New Issue
Block a user