From b1bb646ad5199760d16a433a6bccdc0bb199809c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 6 Jun 2019 12:55:59 -0500 Subject: [PATCH] goimports + streamSubscribe command for raw access to the seed node data --- cmd/root.go | 1 + cmd/streamSubscribe.go | 214 ++++++++++++++++++ cmd/syncAndPublish.go | 2 +- cmd/syncPublishScreenAndServe.go | 15 +- cmd/test2.go | 144 ------------ libraries/shared/streamer/streamer.go | 5 +- pkg/config/subscription.go | 43 ++++ pkg/config/subscription_test.go | 1 + pkg/ipfs/api.go | 4 +- pkg/ipfs/converter_test.go | 12 +- .../{test_helpers => helpers}/mocks/api.go | 0 .../mocks/converter.go | 0 .../mocks/publisher.go | 0 pkg/ipfs/helpers/mocks/repository.go | 31 +++ .../mocks/screener.go | 0 .../mocks/streamer.go | 0 .../{test_helpers => helpers}/test_data.go | 2 +- pkg/ipfs/publisher_test.go | 12 +- pkg/ipfs/repository.go | 5 + pkg/ipfs/repository_test.go | 8 +- pkg/ipfs/retreiver.go | 15 +- pkg/ipfs/screener.go | 16 +- pkg/ipfs/service.go | 6 +- pkg/ipfs/service_test.go | 16 +- pkg/ipfs/streamer_test.go | 8 +- pkg/ipfs/types.go | 36 +-- 26 files changed, 362 insertions(+), 234 deletions(-) create mode 100644 cmd/streamSubscribe.go delete mode 100644 cmd/test2.go create mode 100644 pkg/config/subscription.go create mode 100644 pkg/config/subscription_test.go rename pkg/ipfs/{test_helpers => helpers}/mocks/api.go (100%) rename pkg/ipfs/{test_helpers => helpers}/mocks/converter.go (100%) rename pkg/ipfs/{test_helpers => helpers}/mocks/publisher.go (100%) create mode 100644 pkg/ipfs/helpers/mocks/repository.go rename pkg/ipfs/{test_helpers => helpers}/mocks/screener.go (100%) rename pkg/ipfs/{test_helpers => helpers}/mocks/streamer.go (100%) rename pkg/ipfs/{test_helpers => helpers}/test_data.go (99%) diff --git a/cmd/root.go b/cmd/root.go index 6b45ea52..1b647625 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -40,6 +40,7 @@ var ( cfgFile string databaseConfig config.Database genConfig config.Plugin + subConfig config.Subscription ipc string levelDbPath string queueRecheckInterval time.Duration diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go new file mode 100644 index 00000000..fe895d49 --- /dev/null +++ b/cmd/streamSubscribe.go @@ -0,0 +1,214 @@ +// Copyright © 2019 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" +) + +// streamSubscribeCmd represents the streamSubscribe command +var streamSubscribeCmd = &cobra.Command{ + Use: "streamSubscribe", + Short: "This command is used to subscribe to the seed node stream with the provided filters", + Long: ``, + Run: func(cmd *cobra.Command, args []string) { + streamSubscribe() + }, +} + +func init() { + rootCmd.AddCommand(streamSubscribeCmd) +} + +func streamSubscribe() { + // Prep the subscription config/filters to be sent to the server + subscriptionConfig() + + // Create a new rpc client and a subscription streamer with that client + rpcClient := getRpcClient() + str := streamer.NewSeedStreamer(rpcClient) + + // Buffered channel for reading subscription payloads + payloadChan := make(chan ipfs.ResponsePayload, 8000) + + // Subscribe to the seed node service with the given config/filter parameters + sub, err := str.Stream(payloadChan, subConfig) + if err != nil { + println(err.Error()) + log.Fatal(err) + } + + // Receive response payloads and print out the results + for { + select { + case payload := <-payloadChan: + if payload.Err != nil { + log.Error(payload.Err) + } + for _, headerRlp := range payload.HeadersRlp { + var header types.Header + err = rlp.Decode(bytes.NewBuffer(headerRlp), &header) + if err != nil { + println(err.Error()) + log.Error(err) + } + println("Header") + println(header.Hash().Hex()) + println(header.Number.Int64()) + } + for _, trxRlp := range payload.TransactionsRlp { + var trx types.Transaction + buff := bytes.NewBuffer(trxRlp) + stream := rlp.NewStream(buff, 0) + err := trx.DecodeRLP(stream) + if err != nil { + println(err.Error()) + log.Error(err) + } + println("Trx") + println(trx.Hash().Hex()) + } + for _, rctRlp := range payload.ReceiptsRlp { + var rct types.Receipt + buff := bytes.NewBuffer(rctRlp) + stream := rlp.NewStream(buff, 0) + err = rct.DecodeRLP(stream) + if err != nil { + println(err.Error()) + log.Error(err) + } + println("Rct") + for _, l := range rct.Logs { + println("log") + println(l.BlockHash.Hex()) + println(l.TxHash.Hex()) + println(l.Address.Hex()) + } + } + for key, stateRlp := range payload.StateNodesRlp { + var acct state.Account + err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct) + if err != nil { + println(err.Error()) + log.Error(err) + } + println("State") + print("key: ") + println(key.Hex()) + print("root: ") + println(acct.Root.Hex()) + print("balance: ") + println(acct.Balance.Int64()) + } + for stateKey, mappedRlp := range payload.StorageNodesRlp { + println("Storage") + print("state key: ") + println(stateKey.Hex()) + for storageKey, storageRlp := range mappedRlp { + println("Storage") + print("key: ") + println(storageKey.Hex()) + var i []interface{} + err := rlp.DecodeBytes(storageRlp, i) + if err != nil { + println(err.Error()) + log.Error(err) + } + print("bytes: ") + println(storageRlp) + } + } + case err = <-sub.Err(): + println(err.Error()) + log.Fatal(err) + } + } +} + +func subscriptionConfig() { + log.Info("loading subscription config") + vulcPath = viper.GetString("subscription.path") + subConfig = config.Subscription{ + // Below default to false, which means we do not backfill by default + BackFill: viper.GetBool("subscription.backfill"), + BackFillOnly: viper.GetBool("subscription.backfillOnly"), + + // Below default to 0 + // 0 start means we start at the beginning and 0 end means we continue indefinitely + StartingBlock: viper.GetInt64("subscription.startingBlock"), + EndingBlock: viper.GetInt64("subscription.endingBlock"), + + // Below default to false, which means we get all headers by default + HeaderFilter: config.HeaderFilter{ + Off: viper.GetBool("subscription.headerFilter.off"), + FinalOnly: viper.GetBool("subscription.headerFilter.finalOnly"), + }, + + // Below defaults to false and two slices of length 0 + // Which means we get all transactions by default + TrxFilter: config.TrxFilter{ + Off: viper.GetBool("subscription.trxFilter.off"), + Src: viper.GetStringSlice("subscription.trxFilter.src"), + Dst: viper.GetStringSlice("subscription.trxFilter.dst"), + }, + + // Below defaults to false and one slice of length 0 + // Which means we get all receipts by default + ReceiptFilter: config.ReceiptFilter{ + Off: viper.GetBool("subscription.receiptFilter.off"), + Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"), + }, + + // Below defaults to two false, and a slice of length 0 + // Which means we get all state leafs by default, but no intermediate nodes + StateFilter: config.StateFilter{ + Off: viper.GetBool("subscription.stateFilter.off"), + IntermediateNodes: viper.GetBool("subscription.stateFilter.intermediateNodes"), + Addresses: viper.GetStringSlice("subscription.stateFilter.addresses"), + }, + + // Below defaults to two false, and two slices of length 0 + // Which means we get all storage leafs by default, but no intermediate nodes + StorageFilter: config.StorageFilter{ + Off: viper.GetBool("subscription.storageFilter.off"), + IntermediateNodes: viper.GetBool("subscription.storageFilter.intermediateNodes"), + Addresses: viper.GetStringSlice("subscription.storageFilter.addresses"), + StorageKeys: viper.GetStringSlice("subscription.storageFilter.storageKeys"), + }, + } +} + +func getRpcClient() core.RpcClient { + rawRpcClient, err := rpc.Dial(vulcPath) + if err != nil { + log.Fatal(err) + } + return client.NewRpcClient(rawRpcClient, vulcPath) +} diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index b1dd454a..3b1d1f15 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -39,7 +39,7 @@ var syncAndPublishCmd = &cobra.Command{ Short: "Syncs all Ethereum data into IPFS, indexing the CIDs", Long: `This command works alongside a modified geth node which streams all block and state (diff) data over a websocket subscription. This process -then converts the eth objects to IPLDs and publishes them IPFS. Additionally, +then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, it maintains a local index of the IPLD objects' CIDs in Postgres.`, Run: func(cmd *cobra.Command, args []string) { syncAndPublish() diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 0b0d4b8a..14c53bdf 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -29,13 +29,12 @@ import ( // syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command var syncPublishScreenAndServeCmd = &cobra.Command{ Use: "syncPublishScreenAndServe", - Short: "A brief description of your command", - Long: `A longer description that spans multiple lines and likely contains examples -and usage of using your command. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, + Short: "Syncs all Ethereum data into IPFS, indexing the CIDs, and uses this to serve data requests to requesting clients", + Long: `This command works alongside a modified geth node which streams +all block and state (diff) data over a websocket subscription. This process +then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, +it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which +relays relevant data to requesting clients.`, Run: func(cmd *cobra.Command, args []string) { syncPublishScreenAndServe() }, @@ -44,7 +43,7 @@ to quickly create a Cobra application.`, func init() { rootCmd.AddCommand(syncPublishScreenAndServeCmd) syncPublishScreenAndServeCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") - syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "ipc-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server") + syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "sub-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server") } func syncPublishScreenAndServe() { diff --git a/cmd/test2.go b/cmd/test2.go deleted file mode 100644 index 18e98869..00000000 --- a/cmd/test2.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/geth/client" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" -) - -// test2Cmd represents the test2 command -var test2Cmd = &cobra.Command{ - Use: "test2", - Short: "A brief description of your command", - Long: `A longer description that spans multiple lines and likely contains examples -and usage of using your command. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, - Run: func(cmd *cobra.Command, args []string) { - test2() - }, -} - -func test2() { - rpcClient := getRpcClient() - str := streamer.NewSeedStreamer(rpcClient) - payloadChan := make(chan ipfs.ResponsePayload, 8000) - streamFilters := ipfs.StreamFilters{} - streamFilters.HeaderFilter.FinalOnly = true - streamFilters.ReceiptFilter.Topic0s = []string{ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377", - } - streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - streamFilters.StorageFilter.Off = true - //streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - //streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - sub, err := str.Stream(payloadChan, streamFilters) - if err != nil { - println(err.Error()) - log.Fatal(err) - } - for { - select { - case payload := <-payloadChan: - if payload.Err != nil { - log.Error(payload.Err) - } - for _, headerRlp := range payload.HeadersRlp { - var header types.Header - err = rlp.Decode(bytes.NewBuffer(headerRlp), &header) - if err != nil { - println(err.Error()) - log.Error(err) - } - println("header") - println(header.Hash().Hex()) - println(header.Number.Int64()) - } - for _, trxRlp := range payload.TransactionsRlp { - var trx types.Transaction - buff := bytes.NewBuffer(trxRlp) - stream := rlp.NewStream(buff, 0) - err := trx.DecodeRLP(stream) - if err != nil { - println(err.Error()) - log.Error(err) - } - println("trx") - println(trx.Hash().Hex()) - } - for _, rctRlp := range payload.ReceiptsRlp { - var rct types.Receipt - buff := bytes.NewBuffer(rctRlp) - stream := rlp.NewStream(buff, 0) - err = rct.DecodeRLP(stream) - if err != nil { - println(err.Error()) - log.Error(err) - } - println("rct") - for _, l := range rct.Logs { - println("log") - println(l.BlockHash.Hex()) - println(l.TxHash.Hex()) - println(l.Address.Hex()) - } - } - for _, stateRlp := range payload.StateNodesRlp { - var acct state.Account - err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct) - if err != nil { - println(err.Error()) - log.Error(err) - } - println("state") - println(acct.Root.Hex()) - println(acct.Balance.Int64()) - } - case err = <-sub.Err(): - println(err.Error()) - log.Fatal(err) - } - } -} - -func init() { - rootCmd.AddCommand(test2Cmd) - test2Cmd.Flags().StringVarP(&vulcPath, "ipc-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server") -} - -func getRpcClient() core.RpcClient { - println(vulcPath) - rawRpcClient, err := rpc.Dial(vulcPath) - if err != nil { - log.Fatal(err) - } - return client.NewRpcClient(rawRpcClient, vulcPath) -} diff --git a/libraries/shared/streamer/streamer.go b/libraries/shared/streamer/streamer.go index f296e469..5d74a449 100644 --- a/libraries/shared/streamer/streamer.go +++ b/libraries/shared/streamer/streamer.go @@ -19,6 +19,7 @@ package streamer import ( "github.com/ethereum/go-ethereum/rpc" + "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/ipfs" @@ -26,7 +27,7 @@ import ( // IStreamer is the interface for streaming data from a vulcanizeDB seed node type IStreamer interface { - Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) + Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) } // Streamer is the underlying struct for the IStreamer interface @@ -42,6 +43,6 @@ func NewSeedStreamer(client core.RpcClient) *Streamer { } // Stream is the main loop for subscribing to data from a vulcanizedb seed node -func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) { +func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) } diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go new file mode 100644 index 00000000..50949c9b --- /dev/null +++ b/pkg/config/subscription.go @@ -0,0 +1,43 @@ +package config + +// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node +type Subscription struct { + BackFill bool + BackFillOnly bool + StartingBlock int64 + EndingBlock int64 // set to 0 or a negative value to have no ending block + HeaderFilter HeaderFilter + TrxFilter TrxFilter + ReceiptFilter ReceiptFilter + StateFilter StateFilter + StorageFilter StorageFilter +} + +type HeaderFilter struct { + Off bool + FinalOnly bool +} + +type TrxFilter struct { + Off bool + Src []string + Dst []string +} + +type ReceiptFilter struct { + Off bool + Topic0s []string +} + +type StateFilter struct { + Off bool + Addresses []string // is converted to state key by taking its keccak256 hash + IntermediateNodes bool +} + +type StorageFilter struct { + Off bool + Addresses []string + StorageKeys []string + IntermediateNodes bool +} \ No newline at end of file diff --git a/pkg/config/subscription_test.go b/pkg/config/subscription_test.go new file mode 100644 index 00000000..d912156b --- /dev/null +++ b/pkg/config/subscription_test.go @@ -0,0 +1 @@ +package config diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go index cd42252c..150ece3f 100644 --- a/pkg/ipfs/api.go +++ b/pkg/ipfs/api.go @@ -19,6 +19,8 @@ package ipfs import ( "context" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -42,7 +44,7 @@ func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI { } // Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created -func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters StreamFilters) (*rpc.Subscription, error) { +func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go index 859d215d..88b8c403 100644 --- a/pkg/ipfs/converter_test.go +++ b/pkg/ipfs/converter_test.go @@ -20,19 +20,19 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks" ) var _ = Describe("Converter", func() { Describe("Convert", func() { It("Converts StatediffPayloads into IPLDPayloads", func() { mockConverter := mocks.PayloadConverter{} - mockConverter.ReturnIPLDPayload = &test_helpers.MockIPLDPayload - ipldPayload, err := mockConverter.Convert(test_helpers.MockStatediffPayload) + mockConverter.ReturnIPLDPayload = &helpers.MockIPLDPayload + ipldPayload, err := mockConverter.Convert(helpers.MockStatediffPayload) Expect(err).ToNot(HaveOccurred()) - Expect(ipldPayload).To(Equal(&test_helpers.MockIPLDPayload)) - Expect(mockConverter.PassedStatediffPayload).To(Equal(test_helpers.MockStatediffPayload)) + Expect(ipldPayload).To(Equal(&helpers.MockIPLDPayload)) + Expect(mockConverter.PassedStatediffPayload).To(Equal(helpers.MockStatediffPayload)) }) }) }) diff --git a/pkg/ipfs/test_helpers/mocks/api.go b/pkg/ipfs/helpers/mocks/api.go similarity index 100% rename from pkg/ipfs/test_helpers/mocks/api.go rename to pkg/ipfs/helpers/mocks/api.go diff --git a/pkg/ipfs/test_helpers/mocks/converter.go b/pkg/ipfs/helpers/mocks/converter.go similarity index 100% rename from pkg/ipfs/test_helpers/mocks/converter.go rename to pkg/ipfs/helpers/mocks/converter.go diff --git a/pkg/ipfs/test_helpers/mocks/publisher.go b/pkg/ipfs/helpers/mocks/publisher.go similarity index 100% rename from pkg/ipfs/test_helpers/mocks/publisher.go rename to pkg/ipfs/helpers/mocks/publisher.go diff --git a/pkg/ipfs/helpers/mocks/repository.go b/pkg/ipfs/helpers/mocks/repository.go new file mode 100644 index 00000000..dc06ccc9 --- /dev/null +++ b/pkg/ipfs/helpers/mocks/repository.go @@ -0,0 +1,31 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import "github.com/vulcanize/vulcanizedb/pkg/ipfs" + +// CIDRepository is the underlying struct for the Repository interface +type CIDRepository struct { + PassedCIDPayload *ipfs.CIDPayload + ReturnErr error +} + +// Index indexes a cidPayload in Postgres +func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error { + repo.PassedCIDPayload = cidPayload + return repo.ReturnErr +} diff --git a/pkg/ipfs/test_helpers/mocks/screener.go b/pkg/ipfs/helpers/mocks/screener.go similarity index 100% rename from pkg/ipfs/test_helpers/mocks/screener.go rename to pkg/ipfs/helpers/mocks/screener.go diff --git a/pkg/ipfs/test_helpers/mocks/streamer.go b/pkg/ipfs/helpers/mocks/streamer.go similarity index 100% rename from pkg/ipfs/test_helpers/mocks/streamer.go rename to pkg/ipfs/helpers/mocks/streamer.go diff --git a/pkg/ipfs/test_helpers/test_data.go b/pkg/ipfs/helpers/test_data.go similarity index 99% rename from pkg/ipfs/test_helpers/test_data.go rename to pkg/ipfs/helpers/test_data.go index 2401a970..33daefd0 100644 --- a/pkg/ipfs/test_helpers/test_data.go +++ b/pkg/ipfs/helpers/test_data.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package test_helpers +package helpers import ( "errors" diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go index 744f886e..2e14a814 100644 --- a/pkg/ipfs/publisher_test.go +++ b/pkg/ipfs/publisher_test.go @@ -20,19 +20,19 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks" ) var _ = Describe("Publisher", func() { Describe("Publish", func() { It("Publishes IPLDPayload to IPFS", func() { mockPublisher := mocks.IPLDPublisher{} - mockPublisher.ReturnCIDPayload = &test_helpers.MockCIDPayload - cidPayload, err := mockPublisher.Publish(&test_helpers.MockIPLDPayload) + mockPublisher.ReturnCIDPayload = &helpers.MockCIDPayload + cidPayload, err := mockPublisher.Publish(&helpers.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - Expect(cidPayload).To(Equal(&test_helpers.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(&test_helpers.MockIPLDPayload)) + Expect(cidPayload).To(Equal(&helpers.MockCIDPayload)) + Expect(mockPublisher.PassedIPLDPayload).To(Equal(&helpers.MockIPLDPayload)) }) }) }) diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 474e2921..a433eb71 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -17,6 +17,7 @@ package ipfs import ( + "github.com/i-norden/go-ethereum/core" "github.com/jmoiron/sqlx" "github.com/lib/pq" @@ -144,3 +145,7 @@ func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) return err } + +type RepositoryError struct { + core.Message +} diff --git a/pkg/ipfs/repository_test.go b/pkg/ipfs/repository_test.go index bc5829a1..0f127224 100644 --- a/pkg/ipfs/repository_test.go +++ b/pkg/ipfs/repository_test.go @@ -20,17 +20,17 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks" ) var _ = Describe("Repository", func() { Describe("Index", func() { It("Indexes CIDs against their metadata", func() { mockRepo := mocks.CIDRepository{} - err := mockRepo.Index(&test_helpers.MockCIDPayload) + err := mockRepo.Index(&helpers.MockCIDPayload) Expect(err).ToNot(HaveOccurred()) - Expect(mockRepo.PassedCIDPayload).To(Equal(&test_helpers.MockCIDPayload)) + Expect(mockRepo.PassedCIDPayload).To(Equal(&helpers.MockCIDPayload)) }) }) }) diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index e01cd221..cb113b0f 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -19,12 +19,13 @@ package ipfs import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) // CIDRetriever is the interface for retrieving CIDs from the Postgres cache type CIDRetriever interface { - RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) + RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error) } // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface @@ -47,7 +48,7 @@ func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) { } // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) { +func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error) { var endingBlock int64 var err error if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock { @@ -112,7 +113,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrap return cids, err } -func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error { var pgStr string if streamFilters.HeaderFilter.FinalOnly { pgStr = `SELECT cid FROM header_cids @@ -125,7 +126,7 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters Stream return tx.Select(cids.Headers, pgStr, blockNumber) } -func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) ([]int64, error) { +func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) ([]int64, error) { args := make([]interface{}, 0, 3) type result struct { ID int64 `db:"id"` @@ -155,7 +156,7 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFil return ids, nil } -func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64, trxIds []int64) error { +func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64, trxIds []int64) error { args := make([]interface{}, 0, 2) pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids WHERE receipt_cids.tx_id = transaction_cids.id @@ -175,7 +176,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFil return tx.Select(cids.Receipts, pgStr, args...) } -func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error { args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` @@ -192,7 +193,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamF return tx.Select(cids.StateNodes, pgStr, args...) } -func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error { args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids WHERE storage_cids.state_id = state_cids.id diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go index 1dfe5ab1..5468870c 100644 --- a/pkg/ipfs/screener.go +++ b/pkg/ipfs/screener.go @@ -19,6 +19,8 @@ package ipfs import ( "bytes" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/common" @@ -27,7 +29,7 @@ import ( // ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload type ResponseScreener interface { - ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) + ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) } // Screener is the underlying struct for the ReponseScreener interface @@ -39,7 +41,7 @@ func NewResponseScreener() *Screener { } // ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload -func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) { +func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) { response := new(ResponsePayload) err := s.filterHeaders(streamFilters, response, payload) if err != nil { @@ -64,7 +66,7 @@ func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayl return response, nil } -func (s *Screener) filterHeaders(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { +func (s *Screener) filterHeaders(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) if !streamFilters.HeaderFilter.FinalOnly { @@ -87,7 +89,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *Screener) filterTransactions(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { +func (s *Screener) filterTransactions(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { for i, trx := range payload.BlockBody.Transactions { @@ -123,7 +125,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *Screener) filerReceipts(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { +func (s *Screener) filerReceipts(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { @@ -159,7 +161,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, want return false } -func (s *Screener) filterState(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { +func (s *Screener) filterState(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { response.StateNodesRlp = make(map[common.Hash][]byte) if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) @@ -191,7 +193,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *Screener) filterStorage(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { +func (s *Screener) filterStorage(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) for _, addr := range streamFilters.StorageFilter.Addresses { diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index aa325562..062a2175 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -20,6 +20,8 @@ import ( "fmt" "sync" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" @@ -42,7 +44,7 @@ type SyncPublishScreenAndServe interface { // Main event loop for handling client pub-sub ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) + Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error } @@ -202,7 +204,7 @@ func (sap *Service) processResponse(payload IPLDPayload) error { } // Subscribe is used by the API to subscribe to the service loop -func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) { +func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) { log.Info("Subscribing to the statediff service") sap.Lock() sap.Subscriptions[id] = Subscription{ diff --git a/pkg/ipfs/service_test.go b/pkg/ipfs/service_test.go index 40dbce90..d5f95071 100644 --- a/pkg/ipfs/service_test.go +++ b/pkg/ipfs/service_test.go @@ -26,8 +26,8 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks" ) var _ = Describe("Service", func() { @@ -41,18 +41,18 @@ var _ = Describe("Service", func() { ReturnErr: nil, } mockPublisher := &mocks.IPLDPublisher{ - ReturnCIDPayload: &test_helpers.MockCIDPayload, + ReturnCIDPayload: &helpers.MockCIDPayload, ReturnErr: nil, } mockStreamer := &mocks.StateDiffStreamer{ ReturnSub: &rpc.ClientSubscription{}, StreamPayloads: []statediff.Payload{ - test_helpers.MockStatediffPayload, + helpers.MockStatediffPayload, }, ReturnErr: nil, } mockConverter := &mocks.PayloadConverter{ - ReturnIPLDPayload: &test_helpers.MockIPLDPayload, + ReturnIPLDPayload: &helpers.MockIPLDPayload, ReturnErr: nil, } processor := &ipfs.Service{ @@ -68,9 +68,9 @@ var _ = Describe("Service", func() { time.Sleep(2 * time.Second) quitChan <- true wg.Wait() - Expect(mockConverter.PassedStatediffPayload).To(Equal(test_helpers.MockStatediffPayload)) - Expect(mockCidRepo.PassedCIDPayload).To(Equal(&test_helpers.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(&test_helpers.MockIPLDPayload)) + Expect(mockConverter.PassedStatediffPayload).To(Equal(helpers.MockStatediffPayload)) + Expect(mockCidRepo.PassedCIDPayload).To(Equal(&helpers.MockCIDPayload)) + Expect(mockPublisher.PassedIPLDPayload).To(Equal(&helpers.MockIPLDPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) }) }) diff --git a/pkg/ipfs/streamer_test.go b/pkg/ipfs/streamer_test.go index 1378fa3e..acbc506e 100644 --- a/pkg/ipfs/streamer_test.go +++ b/pkg/ipfs/streamer_test.go @@ -22,8 +22,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers" - "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks" ) var _ = Describe("Streamer", func() { @@ -32,7 +32,7 @@ var _ = Describe("Streamer", func() { mockStreamer := mocks.StateDiffStreamer{} mockStreamer.ReturnSub = &rpc.ClientSubscription{} mockStreamer.StreamPayloads = []statediff.Payload{ - test_helpers.MockStatediffPayload, + helpers.MockStatediffPayload, } payloadChan := make(chan statediff.Payload, 1) sub, err := mockStreamer.Stream(payloadChan) @@ -40,7 +40,7 @@ var _ = Describe("Streamer", func() { Expect(sub).To(Equal(&rpc.ClientSubscription{})) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) streamedPayload := <-payloadChan - Expect(streamedPayload).To(Equal(test_helpers.MockStatediffPayload)) + Expect(streamedPayload).To(Equal(helpers.MockStatediffPayload)) }) }) }) diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 904c5efc..bdf23113 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -20,6 +20,8 @@ import ( "encoding/json" "math/big" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/ipfs/go-block-format" "github.com/ethereum/go-ethereum/common" @@ -30,7 +32,7 @@ import ( type Subscription struct { PayloadChan chan<- ResponsePayload QuitChan chan<- bool - StreamFilters *StreamFilters + StreamFilters *config.Subscription } // ResponsePayload holds the data returned from the seed node to the requesting client @@ -149,35 +151,3 @@ type TrxMetaData struct { Src string Dst string } - -// StreamFilters are defined by the client to specifiy which data to receive from the seed node -type StreamFilters struct { - BackFill bool - BackFillOnly bool - StartingBlock int64 - EndingBlock int64 // set to 0 or a negative value to have no ending block - HeaderFilter struct { - Off bool - FinalOnly bool - } - TrxFilter struct { - Off bool - Src []string - Dst []string - } - ReceiptFilter struct { - Off bool - Topic0s []string - } - StateFilter struct { - Off bool - Addresses []string // is converted to state key by taking its keccak256 hash - IntermediateNodes bool - } - StorageFilter struct { - Off bool - Addresses []string - StorageKeys []string - IntermediateNodes bool - } -}