diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 633993fa..eea5e686 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -17,6 +17,10 @@ package cmd import ( + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/fs" "os" "plugin" syn "sync" @@ -25,9 +29,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/fs" p2 "github.com/vulcanize/vulcanizedb/pkg/plugin" "github.com/vulcanize/vulcanizedb/pkg/plugin/helpers" "github.com/vulcanize/vulcanizedb/utils" @@ -179,12 +181,26 @@ func composeAndExecute() { } if len(ethStorageInitializers) > 0 { - tailer := fs.FileTailer{Path: storageDiffsPath} - storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewStorageWatcher(storageFetcher, &db) - sw.AddTransformers(ethStorageInitializers) - wg.Add(1) - go watchEthStorage(&sw, &wg) + switch storageDiffsSource { + case "geth": + log.Debug("fetching storage diffs from geth pub sub") + rpcClient, _ := getClients() + stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) + payloadChan := make(chan statediff.Payload) + storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) + sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + default: + log.Debug("fetching storage diffs from csv") + tailer := fs.FileTailer{Path: storageDiffsPath} + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + } } if len(ethContractInitializers) > 0 { diff --git a/cmd/execute.go b/cmd/execute.go index b416cbb5..d04bc205 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -18,6 +18,10 @@ package cmd import ( "fmt" + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/fs" "plugin" syn "sync" "time" @@ -26,11 +30,9 @@ import ( "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/fs" "github.com/vulcanize/vulcanizedb/utils" ) @@ -123,12 +125,26 @@ func execute() { } if len(ethStorageInitializers) > 0 { - tailer := fs.FileTailer{Path: storageDiffsPath} - storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewStorageWatcher(storageFetcher, &db) - sw.AddTransformers(ethStorageInitializers) - wg.Add(1) - go watchEthStorage(&sw, &wg) + switch storageDiffsSource { + case "geth": + log.Debug("fetching storage diffs from geth pub sub") + rpcClient, _ := getClients() + stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) + payloadChan := make(chan statediff.Payload) + storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) + sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + default: + log.Debug("fetching storage diffs from csv") + tailer := fs.FileTailer{Path: storageDiffsPath} + storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) + sw := watcher.NewStorageWatcher(storageFetcher, &db) + sw.AddTransformers(ethStorageInitializers) + wg.Add(1) + go watchEthStorage(&sw, &wg) + } } if len(ethContractInitializers) > 0 { @@ -166,7 +182,7 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { } } -func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { +func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) { defer wg.Done() // Execute over the StorageTransformerInitializer set using the storage watcher LogWithCommand.Info("executing storage transformers") @@ -174,8 +190,8 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) { defer ticker.Stop() for range ticker.C { errs := make(chan error) - rows := make(chan storageUtils.StorageDiffRow) - w.Execute(rows, errs, queueRecheckInterval) + diffs := make(chan storageUtils.StorageDiff) + w.Execute(diffs, errs, queueRecheckInterval) } } diff --git a/cmd/root.go b/cmd/root.go index c9250c3f..f8174f74 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -48,6 +48,7 @@ var ( recheckHeadersArg bool SubCommand string LogWithCommand log.Entry + storageDiffsSource string ) const ( @@ -80,6 +81,7 @@ func setViperConfigs() { ipc = viper.GetString("client.ipcpath") levelDbPath = viper.GetString("client.leveldbpath") storageDiffsPath = viper.GetString("filesystem.storageDiffsPath") + storageDiffsSource = viper.GetString("storageDiffs.source") databaseConfig = config.Database{ Name: viper.GetString("database.name"), Hostname: viper.GetString("database.hostname"), @@ -118,6 +120,7 @@ func init() { rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file") rootCmd.PersistentFlags().String("client-levelDbPath", "", "location of levelDb chaindata") rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file") + rootCmd.PersistentFlags().String("storageDiffs-source", "csv", "where to get the state diffs: csv or geth") rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") @@ -129,6 +132,7 @@ func init() { viper.BindPFlag("client.ipcPath", rootCmd.PersistentFlags().Lookup("client-ipcPath")) viper.BindPFlag("client.levelDbPath", rootCmd.PersistentFlags().Lookup("client-levelDbPath")) viper.BindPFlag("filesystem.storageDiffsPath", rootCmd.PersistentFlags().Lookup("filesystem-storageDiffsPath")) + viper.BindPFlag("storageDiffs.source", rootCmd.PersistentFlags().Lookup("storageDiffs-source")) viper.BindPFlag("exporter.fileName", rootCmd.PersistentFlags().Lookup("exporter-name")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) } @@ -152,6 +156,14 @@ func initConfig() { } func getBlockChain() *geth.BlockChain { + rpcClient, ethClient := getClients() + vdbEthClient := client.NewEthClient(ethClient) + vdbNode := node.MakeNode(rpcClient) + transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) + return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) +} + +func getClients() (client.RpcClient, *ethclient.Client) { rawRpcClient, err := rpc.Dial(ipc) if err != nil { @@ -159,8 +171,6 @@ func getBlockChain() *geth.BlockChain { } rpcClient := client.NewRpcClient(rawRpcClient, ipc) ethClient := ethclient.NewClient(rawRpcClient) - vdbEthClient := client.NewEthClient(ethClient) - vdbNode := node.MakeNode(rpcClient) - transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) - return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) + + return rpcClient, ethClient } diff --git a/go.mod b/go.mod index c7cd332b..d52b3e5a 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,11 @@ require ( github.com/ethereum/go-ethereum v1.9.5 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect + github.com/go-sql-driver/mysql v1.4.1 // indirect + github.com/golang/protobuf v1.3.2 // indirect github.com/gorilla/websocket v1.4.1 // indirect - github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 // indirect - github.com/hashicorp/golang-lru v0.5.1 + github.com/hashicorp/golang-lru v0.5.3 + github.com/howeyc/fsnotify v0.9.0 // indirect github.com/hpcloud/tail v1.0.0 github.com/huin/goupnp v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.1 // indirect @@ -42,8 +44,14 @@ require ( github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tyler-smith/go-bip39 v1.0.2 // indirect github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect + golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 // indirect golang.org/x/net v0.0.0-20190603091049-60506f45cf65 golang.org/x/sync v0.0.0-20190423024810-112230192c58 gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 + gopkg.in/urfave/cli.v1 v1.0.0-00010101000000-000000000000 // indirect ) + +replace github.com/ethereum/go-ethereum => github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 + +replace gopkg.in/urfave/cli.v1 => gopkg.in/urfave/cli.v1 v1.20.0 diff --git a/go.sum b/go.sum index ffc6e746..7a96f988 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1 github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/howeyc/fsnotify v0.9.0 h1:0gtV5JmOKH4A8SsFxG2BczSeXWWPvcMT0euZt5gDAxY= @@ -241,6 +243,10 @@ github.com/tyler-smith/go-bip39 v1.0.0/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2 github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8= github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA= +github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA= +github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug= +github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug= github.com/vulcanize/vulcanizedb v0.0.5/go.mod h1:utXkheCL9VjTfmuivuvRiAAyHh54GSN9XRQNEbFCA8k= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees= @@ -252,6 +258,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 h1:4Yo9XtTfxfBCecLiBW8TYsFIdN7TkDhjGLWetFo4JSo= +golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U= @@ -281,6 +289,8 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aW golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69 h1:rOhMmluY6kLMhdnrivzec6lLgaVbMHMn2ISQXJeJ5EM= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -300,6 +310,7 @@ gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff h1:uuol9OUzSv gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/libraries/shared/factories/storage/transformer.go b/libraries/shared/factories/storage/transformer.go index d7cf6275..b38e5760 100644 --- a/libraries/shared/factories/storage/transformer.go +++ b/libraries/shared/factories/storage/transformer.go @@ -18,7 +18,6 @@ package storage import ( "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" @@ -26,9 +25,9 @@ import ( ) type Transformer struct { - Address common.Address - Mappings storage.Mappings - Repository Repository + HashedAddress common.Hash + Mappings storage.Mappings + Repository Repository } func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.StorageTransformer { @@ -37,18 +36,18 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Stora return transformer } -func (transformer Transformer) ContractAddress() common.Address { - return transformer.Address +func (transformer Transformer) KeccakContractAddress() common.Hash { + return transformer.HashedAddress } -func (transformer Transformer) Execute(row utils.StorageDiffRow) error { - metadata, lookupErr := transformer.Mappings.Lookup(row.StorageKey) +func (transformer Transformer) Execute(diff utils.StorageDiff) error { + metadata, lookupErr := transformer.Mappings.Lookup(diff.StorageKey) if lookupErr != nil { return lookupErr } - value, decodeErr := utils.Decode(row, metadata) + value, decodeErr := utils.Decode(diff, metadata) if decodeErr != nil { return decodeErr } - return transformer.Repository.Create(row.BlockHeight, row.BlockHash.Hex(), metadata, value) + return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value) } diff --git a/libraries/shared/factories/storage/transformer_test.go b/libraries/shared/factories/storage/transformer_test.go index 59982a20..8fe87e52 100644 --- a/libraries/shared/factories/storage/transformer_test.go +++ b/libraries/shared/factories/storage/transformer_test.go @@ -20,7 +20,6 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/factories/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" @@ -38,21 +37,21 @@ var _ = Describe("Storage transformer", func() { mappings = &mocks.MockMappings{} repository = &mocks.MockStorageRepository{} t = storage.Transformer{ - Address: common.Address{}, - Mappings: mappings, - Repository: repository, + HashedAddress: common.Hash{}, + Mappings: mappings, + Repository: repository, } }) It("returns the contract address being watched", func() { - fakeAddress := common.HexToAddress("0x12345") - t.Address = fakeAddress + fakeAddress := utils.HexToKeccak256Hash("0x12345") + t.HashedAddress = fakeAddress - Expect(t.ContractAddress()).To(Equal(fakeAddress)) + Expect(t.KeccakContractAddress()).To(Equal(fakeAddress)) }) It("looks up metadata for storage key", func() { - t.Execute(utils.StorageDiffRow{}) + t.Execute(utils.StorageDiff{}) Expect(mappings.LookupCalled).To(BeTrue()) }) @@ -60,7 +59,7 @@ var _ = Describe("Storage transformer", func() { It("returns error if lookup fails", func() { mappings.LookupErr = fakes.FakeError - err := t.Execute(utils.StorageDiffRow{}) + err := t.Execute(utils.StorageDiff{}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -72,12 +71,12 @@ var _ = Describe("Storage transformer", func() { rawValue := common.HexToAddress("0x12345") fakeBlockNumber := 123 fakeBlockHash := "0x67890" - fakeRow := utils.StorageDiffRow{ - Contract: common.Address{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + fakeRow := utils.StorageDiff{ + HashedAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), } err := t.Execute(fakeRow) @@ -95,7 +94,7 @@ var _ = Describe("Storage transformer", func() { mappings.Metadata = fakeMetadata repository.CreateErr = fakes.FakeError - err := t.Execute(utils.StorageDiffRow{StorageValue: rawValue.Hash()}) + err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) @@ -120,12 +119,12 @@ var _ = Describe("Storage transformer", func() { It("passes the decoded data items to the repository", func() { mappings.Metadata = fakeMetadata - fakeRow := utils.StorageDiffRow{ - Contract: common.Address{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + fakeRow := utils.StorageDiff{ + HashedAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), } err := t.Execute(fakeRow) @@ -144,7 +143,7 @@ var _ = Describe("Storage transformer", func() { mappings.Metadata = fakeMetadata repository.CreateErr = fakes.FakeError - err := t.Execute(utils.StorageDiffRow{StorageValue: rawValue.Hash()}) + err := t.Execute(utils.StorageDiff{StorageValue: rawValue.Hash()}) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/libraries/shared/fetcher/storage_fetcher.go b/libraries/shared/fetcher/csv_tail_storage_fetcher.go similarity index 80% rename from libraries/shared/fetcher/storage_fetcher.go rename to libraries/shared/fetcher/csv_tail_storage_fetcher.go index 48960dcf..01dc3caa 100644 --- a/libraries/shared/fetcher/storage_fetcher.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher.go @@ -17,18 +17,12 @@ package fetcher import ( - "strings" - - log "github.com/sirupsen/logrus" - + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/fs" + "strings" ) -type IStorageFetcher interface { - FetchStorageDiffs(chan<- utils.StorageDiffRow, chan<- error) -} - type CsvTailStorageFetcher struct { tailer fs.Tailer } @@ -37,18 +31,18 @@ func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher { return CsvTailStorageFetcher{tailer: tailer} } -func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { +func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { t, tailErr := storageFetcher.tailer.Tail() if tailErr != nil { errs <- tailErr } - log.Debug("fetching storage diffs...") + logrus.Debug("fetching storage diffs...") for line := range t.Lines { - row, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) + diff, parseErr := utils.FromParityCsvRow(strings.Split(line.Text, ",")) if parseErr != nil { errs <- parseErr } else { - out <- row + out <- diff } } } diff --git a/libraries/shared/fetcher/storage_fetcher_test.go b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go similarity index 85% rename from libraries/shared/fetcher/storage_fetcher_test.go rename to libraries/shared/fetcher/csv_tail_storage_fetcher_test.go index 20e9a6b5..eaca0eb3 100644 --- a/libraries/shared/fetcher/storage_fetcher_test.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go @@ -18,30 +18,28 @@ package fetcher_test import ( "fmt" - "strings" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/hpcloud/tail" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/fakes" + "strings" + "time" ) var _ = Describe("Csv Tail Storage Fetcher", func() { var ( errorsChannel chan error mockTailer *fakes.MockTailer - rowsChannel chan utils.StorageDiffRow + diffsChannel chan utils.StorageDiff storageFetcher fetcher.CsvTailStorageFetcher ) BeforeEach(func() { errorsChannel = make(chan error) - rowsChannel = make(chan utils.StorageDiffRow) + diffsChannel = make(chan utils.StorageDiff) mockTailer = fakes.NewMockTailer() storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer) }) @@ -49,7 +47,7 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { It("adds error to errors channel if tailing file fails", func(done Done) { mockTailer.TailErr = fakes.FakeError - go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + go storageFetcher.FetchStorageDiffs(diffsChannel, errorsChannel) Expect(<-errorsChannel).To(MatchError(fakes.FakeError)) close(done) @@ -58,24 +56,24 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { It("adds parsed csv row to rows channel for storage diff", func(done Done) { line := getFakeLine() - go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + go storageFetcher.FetchStorageDiffs(diffsChannel, errorsChannel) mockTailer.Lines <- line - expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) + expectedRow, err := utils.FromParityCsvRow(strings.Split(line.Text, ",")) Expect(err).NotTo(HaveOccurred()) - Expect(<-rowsChannel).To(Equal(expectedRow)) + Expect(<-diffsChannel).To(Equal(expectedRow)) close(done) }) It("adds error to errors channel if parsing csv fails", func(done Done) { line := &tail.Line{Text: "invalid"} - go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel) + go storageFetcher.FetchStorageDiffs(diffsChannel, errorsChannel) mockTailer.Lines <- line Expect(<-errorsChannel).To(HaveOccurred()) select { - case <-rowsChannel: + case <-diffsChannel: Fail("value passed to rows channel on error") default: Succeed() diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go new file mode 100644 index 00000000..43438ee3 --- /dev/null +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -0,0 +1,81 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fetcher + +import ( + "fmt" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" +) + +type GethRpcStorageFetcher struct { + statediffPayloadChan chan statediff.Payload + streamer streamer.Streamer +} + +func NewGethRpcStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRpcStorageFetcher { + return GethRpcStorageFetcher{ + statediffPayloadChan: statediffPayloadChan, + streamer: streamer, + } +} + +func (fetcher GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { + ethStatediffPayloadChan := fetcher.statediffPayloadChan + clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) + if clientSubErr != nil { + errs <- clientSubErr + panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr)) + } + logrus.Info("Successfully created a geth client subscription: ", clientSubscription) + + for { + diff := <-ethStatediffPayloadChan + logrus.Trace("received a statediff") + stateDiff := new(statediff.StateDiff) + decodeErr := rlp.DecodeBytes(diff.StateDiffRlp, stateDiff) + if decodeErr != nil { + logrus.Warn("Error decoding state diff into RLP: ", decodeErr) + errs <- decodeErr + } + + accounts := getAccountsFromDiff(*stateDiff) + logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber)) + for _, account := range accounts { + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) + for _, storage := range account.Storage { + diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) + logrus.Trace("adding storage diff to out channel", + "keccak of address: ", diff.HashedAddress.Hex(), + "block height: ", diff.BlockHeight, + "storage key: ", diff.StorageKey.Hex(), + "storage value: ", diff.StorageValue.Hex()) + if formatErr != nil { + errs <- formatErr + } + + out <- diff + } + } + } +} + +func getAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff { + accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...) + return append(accounts, stateDiff.DeletedAccounts...) +} diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go new file mode 100644 index 00000000..b1b1aa10 --- /dev/null +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -0,0 +1,172 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fetcher_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +type MockStoragediffStreamer struct { + subscribeError error + PassedPayloadChan chan statediff.Payload + streamPayloads []statediff.Payload +} + +func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + clientSubscription := rpc.ClientSubscription{} + streamer.PassedPayloadChan = statediffPayloadChan + + go func() { + for _, payload := range streamer.streamPayloads { + streamer.PassedPayloadChan <- payload + } + }() + + return &clientSubscription, streamer.subscribeError +} + +func (streamer *MockStoragediffStreamer) SetSubscribeError(err error) { + streamer.subscribeError = err +} + +func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payload) { + streamer.streamPayloads = payloads +} + +var _ = Describe("Geth RPC Storage Fetcher", func() { + var streamer MockStoragediffStreamer + var statediffPayloadChan chan statediff.Payload + var statediffFetcher fetcher.GethRpcStorageFetcher + var storagediffChan chan utils.StorageDiff + var errorChan chan error + + BeforeEach(func() { + streamer = MockStoragediffStreamer{} + statediffPayloadChan = make(chan statediff.Payload, 1) + statediffFetcher = fetcher.NewGethRpcStorageFetcher(&streamer, statediffPayloadChan) + storagediffChan = make(chan utils.StorageDiff) + errorChan = make(chan error) + }) + + It("adds errors to error channel if the RPC subscription fails and panics", func(done Done) { + streamer.SetSubscribeError(fakes.FakeError) + + go func() { + failedSub := func() { + statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) + } + Expect(failedSub).To(Panic()) + }() + + Expect(<-errorChan).To(MatchError(fakes.FakeError)) + close(done) + }) + + It("streams StatediffPayloads from a Geth RPC subscription", func(done Done) { + streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) + + streamedPayload := <-statediffPayloadChan + Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload)) + Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan)) + close(done) + }) + + It("adds errors to error channel if decoding the state diff RLP fails", func(done Done) { + badStatediffPayload := statediff.Payload{} + streamer.SetPayloads([]statediff.Payload{badStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) + + Expect(<-errorChan).To(MatchError("EOF")) + + close(done) + }) + + It("adds parsed statediff payloads to the rows channel", func(done Done) { + streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) + + height := test_data.BlockNumber + intHeight := int(height.Int64()) + createdExpectedStorageDiff := utils.StorageDiff{ + HashedAddress: common.BytesToHash(test_data.ContractLeafKey[:]), + BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHeight: intHeight, + StorageKey: common.BytesToHash(test_data.StorageKey), + StorageValue: common.BytesToHash(test_data.SmallStorageValue), + } + updatedExpectedStorageDiff := utils.StorageDiff{ + HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), + BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHeight: intHeight, + StorageKey: common.BytesToHash(test_data.StorageKey), + StorageValue: common.BytesToHash(test_data.LargeStorageValue), + } + deletedExpectedStorageDiff := utils.StorageDiff{ + HashedAddress: common.BytesToHash(test_data.AnotherContractLeafKey[:]), + BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHeight: intHeight, + StorageKey: common.BytesToHash(test_data.StorageKey), + StorageValue: common.BytesToHash(test_data.SmallStorageValue), + } + + createdStateDiff := <-storagediffChan + updatedStateDiff := <-storagediffChan + deletedStateDiff := <-storagediffChan + + Expect(createdStateDiff).To(Equal(createdExpectedStorageDiff)) + Expect(updatedStateDiff).To(Equal(updatedExpectedStorageDiff)) + Expect(deletedStateDiff).To(Equal(deletedExpectedStorageDiff)) + + close(done) + }) + + It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) { + accountDiffs := test_data.CreatedAccountDiffs + accountDiffs[0].Storage = []statediff.StorageDiff{test_data.StorageWithBadValue} + + stateDiff := statediff.StateDiff{ + BlockNumber: test_data.BlockNumber, + BlockHash: common.HexToHash(test_data.BlockHash), + CreatedAccounts: accountDiffs, + } + + stateDiffRlp, err := rlp.EncodeToBytes(stateDiff) + Expect(err).NotTo(HaveOccurred()) + + badStatediffPayload := statediff.Payload{ + StateDiffRlp: stateDiffRlp, + } + streamer.SetPayloads([]statediff.Payload{badStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffChan, errorChan) + + Expect(<-errorChan).To(MatchError("rlp: input contains more than one value")) + + close(done) + }) +}) diff --git a/libraries/shared/fetcher/storage_fetcher_interface.go b/libraries/shared/fetcher/storage_fetcher_interface.go new file mode 100644 index 00000000..8999589c --- /dev/null +++ b/libraries/shared/fetcher/storage_fetcher_interface.go @@ -0,0 +1,21 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fetcher + +import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + +type IStorageFetcher interface { + FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) +} diff --git a/libraries/shared/mocks/storage_fetcher.go b/libraries/shared/mocks/storage_fetcher.go index 5009f116..16c1ad93 100644 --- a/libraries/shared/mocks/storage_fetcher.go +++ b/libraries/shared/mocks/storage_fetcher.go @@ -19,21 +19,21 @@ package mocks import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" type MockStorageFetcher struct { - RowsToReturn []utils.StorageDiffRow - ErrsToReturn []error + DiffsToReturn []utils.StorageDiff + ErrsToReturn []error } func NewMockStorageFetcher() *MockStorageFetcher { return &MockStorageFetcher{} } -func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { +func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { defer close(out) defer close(errs) for _, err := range fetcher.ErrsToReturn { errs <- err } - for _, row := range fetcher.RowsToReturn { - out <- row + for _, diff := range fetcher.DiffsToReturn { + out <- diff } } diff --git a/libraries/shared/mocks/storage_queue.go b/libraries/shared/mocks/storage_queue.go index e667ec57..192c730e 100644 --- a/libraries/shared/mocks/storage_queue.go +++ b/libraries/shared/mocks/storage_queue.go @@ -23,16 +23,16 @@ import ( type MockStorageQueue struct { AddCalled bool AddError error - AddPassedRow utils.StorageDiffRow + AddPassedDiff utils.StorageDiff DeleteErr error DeletePassedId int GetAllErr error - RowsToReturn []utils.StorageDiffRow + DiffsToReturn []utils.StorageDiff } -func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error { +func (queue *MockStorageQueue) Add(diff utils.StorageDiff) error { queue.AddCalled = true - queue.AddPassedRow = row + queue.AddPassedDiff = diff return queue.AddError } @@ -41,6 +41,6 @@ func (queue *MockStorageQueue) Delete(id int) error { return queue.DeleteErr } -func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) { - return queue.RowsToReturn, queue.GetAllErr +func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiff, error) { + return queue.DiffsToReturn, queue.GetAllErr } diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go index a2140de0..b1c3cba2 100644 --- a/libraries/shared/mocks/storage_transformer.go +++ b/libraries/shared/mocks/storage_transformer.go @@ -18,25 +18,24 @@ package mocks import ( "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type MockStorageTransformer struct { - Address common.Address - ExecuteErr error - PassedRow utils.StorageDiffRow + KeccakOfAddress common.Hash + ExecuteErr error + PassedDiff utils.StorageDiff } -func (transformer *MockStorageTransformer) Execute(row utils.StorageDiffRow) error { - transformer.PassedRow = row +func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { + transformer.PassedDiff = diff return transformer.ExecuteErr } -func (transformer *MockStorageTransformer) ContractAddress() common.Address { - return transformer.Address +func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash { + return transformer.KeccakOfAddress } func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer { diff --git a/libraries/shared/storage/mappings.go b/libraries/shared/storage/mappings.go index 18878683..f8b089ae 100644 --- a/libraries/shared/storage/mappings.go +++ b/libraries/shared/storage/mappings.go @@ -46,16 +46,29 @@ const ( IndexEleven = "000000000000000000000000000000000000000000000000000000000000000b" ) +func AddHashedKeys(currentMappings map[common.Hash]utils.StorageValueMetadata) map[common.Hash]utils.StorageValueMetadata { + copyOfCurrentMappings := make(map[common.Hash]utils.StorageValueMetadata) + for k, v := range currentMappings { + copyOfCurrentMappings[k] = v + } + for k, v := range copyOfCurrentMappings { + currentMappings[hashKey(k)] = v + } + return currentMappings +} + +func hashKey(key common.Hash) common.Hash { + return crypto.Keccak256Hash(key.Bytes()) +} + func GetMapping(indexOnContract, key string) common.Hash { keyBytes := common.FromHex(key + indexOnContract) - encoded := crypto.Keccak256(keyBytes) - return common.BytesToHash(encoded) + return crypto.Keccak256Hash(keyBytes) } func GetNestedMapping(indexOnContract, primaryKey, secondaryKey string) common.Hash { primaryMappingIndex := crypto.Keccak256(common.FromHex(primaryKey + indexOnContract)) - secondaryMappingIndex := crypto.Keccak256(common.FromHex(secondaryKey), primaryMappingIndex) - return common.BytesToHash(secondaryMappingIndex) + return crypto.Keccak256Hash(common.FromHex(secondaryKey), primaryMappingIndex) } func GetIncrementedKey(original common.Hash, incrementBy int64) common.Hash { diff --git a/libraries/shared/storage/mappings_test.go b/libraries/shared/storage/mappings_test.go index 7114a39b..bc077f1f 100644 --- a/libraries/shared/storage/mappings_test.go +++ b/libraries/shared/storage/mappings_test.go @@ -5,9 +5,31 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" ) var _ = Describe("Mappings", func() { + Describe("AddHashedKeys", func() { + It("returns a copy of the map with an additional slot for the hashed version of every key", func() { + fakeMap := map[common.Hash]utils.StorageValueMetadata{} + fakeStorageKey := common.HexToHash("72c72de6b203d67cb6cd54fc93300109fcc6fd6eac88e390271a3d548794d800") + var fakeMappingKey utils.Key = "fakeKey" + fakeMetadata := utils.StorageValueMetadata{ + Name: "fakeName", + Keys: map[utils.Key]string{fakeMappingKey: "fakeValue"}, + Type: utils.Uint48, + } + fakeMap[fakeStorageKey] = fakeMetadata + + result := storage.AddHashedKeys(fakeMap) + + Expect(len(result)).To(Equal(2)) + expectedHashedStorageKey := common.HexToHash("2165edb4e1c37b99b60fa510d84f939dd35d5cd1d1c8f299d6456ea09df65a76") + Expect(fakeMap[fakeStorageKey]).To(Equal(fakeMetadata)) + Expect(fakeMap[expectedHashedStorageKey]).To(Equal(fakeMetadata)) + }) + }) + Describe("GetMapping", func() { It("returns the storage key for a mapping when passed the mapping's index on the contract and the desired value's key", func() { // ex. solidity: diff --git a/libraries/shared/storage/storage_queue.go b/libraries/shared/storage/storage_queue.go index 69531a90..c36b9c2d 100644 --- a/libraries/shared/storage/storage_queue.go +++ b/libraries/shared/storage/storage_queue.go @@ -22,9 +22,9 @@ import ( ) type IStorageQueue interface { - Add(row utils.StorageDiffRow) error + Add(diff utils.StorageDiff) error Delete(id int) error - GetAll() ([]utils.StorageDiffRow, error) + GetAll() ([]utils.StorageDiff, error) } type StorageQueue struct { @@ -35,11 +35,11 @@ func NewStorageQueue(db *postgres.DB) StorageQueue { return StorageQueue{db: db} } -func (queue StorageQueue) Add(row utils.StorageDiffRow) error { +func (queue StorageQueue) Add(diff utils.StorageDiff) error { _, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract, block_hash, block_height, storage_key, storage_value) VALUES - ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, row.Contract.Bytes(), row.BlockHash.Bytes(), - row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes()) + ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, diff.HashedAddress.Bytes(), diff.BlockHash.Bytes(), + diff.BlockHeight, diff.StorageKey.Bytes(), diff.StorageValue.Bytes()) return err } @@ -48,8 +48,8 @@ func (queue StorageQueue) Delete(id int) error { return err } -func (queue StorageQueue) GetAll() ([]utils.StorageDiffRow, error) { - var result []utils.StorageDiffRow +func (queue StorageQueue) GetAll() ([]utils.StorageDiff, error) { + var result []utils.StorageDiff err := queue.db.Select(&result, `SELECT * FROM public.queued_storage`) return result, err } diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index 5d892ea8..43faca29 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -20,7 +20,6 @@ import ( "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -30,35 +29,36 @@ import ( var _ = Describe("Storage queue", func() { var ( db *postgres.DB - row utils.StorageDiffRow + diff utils.StorageDiff queue storage.IStorageQueue ) BeforeEach(func() { - row = utils.StorageDiffRow{ - Contract: common.HexToAddress("0x123456"), - BlockHash: common.HexToHash("0x678901"), - BlockHeight: 987, - StorageKey: common.HexToHash("0x654321"), - StorageValue: common.HexToHash("0x198765"), + fakeAddr := "0x123456" + diff = utils.StorageDiff{ + HashedAddress: utils.HexToKeccak256Hash(fakeAddr), + BlockHash: common.HexToHash("0x678901"), + BlockHeight: 987, + StorageKey: common.HexToHash("0x654321"), + StorageValue: common.HexToHash("0x198765"), } db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) queue = storage.NewStorageQueue(db) - addErr := queue.Add(row) + addErr := queue.Add(diff) Expect(addErr).NotTo(HaveOccurred()) }) Describe("Add", func() { - It("adds a storage row to the db", func() { - var result utils.StorageDiffRow + It("adds a storage diff to the db", func() { + var result utils.StorageDiff getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`) Expect(getErr).NotTo(HaveOccurred()) - Expect(result).To(Equal(row)) + Expect(result).To(Equal(diff)) }) - It("does not duplicate storage rows", func() { - addErr := queue.Add(row) + It("does not duplicate storage diffs", func() { + addErr := queue.Add(diff) Expect(addErr).NotTo(HaveOccurred()) var count int getErr := db.Get(&count, `SELECT count(*) FROM public.queued_storage`) @@ -67,12 +67,12 @@ var _ = Describe("Storage queue", func() { }) }) - It("deletes storage row from db", func() { - rows, getErr := queue.GetAll() + It("deletes storage diff from db", func() { + diffs, getErr := queue.GetAll() Expect(getErr).NotTo(HaveOccurred()) - Expect(len(rows)).To(Equal(1)) + Expect(len(diffs)).To(Equal(1)) - err := queue.Delete(rows[0].Id) + err := queue.Delete(diffs[0].Id) Expect(err).NotTo(HaveOccurred()) remainingRows, secondGetErr := queue.GetAll() @@ -80,33 +80,34 @@ var _ = Describe("Storage queue", func() { Expect(len(remainingRows)).To(BeZero()) }) - It("gets all storage rows from db", func() { - rowTwo := utils.StorageDiffRow{ - Contract: common.HexToAddress("0x123456"), - BlockHash: common.HexToHash("0x678902"), - BlockHeight: 988, - StorageKey: common.HexToHash("0x654322"), - StorageValue: common.HexToHash("0x198766"), + It("gets all storage diffs from db", func() { + fakeAddr := "0x234567" + diffTwo := utils.StorageDiff{ + HashedAddress: utils.HexToKeccak256Hash(fakeAddr), + BlockHash: common.HexToHash("0x678902"), + BlockHeight: 988, + StorageKey: common.HexToHash("0x654322"), + StorageValue: common.HexToHash("0x198766"), } - addErr := queue.Add(rowTwo) + addErr := queue.Add(diffTwo) Expect(addErr).NotTo(HaveOccurred()) - rows, err := queue.GetAll() + diffs, err := queue.GetAll() Expect(err).NotTo(HaveOccurred()) - Expect(len(rows)).To(Equal(2)) - Expect(rows[0]).NotTo(Equal(rows[1])) - Expect(rows[0].Id).NotTo(BeZero()) - Expect(rows[0].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract))) - Expect(rows[0].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash))) - Expect(rows[0].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight))) - Expect(rows[0].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey))) - Expect(rows[0].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue))) - Expect(rows[1].Id).NotTo(BeZero()) - Expect(rows[1].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract))) - Expect(rows[1].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash))) - Expect(rows[1].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight))) - Expect(rows[1].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey))) - Expect(rows[1].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue))) + Expect(len(diffs)).To(Equal(2)) + Expect(diffs[0]).NotTo(Equal(diffs[1])) + Expect(diffs[0].Id).NotTo(BeZero()) + Expect(diffs[0].HashedAddress).To(Or(Equal(diff.HashedAddress), Equal(diffTwo.HashedAddress))) + Expect(diffs[0].BlockHash).To(Or(Equal(diff.BlockHash), Equal(diffTwo.BlockHash))) + Expect(diffs[0].BlockHeight).To(Or(Equal(diff.BlockHeight), Equal(diffTwo.BlockHeight))) + Expect(diffs[0].StorageKey).To(Or(Equal(diff.StorageKey), Equal(diffTwo.StorageKey))) + Expect(diffs[0].StorageValue).To(Or(Equal(diff.StorageValue), Equal(diffTwo.StorageValue))) + Expect(diffs[1].Id).NotTo(BeZero()) + Expect(diffs[1].HashedAddress).To(Or(Equal(diff.HashedAddress), Equal(diffTwo.HashedAddress))) + Expect(diffs[1].BlockHash).To(Or(Equal(diff.BlockHash), Equal(diffTwo.BlockHash))) + Expect(diffs[1].BlockHeight).To(Or(Equal(diff.BlockHeight), Equal(diffTwo.BlockHeight))) + Expect(diffs[1].StorageKey).To(Or(Equal(diff.StorageKey), Equal(diffTwo.StorageKey))) + Expect(diffs[1].StorageValue).To(Or(Equal(diff.StorageValue), Equal(diffTwo.StorageValue))) }) }) diff --git a/libraries/shared/storage/utils/decoder.go b/libraries/shared/storage/utils/decoder.go index 56342cc1..94ea8b16 100644 --- a/libraries/shared/storage/utils/decoder.go +++ b/libraries/shared/storage/utils/decoder.go @@ -27,20 +27,20 @@ const ( bitsPerByte = 8 ) -func Decode(row StorageDiffRow, metadata StorageValueMetadata) (interface{}, error) { +func Decode(diff StorageDiff, metadata StorageValueMetadata) (interface{}, error) { switch metadata.Type { case Uint256: - return decodeInteger(row.StorageValue.Bytes()), nil + return decodeInteger(diff.StorageValue.Bytes()), nil case Uint48: - return decodeInteger(row.StorageValue.Bytes()), nil + return decodeInteger(diff.StorageValue.Bytes()), nil case Uint128: - return decodeInteger(row.StorageValue.Bytes()), nil + return decodeInteger(diff.StorageValue.Bytes()), nil case Address: - return decodeAddress(row.StorageValue.Bytes()), nil + return decodeAddress(diff.StorageValue.Bytes()), nil case Bytes32: - return row.StorageValue.Hex(), nil + return diff.StorageValue.Hex(), nil case PackedSlot: - return decodePackedSlot(row.StorageValue.Bytes(), metadata.PackedTypes), nil + return decodePackedSlot(diff.StorageValue.Bytes(), metadata.PackedTypes), nil default: panic(fmt.Sprintf("can't decode unknown type: %d", metadata.Type)) } diff --git a/libraries/shared/storage/utils/decoder_test.go b/libraries/shared/storage/utils/decoder_test.go index bbcb6a84..6650965c 100644 --- a/libraries/shared/storage/utils/decoder_test.go +++ b/libraries/shared/storage/utils/decoder_test.go @@ -29,10 +29,10 @@ import ( var _ = Describe("Storage decoder", func() { It("decodes uint256", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000539") - row := utils.StorageDiffRow{StorageValue: fakeInt} + diff := utils.StorageDiff{StorageValue: fakeInt} metadata := utils.StorageValueMetadata{Type: utils.Uint256} - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(big.NewInt(0).SetBytes(fakeInt.Bytes()).String())) @@ -40,10 +40,10 @@ var _ = Describe("Storage decoder", func() { It("decodes uint128", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000011123") - row := utils.StorageDiffRow{StorageValue: fakeInt} + diff := utils.StorageDiff{StorageValue: fakeInt} metadata := utils.StorageValueMetadata{Type: utils.Uint128} - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(big.NewInt(0).SetBytes(fakeInt.Bytes()).String())) @@ -51,10 +51,10 @@ var _ = Describe("Storage decoder", func() { It("decodes uint48", func() { fakeInt := common.HexToHash("0000000000000000000000000000000000000000000000000000000000000123") - row := utils.StorageDiffRow{StorageValue: fakeInt} + diff := utils.StorageDiff{StorageValue: fakeInt} metadata := utils.StorageValueMetadata{Type: utils.Uint48} - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(big.NewInt(0).SetBytes(fakeInt.Bytes()).String())) @@ -62,10 +62,10 @@ var _ = Describe("Storage decoder", func() { It("decodes address", func() { fakeAddress := common.HexToAddress("0x12345") - row := utils.StorageDiffRow{StorageValue: fakeAddress.Hash()} + diff := utils.StorageDiff{StorageValue: fakeAddress.Hash()} metadata := utils.StorageValueMetadata{Type: utils.Address} - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(fakeAddress.Hex())) @@ -75,7 +75,7 @@ var _ = Describe("Storage decoder", func() { It("decodes uint48 items", func() { //this is a real storage data example packedStorage := common.HexToHash("000000000000000000000000000000000000000000000002a300000000002a30") - row := utils.StorageDiffRow{StorageValue: packedStorage} + diff := utils.StorageDiff{StorageValue: packedStorage} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint48 packedTypes[1] = utils.Uint48 @@ -85,7 +85,7 @@ var _ = Describe("Storage decoder", func() { PackedTypes: packedTypes, } - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) decodedValues := result.(map[int]string) Expect(err).NotTo(HaveOccurred()) @@ -99,7 +99,7 @@ var _ = Describe("Storage decoder", func() { packedStorageHex := "0000000A5D1AFFFFFFFFFFFE00000009F3C600000002A300000000002A30" packedStorage := common.HexToHash(packedStorageHex) - row := utils.StorageDiffRow{StorageValue: packedStorage} + diff := utils.StorageDiff{StorageValue: packedStorage} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint48 packedTypes[1] = utils.Uint48 @@ -112,7 +112,7 @@ var _ = Describe("Storage decoder", func() { PackedTypes: packedTypes, } - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) decodedValues := result.(map[int]string) Expect(err).NotTo(HaveOccurred()) @@ -129,7 +129,7 @@ var _ = Describe("Storage decoder", func() { packedStorageHex := "000000038D7EA4C67FF8E502B6730000" + "0000000000000000AB54A98CEB1F0AD2" packedStorage := common.HexToHash(packedStorageHex) - row := utils.StorageDiffRow{StorageValue: packedStorage} + diff := utils.StorageDiff{StorageValue: packedStorage} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Uint128 packedTypes[1] = utils.Uint128 @@ -139,7 +139,7 @@ var _ = Describe("Storage decoder", func() { PackedTypes: packedTypes, } - result, err := utils.Decode(row, metadata) + result, err := utils.Decode(diff, metadata) decodedValues := result.(map[int]string) Expect(err).NotTo(HaveOccurred()) @@ -151,7 +151,7 @@ var _ = Describe("Storage decoder", func() { //TODO: replace with real data when available addressHex := "0000000000000000000000000000000000012345" packedStorage := common.HexToHash("00000002a300" + "000000002a30" + addressHex) - row := utils.StorageDiffRow{StorageValue: packedStorage} + row := utils.StorageDiff{StorageValue: packedStorage} packedTypes := map[int]utils.ValueType{} packedTypes[0] = utils.Address packedTypes[1] = utils.Uint48 diff --git a/libraries/shared/storage/utils/diff.go b/libraries/shared/storage/utils/diff.go new file mode 100644 index 00000000..a58dfab6 --- /dev/null +++ b/libraries/shared/storage/utils/diff.go @@ -0,0 +1,73 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package utils + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "strconv" +) + +const ExpectedRowLength = 5 + +type StorageDiff struct { + Id int + HashedAddress common.Hash `db:"contract"` + BlockHash common.Hash `db:"block_hash"` + BlockHeight int `db:"block_height"` + StorageKey common.Hash `db:"storage_key"` + StorageValue common.Hash `db:"storage_value"` +} + +func FromParityCsvRow(csvRow []string) (StorageDiff, error) { + if len(csvRow) != ExpectedRowLength { + return StorageDiff{}, ErrRowMalformed{Length: len(csvRow)} + } + height, err := strconv.Atoi(csvRow[2]) + if err != nil { + return StorageDiff{}, err + } + return StorageDiff{ + HashedAddress: HexToKeccak256Hash(csvRow[0]), + BlockHash: common.HexToHash(csvRow[1]), + BlockHeight: height, + StorageKey: common.HexToHash(csvRow[3]), + StorageValue: common.HexToHash(csvRow[4]), + }, nil +} + +func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiff, error) { + var decodedValue []byte + err := rlp.DecodeBytes(storage.Value, &decodedValue) + if err != nil { + return StorageDiff{}, err + } + + return StorageDiff{ + HashedAddress: common.BytesToHash(account.Key), + BlockHash: stateDiff.BlockHash, + BlockHeight: int(stateDiff.BlockNumber.Int64()), + StorageKey: common.BytesToHash(storage.Key), + StorageValue: common.BytesToHash(decodedValue), + }, nil +} + +func HexToKeccak256Hash(hex string) common.Hash { + return crypto.Keccak256Hash(common.FromHex(hex)) +} diff --git a/libraries/shared/storage/utils/diff_test.go b/libraries/shared/storage/utils/diff_test.go new file mode 100644 index 00000000..2de496ac --- /dev/null +++ b/libraries/shared/storage/utils/diff_test.go @@ -0,0 +1,121 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package utils_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "math/big" + "math/rand" +) + +var _ = Describe("Storage row parsing", func() { + Describe("FromParityCsvRow", func() { + It("converts an array of strings to a row struct", func() { + contract := "0x123" + blockHash := "0x456" + blockHeight := "789" + storageKey := "0x987" + storageValue := "0x654" + data := []string{contract, blockHash, blockHeight, storageKey, storageValue} + + result, err := utils.FromParityCsvRow(data) + + Expect(err).NotTo(HaveOccurred()) + expectedKeccakOfContractAddress := utils.HexToKeccak256Hash(contract) + Expect(result.HashedAddress).To(Equal(expectedKeccakOfContractAddress)) + Expect(result.BlockHash).To(Equal(common.HexToHash(blockHash))) + Expect(result.BlockHeight).To(Equal(789)) + Expect(result.StorageKey).To(Equal(common.HexToHash(storageKey))) + Expect(result.StorageValue).To(Equal(common.HexToHash(storageValue))) + }) + + It("returns an error if row is missing data", func() { + _, err := utils.FromParityCsvRow([]string{"0x123"}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) + }) + + It("returns error if block height malformed", func() { + _, err := utils.FromParityCsvRow([]string{"", "", "", "", ""}) + + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("FromGethStateDiff", func() { + var ( + accountDiff = statediff.AccountDiff{Key: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}} + stateDiff = &statediff.StateDiff{ + BlockNumber: big.NewInt(rand.Int63()), + BlockHash: fakes.FakeHash, + } + ) + + It("adds relevant fields to diff", func() { + storageValueBytes := []byte{3} + storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) + Expect(encodeErr).NotTo(HaveOccurred()) + + storageDiff := statediff.StorageDiff{ + Key: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + Value: storageValueRlp, + } + + result, err := utils.FromGethStateDiff(accountDiff, stateDiff, storageDiff) + Expect(err).NotTo(HaveOccurred()) + + expectedAddress := common.BytesToHash(accountDiff.Key) + Expect(result.HashedAddress).To(Equal(expectedAddress)) + Expect(result.BlockHash).To(Equal(fakes.FakeHash)) + expectedBlockHeight := int(stateDiff.BlockNumber.Int64()) + Expect(result.BlockHeight).To(Equal(expectedBlockHeight)) + expectedStorageKey := common.BytesToHash(storageDiff.Key) + Expect(result.StorageKey).To(Equal(expectedStorageKey)) + expectedStorageValue := common.BytesToHash(storageValueBytes) + Expect(result.StorageValue).To(Equal(expectedStorageValue)) + }) + + It("handles decoding large storage values from their RLP", func() { + storageValueBytes := []byte{1, 2, 3, 4, 5, 0, 9, 8, 7, 6} + storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) + Expect(encodeErr).NotTo(HaveOccurred()) + + storageDiff := statediff.StorageDiff{ + Key: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + Value: storageValueRlp, + } + + result, err := utils.FromGethStateDiff(accountDiff, stateDiff, storageDiff) + Expect(err).NotTo(HaveOccurred()) + Expect(result.StorageValue).To(Equal(common.BytesToHash(storageValueBytes))) + }) + + It("returns an err if decoding the storage value Rlp fails", func() { + _, err := utils.FromGethStateDiff(accountDiff, stateDiff, test_data.StorageWithBadValue) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("rlp: input contains more than one value")) + }) + }) +}) diff --git a/libraries/shared/storage/utils/row.go b/libraries/shared/storage/utils/row.go deleted file mode 100644 index 70606fa5..00000000 --- a/libraries/shared/storage/utils/row.go +++ /dev/null @@ -1,51 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package utils - -import ( - "strconv" - - "github.com/ethereum/go-ethereum/common" -) - -const ExpectedRowLength = 5 - -type StorageDiffRow struct { - Id int - Contract common.Address - BlockHash common.Hash `db:"block_hash"` - BlockHeight int `db:"block_height"` - StorageKey common.Hash `db:"storage_key"` - StorageValue common.Hash `db:"storage_value"` -} - -func FromStrings(csvRow []string) (StorageDiffRow, error) { - if len(csvRow) != ExpectedRowLength { - return StorageDiffRow{}, ErrRowMalformed{Length: len(csvRow)} - } - height, err := strconv.Atoi(csvRow[2]) - if err != nil { - return StorageDiffRow{}, err - } - return StorageDiffRow{ - Contract: common.HexToAddress(csvRow[0]), - BlockHash: common.HexToHash(csvRow[1]), - BlockHeight: height, - StorageKey: common.HexToHash(csvRow[3]), - StorageValue: common.HexToHash(csvRow[4]), - }, nil -} diff --git a/libraries/shared/storage/utils/row_test.go b/libraries/shared/storage/utils/row_test.go deleted file mode 100644 index ffd49fbf..00000000 --- a/libraries/shared/storage/utils/row_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package utils_test - -import ( - "github.com/ethereum/go-ethereum/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" -) - -var _ = Describe("Storage row parsing", func() { - It("converts an array of strings to a row struct", func() { - contract := "0x123" - blockHash := "0x456" - blockHeight := "789" - storageKey := "0x987" - storageValue := "0x654" - data := []string{contract, blockHash, blockHeight, storageKey, storageValue} - - result, err := utils.FromStrings(data) - - Expect(err).NotTo(HaveOccurred()) - Expect(result.Contract).To(Equal(common.HexToAddress(contract))) - Expect(result.BlockHash).To(Equal(common.HexToHash(blockHash))) - Expect(result.BlockHeight).To(Equal(789)) - Expect(result.StorageKey).To(Equal(common.HexToHash(storageKey))) - Expect(result.StorageValue).To(Equal(common.HexToHash(storageValue))) - }) - - It("returns an error if row is missing data", func() { - _, err := utils.FromStrings([]string{"0x123"}) - - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) - }) - - It("returns error if block height malformed", func() { - _, err := utils.FromStrings([]string{"", "", "", "", ""}) - - Expect(err).To(HaveOccurred()) - }) -}) diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go new file mode 100644 index 00000000..8cda750f --- /dev/null +++ b/libraries/shared/streamer/statediff_streamer.go @@ -0,0 +1,41 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streamer + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +type Streamer interface { + Stream(chan statediff.Payload) (*rpc.ClientSubscription, error) +} + +type StateDiffStreamer struct { + client core.RpcClient +} + +func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + logrus.Info("streaming diffs from geth") + return streamer.client.Subscribe("statediff", payloadChan, "stream") +} + +func NewStateDiffStreamer(client core.RpcClient) StateDiffStreamer { + return StateDiffStreamer{ + client: client, + } +} diff --git a/libraries/shared/streamer/statediff_streamer_test.go b/libraries/shared/streamer/statediff_streamer_test.go new file mode 100644 index 00000000..59590c84 --- /dev/null +++ b/libraries/shared/streamer/statediff_streamer_test.go @@ -0,0 +1,35 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streamer_test + +import ( + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +var _ = Describe("StateDiff Streamer", func() { + It("subscribes to the geth statediff service", func() { + client := &fakes.MockRpcClient{} + streamer := streamer.NewStateDiffStreamer(client) + payloadChan := make(chan statediff.Payload) + _, err := streamer.Stream(payloadChan) + Expect(err).NotTo(HaveOccurred()) + + client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) + }) +}) diff --git a/libraries/shared/streamer/streamer_suite_test.go b/libraries/shared/streamer/streamer_suite_test.go new file mode 100644 index 00000000..fec1bc6c --- /dev/null +++ b/libraries/shared/streamer/streamer_suite_test.go @@ -0,0 +1,19 @@ +package streamer_test + +import ( + "io/ioutil" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +func TestStreamer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Streamer Suite") +} + +var _ = BeforeSuite(func() { + logrus.SetOutput(ioutil.Discard) +}) diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go new file mode 100644 index 00000000..3e0a2219 --- /dev/null +++ b/libraries/shared/test_data/statediff.go @@ -0,0 +1,137 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test_data + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "math/big" + "math/rand" +) + +var ( + BlockNumber = big.NewInt(rand.Int63()) + BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73" + CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") + NewNonceValue = rand.Uint64() + NewBalanceValue = rand.Int63() + ContractRoot = common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + StoragePath = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes() + StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() + SmallStorageValue = common.Hex2Bytes("03") + SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue) + storageWithSmallValue = []statediff.StorageDiff{{ + Key: StorageKey, + Value: SmallStorageValueRlp, + Path: StoragePath, + Proof: [][]byte{}, + }} + LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000") + LargeStorageValueRlp, rlpErr = rlp.EncodeToBytes(LargeStorageValue) + storageWithLargeValue = []statediff.StorageDiff{{ + Key: StorageKey, + Value: LargeStorageValueRlp, + Path: StoragePath, + Proof: [][]byte{}, + }} + EmptyStorage = make([]statediff.StorageDiff, 0) + StorageWithBadValue = statediff.StorageDiff{ + Key: StorageKey, + Value: []byte{0, 1, 2}, + // this storage value will fail to be decoded as an RLP with the following error message: + // "input contains more than one value" + } + contractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") + ContractLeafKey = crypto.Keccak256Hash(contractAddress[:]) + anotherContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") + AnotherContractLeafKey = crypto.Keccak256Hash(anotherContractAddress[:]) + + testAccount = state.Account{ + Nonce: NewNonceValue, + Balance: big.NewInt(NewBalanceValue), + Root: ContractRoot, + CodeHash: CodeHash, + } + valueBytes, _ = rlp.EncodeToBytes(testAccount) + CreatedAccountDiffs = []statediff.AccountDiff{ + { + Key: ContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storageWithSmallValue, + }, + } + + UpdatedAccountDiffs = []statediff.AccountDiff{{ + Key: AnotherContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storageWithLargeValue, + }} + + DeletedAccountDiffs = []statediff.AccountDiff{{ + Key: AnotherContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storageWithSmallValue, + }} + + MockStateDiff = statediff.StateDiff{ + BlockNumber: BlockNumber, + BlockHash: common.HexToHash(BlockHash), + CreatedAccounts: CreatedAccountDiffs, + DeletedAccounts: DeletedAccountDiffs, + UpdatedAccounts: UpdatedAccountDiffs, + } + MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) + + mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) + mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) + MockTransactions = types.Transactions{mockTransaction1, mockTransaction2} + + mockReceipt1 = types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50) + mockReceipt2 = types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) + MockReceipts = types.Receipts{mockReceipt1, mockReceipt2} + + MockHeader = types.Header{ + Time: 0, + Number: BlockNumber, + Root: common.HexToHash("0x0"), + TxHash: common.HexToHash("0x0"), + ReceiptHash: common.HexToHash("0x0"), + } + MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) + MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) + + MockStatediffPayload = statediff.Payload{ + BlockRlp: MockBlockRlp, + StateDiffRlp: MockStateDiffBytes, + Err: nil, + } + + EmptyStatediffPayload = statediff.Payload{ + BlockRlp: []byte{}, + StateDiffRlp: []byte{}, + Err: nil, + } + + ErrStatediffPayload = statediff.Payload{ + BlockRlp: []byte{}, + StateDiffRlp: []byte{}, + Err: errors.New("mock error"), + } +) diff --git a/libraries/shared/transformer/storage_transformer.go b/libraries/shared/transformer/storage_transformer.go index 3db5c0c1..698ef841 100644 --- a/libraries/shared/transformer/storage_transformer.go +++ b/libraries/shared/transformer/storage_transformer.go @@ -18,14 +18,13 @@ package transformer import ( "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type StorageTransformer interface { - Execute(row utils.StorageDiffRow) error - ContractAddress() common.Address + Execute(diff utils.StorageDiff) error + KeccakContractAddress() common.Hash } type StorageTransformerInitializer func(db *postgres.DB) StorageTransformer diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index 7b2c5362..7768beb4 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,69 +18,76 @@ package watcher import ( "fmt" - "reflect" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "time" ) +type IStorageWatcher interface { + AddTransformers(initializers []transformer.StorageTransformerInitializer) + Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) +} + type StorageWatcher struct { - db *postgres.DB - StorageFetcher fetcher.IStorageFetcher - Queue storage.IStorageQueue - Transformers map[common.Address]transformer.StorageTransformer + db *postgres.DB + StorageFetcher fetcher.IStorageFetcher + Queue storage.IStorageQueue + KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer } func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { - transformers := make(map[common.Address]transformer.StorageTransformer) queue := storage.NewStorageQueue(db) + transformers := make(map[common.Hash]transformer.StorageTransformer) return StorageWatcher{ - db: db, - StorageFetcher: fetcher, - Queue: queue, - Transformers: transformers, + db: db, + StorageFetcher: fetcher, + Queue: queue, + KeccakAddressTransformers: transformers, } } func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { storageTransformer := initializer(storageWatcher.db) - storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer + storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer } } -func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, errs chan error, queueRecheckInterval time.Duration) { +func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) { ticker := time.NewTicker(queueRecheckInterval) - go storageWatcher.StorageFetcher.FetchStorageDiffs(rows, errs) + go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan) for { select { - case fetchErr := <-errs: + case fetchErr := <-errsChan: logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr)) - case row := <-rows: - storageWatcher.processRow(row) + case diff := <-diffsChan: + storageWatcher.processRow(diff) case <-ticker.C: storageWatcher.processQueue() } } } -func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { - storageTransformer, ok := storageWatcher.Transformers[row.Contract] +func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { + storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.HashedAddress] + return storageTransformer, ok +} + +func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { + storageTransformer, ok := storageWatcher.getTransformer(diff) if !ok { - // ignore rows from unwatched contracts + logrus.Debug("ignoring a diff from an unwatched contract") return } - executeErr := storageTransformer.Execute(row) + executeErr := storageTransformer.Execute(diff) if executeErr != nil { logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) - queueErr := storageWatcher.Queue.Add(row) + queueErr := storageWatcher.Queue.Add(diff) if queueErr != nil { logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr)) } @@ -88,20 +95,20 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) { } func (storageWatcher StorageWatcher) processQueue() { - rows, fetchErr := storageWatcher.Queue.GetAll() + diffs, fetchErr := storageWatcher.Queue.GetAll() if fetchErr != nil { logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr)) } - for _, row := range rows { - storageTransformer, ok := storageWatcher.Transformers[row.Contract] + for _, diff := range diffs { + storageTransformer, ok := storageWatcher.getTransformer(diff) if !ok { - // delete row from queue if address no longer watched - storageWatcher.deleteRow(row.Id) + // delete diff from queue if address no longer watched + storageWatcher.deleteRow(diff.Id) continue } - executeErr := storageTransformer.Execute(row) + executeErr := storageTransformer.Execute(diff) if executeErr == nil { - storageWatcher.deleteRow(row.Id) + storageWatcher.deleteRow(diff.Id) } } } @@ -109,10 +116,6 @@ func (storageWatcher StorageWatcher) processQueue() { func (storageWatcher StorageWatcher) deleteRow(id int) { deleteErr := storageWatcher.Queue.Delete(id) if deleteErr != nil { - logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr)) + logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr)) } } - -func isKeyNotFound(executeErr error) bool { - return reflect.TypeOf(executeErr) == reflect.TypeOf(utils.ErrStorageKeyNotFound{}) -} diff --git a/libraries/shared/watcher/storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go index 6c12358d..f3fe0afa 100644 --- a/libraries/shared/watcher/storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,59 +17,60 @@ package watcher_test import ( - "io/ioutil" - "os" - "time" - "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" + "io/ioutil" + "os" + "time" ) var _ = Describe("Storage Watcher", func() { - It("adds transformers", func() { - fakeAddress := common.HexToAddress("0x12345") - fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) + Describe("AddTransformer", func() { + It("adds transformers", func() { + fakeHashedAddress := utils.HexToKeccak256Hash("0x12345") + fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} + w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) + Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer)) + }) }) - Describe("executing watcher", func() { + Describe("Execute", func() { var ( errs chan error mockFetcher *mocks.MockStorageFetcher mockQueue *mocks.MockStorageQueue mockTransformer *mocks.MockStorageTransformer - row utils.StorageDiffRow - rows chan utils.StorageDiffRow + csvDiff utils.StorageDiff + diffs chan utils.StorageDiff storageWatcher watcher.StorageWatcher + hashedAddress common.Hash ) BeforeEach(func() { errs = make(chan error) - rows = make(chan utils.StorageDiffRow) - address := common.HexToAddress("0x0123456789abcdef") + diffs = make(chan utils.StorageDiff) + hashedAddress = utils.HexToKeccak256Hash("0x0123456789abcdef") mockFetcher = mocks.NewMockStorageFetcher() mockQueue = &mocks.MockStorageQueue{} - mockTransformer = &mocks.MockStorageTransformer{Address: address} - row = utils.StorageDiffRow{ - Id: 1337, - Contract: address, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), + mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} + csvDiff = utils.StorageDiff{ + Id: 1337, + HashedAddress: hashedAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), } }) @@ -83,7 +84,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(rows, errs, time.Hour) + go storageWatcher.Execute(diffs, errs, time.Hour) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) @@ -92,39 +93,39 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - Describe("transforming new storage diffs", func() { + Describe("transforming new storage diffs from csv", func() { BeforeEach(func() { - mockFetcher.RowsToReturn = []utils.StorageDiffRow{row} + mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) }) - It("executes transformer for recognized storage row", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Hour) + It("executes transformer for recognized storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Hour) - Eventually(func() utils.StorageDiffRow { - return mockTransformer.PassedRow - }).Should(Equal(row)) + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(csvDiff)) close(done) }) - It("queues row for later processing if transformer execution fails", func(done Done) { + It("queues diff for later processing if transformer execution fails", func(done Done) { mockTransformer.ExecuteErr = fakes.FakeError - go storageWatcher.Execute(rows, errs, time.Hour) + go storageWatcher.Execute(diffs, errs, time.Hour) Expect(<-errs).To(BeNil()) Eventually(func() bool { return mockQueue.AddCalled }).Should(BeTrue()) - Eventually(func() utils.StorageDiffRow { - return mockQueue.AddPassedRow - }).Should(Equal(row)) + Eventually(func() utils.StorageDiff { + return mockQueue.AddPassedDiff + }).Should(Equal(csvDiff)) close(done) }) - It("logs error if queueing row fails", func(done Done) { + It("logs error if queueing diff fails", func(done Done) { mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} mockQueue.AddError = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") @@ -132,7 +133,7 @@ var _ = Describe("Storage Watcher", func() { defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(rows, errs, time.Hour) + go storageWatcher.Execute(diffs, errs, time.Hour) Eventually(func() bool { return mockQueue.AddCalled @@ -147,20 +148,38 @@ var _ = Describe("Storage Watcher", func() { Describe("transforming queued storage diffs", func() { BeforeEach(func() { - mockQueue.RowsToReturn = []utils.StorageDiffRow{row} + mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) }) - It("logs error if getting queued storage fails", func(done Done) { - mockQueue.GetAllErr = fakes.FakeError + It("executes transformer for storage diff", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() utils.StorageDiff { + return mockTransformer.PassedDiff + }).Should(Equal(csvDiff)) + close(done) + }) + + It("deletes diff from queue if transformer execution successful", func(done Done) { + go storageWatcher.Execute(diffs, errs, time.Nanosecond) + + Eventually(func() int { + return mockQueue.DeletePassedId + }).Should(Equal(csvDiff.Id)) + close(done) + }) + + It("logs error if deleting persisted diff fails", func(done Done) { + mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(rows, errs, time.Nanosecond) + go storageWatcher.Execute(diffs, errs, time.Nanosecond) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) @@ -169,68 +188,34 @@ var _ = Describe("Storage Watcher", func() { close(done) }) - It("executes transformer for storage row", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Nanosecond) + It("deletes obsolete diff from queue if contract not recognized", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: csvDiff.Id + 1, + HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - Eventually(func() utils.StorageDiffRow { - return mockTransformer.PassedRow - }).Should(Equal(row)) - close(done) - }) - - It("deletes row from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(rows, errs, time.Nanosecond) + go storageWatcher.Execute(diffs, errs, time.Nanosecond) Eventually(func() int { return mockQueue.DeletePassedId - }).Should(Equal(row.Id)) + }).Should(Equal(obsoleteDiff.Id)) close(done) }) - It("logs error if deleting persisted row fails", func(done Done) { + It("logs error if deleting obsolete diff fails", func(done Done) { + obsoleteDiff := utils.StorageDiff{ + Id: csvDiff.Id + 1, + HashedAddress: utils.HexToKeccak256Hash("0xfedcba9876543210"), + } + mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} mockQueue.DeleteErr = fakes.FakeError tempFile, fileErr := ioutil.TempFile("", "log") Expect(fileErr).NotTo(HaveOccurred()) defer os.Remove(tempFile.Name()) logrus.SetOutput(tempFile) - go storageWatcher.Execute(rows, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("deletes obsolete row from queue if contract not recognized", func(done Done) { - obsoleteRow := utils.StorageDiffRow{ - Id: row.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} - - go storageWatcher.Execute(rows, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(obsoleteRow.Id)) - close(done) - }) - - It("logs error if deleting obsolete row fails", func(done Done) { - obsoleteRow := utils.StorageDiffRow{ - Id: row.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow} - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(rows, errs, time.Nanosecond) + go storageWatcher.Execute(diffs, errs, time.Nanosecond) Eventually(func() (string, error) { logContent, err := ioutil.ReadFile(tempFile.Name()) @@ -239,6 +224,5 @@ var _ = Describe("Storage Watcher", func() { close(done) }) }) - }) }) diff --git a/pkg/contract_watcher/shared/types/event.go b/pkg/contract_watcher/shared/types/event.go index b70c2a0c..dfca596d 100644 --- a/pkg/contract_watcher/shared/types/event.go +++ b/pkg/contract_watcher/shared/types/event.go @@ -92,5 +92,5 @@ func (e Event) Sig() common.Hash { types[i] = input.Type.String() } - return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ","))))) + return crypto.Keccak256Hash([]byte(fmt.Sprintf("%v(%v)", e.Name, strings.Join(types, ",")))) } diff --git a/pkg/contract_watcher/shared/types/method.go b/pkg/contract_watcher/shared/types/method.go index 795b5b7f..7b962b17 100644 --- a/pkg/contract_watcher/shared/types/method.go +++ b/pkg/contract_watcher/shared/types/method.go @@ -107,5 +107,5 @@ func (m Method) Sig() common.Hash { i++ } - return common.BytesToHash(crypto.Keccak256([]byte(fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ","))))) + return crypto.Keccak256Hash([]byte(fmt.Sprintf("%v(%v)", m.Name, strings.Join(types, ",")))) } diff --git a/pkg/core/rpc_client.go b/pkg/core/rpc_client.go index 1f52b27a..a89ca230 100644 --- a/pkg/core/rpc_client.go +++ b/pkg/core/rpc_client.go @@ -18,6 +18,7 @@ package core import ( "context" + "github.com/ethereum/go-ethereum/rpc" "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) @@ -27,4 +28,5 @@ type RpcClient interface { BatchCall(batch []client.BatchElem) error IpcPath() string SupportedModules() (map[string]string, error) + Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) } diff --git a/pkg/fakes/mock_rpc_client.go b/pkg/fakes/mock_rpc_client.go index fcf224cc..9acfdbe8 100644 --- a/pkg/fakes/mock_rpc_client.go +++ b/pkg/fakes/mock_rpc_client.go @@ -18,10 +18,13 @@ package fakes import ( "context" + "errors" + "github.com/ethereum/go-ethereum/statediff" "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -29,18 +32,44 @@ import ( ) type MockRpcClient struct { - callContextErr error - ipcPath string - nodeType core.NodeType - passedContext context.Context - passedMethod string - passedResult interface{} - passedBatch []client.BatchElem - lengthOfBatch int - returnPOAHeader core.POAHeader - returnPOAHeaders []core.POAHeader - returnPOWHeaders []*types.Header - supportedModules map[string]string + callContextErr error + ipcPath string + nodeType core.NodeType + passedContext context.Context + passedMethod string + passedResult interface{} + passedBatch []client.BatchElem + passedNamespace string + passedPayloadChan chan statediff.Payload + passedSubscribeArgs []interface{} + lengthOfBatch int + returnPOAHeader core.POAHeader + returnPOAHeaders []core.POAHeader + returnPOWHeaders []*types.Header + supportedModules map[string]string +} + +func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + client.passedNamespace = namespace + + passedPayloadChan, ok := payloadChan.(chan statediff.Payload) + if !ok { + return nil, errors.New("passed in channel is not of the correct type") + } + client.passedPayloadChan = passedPayloadChan + + for _, arg := range args { + client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg) + } + + subscription := rpc.ClientSubscription{} + return &subscription, nil +} + +func (client *MockRpcClient) AssertSubscribeCalledWith(namespace string, payloadChan chan statediff.Payload, args []interface{}) { + Expect(client.passedNamespace).To(Equal(namespace)) + Expect(client.passedPayloadChan).To(Equal(payloadChan)) + Expect(client.passedSubscribeArgs).To(Equal(args)) } func NewMockRpcClient() *MockRpcClient { diff --git a/pkg/geth/client/rpc_client.go b/pkg/geth/client/rpc_client.go index b80a746d..d6f6a738 100644 --- a/pkg/geth/client/rpc_client.go +++ b/pkg/geth/client/rpc_client.go @@ -18,6 +18,9 @@ package client import ( "context" + "errors" + "reflect" + "github.com/ethereum/go-ethereum/rpc" ) @@ -73,3 +76,16 @@ func (client RpcClient) BatchCall(batch []BatchElem) error { } return client.client.BatchCall(rpcBatch) } + +// Subscribe subscribes to an rpc "namespace_subscribe" subscription with the given channel +// The first argument needs to be the method we wish to invoke +func (client RpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + chanVal := reflect.ValueOf(payloadChan) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + return nil, errors.New("second argument to Subscribe must be a writable channel") + } + if chanVal.IsNil() { + return nil, errors.New("channel given to Subscribe must not be nil") + } + return client.client.Subscribe(context.Background(), namespace, payloadChan, args...) +}