diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index f1e4bf95..4784b92c 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -22,7 +22,6 @@ import ( syn "sync" "time" - "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -186,12 +185,11 @@ func composeAndExecute() { log.Debug("fetching storage diffs from geth pub sub") rpcClient, _ := getClients() stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) - payloadChan := make(chan statediff.Payload) - storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan) + storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer) sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) - go watchEthStorage(&sw, &wg) + go watchEthStorage(sw, &wg) default: log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} @@ -199,7 +197,7 @@ func composeAndExecute() { sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) - go watchEthStorage(&sw, &wg) + go watchEthStorage(sw, &wg) } } diff --git a/cmd/execute.go b/cmd/execute.go index 097fa9f1..27fdf88f 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -24,7 +24,6 @@ import ( "time" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -134,12 +133,11 @@ func execute() { log.Debug("fetching storage diffs from geth pub sub") rpcClient, _ := getClients() stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) - payloadChan := make(chan statediff.Payload) - storageFetcher := fetcher.NewGethRPCStorageFetcher(&stateDiffStreamer, payloadChan) + storageFetcher := fetcher.NewGethRPCStorageFetcher(stateDiffStreamer) sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) - go watchEthStorage(&sw, &wg) + go watchEthStorage(sw, &wg) default: log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} @@ -147,7 +145,7 @@ func execute() { sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) - go watchEthStorage(&sw, &wg) + go watchEthStorage(sw, &wg) } } @@ -192,13 +190,14 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { LogWithCommand.Info("executing storage transformers") on := viper.GetBool("storageBackFill.on") if on { - go backFillStorage(w) + backFillStorage(w) } w.Execute(queueRecheckInterval, on) } func backFillStorage(w watcher.IStorageWatcher) { // configure archival rpc client + // move this all into the storage watcher? archivalRPCPath := viper.GetString("storageBackFill.rpcPath") if archivalRPCPath == "" { LogWithCommand.Fatal(errors.New("storage backfill is turned on but no rpc path is provided")) @@ -207,12 +206,12 @@ func backFillStorage(w watcher.IStorageWatcher) { if dialErr != nil { LogWithCommand.Fatal(dialErr) } - rpcClient := client.NewRpcClient(rawRPCClient, archivalRPCPath) + rpcClient := client.NewRPCClient(rawRPCClient, archivalRPCPath) // find min deployment block - minDeploymentBlock := viper.GetInt("storageBackFill.startingBlock") + minDeploymentBlock := constants.GetMinDeploymentBlock() stateDiffFetcher := fetcher.NewStateDiffFetcher(rpcClient) - backFiller := storage.NewStorageBackFiller(stateDiffFetcher) - w.BackFill(backFiller, minDeploymentBlock) + backFiller := storage.NewStorageBackFiller(stateDiffFetcher, uint64(minDeploymentBlock), storage.DefaultMaxBatchSize) + go w.BackFill(backFiller) } func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) { diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md index 007120ed..6e348646 100644 --- a/documentation/custom-transformers.md +++ b/documentation/custom-transformers.md @@ -205,8 +205,6 @@ To do so, add the following fields to the config file. [storageBackFill] on = false rpcPath = "" - startingBlock = 0 ``` - `on` is set to `true` to turn the backfill process on -- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs -- `startingBlock` is the block height at which we want to begin the backfill process; the height of the earliest contract deployment +- `rpcPath` is the websocket or ipc path to the modified archival geth node that exposes the `StateDiffAt` rpc endpoint we can use to backfill storage diffs \ No newline at end of file diff --git a/libraries/shared/constants/external.go b/libraries/shared/constants/external.go new file mode 100644 index 00000000..c53c4629 --- /dev/null +++ b/libraries/shared/constants/external.go @@ -0,0 +1,90 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package constants + +import ( + "fmt" + "math" + + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +var initialized = false + +func initConfig() { + if initialized { + return + } + + if err := viper.ReadInConfig(); err == nil { + log.Info("Using config file:", viper.ConfigFileUsed()) + } else { + panic(fmt.Sprintf("Could not find environment file: %v", err)) + } + initialized = true +} + +// GetMinDeploymentBlock gets the minimum deployment block for multiple contracts from config +func GetMinDeploymentBlock() uint64 { + initConfig() + contractNames := getContractNames() + if len(contractNames) < 1 { + log.Fatalf("No contracts supplied") + } + minBlock := uint64(math.MaxUint64) + for _, c := range contractNames { + deployed := getDeploymentBlock(c) + if deployed < minBlock { + minBlock = deployed + } + } + return minBlock +} + +func getContractNames() []string { + initConfig() + transformerNames := viper.GetStringSlice("exporter.transformerNames") + contractNames := make([]string, 0) + for _, transformerName := range transformerNames { + configKey := "exporter." + transformerName + ".contracts" + names := viper.GetStringSlice(configKey) + for _, name := range names { + contractNames = appendNoDuplicates(transformerNames, name) + } + } + return contractNames +} + +func appendNoDuplicates(strSlice []string, str string) []string { + for _, strInSlice := range strSlice { + if strInSlice == str { + return strSlice + } + } + return append(strSlice, str) +} + +func getDeploymentBlock(contractName string) uint64 { + configKey := "contract." + contractName + ".deployed" + value := viper.GetInt64(configKey) + if value < 0 { + log.Infof("No deployment block configured for contract \"%v\", defaulting to 0.", contractName) + return 0 + } + return uint64(value) +} diff --git a/libraries/shared/factories/event/repository.go b/libraries/shared/factories/event/repository.go index c8fd397c..a4bb5b3e 100644 --- a/libraries/shared/factories/event/repository.go +++ b/libraries/shared/factories/event/repository.go @@ -19,9 +19,10 @@ package event import ( "database/sql/driver" "fmt" - "github.com/vulcanize/vulcanizedb/utils" "strings" + "github.com/vulcanize/vulcanizedb/utils" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) diff --git a/libraries/shared/factories/event/repository_test.go b/libraries/shared/factories/event/repository_test.go index 93dfb789..10f3c0e9 100644 --- a/libraries/shared/factories/event/repository_test.go +++ b/libraries/shared/factories/event/repository_test.go @@ -18,6 +18,8 @@ package event_test import ( "fmt" + "math/big" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/libraries/shared/factories/event" @@ -26,7 +28,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" - "math/big" ) var _ = Describe("Repository", func() { diff --git a/libraries/shared/fetcher/state_diff_fetcher.go b/libraries/shared/fetcher/state_diff_fetcher.go index fe4c16c6..fa5ebdfc 100644 --- a/libraries/shared/fetcher/state_diff_fetcher.go +++ b/libraries/shared/fetcher/state_diff_fetcher.go @@ -18,6 +18,7 @@ package fetcher import ( "fmt" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/eth/client" diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go index ffdb9386..fe21b2e2 100644 --- a/libraries/shared/storage/backfiller.go +++ b/libraries/shared/storage/backfiller.go @@ -30,8 +30,8 @@ import ( ) const ( - DefaultMaxBatchSize uint64 = 5000 - defaultMaxBatchNumber int64 = 100 + DefaultMaxBatchSize uint64 = 1000 + defaultMaxBatchNumber int64 = 10 ) // BackFiller is the backfilling interface @@ -64,6 +64,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi if endingBlock < bf.startingBlock { return errors.New("backfill: ending block number needs to be greater than starting block number") } + logrus.Infof("going to fill in gap from %d to %d", bf.startingBlock, endingBlock) // break the range up into bins of smaller ranges length := endingBlock - bf.startingBlock + 1 numberOfBins := length / bf.batchSize @@ -88,7 +89,8 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) + processingDone := make(chan [2]uint64) + forwardDone := make(chan bool) // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range go func() { @@ -97,8 +99,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi // wait for one to finish before starting the next if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber { // this blocks until a process signals it has finished - // immediately forwards the signal to the normal listener so that it keeps the correct count - processingDone <- <-processingDone + <-forwardDone } go func(blockHeights []uint64) { payloads, fetchErr := bf.fetcher.FetchStateDiffsAt(blockHeights) @@ -131,7 +132,7 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi } } // when this goroutine is done, send out a signal - processingDone <- true + processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} }(blockHeights) } }() @@ -143,8 +144,14 @@ func (bf *backFiller) BackFill(endingBlock uint64, backFill chan utils.StorageDi goroutinesFinished := 0 for { select { - case <-processingDone: + case doneWithHeights := <-processingDone: atomic.AddInt64(&activeCount, -1) + select { + // if we are waiting for a process to finish, signal that one has + case forwardDone <- true: + default: + } + logrus.Infof("finished fetching gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1]) goroutinesFinished++ if goroutinesFinished == int(numberOfBins) { done <- true diff --git a/libraries/shared/storage/utils/keys_loader.go b/libraries/shared/storage/utils/keys_loader.go index 0142b564..e1252643 100644 --- a/libraries/shared/storage/utils/keys_loader.go +++ b/libraries/shared/storage/utils/keys_loader.go @@ -17,9 +17,10 @@ package utils import ( + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "math/big" ) const ( diff --git a/libraries/shared/test_data/test_helpers.go b/libraries/shared/test_data/test_helpers.go index 4295f4c6..b819635e 100644 --- a/libraries/shared/test_data/test_helpers.go +++ b/libraries/shared/test_data/test_helpers.go @@ -1,13 +1,14 @@ package test_data import ( + "math/rand" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" - "math/rand" ) // Create a header sync log to reference in an event, returning inserted header sync log diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go index b77870c7..df1f4e56 100644 --- a/pkg/contract_watcher/header/transformer/transformer.go +++ b/pkg/contract_watcher/header/transformer/transformer.go @@ -20,6 +20,7 @@ import ( "database/sql" "errors" "fmt" + "math" "strings" "github.com/ethereum/go-ethereum/common" @@ -105,7 +106,7 @@ func (tr *Transformer) Init() error { tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs - tr.Start = 100000000000 + tr.Start = math.MaxInt64 // Iterate through all internal contract addresses for contractAddr := range tr.Config.Addresses {