Continuous sync (#23)

This commit is contained in:
Elizabeth 2018-09-19 10:14:49 -05:00 committed by GitHub
parent 60901d9095
commit cfc8773c5d
7 changed files with 175 additions and 109 deletions

View File

@ -17,16 +17,10 @@ package cmd
import (
"log"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/transformers"
)
@ -38,7 +32,7 @@ var backfillMakerLogsCmd = &cobra.Command{
This currently includes logs related to Multi-collateral Dai (frob), Auctions (flip-kick),
and Price Feeds (ETH/USD, MKR/USD, and REP/USD - LogValue).
vulcanize backfillMakerLogs --config environments/local.toml
vulcanizedb backfillMakerLogs --config environments/local.toml
This command expects a light sync to have been run, and the presence of header records in the Vulcanize database.`,
Run: func(cmd *cobra.Command, args []string) {
@ -46,22 +40,8 @@ This command expects a light sync to have been run, and the presence of header r
},
}
func blockChain() *geth.BlockChain {
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, vdbNode, transactionConverter)
}
func backfillMakerLogs() {
blockChain := blockChain()
blockChain := getBlockChain()
db, err := postgres.NewDB(databaseConfig, blockChain.Node())
if err != nil {
log.Fatal("Failed to initialize database.")

110
cmd/continuousLogSync.go Normal file
View File

@ -0,0 +1,110 @@
// Copyright © 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"log"
"time"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers"
shared2 "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
)
// continuousLogSyncCmd represents the continuousLogSync command
var continuousLogSyncCmd = &cobra.Command{
Use: "continuousLogSync",
Short: "Continuously sync logs at the head of the chain",
Long: `Continously syncs logs based on the configured transformers.
vulcanizedb continousLogSync --config environments/local.toml
This command expects a light sync to have been run, and the presence of header records in the Vulcanize database.`,
Run: func(cmd *cobra.Command, args []string) {
syncMakerLogs()
},
}
var transformerNames []string
func syncMakerLogs() {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
blockChain := getBlockChain()
db, err := postgres.NewDB(databaseConfig, blockChain.Node())
if err != nil {
log.Fatal("Failed to initialize database.")
}
watcher := shared.Watcher{
DB: *db,
Blockchain: blockChain,
}
transformerInititalizers := getTransformerInititalizers(transformerNames)
watcher.AddTransformers(transformerInititalizers)
for range ticker.C {
watcher.Execute()
}
}
func getTransformerInititalizers(transformerNames []string) []shared2.TransformerInitializer {
transformerInitializerMap := buildTransformerInitializerMap()
var transformerInitializers []shared2.TransformerInitializer
if transformerNames[0] == "all" {
for _, v := range transformerInitializerMap {
transformerInitializers = append(transformerInitializers, v)
}
} else {
for _, transformerName := range transformerNames {
initializer := transformerInitializerMap[transformerName]
transformerInitializers = append(transformerInitializers, initializer)
}
}
return transformerInitializers
}
func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer {
transformerInitializerMap := make(map[string]shared2.TransformerInitializer)
transformerInitializerMap["bite"] = transformers.BiteTransformerInitializer
transformerInitializerMap["deal"] = transformers.DealTransformerInitializer
transformerInitializerMap["dent"] = transformers.DentTransformerInitializer
transformerInitializerMap["dripDrip"] = transformers.DripDripTransformerInitializer
transformerInitializerMap["dripFileIlk"] = transformers.DripFileIlkTransformerInitializer
transformerInitializerMap["dripFileRepo"] = transformers.DripFileRepoTransformerInitializer
transformerInitializerMap["flipKick"] = transformers.FlipKickTransformerInitializer
transformerInitializerMap["frob"] = transformers.FrobTransformerInitializer
transformerInitializerMap["pitFileDebtCeiling"] = transformers.PitFileDebtCeilingTransformerInitializer
transformerInitializerMap["pitFileIlk"] = transformers.PitFileIlkTransformerInitializer
transformerInitializerMap["pitFileStabilityFee"] = transformers.PitFileStabilityFeeTransformerInitializer
transformerInitializerMap["priceFeed"] = transformers.PriceFeedTransformerInitializer
transformerInitializerMap["tend"] = transformers.TendTransformerInitializer
transformerInitializerMap["vatInit"] = transformers.VatInitTransformerInitializer
return transformerInitializerMap
}
func init() {
rootCmd.AddCommand(continuousLogSyncCmd)
continuousLogSyncCmd.Flags().StringSliceVar(&transformerNames, "transformers", []string{"all"}, "transformer names to be run during this command")
}

View File

@ -18,17 +18,11 @@ import (
"log"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
)
// erc20Cmd represents the erc20 command
@ -56,16 +50,7 @@ Expects an ethereum node to be running and requires a .toml config file:
func watchERC20s() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
client := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(client, node, transactionConverter)
blockChain := getBlockChain()
db, err := postgres.NewDB(databaseConfig, blockChain.Node())
if err != nil {
log.Fatal("Failed to initialize database.")

View File

@ -19,17 +19,12 @@ import (
"os"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/history"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -103,17 +98,3 @@ func validateArgs(blockChain *geth.BlockChain) {
log.Fatal("starting block number > current block number")
}
}
func getBlockChain() *geth.BlockChain {
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
client := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(client)
blockChain := geth.NewBlockChain(client, node, transactionConverter)
return blockChain
}

View File

@ -16,12 +16,21 @@ package cmd
import (
"fmt"
"log"
"os"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
)
var (
@ -34,6 +43,11 @@ var (
endingBlockNumber int64
)
const (
pollingInterval = 7 * time.Second
validationWindow = 15
)
var rootCmd = &cobra.Command{
Use: "vulcanizedb",
PersistentPreRun: database,
@ -102,3 +116,17 @@ func initConfig() {
fmt.Printf("Using config file: %s\n\n", viper.ConfigFileUsed())
}
}
func getBlockChain() *geth.BlockChain {
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, vdbNode, transactionConverter)
}

View File

@ -19,17 +19,11 @@ import (
"os"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/history"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -58,11 +52,6 @@ Expects ethereum node to be running and requires a .toml config:
},
}
const (
pollingInterval = 7 * time.Second
validationWindow = 15
)
func init() {
rootCmd.AddCommand(syncCmd)
@ -76,17 +65,8 @@ func backFillAllBlocks(blockchain core.BlockChain, blockRepository datastore.Blo
func sync() {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
rawRpcClient, err := rpc.Dial(ipc)
if err != nil {
log.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
client := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(client, node, transactionConverter)
blockChain := getBlockChain()
lastBlock := blockChain.LastBlock().Int64()
if lastBlock == 0 {
log.Fatal("geth initial: state sync not finished")

View File

@ -34,38 +34,40 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_init"
)
func TransformerInitializers() []shared.TransformerInitializer {
biteTransformerInitializer := bite.BiteTransformerInitializer{Config: bite.BiteConfig}
dealTransformerInitializer := deal.DealTransformerInitializer{Config: deal.Config}
dentTransformerInitializer := dent.DentTransformerInitializer{Config: dent.DentConfig}
dripDripTransformerInitializer := drip_drip.DripDripTransformerInitializer{Config: drip_drip.DripDripConfig}
flipKickTransformerInitializer := flip_kick.FlipKickTransformerInitializer{Config: flip_kick.FlipKickConfig}
frobTransformerInitializer := frob.FrobTransformerInitializer{Config: frob.FrobConfig}
dripFileConfig := drip_file.DripFileConfig
dripFileIlkTransformerInitializer := ilk2.DripFileIlkTransformerInitializer{Config: dripFileConfig}
dripFileRepoTransformerInitializer := repo.DripFileRepoTransformerInitializer{Config: dripFileConfig}
pitFileConfig := pit_file.PitFileConfig
pitFileDebtCeilingTransformerInitializer := debt_ceiling.PitFileDebtCeilingTransformerInitializer{Config: pitFileConfig}
pitFileIlkTransformerInitializer := ilk.PitFileIlkTransformerInitializer{Config: pitFileConfig}
pitFileStabilityFeeTransformerInitializer := stability_fee.PitFileStabilityFeeTransformerInitializer{Config: pitFileConfig}
priceFeedTransformerInitializer := price_feeds.PriceFeedTransformerInitializer{Config: price_feeds.PriceFeedConfig}
tendTransformerInitializer := tend.TendTransformerInitializer{Config: tend.TendConfig}
vatInitTransformerInitializer := vat_init.VatInitTransformerInitializer{Config: vat_init.VatInitConfig}
var (
BiteTransformerInitializer = bite.BiteTransformerInitializer{Config: bite.BiteConfig}.NewBiteTransformer
DealTransformerInitializer = deal.DealTransformerInitializer{Config: deal.Config}.NewDealTransformer
DentTransformerInitializer = dent.DentTransformerInitializer{Config: dent.DentConfig}.NewDentTransformer
DripDripTransformerInitializer = drip_drip.DripDripTransformerInitializer{Config: drip_drip.DripDripConfig}.NewDripDripTransformer
dripFileConfig = drip_file.DripFileConfig
DripFileIlkTransformerInitializer = ilk2.DripFileIlkTransformerInitializer{Config: dripFileConfig}.NewDripFileIlkTransformer
DripFileRepoTransformerInitializer = repo.DripFileRepoTransformerInitializer{Config: dripFileConfig}.NewDripFileRepoTransformer
FlipKickTransformerInitializer = flip_kick.FlipKickTransformerInitializer{Config: flip_kick.FlipKickConfig}.NewFlipKickTransformer
FrobTransformerInitializer = frob.FrobTransformerInitializer{Config: frob.FrobConfig}.NewFrobTransformer
pitFileConfig = pit_file.PitFileConfig
PitFileDebtCeilingTransformerInitializer = debt_ceiling.PitFileDebtCeilingTransformerInitializer{Config: pitFileConfig}.NewPitFileDebtCeilingTransformer
PitFileIlkTransformerInitializer = ilk.PitFileIlkTransformerInitializer{Config: pitFileConfig}.NewPitFileIlkTransformer
PitFileStabilityFeeTransformerInitializer = stability_fee.PitFileStabilityFeeTransformerInitializer{Config: pitFileConfig}.NewPitFileStabilityFeeTransformer
PriceFeedTransformerInitializer = price_feeds.PriceFeedTransformerInitializer{Config: price_feeds.PriceFeedConfig}.NewPriceFeedTransformer
TendTransformerInitializer = tend.TendTransformerInitializer{Config: tend.TendConfig}.NewTendTransformer
VatInitTransformerInitializer = vat_init.VatInitTransformerInitializer{Config: vat_init.VatInitConfig}.NewVatInitTransformer
)
func TransformerInitializers() []shared.TransformerInitializer {
return []shared.TransformerInitializer{
biteTransformerInitializer.NewBiteTransformer,
dealTransformerInitializer.NewDealTransformer,
dentTransformerInitializer.NewDentTransformer,
dripFileIlkTransformerInitializer.NewDripFileIlkTransformer,
dripFileRepoTransformerInitializer.NewDripFileRepoTransformer,
dripDripTransformerInitializer.NewDripDripTransformer,
flipKickTransformerInitializer.NewFlipKickTransformer,
frobTransformerInitializer.NewFrobTransformer,
pitFileDebtCeilingTransformerInitializer.NewPitFileDebtCeilingTransformer,
pitFileIlkTransformerInitializer.NewPitFileIlkTransformer,
pitFileStabilityFeeTransformerInitializer.NewPitFileStabilityFeeTransformer,
priceFeedTransformerInitializer.NewPriceFeedTransformer,
tendTransformerInitializer.NewTendTransformer,
vatInitTransformerInitializer.NewVatInitTransformer,
BiteTransformerInitializer,
DealTransformerInitializer,
DentTransformerInitializer,
DripDripTransformerInitializer,
DripFileIlkTransformerInitializer,
DripFileRepoTransformerInitializer,
FlipKickTransformerInitializer,
FrobTransformerInitializer,
PitFileDebtCeilingTransformerInitializer,
PitFileIlkTransformerInitializer,
PitFileStabilityFeeTransformerInitializer,
PriceFeedTransformerInitializer,
TendTransformerInitializer,
VatInitTransformerInitializer,
}
}