From 0d28234804b223bfed4755ecc077e1777e55e6b4 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 31 Aug 2020 10:59:15 -0500 Subject: [PATCH] continue refactor/purge --- cmd/{streamEthSubscribe.go => subscribe.go} | 16 +- documentation/apis.md | 80 +++++----- documentation/ipfs.md | 53 ------ environments/example.toml | 14 ++ environments/superNodeETH.toml | 49 ------ environments/superNodeSubscription.toml | 30 ---- go.mod | 2 +- go.sum | 1 + pkg/builders/builders.go | 168 -------------------- pkg/eth/cid_retriever.go | 52 ------ pkg/eth/mocks/batch_client.go | 86 ---------- pkg/eth/mocks/converter.go | 66 -------- pkg/eth/mocks/indexer.go | 41 ----- pkg/eth/mocks/publisher.go | 61 ------- pkg/eth/mocks/stream_client.go | 44 ----- pkg/serve/config.go | 10 +- pkg/serve/service.go | 9 -- pkg/shared/mocks/payload_fetcher.go | 50 ------ pkg/shared/mocks/retriever.go | 64 -------- pkg/shared/mocks/streamer.go | 43 ----- test_config/test_config.go | 6 +- utils/utilities_suite_test.go | 36 ----- utils/utils.go | 89 ----------- utils/utils_test.go | 74 --------- 24 files changed, 69 insertions(+), 1075 deletions(-) rename cmd/{streamEthSubscribe.go => subscribe.go} (95%) delete mode 100644 documentation/ipfs.md create mode 100644 environments/example.toml delete mode 100644 environments/superNodeETH.toml delete mode 100644 environments/superNodeSubscription.toml delete mode 100644 pkg/builders/builders.go delete mode 100644 pkg/eth/mocks/batch_client.go delete mode 100644 pkg/eth/mocks/converter.go delete mode 100644 pkg/eth/mocks/indexer.go delete mode 100644 pkg/eth/mocks/publisher.go delete mode 100644 pkg/eth/mocks/stream_client.go delete mode 100644 pkg/shared/mocks/payload_fetcher.go delete mode 100644 pkg/shared/mocks/retriever.go delete mode 100644 pkg/shared/mocks/streamer.go delete mode 100644 utils/utilities_suite_test.go delete mode 100644 utils/utils.go delete mode 100644 utils/utils_test.go diff --git a/cmd/streamEthSubscribe.go b/cmd/subscribe.go similarity index 95% rename from cmd/streamEthSubscribe.go rename to cmd/subscribe.go index 4fc692a3..abe5e93d 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/subscribe.go @@ -28,29 +28,31 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/client" "github.com/vulcanize/ipld-eth-server/pkg/eth" w "github.com/vulcanize/ipld-eth-server/pkg/serve" ) -// streamEthSubscriptionCmd represents the streamEthSubscription command -var streamEthSubscriptionCmd = &cobra.Command{ - Use: "streamEthSubscription", +// subscribeCmd represents the subscribe command +var subscribeCmd = &cobra.Command{ + Use: "subscribe", Short: "This command is used to subscribe to the eth ipfs watcher data stream with the provided filters", Long: `This command is for demo and testing purposes and is used to subscribe to the watcher with the provided subscription configuration parameters. It does not do anything with the data streamed from the watcher other than unpack it and print it out for demonstration purposes.`, Run: func(cmd *cobra.Command, args []string) { subCommand = cmd.CalledAs() logWithCommand = *log.WithField("SubCommand", subCommand) - streamEthSubscription() + subscribe() }, } func init() { - rootCmd.AddCommand(streamEthSubscriptionCmd) + rootCmd.AddCommand(subscribeCmd) } -func streamEthSubscription() { +func subscribe() { // Prep the subscription config/filters to be sent to the server ethSubConfig, err := eth.NewEthSubscriptionConfig() if err != nil { @@ -85,7 +87,7 @@ func streamEthSubscription() { logWithCommand.Error(payload.Err) continue } - var ethData eth.IPLDs + var ethData eth2.IPLDs if err := rlp.DecodeBytes(payload.Data, ðData); err != nil { logWithCommand.Error(err) continue diff --git a/documentation/apis.md b/documentation/apis.md index 6b03ac82..09ccdd81 100644 --- a/documentation/apis.md +++ b/documentation/apis.md @@ -1,5 +1,5 @@ -## ipfs-blockchain-watcher APIs -We can expose a number of different APIs for remote access to ipfs-blockchain-watcher data +## ipld-eth-server APIs +We can expose a number of different APIs for remote access to ipld-eth-server data ### Table of Contents @@ -9,7 +9,7 @@ We can expose a number of different APIs for remote access to ipfs-blockchain-wa ### Postgraphile -ipfs-blockchain-watcher stores all processed data in Postgres using PG-IPFS, this includes all of the IPLD objects. +ipld-eth-server stores all processed data in Postgres using PG-IPFS, this includes all of the IPLD objects. [Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables. e.g. @@ -22,15 +22,15 @@ All of their data can then be queried with standard [GraphQL](https://graphql.or ### RPC Subscription Interface -A direct, real-time subscription to the data being processed by ipfs-blockchain-watcher can be established over WS or IPC through the [Stream](../pkg/watch/api.go#L53) RPC method. +A direct, real-time subscription to the data being processed by ipld-eth-server can be established over WS or IPC through the [Stream](../pkg/serve/api.go#L53) RPC method. This method is not chain-specific and each chain-type supports it, it is accessed under the "vdb" namespace rather than a chain-specific namespace. An interface for subscribing to this endpoint is provided [here](../pkg/client/client.go). When subscribing to this endpoint, the subscriber provides a set of RLP-encoded subscription parameters. These parameters will be chain-specific, and are used -by ipfs-blockchain-watcher to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../pkg/btc/subscription_config.go), [ETH](../../pkg/eth/subscription_config.go)). +by ipld-eth-server to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../pkg/btc/subscription_config.go), [ETH](../../pkg/eth/subscription_config.go)). #### Ethereum RPC Subscription -An example of how to subscribe to a real-time Ethereum data feed from ipfs-blockchain-watcher using the `Stream` RPC method is provided below +An example of how to subscribe to a real-time Ethereum data feed from ipld-eth-server using the `Stream` RPC method is provided below ```go package main @@ -40,9 +40,9 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block "github.com/ethereum/go-ethereum/rpc" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + "github.com/vulcanize/ipld-eth-server/pkg/client" + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/watch" ) config, _ := eth.NewEthSubscriptionConfig() @@ -101,10 +101,10 @@ These configuration parameters are broken down as follows: `ethSubscription.wsPath` is used to define the watcher ws url OR ipc endpoint to subscribe to -`ethSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and +`ethSubscription.historicalData` specifies whether or not ipld-eth-server should look up historical data in its cache and send that to the subscriber, if this is set to `false` then only newly synced/incoming data is streamed -`ethSubscription.historicalDataOnly` will tell ipfs-blockchain-watcher to only send historical data with the specified range and +`ethSubscription.historicalDataOnly` will tell ipld-eth-server to only send historical data with the specified range and not stream forward syncing data `ethSubscription.startingBlock` is the starting block number for the range to receive data in @@ -114,43 +114,43 @@ setting to 0 means the process will continue streaming indefinitely. `ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any headers to the subscriber -- setting `uncles` to true tells ipfs-blockchain-watcher to send uncles in addition to normal headers. +- Setting `off` to true tells ipld-eth-server to not send any headers to the subscriber +- setting `uncles` to true tells ipld-eth-server to send uncles in addition to normal headers. `ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any transactions to the subscriber +- Setting `off` to true tells ipld-eth-server to not send any transactions to the subscriber - `src` and `dst` are string arrays which can be filled with ETH addresses to filter transactions for, -if they have any addresses then ipfs-blockchain-watcher will only send transactions that were sent or received by the addresses contained +if they have any addresses then ipld-eth-server will only send transactions that were sent or received by the addresses contained in `src` and `dst`, respectively. `ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any receipts to the subscriber +- Setting `off` to true tells ipld-eth-server to not send any receipts to the subscriber - `topic0s` is a string array which can be filled with event topics to filter for, -if it has any topics then ipfs-blockchain-watcher will only send receipts that contain logs which have that topic0. +if it has any topics then ipld-eth-server will only send receipts that contain logs which have that topic0. - `contracts` is a string array which can be filled with contract addresses to filter for, if it contains any contract addresses the watcher will only send receipts that correspond to one of those contracts. - `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the watcher, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. `ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any state data to the subscriber +- Setting `off` to true tells ipld-eth-server to not send any state data to the subscriber - `addresses` is a string array which can be filled with ETH addresses to filter state for, -if it has any addresses then ipfs-blockchain-watcher will only send state leafs (accounts) corresponding to those account addresses. -- By default ipfs-blockchain-watcher only sends along state leafs, to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. +if it has any addresses then ipld-eth-server will only send state leafs (accounts) corresponding to those account addresses. +- By default ipld-eth-server only sends along state leafs, to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. `ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any storage data to the subscriber +- Setting `off` to true tells ipld-eth-server to not send any storage data to the subscriber - `addresses` is a string array which can be filled with ETH addresses to filter storage for, -if it has any addresses then ipfs-blockchain-watcher will only send storage nodes from the storage tries at those state addresses. +if it has any addresses then ipld-eth-server will only send storage nodes from the storage tries at those state addresses. - `storageKeys` is another string array that can be filled with storage keys to filter storage data for. It is important to note that the storage keys need to be the actual keccak256 hashes, whereas the addresses in the `addresses` fields are pre-hashed ETH addresses. -- By default ipfs-blockchain-watcher only sends along storage leafs, to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. +- By default ipld-eth-server only sends along storage leafs, to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. ### Bitcoin RPC Subscription: -An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockchain-watcher using the `Stream` RPC method is provided below +An example of how to subscribe to a real-time Bitcoin data feed from ipld-eth-server using the `Stream` RPC method is provided below ```go package main @@ -160,9 +160,9 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc "github.com/ethereum/go-ethereum/rpc" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + "github.com/vulcanize/ipld-eth-server/pkg/btc" + "github.com/vulcanize/ipld-eth-server/pkg/client" + "github.com/vulcanize/ipld-eth-server/pkg/watch" ) config, _ := btc.NewBtcSubscriptionConfig() @@ -206,12 +206,12 @@ The .toml file being used to fill the Bitcoin subscription config would look som These configuration parameters are broken down as follows: -`btcSubscription.wsPath` is used to define the ipfs-blockchain-watcher ws url OR ipc endpoint to subscribe to +`btcSubscription.wsPath` is used to define the ipld-eth-server ws url OR ipc endpoint to subscribe to -`btcSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and -send that to the subscriber, if this is set to `false` then ipfs-blockchain-watcher only streams newly synced/incoming data +`btcSubscription.historicalData` specifies whether or not ipld-eth-server should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then ipld-eth-server only streams newly synced/incoming data -`btcSubscription.historicalDataOnly` will tell ipfs-blockchain-watcher to only send historical data with the specified range and +`btcSubscription.historicalDataOnly` will tell ipld-eth-server to only send historical data with the specified range and not stream forward syncing data `btcSubscription.startingBlock` is the starting block number for the range to receive data in @@ -221,20 +221,20 @@ setting to 0 means the process will continue streaming indefinitely. `btcSubscription.headerFilter` has one sub-option: `off`. -- Setting `off` to true tells ipfs-blockchain-watcher to +- Setting `off` to true tells ipld-eth-server to not send any headers to the subscriber. - Additional header-filtering options will be added in the future. `btcSubscription.txFilter` has seven sub-options: `off`, `segwit`, `witnessHashes`, `indexes`, `pkScriptClass`, `multiSig`, and `addresses`. -- Setting `off` to true tells ipfs-blockchain-watcher to not send any transactions to the subscriber. -- Setting `segwit` to true tells ipfs-blockchain-watcher to only send segwit transactions. -- `witnessHashes` is a string array that can be filled with witness hash string; if it contains any hashes ipfs-blockchain-watcher will only send transactions that contain one of those hashes. -- `indexes` is an int64 array that can be filled with tx index numbers; if it contains any integers ipfs-blockchain-watcher will only send transactions at those indexes (e.g. `[0]` will send only coinbase transactions) -- `pkScriptClass` is an uint8 array that can be filled with pk script class numbers; if it contains any integers ipfs-blockchain-watcher will only send transactions that have at least one tx output with one of the specified pkscript classes; +- Setting `off` to true tells ipld-eth-server to not send any transactions to the subscriber. +- Setting `segwit` to true tells ipld-eth-server to only send segwit transactions. +- `witnessHashes` is a string array that can be filled with witness hash string; if it contains any hashes ipld-eth-server will only send transactions that contain one of those hashes. +- `indexes` is an int64 array that can be filled with tx index numbers; if it contains any integers ipld-eth-server will only send transactions at those indexes (e.g. `[0]` will send only coinbase transactions) +- `pkScriptClass` is an uint8 array that can be filled with pk script class numbers; if it contains any integers ipld-eth-server will only send transactions that have at least one tx output with one of the specified pkscript classes; possible class types are 0 through 8 as defined [here](https://github.com/btcsuite/btcd/blob/master/txscript/standard.go#L52). -- Setting `multisig` to true tells ipfs-blockchain-watcher to send only multi-sig transactions- to send only transaction that have at least one tx output that requires more than one signature to spend. -- `addresses` is a string array that can be filled with btc address strings; if it contains any addresses ipfs-blockchain-watcher will only send transactions that have at least one tx output with at least one of the provided addresses. +- Setting `multisig` to true tells ipld-eth-server to send only multi-sig transactions- to send only transaction that have at least one tx output that requires more than one signature to spend. +- `addresses` is a string array that can be filled with btc address strings; if it contains any addresses ipld-eth-server will only send transactions that have at least one tx output with at least one of the provided addresses. ### Native API Recapitulation: @@ -242,7 +242,7 @@ In addition to providing novel Postgraphile and RPC-Subscription endpoints, we a standard chain APIs. This will allow direct compatibility with software that already makes use of the standard interfaces. #### Ethereum JSON-RPC API -ipfs-blockchain-watcher currently faithfully recapitulates portions of the Ethereum JSON-RPC api standard. +ipld-eth-server currently faithfully recapitulates portions of the Ethereum JSON-RPC api standard. The currently supported endpoints include: `eth_blockNumber` diff --git a/documentation/ipfs.md b/documentation/ipfs.md deleted file mode 100644 index 3997c1d1..00000000 --- a/documentation/ipfs.md +++ /dev/null @@ -1,53 +0,0 @@ -### PG-IPFS configuration - -This doc walks through the steps to install IPFS and configure it to use Postgres as its backing datastore. - -1. Start by downloading and moving into the IPFS repo: - -`go get github.com/ipfs/go-ipfs` - -`cd $GOPATH/src/github.com/ipfs/go-ipfs` - -2. Add the [Postgres-supporting fork](https://github.com/vulcanize/go-ipfs) and switch over to it: - -`git remote add vulcanize https://github.com/vulcanize/go-ipfs.git` - -`git fetch vulcanize` - -`git checkout -b postgres_update tags/v0.4.22-alpha` - -3. Now install this fork of ipfs, first be sure to remove any previous installation: - -`make install` - -4. Check that is installed properly by running: - -`ipfs` - -You should see the CLI info/help output. - -5. Now we initialize with the `postgresds` profile. -If ipfs was previously initialized we will need to remove the old profile first. -We also need to provide env variables for the postgres connection: - -We can either set these manually, e.g. -```bash -export IPFS_PGHOST= -export IPFS_PGUSER= -export IPFS_PGDATABASE= -export IPFS_PGPORT= -export IPFS_PGPASSWORD= -``` - -And then run the ipfs command: - -`ipfs init --profile=postgresds` - -Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` -which has usage: - -`./ipfs_postgres.sh "` - -and will ask us to enter the password, avoiding storing it to an ENV variable. - -Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes. \ No newline at end of file diff --git a/environments/example.toml b/environments/example.toml new file mode 100644 index 00000000..67d319b2 --- /dev/null +++ b/environments/example.toml @@ -0,0 +1,14 @@ +[database] + name = "vulcanize_public" # $DATABASE_NAME + hostname = "localhost" # $DATABASE_HOSTNAME + port = 5432 # $DATABASE_PORT + user = "postgres" # $DATABASE_USER + password = "" # $DATABASE_PASSWORD + +[log] + level = "info" # $LOGRUS_LEVEL + +[server] + ipcPath = "~/.vulcanize/vulcanize.ipc" # $SERVER_IPC_PATH + wsPath = "127.0.0.1:8081" # $SERVER_WS_PATH + httpPath = "127.0.0.1:8082" # $SERVER_HTTP_PATH \ No newline at end of file diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml deleted file mode 100644 index 78bf9be4..00000000 --- a/environments/superNodeETH.toml +++ /dev/null @@ -1,49 +0,0 @@ -[database] - name = "vulcanize_testing" # $DATABASE_NAME - hostname = "localhost" # $DATABASE_HOSTNAME - port = 5432 # $DATABASE_PORT - user = "postgres" # $DATABASE_USER - password = "" # $DATABASE_PASSWORD - - [database.sync] - maxIdle = 1 - [database.backFill] - maxIdle = 5 - -[log] - level = "debug" # $LOGRUS_LEVEL - -[resync] - chain = "ethereum" # $RESYNC_CHAIN - type = "state" # $RESYNC_TYPE - start = 0 # $RESYNC_START - stop = 0 # $RESYNC_STOP - batchSize = 5 # $RESYNC_BATCH_SIZE - batchNumber = 5 # $RESYNC_BATCH_NUMBER - timeout = 300 # $HTTP_TIMEOUT - clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE - resetValidation = true # $RESYNC_RESET_VALIDATION - -[watcher] - chain = "ethereum" # $SUPERNODE_CHAIN - server = false # $SUPERNODE_SERVER - ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH - wsPath = "127.0.0.1:8081" # $SUPERNODE_WS_PATH - httpPath = "127.0.0.1:8082" # $SUPERNODE_HTTP_PATH - sync = true # $SUPERNODE_SYNC - workers = 1 # $SUPERNODE_WORKERS - backFill = false # $SUPERNODE_BACKFILL - frequency = 15 # $SUPERNODE_FREQUENCY - batchSize = 5 # $SUPERNODE_BATCH_SIZE - batchNumber = 5 # $SUPERNODE_BATCH_NUMBER - timeout = 300 # $HTTP_TIMEOUT - validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL - -[ethereum] - wsPath = "127.0.0.1:8546" # $ETH_WS_PATH - httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH - nodeID = "arch1" # $ETH_NODE_ID - clientName = "Geth" # $ETH_CLIENT_NAME - genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK - networkID = "1" # $ETH_NETWORK_ID - chainID = "1" # $ETH_CHAIN_ID diff --git a/environments/superNodeSubscription.toml b/environments/superNodeSubscription.toml deleted file mode 100644 index 14a82379..00000000 --- a/environments/superNodeSubscription.toml +++ /dev/null @@ -1,30 +0,0 @@ -[watcher] - [watcher.ethSubscription] - historicalData = false - historicalDataOnly = false - startingBlock = 0 - endingBlock = 0 - wsPath = "ws://127.0.0.1:8080" - [watcher.ethSubscription.headerFilter] - off = false - uncles = false - [watcher.ethSubscription.txFilter] - off = false - src = [] - dst = [] - [watcher.ethSubscription.receiptFilter] - off = false - contracts = [] - topic0s = [] - topic1s = [] - topic2s = [] - topic3s = [] - [watcher.ethSubscription.stateFilter] - off = false - addresses = [] - intermediateNodes = false - [watcher.ethSubscription.storageFilter] - off = true - addresses = [] - storageKeys = [] - intermediateNodes = false \ No newline at end of file diff --git a/go.mod b/go.mod index bf3fde28..19a272db 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/vulcanize/ipfs-blockchain-watcher +module github.com/vulcanize/ipld-eth-server go 1.13 diff --git a/go.sum b/go.sum index 4b72d9a9..9982aa34 100644 --- a/go.sum +++ b/go.sum @@ -936,6 +936,7 @@ github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 h1:ebv2bWocCmNKGnpHtRjS github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.5 h1:U+BqhjRLR22e9OEm8cgWC3Eq3bh8G6azjNpXeenfCG4= github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.5/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= +github.com/vulcanize/ipfs-blockchain-watcher v0.0.9 h1:pKL378Wtuhi8HPw3ZqV/3UBgJngUw1Ke4w5GKVM52pY= github.com/vulcanize/pg-ipfs-ethdb v0.0.1-alpha h1:Y7j0Hw1jgVVOg+eUGUr7OgH+gOBID0DwbsfZV1KoL7I= github.com/vulcanize/pg-ipfs-ethdb v0.0.1-alpha/go.mod h1:OuqE4r2LGWAtDVx3s1yaAzDcwy+LEAqrWaE1L8UfrGY= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= diff --git a/pkg/builders/builders.go b/pkg/builders/builders.go deleted file mode 100644 index f40c526e..00000000 --- a/pkg/builders/builders.go +++ /dev/null @@ -1,168 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package builders - -import ( - "fmt" - "time" - - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/rpcclient" - "github.com/ethereum/go-ethereum/rpc" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/btc" - "github.com/vulcanize/ipld-eth-server/pkg/eth" - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -// NewResponseFilterer constructs a ResponseFilterer for the provided chain type -func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error) { - switch chain { - case shared.Ethereum: - return eth.NewResponseFilterer(), nil - case shared.Bitcoin: - return btc.NewResponseFilterer(), nil - default: - return nil, fmt.Errorf("invalid chain %s for filterer constructor", chain.String()) - } -} - -// NewCIDRetriever constructs a CIDRetriever for the provided chain type -func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error) { - switch chain { - case shared.Ethereum: - return eth.NewCIDRetriever(db), nil - case shared.Bitcoin: - return btc.NewCIDRetriever(db), nil - default: - return nil, fmt.Errorf("invalid chain %s for retriever constructor", chain.String()) - } -} - -// NewPayloadStreamer constructs a PayloadStreamer for the provided chain type -func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { - switch chain { - case shared.Ethereum: - ethClient, ok := clientOrConfig.(*rpc.Client) - if !ok { - return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", &rpc.Client{}, clientOrConfig) - } - streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) - return eth.NewPayloadStreamer(ethClient), streamChan, nil - case shared.Bitcoin: - btcClientConn, ok := clientOrConfig.(*rpcclient.ConnConfig) - if !ok { - return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, clientOrConfig) - } - streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize) - return btc.NewHTTPPayloadStreamer(btcClientConn), streamChan, nil - default: - return nil, nil, fmt.Errorf("invalid chain %s for streamer constructor", chain.String()) - } -} - -// NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type -func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) { - switch chain { - case shared.Ethereum: - batchClient, ok := client.(*rpc.Client) - if !ok { - return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", &rpc.Client{}, client) - } - return eth.NewPayloadFetcher(batchClient, timeout), nil - case shared.Bitcoin: - connConfig, ok := client.(*rpcclient.ConnConfig) - if !ok { - return nil, fmt.Errorf("bitcoin payload fetcher constructor expected client type %T got %T", &rpcclient.Client{}, client) - } - return btc.NewPayloadFetcher(connConfig) - default: - return nil, fmt.Errorf("invalid chain %s for payload fetcher constructor", chain.String()) - } -} - -// NewPayloadConverter constructs a PayloadConverter for the provided chain type -func NewPayloadConverter(chainType shared.ChainType, chainID uint64) (shared.PayloadConverter, error) { - switch chainType { - case shared.Ethereum: - chainConfig, err := eth.ChainConfig(chainID) - if err != nil { - return nil, err - } - return eth.NewPayloadConverter(chainConfig), nil - case shared.Bitcoin: - return btc.NewPayloadConverter(&chaincfg.MainNetParams), nil - default: - return nil, fmt.Errorf("invalid chain %s for converter constructor", chainType.String()) - } -} - -// NewIPLDFetcher constructs an IPLDFetcher for the provided chain type -func NewIPLDFetcher(chain shared.ChainType, db *postgres.DB) (shared.IPLDFetcher, error) { - switch chain { - case shared.Ethereum: - return eth.NewIPLDFetcher(db), nil - case shared.Bitcoin: - return btc.NewIPLDFetcher(db), nil - default: - return nil, fmt.Errorf("invalid chain %s for IPLD fetcher constructor", chain.String()) - } -} - -// NewIPLDPublisher constructs an IPLDPublisher for the provided chain type -func NewIPLDPublisher(chain shared.ChainType, db *postgres.DB) (shared.IPLDPublisher, error) { - switch chain { - case shared.Ethereum: - return eth.NewIPLDPublisher(db), nil - case shared.Bitcoin: - return btc.NewIPLDPublisher(db), nil - default: - return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String()) - } -} - -// NewPublicAPI constructs a PublicAPI for the provided chain type -func NewPublicAPI(chain shared.ChainType, db *postgres.DB) (rpc.API, error) { - switch chain { - case shared.Ethereum: - backend, err := eth.NewEthBackend(db) - if err != nil { - return rpc.API{}, err - } - return rpc.API{ - Namespace: eth.APIName, - Version: eth.APIVersion, - Service: eth.NewPublicEthAPI(backend), - Public: true, - }, nil - default: - return rpc.API{}, fmt.Errorf("invalid chain %s for public api constructor", chain.String()) - } -} - -// NewCleaner constructs a Cleaner for the provided chain type -func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error) { - switch chain { - case shared.Ethereum: - return eth.NewCleaner(db), nil - case shared.Bitcoin: - return btc.NewCleaner(db), nil - default: - return nil, fmt.Errorf("invalid chain %s for cleaner constructor", chain.String()) - } -} diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index e62ecffe..3d83f62e 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -17,7 +17,6 @@ package eth import ( - "database/sql" "fmt" "math/big" @@ -31,7 +30,6 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipld-eth-server/pkg/shared" - "github.com/vulcanize/ipld-eth-server/utils" ) // Retriever interface for substituting mocks in tests @@ -446,56 +444,6 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) } -// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -// it finds the union of heights where no data exists and where the times_validated is lower than the validation level -func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]eth2.DBGap, error) { - log.Info("searching for gaps in the eth ipfs watcher database") - startingBlock, err := ecr.RetrieveFirstBlockNumber() - if err != nil { - return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err) - } - var initialGap []eth2.DBGap - if startingBlock != 0 { - stop := uint64(startingBlock - 1) - log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop) - initialGap = []eth2.DBGap{{ - Start: 0, - Stop: stop, - }} - } - - pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids - LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1 - LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number - WHERE r.block_number is NULL and fr.block_number IS NOT NULL - GROUP BY header_cids.block_number, r.block_number` - results := make([]struct { - Start uint64 `db:"start"` - Stop uint64 `db:"stop"` - }, 0) - if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { - return nil, err - } - emptyGaps := make([]eth2.DBGap, len(results)) - for i, res := range results { - emptyGaps[i] = eth2.DBGap{ - Start: res.Start, - Stop: res.Stop, - } - } - - // Find sections of blocks where we are below the validation level - // There will be no overlap between these "gaps" and the ones above - pgStr = `SELECT block_number FROM eth.header_cids - WHERE times_validated < $1 - ORDER BY block_number` - var heights []uint64 - if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { - return nil, err - } - return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil -} - // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth2.HeaderModel, []eth2.UncleModel, []eth2.TxModel, []eth2.ReceiptModel, error) { log.Debug("retrieving block cids for block hash ", blockHash.String()) diff --git a/pkg/eth/mocks/batch_client.go b/pkg/eth/mocks/batch_client.go deleted file mode 100644 index a4b02729..00000000 --- a/pkg/eth/mocks/batch_client.go +++ /dev/null @@ -1,86 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "context" - "encoding/json" - "errors" - - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" -) - -// BackFillerClient is a mock client for use in backfiller tests -type BackFillerClient struct { - MappedStateDiffAt map[uint64][]byte -} - -// SetReturnDiffAt method to set what statediffs the mock client returns -func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff.Payload) error { - if mc.MappedStateDiffAt == nil { - mc.MappedStateDiffAt = make(map[uint64][]byte) - } - by, err := json.Marshal(diffPayload) - if err != nil { - return err - } - mc.MappedStateDiffAt[height] = by - return nil -} - -// BatchCall mockClient method to simulate batch call to geth -func (mc *BackFillerClient) BatchCall(batch []rpc.BatchElem) error { - if mc.MappedStateDiffAt == nil { - return errors.New("mockclient needs to be initialized with statediff payloads and errors") - } - for _, batchElem := range batch { - if len(batchElem.Args) < 1 { - return errors.New("expected batch elem to contain an argument(s)") - } - blockHeight, ok := batchElem.Args[0].(uint64) - if !ok { - return errors.New("expected first batch elem argument to be a uint64") - } - err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) - if err != nil { - return err - } - } - return nil -} - -// BatchCallContext mockClient method to simulate batch call to geth -func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { - if mc.MappedStateDiffAt == nil { - return errors.New("mockclient needs to be initialized with statediff payloads and errors") - } - for _, batchElem := range batch { - if len(batchElem.Args) < 1 { - return errors.New("expected batch elem to contain an argument(s)") - } - blockHeight, ok := batchElem.Args[0].(uint64) - if !ok { - return errors.New("expected batch elem first argument to be a uint64") - } - err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/eth/mocks/converter.go b/pkg/eth/mocks/converter.go deleted file mode 100644 index 50158a4e..00000000 --- a/pkg/eth/mocks/converter.go +++ /dev/null @@ -1,66 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "fmt" - - "github.com/ethereum/go-ethereum/statediff" - - "github.com/vulcanize/ipld-eth-server/pkg/eth" - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -// PayloadConverter is the underlying struct for the Converter interface -type PayloadConverter struct { - PassedStatediffPayload statediff.Payload - ReturnIPLDPayload eth.ConvertedPayload - ReturnErr error -} - -// Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { - stateDiffPayload, ok := payload.(statediff.Payload) - if !ok { - return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) - } - pc.PassedStatediffPayload = stateDiffPayload - return pc.ReturnIPLDPayload, pc.ReturnErr -} - -// IterativePayloadConverter is the underlying struct for the Converter interface -type IterativePayloadConverter struct { - PassedStatediffPayload []statediff.Payload - ReturnIPLDPayload []eth.ConvertedPayload - ReturnErr error - iteration int -} - -// Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { - stateDiffPayload, ok := payload.(statediff.Payload) - if !ok { - return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) - } - pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload) - if len(pc.ReturnIPLDPayload) < pc.iteration+1 { - return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration) - } - returnPayload := pc.ReturnIPLDPayload[pc.iteration] - pc.iteration++ - return returnPayload, pc.ReturnErr -} diff --git a/pkg/eth/mocks/indexer.go b/pkg/eth/mocks/indexer.go deleted file mode 100644 index cee84767..00000000 --- a/pkg/eth/mocks/indexer.go +++ /dev/null @@ -1,41 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "fmt" - - "github.com/vulcanize/ipld-eth-server/pkg/shared" - - "github.com/vulcanize/ipld-eth-server/pkg/eth" -) - -// CIDIndexer is the underlying struct for the Indexer interface -type CIDIndexer struct { - PassedCIDPayload []*eth.CIDPayload - ReturnErr error -} - -// Index indexes a cidPayload in Postgres -func (repo *CIDIndexer) Index(cids shared.CIDsForIndexing) error { - cidPayload, ok := cids.(*eth.CIDPayload) - if !ok { - return fmt.Errorf("index expected cids type %T got %T", ð.CIDPayload{}, cids) - } - repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload) - return repo.ReturnErr -} diff --git a/pkg/eth/mocks/publisher.go b/pkg/eth/mocks/publisher.go deleted file mode 100644 index 5758b277..00000000 --- a/pkg/eth/mocks/publisher.go +++ /dev/null @@ -1,61 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "fmt" - - "github.com/vulcanize/ipld-eth-server/pkg/shared" - - "github.com/vulcanize/ipld-eth-server/pkg/eth" -) - -// IPLDPublisher is the underlying struct for the Publisher interface -type IPLDPublisher struct { - PassedIPLDPayload eth.ConvertedPayload - ReturnCIDPayload *eth.CIDPayload - ReturnErr error -} - -// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error { - ipldPayload, ok := payload.(eth.ConvertedPayload) - if !ok { - return fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) - } - pub.PassedIPLDPayload = ipldPayload - return pub.ReturnErr -} - -// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing -type IterativeIPLDPublisher struct { - PassedIPLDPayload []eth.ConvertedPayload - ReturnCIDPayload []*eth.CIDPayload - ReturnErr error - iteration int -} - -// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) error { - ipldPayload, ok := payload.(eth.ConvertedPayload) - if !ok { - return fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) - } - pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) - pub.iteration++ - return pub.ReturnErr -} diff --git a/pkg/eth/mocks/stream_client.go b/pkg/eth/mocks/stream_client.go deleted file mode 100644 index 6ae821b4..00000000 --- a/pkg/eth/mocks/stream_client.go +++ /dev/null @@ -1,44 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "context" - - "github.com/ethereum/go-ethereum/rpc" -) - -type StreamClient struct { - passedContext context.Context - passedResult interface{} - passedNamespace string - passedPayloadChan interface{} - passedSubscribeArgs []interface{} -} - -func (client *StreamClient) Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { - client.passedNamespace = namespace - client.passedPayloadChan = payloadChan - client.passedContext = ctx - - for _, arg := range args { - client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg) - } - - subscription := rpc.ClientSubscription{} - return &subscription, nil -} diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 5a525c30..0a2a0197 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -46,7 +46,6 @@ type Config struct { WSEndpoint string HTTPEndpoint string IPCEndpoint string - NodeInfo node.Info } // NewConfig is used to initialize a watcher config from a .toml file @@ -80,19 +79,12 @@ func NewConfig() (*Config, error) { } c.HTTPEndpoint = httpPath overrideDBConnConfig(&c.DBConfig) - serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + serveDB := utils.LoadPostgres(c.DBConfig, postgres.Info{}) c.DB = &serveDB return c, nil } -type mode string - -var ( - Sync mode = "sync" - Serve mode = "serve" -) - func overrideDBConnConfig(con *postgres.Config) { viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) diff --git a/pkg/serve/service.go b/pkg/serve/service.go index adaf027e..3226d6e2 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -52,8 +52,6 @@ type Server interface { Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) // Method to unsubscribe from the service Unsubscribe(id rpc.ID) - // Method to access the node info for the service - Node() *node.Info // Method to access chain type Chain() shared.ChainType } @@ -74,8 +72,6 @@ type Service struct { Subscriptions map[common.Hash]map[rpc.ID]Subscription // A mapping of subscription params hash to the corresponding subscription params SubscriptionTypes map[common.Hash]eth.SubscriptionSettings - // Info for the Geth node that this watcher is working with - NodeInfo *node.Info // Underlying db db *postgres.DB // wg for syncing serve processes @@ -355,11 +351,6 @@ func (sap *Service) Stop() error { return nil } -// Node returns the node info for this service -func (sap *Service) Node() *node.Info { - return sap.NodeInfo -} - // Chain returns the chain type for this service func (sap *Service) Chain() shared.ChainType { return shared.Ethereum diff --git a/pkg/shared/mocks/payload_fetcher.go b/pkg/shared/mocks/payload_fetcher.go deleted file mode 100644 index 218cd923..00000000 --- a/pkg/shared/mocks/payload_fetcher.go +++ /dev/null @@ -1,50 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "errors" - "sync/atomic" - - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -// PayloadFetcher mock for tests -type PayloadFetcher struct { - PayloadsToReturn map[uint64]shared.RawChainData - FetchErrs map[uint64]error - CalledAtBlockHeights [][]uint64 - CalledTimes int64 -} - -// FetchAt mock method -func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { - if fetcher.PayloadsToReturn == nil { - return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") - } - atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment - fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights) - results := make([]shared.RawChainData, 0, len(blockHeights)) - for _, height := range blockHeights { - results = append(results, fetcher.PayloadsToReturn[height]) - err, ok := fetcher.FetchErrs[height] - if ok && err != nil { - return nil, err - } - } - return results, nil -} diff --git a/pkg/shared/mocks/retriever.go b/pkg/shared/mocks/retriever.go deleted file mode 100644 index ba3843f9..00000000 --- a/pkg/shared/mocks/retriever.go +++ /dev/null @@ -1,64 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -// CIDRetriever is a mock CID retriever for use in tests -type CIDRetriever struct { - GapsToRetrieve []shared.Gap - GapsToRetrieveErr error - CalledTimes int - FirstBlockNumberToReturn int64 - RetrieveFirstBlockNumberErr error -} - -// RetrieveCIDs mock method -func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { - panic("implement me") -} - -// RetrieveLastBlockNumber mock method -func (*CIDRetriever) RetrieveLastBlockNumber() (int64, error) { - panic("implement me") -} - -// RetrieveFirstBlockNumber mock method -func (mcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { - return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr -} - -// RetrieveGapsInData mock method -func (mcr *CIDRetriever) RetrieveGapsInData(int) ([]shared.Gap, error) { - mcr.CalledTimes++ - return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr -} - -// SetGapsToRetrieve mock method -func (mcr *CIDRetriever) SetGapsToRetrieve(gaps []shared.Gap) { - if mcr.GapsToRetrieve == nil { - mcr.GapsToRetrieve = make([]shared.Gap, 0) - } - mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) -} - -func (mcr *CIDRetriever) Database() *postgres.DB { - panic("implement me") -} diff --git a/pkg/shared/mocks/streamer.go b/pkg/shared/mocks/streamer.go deleted file mode 100644 index daf683eb..00000000 --- a/pkg/shared/mocks/streamer.go +++ /dev/null @@ -1,43 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -// PayloadStreamer mock struct -type PayloadStreamer struct { - PassedPayloadChan chan shared.RawChainData - ReturnSub *rpc.ClientSubscription - ReturnErr error - StreamPayloads []shared.RawChainData -} - -// Stream mock method -func (sds *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { - sds.PassedPayloadChan = payloadChan - - go func() { - for _, payload := range sds.StreamPayloads { - sds.PassedPayloadChan <- payload - } - }() - - return sds.ReturnSub, sds.ReturnErr -} diff --git a/test_config/test_config.go b/test_config/test_config.go index 7236422e..88678dd9 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -22,10 +22,10 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" ) -var DBConfig config.Database +var DBConfig postgres.Config func init() { setTestConfig() @@ -53,7 +53,7 @@ func setTestConfig() { port := vip.GetInt("database.port") name := vip.GetString("database.name") - DBConfig = config.Database{ + DBConfig = postgres.Config{ Hostname: hn, Name: name, Port: port, diff --git a/utils/utilities_suite_test.go b/utils/utilities_suite_test.go deleted file mode 100644 index 5095a048..00000000 --- a/utils/utilities_suite_test.go +++ /dev/null @@ -1,36 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package utils_test - -import ( - "io/ioutil" - "testing" - - "github.com/sirupsen/logrus" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func TestShared(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Shared Utilities Suite") -} - -var _ = BeforeSuite(func() { - logrus.SetOutput(ioutil.Discard) -}) diff --git a/utils/utils.go b/utils/utils.go deleted file mode 100644 index 56771295..00000000 --- a/utils/utils.go +++ /dev/null @@ -1,89 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package utils - -import ( - "errors" - - "github.com/sirupsen/logrus" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" - "github.com/vulcanize/ipld-eth-server/pkg/node" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/shared" -) - -func LoadPostgres(database config.Database, node node.Node) postgres.DB { - db, err := postgres.NewDB(database, node) - if err != nil { - logrus.Fatal("Error loading postgres: ", err) - } - return *db -} - -// GetBlockHeightBins splits a block range up into bins of block heights of the given batch size -func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) { - if endingBlock < startingBlock { - return nil, errors.New("backfill: ending block number needs to be greater than starting block number") - } - if batchSize == 0 { - return nil, errors.New("backfill: batchsize needs to be greater than zero") - } - length := endingBlock - startingBlock + 1 - numberOfBins := length / batchSize - if length%batchSize != 0 { - numberOfBins++ - } - blockRangeBins := make([][]uint64, numberOfBins) - for i := range blockRangeBins { - nextBinStart := startingBlock + batchSize - blockRange := make([]uint64, 0, nextBinStart-startingBlock+1) - for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ { - blockRange = append(blockRange, j) - } - startingBlock = nextBinStart - blockRangeBins[i] = blockRange - } - return blockRangeBins, nil -} - -// MissingHeightsToGaps returns a slice of gaps from a slice of missing block heights -func MissingHeightsToGaps(heights []uint64) []shared.Gap { - if len(heights) == 0 { - return nil - } - validationGaps := make([]shared.Gap, 0) - start := heights[0] - lastHeight := start - for i, height := range heights[1:] { - if height != lastHeight+1 { - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: lastHeight, - }) - start = height - } - if i+2 == len(heights) { - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: height, - }) - } - lastHeight = height - } - return validationGaps -} diff --git a/utils/utils_test.go b/utils/utils_test.go deleted file mode 100644 index b36ac66c..00000000 --- a/utils/utils_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package utils_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/ipld-eth-server/utils" -) - -var _ = Describe("GetBlockHeightBins", func() { - It("splits a block range up into bins", func() { - var startingBlock uint64 = 1 - var endingBlock uint64 = 10101 - var batchSize uint64 = 100 - blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) - Expect(err).ToNot(HaveOccurred()) - Expect(len(blockRangeBins)).To(Equal(102)) - Expect(blockRangeBins[101]).To(Equal([]uint64{10101})) - - startingBlock = 101 - endingBlock = 10100 - batchSize = 100 - lastBin := make([]uint64, 0) - for i := 10001; i <= 10100; i++ { - lastBin = append(lastBin, uint64(i)) - } - blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) - Expect(err).ToNot(HaveOccurred()) - Expect(len(blockRangeBins)).To(Equal(100)) - Expect(blockRangeBins[99]).To(Equal(lastBin)) - - startingBlock = 1 - endingBlock = 1 - batchSize = 100 - blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) - Expect(err).ToNot(HaveOccurred()) - Expect(len(blockRangeBins)).To(Equal(1)) - Expect(blockRangeBins[0]).To(Equal([]uint64{1})) - }) - - It("throws an error if the starting block is higher than the ending block", func() { - var startingBlock uint64 = 10102 - var endingBlock uint64 = 10101 - var batchSize uint64 = 100 - _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("ending block number needs to be greater than starting block number")) - }) - - It("throws an error if the batch size is zero", func() { - var startingBlock uint64 = 1 - var endingBlock uint64 = 10101 - var batchSize uint64 = 0 - _, err := utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("batchsize needs to be greater than zero")) - }) -})