From 3764662fe392aa2453978781a80bb2230a6cffb9 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 17 Apr 2020 16:59:25 -0500 Subject: [PATCH 1/7] updates to documentation for super node --- cmd/streamEthSubscribe.go | 2 +- documentation/super_node/apis.md | 262 ++++++++++++++++++++++ documentation/super_node/architecture.md | 150 +++++++++++-- documentation/super_node/resync.md | 68 ++++++ documentation/super_node/setup.md | 243 +++++++------------- documentation/super_node/subscription.md | 88 -------- documentation/super_node/watcher.md | 16 ++ environments/superNodeBTC.toml | 2 +- environments/superNodeETH.toml | 2 +- pkg/ipfs/interfaces.go | 26 +++ pkg/super_node/btc/publisher.go | 6 +- pkg/super_node/btc/subscription_config.go | 2 +- pkg/super_node/eth/publisher.go | 14 +- pkg/super_node/shared/intefaces.go | 7 - pkg/watcher/config.go | 2 +- version/version.go | 6 +- 16 files changed, 601 insertions(+), 295 deletions(-) create mode 100644 documentation/super_node/apis.md create mode 100644 documentation/super_node/resync.md delete mode 100644 documentation/super_node/subscription.md create mode 100644 documentation/super_node/watcher.md create mode 100644 pkg/ipfs/interfaces.go diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index e46aee52..17f85767 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -168,7 +168,7 @@ func streamEthSubscription() { } func getRPCClient() core.RPCClient { - vulcPath := viper.GetString("superNode.ethSubscription.path") + vulcPath := viper.GetString("superNode.ethSubscription.wsPath") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } diff --git a/documentation/super_node/apis.md b/documentation/super_node/apis.md new file mode 100644 index 00000000..9286b29c --- /dev/null +++ b/documentation/super_node/apis.md @@ -0,0 +1,262 @@ +## VulcanizeDB Super Node APIs +The super node exposes a number of different APIs for remote access to the underlying DB. + + +### Table of Contents +1. [Postgraphile](#postgraphile) +1. [RPC Subscription Interface](#rpc-subscription-interface) +1. [Native API Recapitulation](#native-api-recapitulation) + + +### Postgraphile +The super node process all data into a Postgres DB- using PG-IPFS this includes all of the IPLD objects. +[Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables. + +e.g. + +`postgraphile --plugins @graphile/pg-pubsub --subscriptions --simple-subscriptions -c postgres://localhost:5432/vulcanize_public?sslmode=disable -s public,btc,eth -a -j` + + +This will stand up a Postgraphile server on the public, eth, and btc schemas- exposing GraphQL endpoints for all of the tables contained under those schemas. +All of their data can then be queried with standard [GraphQL](https://graphql.org) queries. + + +### RPC Subscription Interface +A direct, real-time subscription to the data being processed by the super node can be established over WS or IPC through the [Stream](../../pkg/super_node/api.go#L53) RPC method. +This method is not chain-specific and each chain-type supports it, it is accessed under the "vdb" namespace rather than a chain-specific namespace. An interface for +subscribing to this endpoint is provided [here](../../libraries/shared/streamer/super_node_streamer.go). + +When subscribing to this endpoint, the subscriber provides a set of RLP-encoded subscription parameters. These parameters will be chain-specific, and are used +by the super node to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../../pkg/super_node/btc/subscription_config.go), [ETH](../../pkg/super_node/eth/subscription_config.go)). + +#### Ethereum RPC Subscription +An example of how to subscribe to a real-time Ethereum data feed from the super node using the `Stream` RPC method is provided below + +```go + package main + + import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + ) + + config, _ := eth.NewEthSubscriptionConfig() + rlpConfig, _ := rlp.EncodeToBytes(config) + vulcPath := viper.GetString("superNode.ethSubscription.path") + rawRPCClient, _ := rpc.Dial(vulcPath) + rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) + stream := streamer.NewSuperNodeStreamer(rpcClient) + payloadChan := make(chan super_node.SubscriptionPayload, 20000) + subscription, _ := stream.Stream(payloadChan, rlpConfig) + for { + select { + case payload := <- payloadChan: + // do something with the subscription payload + case err := <- subscription.Err(): + // do something with the subscription error + } + } +``` + +The .toml file being used to fill the Ethereum subscription config would look something like this: + +```toml +[superNode] + [superNode.ethSubscription] + historicalData = false + historicalDataOnly = false + startingBlock = 0 + endingBlock = 0 + wsPath = "ws://127.0.0.1:8080" + [superNode.ethSubscription.headerFilter] + off = false + uncles = false + [superNode.ethSubscription.txFilter] + off = false + src = [] + dst = [] + [superNode.ethSubscription.receiptFilter] + off = false + contracts = [] + topic0s = [] + topic1s = [] + topic2s = [] + topic3s = [] + [superNode.ethSubscription.stateFilter] + off = false + addresses = [] + intermediateNodes = false + [superNode.ethSubscription.storageFilter] + off = true + addresses = [] + storageKeys = [] + intermediateNodes = false +``` + +These configuration parameters are broken down as follows: + +`ethSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to + +`ethSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data + +`ethSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and +not stream forward syncing data + +`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in + +`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue streaming indefinitely. + +`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. + +- Setting `off` to true tells the super node to not send any headers to the subscriber +- setting `uncles` to true tells the super node to send uncles in addition to normal headers. + +`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. + +- Setting `off` to true tells the super node to not send any transactions to the subscriber +- `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, +if they have any addresses then the super node will only send transactions that were sent or received by the addresses contained +in `src` and `dst`, respectively. + +`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. + +- Setting `off` to true tells the super node to not send any receipts to the subscriber +- `topic0s` is a string array which can be filled with event topics we want to filter for, +if it has any topics then the super node will only send receipts that contain logs which have that topic0. +- `contracts` is a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super node will +only send receipts that correspond to one of those contracts. +- `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the super node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. + +`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. + +- Setting `off` to true tells the super node to not send any state data to the subscriber +- `addresses` is a string array which can be filled with ETH addresses we want to filter state for, +if it has any addresses then the super node will only send state leafs (accounts) corresponding to those account addresses. +- By default the super node only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. + +- Setting `off` to true tells the super node to not send any storage data to the subscriber +- `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, +if it has any addresses then the super node will only send storage nodes from the storage tries at those state addresses. +- `storageKeys` is another string array that can be filled with storage keys we want to filter storage data for. It is important to note that the storage keys need to be the actual keccak256 hashes, whereas +the addresses in the `addresses` fields are pre-hashed ETH addresses. +- By default the super node only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +### Bitcoin RPC Subscription: +An example of how to subscribe to a real-time Bitcoin data feed from the super node using the `Stream` RPC method is provided below + +```go + package main + + import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + ) + + config, _ := btc.NewBtcSubscriptionConfig() + rlpConfig, _ := rlp.EncodeToBytes(config) + vulcPath := viper.GetString("superNode.btcSubscription.path") + rawRPCClient, _ := rpc.Dial(vulcPath) + rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) + stream := streamer.NewSuperNodeStreamer(rpcClient) + payloadChan := make(chan super_node.SubscriptionPayload, 20000) + subscription, _ := stream.Stream(payloadChan, rlpConfig) + for { + select { + case payload := <- payloadChan: + // do something with the subscription payload + case err := <- subscription.Err(): + // do something with the subscription error + } + } +``` + +The .toml file being used to fill the Bitcoin subscription config would look something like this: + +```toml +[superNode] + [superNode.btcSubscription] + historicalData = false + historicalDataOnly = false + startingBlock = 0 + endingBlock = 0 + wsPath = "ws://127.0.0.1:8080" + [superNode.btcSubscription.headerFilter] + off = false + [superNode.btcSubscription.txFilter] + off = false + segwit = false + witnessHashes = [] + indexes = [] + pkScriptClass = [] + multiSig = false + addresses = [] +``` + +These configuration parameters are broken down as follows: + +`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to + +`btcSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data + +`btcSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and +not stream forward syncing data + +`btcSubscription.startingBlock` is the starting block number for the range we want to receive data in + +`btcSubscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue streaming indefinitely. + +`btcSubscription.headerFilter` has one sub-option: `off`. + +- Setting `off` to true tells the super node to +not send any headers to the subscriber. +- Additional header-filtering options will be added in the future. + +`btcSubscription.txFilter` has seven sub-options: `off`, `segwit`, `witnessHashes`, `indexes`, `pkScriptClass`, `multiSig`, and `addresses`. + +- Setting `off` to true tells the super node to not send any transactions to the subscriber. +- Setting `segwit` to true tells the super node to only send segwit transactions. +- `witnessHashes` is a string array that can be filled with witness hash string; if it contains any hashes the super node will only send transactions that contain one of those hashes. +- `indexes` is an int64 array that can be filled with tx index numbers; if it contains any integers the super node will only send transactions at those indexes (e.g. `[0]` will send only coinbase transactions) +- `pkScriptClass` is an uint8 array that can be filled with pk script class numbers; if it contains any integers the super node will only send transactions that have at least one tx output with one of the specified pkscript classes; +possible class types are 0 through 8 as defined [here](https://github.com/btcsuite/btcd/blob/master/txscript/standard.go#L52). +- Setting `multisig` to true tells the super node to send only multi-sig transactions- to send only transaction that have at least one tx output that requires more than one signature to spend. +- `addresses` is a string array that can be filled with btc address strings; if it contains any addresses the super node will only send transactions that have at least one tx output with at least one of the provided addresses. + + +### Native API Recapitulation: +In addition to providing novel Postgraphile and RPC-Subscription endpoints, we are working towards complete recapitulation of the +standard chain APIs. This will allow direct compatibility with software that already makes use of the standard interfaces. + +#### Ethereum JSON-RPC API +The super node currently faithfully recapitulates portions of the Ethereum JSON-RPC api standard. + +The currently supported endpoints include: +`eth_blockNumber` +`eth_getLogs` +`eth_getHeaderByNumber` +`eth_getBlockByNumber` +`eth_getBlockByHash` +`eth_getTransactionByHash` + +Additional endpoints will be added in the near future, with the immediate goal of recapitulating the entire "eth_" set of endpoints. + +#### Bitcoin JSON-RPC API: +In the near future, the standard Bitcoin JSON-RPC interfaces will be implemented. diff --git a/documentation/super_node/architecture.md b/documentation/super_node/architecture.md index 24660221..6cbbdbb0 100644 --- a/documentation/super_node/architecture.md +++ b/documentation/super_node/architecture.md @@ -1,16 +1,134 @@ -These are the components of a VulcanizeDB Watcher: -* Data Fetcher/Streamer sources: - * go-ethereum - * bitcoind - * btcd - * IPFS -* Transformers contain: - * converter - * publisher - * indexer -* Endpoints contain: - * api - * backend - * filterer - * retriever - * ipld_server +# VulcanizeDB Super Node Architecture +The VulcanizeDB super node is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS +all chain data. The raw data indexed by the super node serves as the basis for more specific watchers and applications. + +Currently the service supports complete processing of all Bitcoin and Ethereum data. + +## Table of Contents +1. [Processes](#processes) +1. [Command](#command) +1. [Configuration](#config) +1. [Database](#database) +1. [APIs](#apis) +1. [Resync](#resync) +1. [IPFS Considerations](#ipfs-considerations) + +## Processes +The [super node service](../../pkg/super_node/service.go#L61) is comprised of the following interfaces: + +* [Payload Fetcher](../../pkg/super_node/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/super_node/btc/payload_fetcher.go), [ETH](../../pkg/super_node/eth/payload_fetcher.go)). +* [Payload Streamer](../../pkg/super_node/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/super_node/btc/http_streamer.go), [ETH](../../pkg/super_node/eth/streamer.go)). +* [Payload Converter](../../pkg/super_node/shared/interfaces.go#L34): Converters raw chain data to an intermediary form prepared for IPFS publishing. ([BTC](../../pkg/super_node/btc/converter.go), [ETH](../../pkg/super_node/eth/converter.go)). +* [IPLD Publisher](../../pkg/super_node/shared/interfaces.go#L39): Publishes the converted data to IPFS, returning their CIDs and associated metadata for indexing. ([BTC](../../pkg/super_node/btc/publisher.go), [ETH](../../pkg/super_node/eth/publisher.go)). +* [CID Indexer](../../pkg/super_node/shared/interfaces.go#L44): Indexes CIDs in Postgres with their associated metadata. This metadata is chain specific and selected based on utility. ([BTC](../../pkg/super_node/btc/indexer.go), [ETH](../../pkg/super_node/eth/indexer.go)). +* [CID Retriever](../../pkg/super_node/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/super_node/btc/retriever.go), [ETH](../../pkg/super_node/eth/retriever.go)). +* [IPLD Fetcher](../../pkg/super_node/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/super_node/btc/ipld_fetcher.go), [ETH](../../pkg/super_node/eth/ipld_fetcher.go)) +* [Response Filterer](../../pkg/super_node/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/super_node/btc/filterer.go), [ETH](../../pkg/super_node/eth/filterer.go)). +* [DB Cleaner](../../pkg/super_node/shared/interfaces.go#L73): Used to clean out cached IPFS objects, CIDs, and associated metadata. Useful for removing bad data or to introduce incompatible changes to the db schema/tables. ([BTC](../../pkg/super_node/btc/cleaner.go), [ETH](../../pkg/super_node/eth/cleaner.go)). +* [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/super_node/api.go), [ETH](../../pkg/super_node/eth/api.go)). + + +Appropriating the service for a new chain is done by creating underlying types to satisfy these interfaces for +the specifics of that chain. + +The service uses these interfaces to operate in any combination of three modes: sync, serve, and backfill. +* Sync: Streams raw chain data at the head, converts and publishes it to IPFS, and indexes the resulting set of CIDs in Postgres with useful metadata. +* BackFill: Automatically searches for and detects gaps in the DB; fetches, converts, publishes, and indexes the data to fill these gaps. +* Serve: Opens up IPC, HTTP, and WebSocket servers on top of the superNode DB and any concurrent sync and/or backfill processes. + + +These three modes are all operated through a single vulcanizeDB command: `superNode` + +## Command + +Usage: `./vulcanizedb superNode --config={config.toml}` + +Configuration can also be done through CLI options and/or environmental variables. +CLI options can be found using `./vulcanizedb superNode --help`. + +## Config + +Below is the set of universal config parameters for the superNode command, in .toml form, with the respective environmental variables commented to the side. +This set of parameters needs to be set no matter the chain type. + +```toml +[database] + name = "vulcanize_public" # $DATABASE_NAME + hostname = "localhost" # $DATABASE_HOSTNAME + port = 5432 # $DATABASE_PORT + user = "vdbm" # $DATABASE_USER + password = "" # $DATABASE_PASSWORD + +[ipfs] + path = "~/.ipfs" # $IPFS_PATH + +[superNode] + chain = "bitcoin" # $SUPERNODE_CHAIN + server = true # $SUPERNODE_SERVER + ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH + wsPath = "127.0.0.1:8082" # $SUPERNODE_WS_PATH + httpPath = "127.0.0.1:8083" # $SUPERNODE_HTTP_PATH + sync = true # $SUPERNODE_SYNC + workers = 1 # $SUPERNODE_WORKERS + backFill = true # $SUPERNODE_BACKFILL + frequency = 45 # $SUPERNODE_FREQUENCY + batchSize = 1 # $SUPERNODE_BATCH_SIZE + batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL +``` + +Additional parameters need to be set depending on the specific chain. + +For Bitcoin: + +```toml +[bitcoin] + wsPath = "127.0.0.1:8332" # $BTC_WS_PATH + httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH + pass = "password" # $BTC_NODE_PASSWORD + user = "username" # $BTC_NODE_USER + nodeID = "ocd0" # $BTC_NODE_ID + clientName = "Omnicore" # $BTC_CLIENT_NAME + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK + networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID +``` + +For Ethereum: + +```toml +[ethereum] + wsPath = "127.0.0.1:8546" # $ETH_WS_PATH + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH +``` + +## Database + +Currently, the super node persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations). +Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema. +Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely. + +In the future, we will be moving to a foreign table based architecture wherein a single db is used for shared data while each watcher uses +its own database and accesses and acts on the shared data through foreign tables. Isolating watchers to their own databases will prevent complications and +conflicts between watcher db migrations. + + +## APIs + +The super node provides mutliple types of APIs by which to interface with its data. +More detailed information on the APIs can be found [here](apis.md). + +## Resync + +A separate command `resync` is available for directing the resyncing of data within specified ranges. +This is useful if we want to re-validate a range of data using a new source or clean out bad/deprecated data. +More detailed information on this command can be found [here](resync.md). + +## IPFS Considerations + +Currently, the IPLD Publisher and Fetcher use internalized IPFS processes which interface directly with a local IPFS repository. +This circumvents the need to run a full IPFS daemon with a [go-ipld-eth](https://github.com/ipfs/go-ipld-eth) plugin, but can lead to issues +with lock-contention on the IPFS repo if another IPFS process is configured and running at the same $IPFS_PATH. This also necessitates the need for +a locally configured IPFS repository. + +Once go-ipld-eth has been updated to work with a modern version of PG-IPFS, an additional option will be provided to direct +all publishing and fetching of IPLD objects through a remote IPFS daemon. \ No newline at end of file diff --git a/documentation/super_node/resync.md b/documentation/super_node/resync.md new file mode 100644 index 00000000..a207a873 --- /dev/null +++ b/documentation/super_node/resync.md @@ -0,0 +1,68 @@ +## VulcanizeDB Super Node Resync +The `resync` command is made available for directing the resyncing of super node data within specified ranges. +It also contains a utility for cleaning out old data, and resetting the validation level of data. + +### Rational + +Manual resyncing of data is useful when we want to re-validate data within specific ranges using a new source. + +Cleaning out data is useful when we need to remove bad/deprecated data or prepare for breaking changes to the db schemas. + +Resetting the validation level of data is useful for designating ranges of data for resyncing by an ongoing super node +backfill process. + +### Command + +Usage: `./vulcanizedb resync --config={config.toml}` + +Configuration can also be done through CLI options and/or environmental variables. +CLI options can be found using `./vulcanizedb resync --help`. + +### Config + +Below is the set of universal config parameters for the resync command, in .toml form, with the respective environmental variables commented to the side. +This set of parameters needs to be set no matter the chain type. + +```toml +[database] + name = "vulcanize_public" # $DATABASE_NAME + hostname = "localhost" # $DATABASE_HOSTNAME + port = 5432 # $DATABASE_PORT + user = "vdbm" # $DATABASE_USER + password = "" # $DATABASE_PASSWORD + +[ipfs] + path = "~/.ipfs" # $IPFS_PATH + +[resync] + chain = "ethereum" # $RESYNC_CHAIN + type = "state" # $RESYNC_TYPE + start = 0 # $RESYNC_START + stop = 1000 # $RESYNC_STOP + batchSize = 10 # $RESYNC_BATCH_SIZE + batchNumber = 100 # $RESYNC_BATCH_NUMBER + clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE + resetValidation = true # $RESYNC_RESET_VALIDATION +``` + +Additional parameters need to be set depending on the specific chain. + +For Bitcoin: + +```toml +[bitcoin] + httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH + pass = "password" # $BTC_NODE_PASSWORD + user = "username" # $BTC_NODE_USER + nodeID = "ocd0" # $BTC_NODE_ID + clientName = "Omnicore" # $BTC_CLIENT_NAME + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK + networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID +``` + +For Ethereum: + +```toml +[ethereum] + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH +``` diff --git a/documentation/super_node/setup.md b/documentation/super_node/setup.md index 685b66c2..d31dc482 100644 --- a/documentation/super_node/setup.md +++ b/documentation/super_node/setup.md @@ -1,14 +1,35 @@ -## Super Node Setup +# VulcanizeDB Super Node Setup +Step-by-step instructions for manually setting up and running a VulcanizeDB super node. -Vulcanizedb can act as an index for chain data stored on IPFS through the use of the `superNode` command. +Steps: +1. [Postgres](#postgres) +1. [Goose](#goose) +1. [IPFS](#ipfs) +1. [Blockchain](#blockchain) +1. [VulcanizeDB](#vulcanizedb) -### Manual Setup +### Postgres +A postgresDB is needed to storing all of the data in the vulcanizedb system. +Postgres is used as the backing datastore for IPFS, and is used to index the CIDs for all of the chain data stored on IPFS. +Follow the guides [here](https://wiki.postgresql.org/wiki/Detailed_installation_guides) for setting up Postgres. -These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/statediffing) -and IPFS. +Once the Postgres server is running, we will need to make a database for vulcanizedb, e.g. `vulcanize_public`. -#### IPFS -To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs) +`createdb vulcanize_public` + +For running the automated tests, also create a database named `vulcanize_testing`. + +`createdb vulcanize_testing` + +### Goose +We use [goose](https://github.com/pressly/goose) as our migration management tool. While it is not necessary to use `goose` for manual setup, it +is required for running the automated tests. + + +### IPFS +We use IPFS to store IPLD objects for each type of data we extract from on chain. + +To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs): `go get github.com/ipfs/go-ipfs` @@ -26,11 +47,11 @@ Start by adding the fork and switching over to it: `git checkout -b postgres_update vulcanize/postgres_update` -Now install this fork of ipfs, first be sure to remove any previous installation. +Now install this fork of ipfs, first be sure to remove any previous installation: `make install` -Check that is installed properly by running +Check that is installed properly by running: `ipfs` @@ -49,7 +70,7 @@ export IPFS_PGPORT= export IPFS_PGPASSWORD= ``` -And then run the ipfs command +And then run the ipfs command: `ipfs init --profile=postgresds` @@ -62,10 +83,14 @@ and will ask us to enter the password, avoiding storing it to an ENV variable. Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes (in fact, we can't). -#### Geth -For Geth, we currently *require* a special fork, and we can set this up as follows: +### Blockchain +This section describes how to setup an Ethereum or Bitcoin node to serve as a data source for the super node -Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch +#### Ethereum +For Ethereum, we currently *require* [a special fork of go-ethereum](https://github.com/vulcanize/go-ethereum/tree/statediff_at_anyblock-1.9.11). This can be setup as follows. +Skip this steps if you already have access to a node that displays + +Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch: `go get github.com/ethereum/go-ethereum` @@ -75,9 +100,9 @@ Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch `git fetch vulcanize` -`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.9` +`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.11` -Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first) +Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first): `make geth` @@ -87,163 +112,49 @@ And run the output binary with statediffing turned on: `./geth --statediff --statediff.streamblock --ws --syncmode=full` +Note: if you wish to access historical data (perform `backFill`) then the node will need to operate as an archival node (`--gcmode=archive`) + Note: other CLI options- statediff specific ones included- can be explored with `./geth help` The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter. -Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this -will be indicated in the output. +Note that until it receives a subscriber, the statediffing process does nothing but wait for one. Once a subscription is received, this +will be indicated in the output and node will begin processing and sending statediffs. -Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process. -The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc" +Also in the output will be the endpoints that we will use to interface with the node. +The default ws url is "127.0.0.1:8546" and the default http url is "127.0.0.1:8545". +These values will be used as the `ethereum.wsPath` and `ethereum.httpPath` in the super node config, respectively. -#### Vulcanizedb +#### Bitcoin +For Bitcoin, the super node is able to operate entirely through the universally exposed JSON-RPC interfaces. +This means we can use any of the standard full nodes (e.g. bitcoind, btcd) as our data source. -The `superNode` command is used to initialize and run an instance of the VulcanizeDB SuperNode +Point at a remote node or set one up locally using the instructions for [bitcoind](https://github.com/bitcoin/bitcoin) and [btcd](https://github.com/btcsuite/btcd). -Usage: +The default http url is "127.0.0.1:8332". We will use the http endpoint as both the `bitcoin.wsPath` and `bitcoin.httpPath` +(bitcoind does not support websocket endpoints, we are currently using a "subscription" wrapper around the http endpoints) + +### Vulcanizedb +Finally, we can begin the vulcanizeDB process itself. + +Start by downloading vulcanizedb and moving into the repo: + +`go get github.com/vulcanize/vulcanizedb` + +`cd $GOPATH/src/github.com/vulcanize/vulcanizedb` + +Run the db migrations against the Postgres database we created for vulcanizeDB: + +`goose -dir=./db/migrations postgres postgres://localhost:5432/vulcanize_public?sslmode=disable up` + +At this point, if we want to run the automated tests: + +`make test` +`make integration_test` + +Then, build the vulcanizedb binary: + +`go build` + +And run the super node command with a provided [config](architecture.md/#): `./vulcanizedb superNode --config=` - -The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubscription config structure](../../pkg/super_node/config/eth_subscription.go) - -```toml -[superNode] - [superNode.ethSubscription] - historicalData = false - historicalDataOnly = false - startingBlock = 0 - endingBlock = 0 - wsPath = "ws://127.0.0.1:8080" - [superNode.ethSubscription.headerFilter] - off = false - uncles = false - [superNode.ethSubscription.txFilter] - off = false - src = [] - dst = [] - [superNode.ethSubscription.receiptFilter] - off = false - contracts = [] - topic0s = [] - topic1s = [] - topic2s = [] - topic3s = [] - [superNode.ethSubscription.stateFilter] - off = false - addresses = [] - intermediateNodes = false - [superNode.ethSubscription.storageFilter] - off = true - addresses = [] - storageKeys = [] - intermediateNodes = false -``` - -`ethSubscription.path` is used to define the SuperNode ws url OR ipc endpoint we subscribe to - -`ethSubscription.historicalData` specifies whether or not the super-node should look up historical data in its cache and -send that to the subscriber, if this is set to `false` then the super-node only streams newly synced/incoming data - -`ethSubscription.historicalDataOnly` will tell the super-node to only send historical data with the specified range and -not stream forward syncing data - -`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in - -`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in; -setting to 0 means there is no end/we will continue streaming indefinitely. - -`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to -not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers. - -`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to -not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, -if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained -in `src` and `dst`, respectively. - -`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to -not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, -if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is -a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will -only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for -transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. - -`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, -if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node -only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. - -`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, -if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string -array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas -the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node -only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. \ No newline at end of file diff --git a/documentation/super_node/watcher.md b/documentation/super_node/watcher.md new file mode 100644 index 00000000..c7748f6d --- /dev/null +++ b/documentation/super_node/watcher.md @@ -0,0 +1,16 @@ +These are the components of a VulcanizeDB Watcher: +* Data Fetcher/Streamer sources: + * go-ethereum + * bitcoind + * btcd + * IPFS +* Transformers contain: + * converter + * publisher + * indexer +* Endpoints contain: + * api + * backend + * filterer + * retriever + * ipld_server \ No newline at end of file diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index 2f114137..0b74cb2b 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -1,6 +1,6 @@ [database] name = "vulcanize_public" # $DATABASE_NAME - hostname = "localhost" # &DATABASE_HOSTNAME + hostname = "localhost" # $DATABASE_HOSTNAME port = 5432 # $DATABASE_PORT user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 4f1f3730..5258db42 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -1,6 +1,6 @@ [database] name = "vulcanize_public" # $DATABASE_NAME - hostname = "localhost" # &DATABASE_HOSTNAME + hostname = "localhost" # $DATABASE_HOSTNAME port = 5432 # $DATABASE_PORT user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD diff --git a/pkg/ipfs/interfaces.go b/pkg/ipfs/interfaces.go new file mode 100644 index 00000000..cb0f25d9 --- /dev/null +++ b/pkg/ipfs/interfaces.go @@ -0,0 +1,26 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs + +import ( + ipld "github.com/ipfs/go-ipld-format" +) + +// DagPutter is a general interface for a dag putter +type DagPutter interface { + DagPut(n ipld.Node) (string, error) +} diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index e8ffc051..8ab6bcde 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -28,9 +28,9 @@ import ( // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter - TransactionTriePutter shared.DagPutter + HeaderPutter ipfs.DagPutter + TransactionPutter ipfs.DagPutter + TransactionTriePutter ipfs.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface diff --git a/pkg/super_node/btc/subscription_config.go b/pkg/super_node/btc/subscription_config.go index 985ea715..8105eb87 100644 --- a/pkg/super_node/btc/subscription_config.go +++ b/pkg/super_node/btc/subscription_config.go @@ -52,7 +52,7 @@ type TxFilter struct { } // Init is used to initialize a EthSubscription struct with env variables -func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { +func NewBtcSubscriptionConfig() (*SubscriptionSettings, error) { sc := new(SubscriptionSettings) // Below default to false, which means we do not backfill by default sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData") diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 375b449c..07a323ed 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -36,13 +36,13 @@ import ( // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter - TransactionTriePutter shared.DagPutter - ReceiptPutter shared.DagPutter - ReceiptTriePutter shared.DagPutter - StatePutter shared.DagPutter - StoragePutter shared.DagPutter + HeaderPutter ipfs.DagPutter + TransactionPutter ipfs.DagPutter + TransactionTriePutter ipfs.DagPutter + ReceiptPutter ipfs.DagPutter + ReceiptTriePutter ipfs.DagPutter + StatePutter ipfs.DagPutter + StoragePutter ipfs.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 9cf8abff..3c2e0110 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -18,8 +18,6 @@ package shared import ( "math/big" - - node "github.com/ipfs/go-ipld-format" ) // PayloadStreamer streams chain-specific payloads to the provided channel @@ -71,11 +69,6 @@ type ClientSubscription interface { Unsubscribe() } -// DagPutter is a general interface for a dag putter -type DagPutter interface { - DagPut(n node.Node) (string, error) -} - // Cleaner is for cleaning out data from the cache within the given ranges type Cleaner interface { Clean(rngs [][2]uint64, t DataType) error diff --git a/pkg/watcher/config.go b/pkg/watcher/config.go index dc8de350..8c292ae0 100644 --- a/pkg/watcher/config.go +++ b/pkg/watcher/config.go @@ -74,7 +74,7 @@ func NewWatcherConfig() (*Config, error) { return nil, err } case shared.Bitcoin: - c.SubscriptionConfig, err = btc.NewEthSubscriptionConfig() + c.SubscriptionConfig, err = btc.NewBtcSubscriptionConfig() if err != nil { return nil, err } diff --git a/version/version.go b/version/version.go index 88631b1a..480d6a9b 100644 --- a/version/version.go +++ b/version/version.go @@ -19,9 +19,9 @@ package version import "fmt" const ( - VersionMajor = 0 // Major version component of the current release - VersionMinor = 1 // Minor version component of the current release - VersionPatch = 0 // Patch version component of the current release + VersionMajor = 0 // Major version component of the current release + VersionMinor = 1 // Minor version component of the current release + VersionPatch = 0 // Patch version component of the current release VersionMeta = "alpha" // Version metadata to append to the version string ) From 7bb1f444b5d29cbca766a1f37fe2e2f77834c821 Mon Sep 17 00:00:00 2001 From: "A. F. Dudley" Date: Fri, 17 Apr 2020 16:34:17 -0700 Subject: [PATCH 2/7] Update apis.md readability fix. --- documentation/super_node/apis.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/super_node/apis.md b/documentation/super_node/apis.md index 9286b29c..3d5e7ce7 100644 --- a/documentation/super_node/apis.md +++ b/documentation/super_node/apis.md @@ -9,7 +9,7 @@ The super node exposes a number of different APIs for remote access to the under ### Postgraphile -The super node process all data into a Postgres DB- using PG-IPFS this includes all of the IPLD objects. +The super node stores all processed data a Postgres DB using PG-IPFS, this includes all of the IPLD objects. [Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables. e.g. From 02872ce7e1d34f38a59becfe80ce2985e63d2cd2 Mon Sep 17 00:00:00 2001 From: "A. F. Dudley" Date: Fri, 17 Apr 2020 16:53:22 -0700 Subject: [PATCH 3/7] Update apis.md Typo fix and clarification. --- documentation/super_node/apis.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/documentation/super_node/apis.md b/documentation/super_node/apis.md index 3d5e7ce7..c6a00701 100644 --- a/documentation/super_node/apis.md +++ b/documentation/super_node/apis.md @@ -9,7 +9,7 @@ The super node exposes a number of different APIs for remote access to the under ### Postgraphile -The super node stores all processed data a Postgres DB using PG-IPFS, this includes all of the IPLD objects. +The super node stores all processed data in Postgres using PG-IPFS, this includes all of the IPLD objects. [Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables. e.g. @@ -256,7 +256,7 @@ The currently supported endpoints include: `eth_getBlockByHash` `eth_getTransactionByHash` -Additional endpoints will be added in the near future, with the immediate goal of recapitulating the entire "eth_" set of endpoints. +Additional endpoints will be added in the near future, with the immediate goal of recapitulating the largest set of "eth_" endpoints which can be provided as a service. #### Bitcoin JSON-RPC API: In the near future, the standard Bitcoin JSON-RPC interfaces will be implemented. From 35e625695ec7169513f917c9953d5e7f68d95420 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 17 Apr 2020 16:59:25 -0500 Subject: [PATCH 4/7] updates to documentation for super node --- cmd/streamEthSubscribe.go | 2 +- documentation/super_node/apis.md | 262 ++++++++++++++++++++++ documentation/super_node/architecture.md | 150 +++++++++++-- documentation/super_node/resync.md | 68 ++++++ documentation/super_node/setup.md | 243 +++++++------------- documentation/super_node/subscription.md | 88 -------- documentation/super_node/watcher.md | 16 ++ environments/superNodeBTC.toml | 2 +- environments/superNodeETH.toml | 2 +- pkg/ipfs/interfaces.go | 26 +++ pkg/super_node/btc/publisher.go | 6 +- pkg/super_node/btc/subscription_config.go | 2 +- pkg/super_node/eth/publisher.go | 14 +- pkg/super_node/shared/intefaces.go | 7 - pkg/watcher/config.go | 2 +- version/version.go | 6 +- 16 files changed, 601 insertions(+), 295 deletions(-) create mode 100644 documentation/super_node/apis.md create mode 100644 documentation/super_node/resync.md delete mode 100644 documentation/super_node/subscription.md create mode 100644 documentation/super_node/watcher.md create mode 100644 pkg/ipfs/interfaces.go diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index e46aee52..17f85767 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -168,7 +168,7 @@ func streamEthSubscription() { } func getRPCClient() core.RPCClient { - vulcPath := viper.GetString("superNode.ethSubscription.path") + vulcPath := viper.GetString("superNode.ethSubscription.wsPath") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } diff --git a/documentation/super_node/apis.md b/documentation/super_node/apis.md new file mode 100644 index 00000000..9286b29c --- /dev/null +++ b/documentation/super_node/apis.md @@ -0,0 +1,262 @@ +## VulcanizeDB Super Node APIs +The super node exposes a number of different APIs for remote access to the underlying DB. + + +### Table of Contents +1. [Postgraphile](#postgraphile) +1. [RPC Subscription Interface](#rpc-subscription-interface) +1. [Native API Recapitulation](#native-api-recapitulation) + + +### Postgraphile +The super node process all data into a Postgres DB- using PG-IPFS this includes all of the IPLD objects. +[Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables. + +e.g. + +`postgraphile --plugins @graphile/pg-pubsub --subscriptions --simple-subscriptions -c postgres://localhost:5432/vulcanize_public?sslmode=disable -s public,btc,eth -a -j` + + +This will stand up a Postgraphile server on the public, eth, and btc schemas- exposing GraphQL endpoints for all of the tables contained under those schemas. +All of their data can then be queried with standard [GraphQL](https://graphql.org) queries. + + +### RPC Subscription Interface +A direct, real-time subscription to the data being processed by the super node can be established over WS or IPC through the [Stream](../../pkg/super_node/api.go#L53) RPC method. +This method is not chain-specific and each chain-type supports it, it is accessed under the "vdb" namespace rather than a chain-specific namespace. An interface for +subscribing to this endpoint is provided [here](../../libraries/shared/streamer/super_node_streamer.go). + +When subscribing to this endpoint, the subscriber provides a set of RLP-encoded subscription parameters. These parameters will be chain-specific, and are used +by the super node to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../../pkg/super_node/btc/subscription_config.go), [ETH](../../pkg/super_node/eth/subscription_config.go)). + +#### Ethereum RPC Subscription +An example of how to subscribe to a real-time Ethereum data feed from the super node using the `Stream` RPC method is provided below + +```go + package main + + import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + ) + + config, _ := eth.NewEthSubscriptionConfig() + rlpConfig, _ := rlp.EncodeToBytes(config) + vulcPath := viper.GetString("superNode.ethSubscription.path") + rawRPCClient, _ := rpc.Dial(vulcPath) + rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) + stream := streamer.NewSuperNodeStreamer(rpcClient) + payloadChan := make(chan super_node.SubscriptionPayload, 20000) + subscription, _ := stream.Stream(payloadChan, rlpConfig) + for { + select { + case payload := <- payloadChan: + // do something with the subscription payload + case err := <- subscription.Err(): + // do something with the subscription error + } + } +``` + +The .toml file being used to fill the Ethereum subscription config would look something like this: + +```toml +[superNode] + [superNode.ethSubscription] + historicalData = false + historicalDataOnly = false + startingBlock = 0 + endingBlock = 0 + wsPath = "ws://127.0.0.1:8080" + [superNode.ethSubscription.headerFilter] + off = false + uncles = false + [superNode.ethSubscription.txFilter] + off = false + src = [] + dst = [] + [superNode.ethSubscription.receiptFilter] + off = false + contracts = [] + topic0s = [] + topic1s = [] + topic2s = [] + topic3s = [] + [superNode.ethSubscription.stateFilter] + off = false + addresses = [] + intermediateNodes = false + [superNode.ethSubscription.storageFilter] + off = true + addresses = [] + storageKeys = [] + intermediateNodes = false +``` + +These configuration parameters are broken down as follows: + +`ethSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to + +`ethSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data + +`ethSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and +not stream forward syncing data + +`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in + +`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue streaming indefinitely. + +`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. + +- Setting `off` to true tells the super node to not send any headers to the subscriber +- setting `uncles` to true tells the super node to send uncles in addition to normal headers. + +`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. + +- Setting `off` to true tells the super node to not send any transactions to the subscriber +- `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, +if they have any addresses then the super node will only send transactions that were sent or received by the addresses contained +in `src` and `dst`, respectively. + +`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. + +- Setting `off` to true tells the super node to not send any receipts to the subscriber +- `topic0s` is a string array which can be filled with event topics we want to filter for, +if it has any topics then the super node will only send receipts that contain logs which have that topic0. +- `contracts` is a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super node will +only send receipts that correspond to one of those contracts. +- `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the super node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. + +`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. + +- Setting `off` to true tells the super node to not send any state data to the subscriber +- `addresses` is a string array which can be filled with ETH addresses we want to filter state for, +if it has any addresses then the super node will only send state leafs (accounts) corresponding to those account addresses. +- By default the super node only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. + +- Setting `off` to true tells the super node to not send any storage data to the subscriber +- `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, +if it has any addresses then the super node will only send storage nodes from the storage tries at those state addresses. +- `storageKeys` is another string array that can be filled with storage keys we want to filter storage data for. It is important to note that the storage keys need to be the actual keccak256 hashes, whereas +the addresses in the `addresses` fields are pre-hashed ETH addresses. +- By default the super node only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +### Bitcoin RPC Subscription: +An example of how to subscribe to a real-time Bitcoin data feed from the super node using the `Stream` RPC method is provided below + +```go + package main + + import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + ) + + config, _ := btc.NewBtcSubscriptionConfig() + rlpConfig, _ := rlp.EncodeToBytes(config) + vulcPath := viper.GetString("superNode.btcSubscription.path") + rawRPCClient, _ := rpc.Dial(vulcPath) + rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) + stream := streamer.NewSuperNodeStreamer(rpcClient) + payloadChan := make(chan super_node.SubscriptionPayload, 20000) + subscription, _ := stream.Stream(payloadChan, rlpConfig) + for { + select { + case payload := <- payloadChan: + // do something with the subscription payload + case err := <- subscription.Err(): + // do something with the subscription error + } + } +``` + +The .toml file being used to fill the Bitcoin subscription config would look something like this: + +```toml +[superNode] + [superNode.btcSubscription] + historicalData = false + historicalDataOnly = false + startingBlock = 0 + endingBlock = 0 + wsPath = "ws://127.0.0.1:8080" + [superNode.btcSubscription.headerFilter] + off = false + [superNode.btcSubscription.txFilter] + off = false + segwit = false + witnessHashes = [] + indexes = [] + pkScriptClass = [] + multiSig = false + addresses = [] +``` + +These configuration parameters are broken down as follows: + +`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to + +`btcSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data + +`btcSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and +not stream forward syncing data + +`btcSubscription.startingBlock` is the starting block number for the range we want to receive data in + +`btcSubscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue streaming indefinitely. + +`btcSubscription.headerFilter` has one sub-option: `off`. + +- Setting `off` to true tells the super node to +not send any headers to the subscriber. +- Additional header-filtering options will be added in the future. + +`btcSubscription.txFilter` has seven sub-options: `off`, `segwit`, `witnessHashes`, `indexes`, `pkScriptClass`, `multiSig`, and `addresses`. + +- Setting `off` to true tells the super node to not send any transactions to the subscriber. +- Setting `segwit` to true tells the super node to only send segwit transactions. +- `witnessHashes` is a string array that can be filled with witness hash string; if it contains any hashes the super node will only send transactions that contain one of those hashes. +- `indexes` is an int64 array that can be filled with tx index numbers; if it contains any integers the super node will only send transactions at those indexes (e.g. `[0]` will send only coinbase transactions) +- `pkScriptClass` is an uint8 array that can be filled with pk script class numbers; if it contains any integers the super node will only send transactions that have at least one tx output with one of the specified pkscript classes; +possible class types are 0 through 8 as defined [here](https://github.com/btcsuite/btcd/blob/master/txscript/standard.go#L52). +- Setting `multisig` to true tells the super node to send only multi-sig transactions- to send only transaction that have at least one tx output that requires more than one signature to spend. +- `addresses` is a string array that can be filled with btc address strings; if it contains any addresses the super node will only send transactions that have at least one tx output with at least one of the provided addresses. + + +### Native API Recapitulation: +In addition to providing novel Postgraphile and RPC-Subscription endpoints, we are working towards complete recapitulation of the +standard chain APIs. This will allow direct compatibility with software that already makes use of the standard interfaces. + +#### Ethereum JSON-RPC API +The super node currently faithfully recapitulates portions of the Ethereum JSON-RPC api standard. + +The currently supported endpoints include: +`eth_blockNumber` +`eth_getLogs` +`eth_getHeaderByNumber` +`eth_getBlockByNumber` +`eth_getBlockByHash` +`eth_getTransactionByHash` + +Additional endpoints will be added in the near future, with the immediate goal of recapitulating the entire "eth_" set of endpoints. + +#### Bitcoin JSON-RPC API: +In the near future, the standard Bitcoin JSON-RPC interfaces will be implemented. diff --git a/documentation/super_node/architecture.md b/documentation/super_node/architecture.md index 24660221..6cbbdbb0 100644 --- a/documentation/super_node/architecture.md +++ b/documentation/super_node/architecture.md @@ -1,16 +1,134 @@ -These are the components of a VulcanizeDB Watcher: -* Data Fetcher/Streamer sources: - * go-ethereum - * bitcoind - * btcd - * IPFS -* Transformers contain: - * converter - * publisher - * indexer -* Endpoints contain: - * api - * backend - * filterer - * retriever - * ipld_server +# VulcanizeDB Super Node Architecture +The VulcanizeDB super node is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS +all chain data. The raw data indexed by the super node serves as the basis for more specific watchers and applications. + +Currently the service supports complete processing of all Bitcoin and Ethereum data. + +## Table of Contents +1. [Processes](#processes) +1. [Command](#command) +1. [Configuration](#config) +1. [Database](#database) +1. [APIs](#apis) +1. [Resync](#resync) +1. [IPFS Considerations](#ipfs-considerations) + +## Processes +The [super node service](../../pkg/super_node/service.go#L61) is comprised of the following interfaces: + +* [Payload Fetcher](../../pkg/super_node/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/super_node/btc/payload_fetcher.go), [ETH](../../pkg/super_node/eth/payload_fetcher.go)). +* [Payload Streamer](../../pkg/super_node/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/super_node/btc/http_streamer.go), [ETH](../../pkg/super_node/eth/streamer.go)). +* [Payload Converter](../../pkg/super_node/shared/interfaces.go#L34): Converters raw chain data to an intermediary form prepared for IPFS publishing. ([BTC](../../pkg/super_node/btc/converter.go), [ETH](../../pkg/super_node/eth/converter.go)). +* [IPLD Publisher](../../pkg/super_node/shared/interfaces.go#L39): Publishes the converted data to IPFS, returning their CIDs and associated metadata for indexing. ([BTC](../../pkg/super_node/btc/publisher.go), [ETH](../../pkg/super_node/eth/publisher.go)). +* [CID Indexer](../../pkg/super_node/shared/interfaces.go#L44): Indexes CIDs in Postgres with their associated metadata. This metadata is chain specific and selected based on utility. ([BTC](../../pkg/super_node/btc/indexer.go), [ETH](../../pkg/super_node/eth/indexer.go)). +* [CID Retriever](../../pkg/super_node/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/super_node/btc/retriever.go), [ETH](../../pkg/super_node/eth/retriever.go)). +* [IPLD Fetcher](../../pkg/super_node/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/super_node/btc/ipld_fetcher.go), [ETH](../../pkg/super_node/eth/ipld_fetcher.go)) +* [Response Filterer](../../pkg/super_node/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/super_node/btc/filterer.go), [ETH](../../pkg/super_node/eth/filterer.go)). +* [DB Cleaner](../../pkg/super_node/shared/interfaces.go#L73): Used to clean out cached IPFS objects, CIDs, and associated metadata. Useful for removing bad data or to introduce incompatible changes to the db schema/tables. ([BTC](../../pkg/super_node/btc/cleaner.go), [ETH](../../pkg/super_node/eth/cleaner.go)). +* [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/super_node/api.go), [ETH](../../pkg/super_node/eth/api.go)). + + +Appropriating the service for a new chain is done by creating underlying types to satisfy these interfaces for +the specifics of that chain. + +The service uses these interfaces to operate in any combination of three modes: sync, serve, and backfill. +* Sync: Streams raw chain data at the head, converts and publishes it to IPFS, and indexes the resulting set of CIDs in Postgres with useful metadata. +* BackFill: Automatically searches for and detects gaps in the DB; fetches, converts, publishes, and indexes the data to fill these gaps. +* Serve: Opens up IPC, HTTP, and WebSocket servers on top of the superNode DB and any concurrent sync and/or backfill processes. + + +These three modes are all operated through a single vulcanizeDB command: `superNode` + +## Command + +Usage: `./vulcanizedb superNode --config={config.toml}` + +Configuration can also be done through CLI options and/or environmental variables. +CLI options can be found using `./vulcanizedb superNode --help`. + +## Config + +Below is the set of universal config parameters for the superNode command, in .toml form, with the respective environmental variables commented to the side. +This set of parameters needs to be set no matter the chain type. + +```toml +[database] + name = "vulcanize_public" # $DATABASE_NAME + hostname = "localhost" # $DATABASE_HOSTNAME + port = 5432 # $DATABASE_PORT + user = "vdbm" # $DATABASE_USER + password = "" # $DATABASE_PASSWORD + +[ipfs] + path = "~/.ipfs" # $IPFS_PATH + +[superNode] + chain = "bitcoin" # $SUPERNODE_CHAIN + server = true # $SUPERNODE_SERVER + ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH + wsPath = "127.0.0.1:8082" # $SUPERNODE_WS_PATH + httpPath = "127.0.0.1:8083" # $SUPERNODE_HTTP_PATH + sync = true # $SUPERNODE_SYNC + workers = 1 # $SUPERNODE_WORKERS + backFill = true # $SUPERNODE_BACKFILL + frequency = 45 # $SUPERNODE_FREQUENCY + batchSize = 1 # $SUPERNODE_BATCH_SIZE + batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL +``` + +Additional parameters need to be set depending on the specific chain. + +For Bitcoin: + +```toml +[bitcoin] + wsPath = "127.0.0.1:8332" # $BTC_WS_PATH + httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH + pass = "password" # $BTC_NODE_PASSWORD + user = "username" # $BTC_NODE_USER + nodeID = "ocd0" # $BTC_NODE_ID + clientName = "Omnicore" # $BTC_CLIENT_NAME + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK + networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID +``` + +For Ethereum: + +```toml +[ethereum] + wsPath = "127.0.0.1:8546" # $ETH_WS_PATH + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH +``` + +## Database + +Currently, the super node persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations). +Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema. +Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely. + +In the future, we will be moving to a foreign table based architecture wherein a single db is used for shared data while each watcher uses +its own database and accesses and acts on the shared data through foreign tables. Isolating watchers to their own databases will prevent complications and +conflicts between watcher db migrations. + + +## APIs + +The super node provides mutliple types of APIs by which to interface with its data. +More detailed information on the APIs can be found [here](apis.md). + +## Resync + +A separate command `resync` is available for directing the resyncing of data within specified ranges. +This is useful if we want to re-validate a range of data using a new source or clean out bad/deprecated data. +More detailed information on this command can be found [here](resync.md). + +## IPFS Considerations + +Currently, the IPLD Publisher and Fetcher use internalized IPFS processes which interface directly with a local IPFS repository. +This circumvents the need to run a full IPFS daemon with a [go-ipld-eth](https://github.com/ipfs/go-ipld-eth) plugin, but can lead to issues +with lock-contention on the IPFS repo if another IPFS process is configured and running at the same $IPFS_PATH. This also necessitates the need for +a locally configured IPFS repository. + +Once go-ipld-eth has been updated to work with a modern version of PG-IPFS, an additional option will be provided to direct +all publishing and fetching of IPLD objects through a remote IPFS daemon. \ No newline at end of file diff --git a/documentation/super_node/resync.md b/documentation/super_node/resync.md new file mode 100644 index 00000000..a207a873 --- /dev/null +++ b/documentation/super_node/resync.md @@ -0,0 +1,68 @@ +## VulcanizeDB Super Node Resync +The `resync` command is made available for directing the resyncing of super node data within specified ranges. +It also contains a utility for cleaning out old data, and resetting the validation level of data. + +### Rational + +Manual resyncing of data is useful when we want to re-validate data within specific ranges using a new source. + +Cleaning out data is useful when we need to remove bad/deprecated data or prepare for breaking changes to the db schemas. + +Resetting the validation level of data is useful for designating ranges of data for resyncing by an ongoing super node +backfill process. + +### Command + +Usage: `./vulcanizedb resync --config={config.toml}` + +Configuration can also be done through CLI options and/or environmental variables. +CLI options can be found using `./vulcanizedb resync --help`. + +### Config + +Below is the set of universal config parameters for the resync command, in .toml form, with the respective environmental variables commented to the side. +This set of parameters needs to be set no matter the chain type. + +```toml +[database] + name = "vulcanize_public" # $DATABASE_NAME + hostname = "localhost" # $DATABASE_HOSTNAME + port = 5432 # $DATABASE_PORT + user = "vdbm" # $DATABASE_USER + password = "" # $DATABASE_PASSWORD + +[ipfs] + path = "~/.ipfs" # $IPFS_PATH + +[resync] + chain = "ethereum" # $RESYNC_CHAIN + type = "state" # $RESYNC_TYPE + start = 0 # $RESYNC_START + stop = 1000 # $RESYNC_STOP + batchSize = 10 # $RESYNC_BATCH_SIZE + batchNumber = 100 # $RESYNC_BATCH_NUMBER + clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE + resetValidation = true # $RESYNC_RESET_VALIDATION +``` + +Additional parameters need to be set depending on the specific chain. + +For Bitcoin: + +```toml +[bitcoin] + httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH + pass = "password" # $BTC_NODE_PASSWORD + user = "username" # $BTC_NODE_USER + nodeID = "ocd0" # $BTC_NODE_ID + clientName = "Omnicore" # $BTC_CLIENT_NAME + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK + networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID +``` + +For Ethereum: + +```toml +[ethereum] + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH +``` diff --git a/documentation/super_node/setup.md b/documentation/super_node/setup.md index 685b66c2..b03eefaa 100644 --- a/documentation/super_node/setup.md +++ b/documentation/super_node/setup.md @@ -1,14 +1,35 @@ -## Super Node Setup +# VulcanizeDB Super Node Setup +Step-by-step instructions for manually setting up and running a VulcanizeDB super node. -Vulcanizedb can act as an index for chain data stored on IPFS through the use of the `superNode` command. +Steps: +1. [Postgres](#postgres) +1. [Goose](#goose) +1. [IPFS](#ipfs) +1. [Blockchain](#blockchain) +1. [VulcanizeDB](#vulcanizedb) -### Manual Setup +### Postgres +A postgresDB is needed to storing all of the data in the vulcanizedb system. +Postgres is used as the backing datastore for IPFS, and is used to index the CIDs for all of the chain data stored on IPFS. +Follow the guides [here](https://wiki.postgresql.org/wiki/Detailed_installation_guides) for setting up Postgres. -These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/statediffing) -and IPFS. +Once the Postgres server is running, we will need to make a database for vulcanizedb, e.g. `vulcanize_public`. -#### IPFS -To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs) +`createdb vulcanize_public` + +For running the automated tests, also create a database named `vulcanize_testing`. + +`createdb vulcanize_testing` + +### Goose +We use [goose](https://github.com/pressly/goose) as our migration management tool. While it is not necessary to use `goose` for manual setup, it +is required for running the automated tests. + + +### IPFS +We use IPFS to store IPLD objects for each type of data we extract from on chain. + +To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs): `go get github.com/ipfs/go-ipfs` @@ -26,11 +47,11 @@ Start by adding the fork and switching over to it: `git checkout -b postgres_update vulcanize/postgres_update` -Now install this fork of ipfs, first be sure to remove any previous installation. +Now install this fork of ipfs, first be sure to remove any previous installation: `make install` -Check that is installed properly by running +Check that is installed properly by running: `ipfs` @@ -49,7 +70,7 @@ export IPFS_PGPORT= export IPFS_PGPASSWORD= ``` -And then run the ipfs command +And then run the ipfs command: `ipfs init --profile=postgresds` @@ -62,10 +83,14 @@ and will ask us to enter the password, avoiding storing it to an ENV variable. Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes (in fact, we can't). -#### Geth -For Geth, we currently *require* a special fork, and we can set this up as follows: +### Blockchain +This section describes how to setup an Ethereum or Bitcoin node to serve as a data source for the super node -Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch +#### Ethereum +For Ethereum, we currently *require* [a special fork of go-ethereum](https://github.com/vulcanize/go-ethereum/tree/statediff_at_anyblock-1.9.11). This can be setup as follows. +Skip this steps if you already have access to a node that displays the statediffing endpoints. + +Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch: `go get github.com/ethereum/go-ethereum` @@ -75,9 +100,9 @@ Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch `git fetch vulcanize` -`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.9` +`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.11` -Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first) +Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first): `make geth` @@ -87,163 +112,49 @@ And run the output binary with statediffing turned on: `./geth --statediff --statediff.streamblock --ws --syncmode=full` +Note: if you wish to access historical data (perform `backFill`) then the node will need to operate as an archival node (`--gcmode=archive`) + Note: other CLI options- statediff specific ones included- can be explored with `./geth help` The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter. -Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this -will be indicated in the output. +Note that until it receives a subscriber, the statediffing process does nothing but wait for one. Once a subscription is received, this +will be indicated in the output and node will begin processing and sending statediffs. -Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process. -The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc" +Also in the output will be the endpoints that we will use to interface with the node. +The default ws url is "127.0.0.1:8546" and the default http url is "127.0.0.1:8545". +These values will be used as the `ethereum.wsPath` and `ethereum.httpPath` in the super node config, respectively. -#### Vulcanizedb +#### Bitcoin +For Bitcoin, the super node is able to operate entirely through the universally exposed JSON-RPC interfaces. +This means we can use any of the standard full nodes (e.g. bitcoind, btcd) as our data source. -The `superNode` command is used to initialize and run an instance of the VulcanizeDB SuperNode +Point at a remote node or set one up locally using the instructions for [bitcoind](https://github.com/bitcoin/bitcoin) and [btcd](https://github.com/btcsuite/btcd). -Usage: +The default http url is "127.0.0.1:8332". We will use the http endpoint as both the `bitcoin.wsPath` and `bitcoin.httpPath` +(bitcoind does not support websocket endpoints, we are currently using a "subscription" wrapper around the http endpoints) + +### Vulcanizedb +Finally, we can begin the vulcanizeDB process itself. + +Start by downloading vulcanizedb and moving into the repo: + +`go get github.com/vulcanize/vulcanizedb` + +`cd $GOPATH/src/github.com/vulcanize/vulcanizedb` + +Run the db migrations against the Postgres database we created for vulcanizeDB: + +`goose -dir=./db/migrations postgres postgres://localhost:5432/vulcanize_public?sslmode=disable up` + +At this point, if we want to run the automated tests: + +`make test` +`make integration_test` + +Then, build the vulcanizedb binary: + +`go build` + +And run the super node command with a provided [config](architecture.md/#): `./vulcanizedb superNode --config=` - -The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubscription config structure](../../pkg/super_node/config/eth_subscription.go) - -```toml -[superNode] - [superNode.ethSubscription] - historicalData = false - historicalDataOnly = false - startingBlock = 0 - endingBlock = 0 - wsPath = "ws://127.0.0.1:8080" - [superNode.ethSubscription.headerFilter] - off = false - uncles = false - [superNode.ethSubscription.txFilter] - off = false - src = [] - dst = [] - [superNode.ethSubscription.receiptFilter] - off = false - contracts = [] - topic0s = [] - topic1s = [] - topic2s = [] - topic3s = [] - [superNode.ethSubscription.stateFilter] - off = false - addresses = [] - intermediateNodes = false - [superNode.ethSubscription.storageFilter] - off = true - addresses = [] - storageKeys = [] - intermediateNodes = false -``` - -`ethSubscription.path` is used to define the SuperNode ws url OR ipc endpoint we subscribe to - -`ethSubscription.historicalData` specifies whether or not the super-node should look up historical data in its cache and -send that to the subscriber, if this is set to `false` then the super-node only streams newly synced/incoming data - -`ethSubscription.historicalDataOnly` will tell the super-node to only send historical data with the specified range and -not stream forward syncing data - -`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in - -`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in; -setting to 0 means there is no end/we will continue streaming indefinitely. - -`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to -not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers. - -`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to -not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, -if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained -in `src` and `dst`, respectively. - -`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to -not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, -if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is -a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will -only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for -transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. - -`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, -if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node -only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. - -`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, -if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string -array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas -the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node -only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. \ No newline at end of file diff --git a/documentation/super_node/watcher.md b/documentation/super_node/watcher.md new file mode 100644 index 00000000..c7748f6d --- /dev/null +++ b/documentation/super_node/watcher.md @@ -0,0 +1,16 @@ +These are the components of a VulcanizeDB Watcher: +* Data Fetcher/Streamer sources: + * go-ethereum + * bitcoind + * btcd + * IPFS +* Transformers contain: + * converter + * publisher + * indexer +* Endpoints contain: + * api + * backend + * filterer + * retriever + * ipld_server \ No newline at end of file diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index 2f114137..0b74cb2b 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -1,6 +1,6 @@ [database] name = "vulcanize_public" # $DATABASE_NAME - hostname = "localhost" # &DATABASE_HOSTNAME + hostname = "localhost" # $DATABASE_HOSTNAME port = 5432 # $DATABASE_PORT user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 4f1f3730..5258db42 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -1,6 +1,6 @@ [database] name = "vulcanize_public" # $DATABASE_NAME - hostname = "localhost" # &DATABASE_HOSTNAME + hostname = "localhost" # $DATABASE_HOSTNAME port = 5432 # $DATABASE_PORT user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD diff --git a/pkg/ipfs/interfaces.go b/pkg/ipfs/interfaces.go new file mode 100644 index 00000000..cb0f25d9 --- /dev/null +++ b/pkg/ipfs/interfaces.go @@ -0,0 +1,26 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs + +import ( + ipld "github.com/ipfs/go-ipld-format" +) + +// DagPutter is a general interface for a dag putter +type DagPutter interface { + DagPut(n ipld.Node) (string, error) +} diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index e8ffc051..8ab6bcde 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -28,9 +28,9 @@ import ( // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter - TransactionTriePutter shared.DagPutter + HeaderPutter ipfs.DagPutter + TransactionPutter ipfs.DagPutter + TransactionTriePutter ipfs.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface diff --git a/pkg/super_node/btc/subscription_config.go b/pkg/super_node/btc/subscription_config.go index 985ea715..8105eb87 100644 --- a/pkg/super_node/btc/subscription_config.go +++ b/pkg/super_node/btc/subscription_config.go @@ -52,7 +52,7 @@ type TxFilter struct { } // Init is used to initialize a EthSubscription struct with env variables -func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { +func NewBtcSubscriptionConfig() (*SubscriptionSettings, error) { sc := new(SubscriptionSettings) // Below default to false, which means we do not backfill by default sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData") diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 375b449c..07a323ed 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -36,13 +36,13 @@ import ( // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter - TransactionTriePutter shared.DagPutter - ReceiptPutter shared.DagPutter - ReceiptTriePutter shared.DagPutter - StatePutter shared.DagPutter - StoragePutter shared.DagPutter + HeaderPutter ipfs.DagPutter + TransactionPutter ipfs.DagPutter + TransactionTriePutter ipfs.DagPutter + ReceiptPutter ipfs.DagPutter + ReceiptTriePutter ipfs.DagPutter + StatePutter ipfs.DagPutter + StoragePutter ipfs.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 9cf8abff..3c2e0110 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -18,8 +18,6 @@ package shared import ( "math/big" - - node "github.com/ipfs/go-ipld-format" ) // PayloadStreamer streams chain-specific payloads to the provided channel @@ -71,11 +69,6 @@ type ClientSubscription interface { Unsubscribe() } -// DagPutter is a general interface for a dag putter -type DagPutter interface { - DagPut(n node.Node) (string, error) -} - // Cleaner is for cleaning out data from the cache within the given ranges type Cleaner interface { Clean(rngs [][2]uint64, t DataType) error diff --git a/pkg/watcher/config.go b/pkg/watcher/config.go index dc8de350..8c292ae0 100644 --- a/pkg/watcher/config.go +++ b/pkg/watcher/config.go @@ -74,7 +74,7 @@ func NewWatcherConfig() (*Config, error) { return nil, err } case shared.Bitcoin: - c.SubscriptionConfig, err = btc.NewEthSubscriptionConfig() + c.SubscriptionConfig, err = btc.NewBtcSubscriptionConfig() if err != nil { return nil, err } diff --git a/version/version.go b/version/version.go index 88631b1a..480d6a9b 100644 --- a/version/version.go +++ b/version/version.go @@ -19,9 +19,9 @@ package version import "fmt" const ( - VersionMajor = 0 // Major version component of the current release - VersionMinor = 1 // Minor version component of the current release - VersionPatch = 0 // Patch version component of the current release + VersionMajor = 0 // Major version component of the current release + VersionMinor = 1 // Minor version component of the current release + VersionPatch = 0 // Patch version component of the current release VersionMeta = "alpha" // Version metadata to append to the version string ) From b3be8aaa055d2df9cff73e1fe442358736b6a45c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 20 Apr 2020 11:03:06 -0500 Subject: [PATCH 5/7] edits --- documentation/super_node/architecture.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/documentation/super_node/architecture.md b/documentation/super_node/architecture.md index 6cbbdbb0..a3e6fc94 100644 --- a/documentation/super_node/architecture.md +++ b/documentation/super_node/architecture.md @@ -14,7 +14,7 @@ Currently the service supports complete processing of all Bitcoin and Ethereum d 1. [IPFS Considerations](#ipfs-considerations) ## Processes -The [super node service](../../pkg/super_node/service.go#L61) is comprised of the following interfaces: +The [super node service](../../pkg/super_node/service.go#L61) is a watcher comprised of the following interfaces: * [Payload Fetcher](../../pkg/super_node/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/super_node/btc/payload_fetcher.go), [ETH](../../pkg/super_node/eth/payload_fetcher.go)). * [Payload Streamer](../../pkg/super_node/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/super_node/btc/http_streamer.go), [ETH](../../pkg/super_node/eth/streamer.go)). @@ -24,7 +24,6 @@ The [super node service](../../pkg/super_node/service.go#L61) is comprised of th * [CID Retriever](../../pkg/super_node/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/super_node/btc/retriever.go), [ETH](../../pkg/super_node/eth/retriever.go)). * [IPLD Fetcher](../../pkg/super_node/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/super_node/btc/ipld_fetcher.go), [ETH](../../pkg/super_node/eth/ipld_fetcher.go)) * [Response Filterer](../../pkg/super_node/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/super_node/btc/filterer.go), [ETH](../../pkg/super_node/eth/filterer.go)). -* [DB Cleaner](../../pkg/super_node/shared/interfaces.go#L73): Used to clean out cached IPFS objects, CIDs, and associated metadata. Useful for removing bad data or to introduce incompatible changes to the db schema/tables. ([BTC](../../pkg/super_node/btc/cleaner.go), [ETH](../../pkg/super_node/eth/cleaner.go)). * [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/super_node/api.go), [ETH](../../pkg/super_node/eth/api.go)). From eceaa0aecbfce1918a30adbc00fd6f7d5fa9969c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Sun, 19 Apr 2020 18:26:23 -0500 Subject: [PATCH 6/7] use configurable timeout for geth batch http requests; additional error log info in payload fetchers --- cmd/resync.go | 10 +++++++ cmd/superNode.go | 10 +++++++ documentation/super_node/architecture.md | 5 ++++ documentation/super_node/resync.md | 5 ++++ environments/superNodeBTC.toml | 2 +- environments/superNodeETH.toml | 8 +++++- libraries/shared/mocks/batch_client.go | 22 +++++++++++++++ pkg/super_node/backfiller.go | 2 +- pkg/super_node/btc/payload_fetcher.go | 6 +++-- pkg/super_node/config.go | 8 ++++++ pkg/super_node/constructors.go | 5 ++-- pkg/super_node/eth/payload_fetcher.go | 23 +++++++++------- pkg/super_node/eth/payload_fetcher_test.go | 4 ++- pkg/super_node/resync/config.go | 17 +++++++++--- pkg/super_node/resync/service.go | 2 +- pkg/super_node/shared/env.go | 31 +++++++++++++++------- 16 files changed, 128 insertions(+), 32 deletions(-) diff --git a/cmd/resync.go b/cmd/resync.go index 723e88fa..60826326 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -71,6 +71,7 @@ func init() { resyncCmd.PersistentFlags().Int("resync-batch-number", 0, "how many goroutines to fetch data concurrently") resyncCmd.PersistentFlags().Bool("resync-clear-old-cache", false, "if true, clear out old data of the provided type within the resync range before resyncing") resyncCmd.PersistentFlags().Bool("resync-reset-validation", false, "if true, reset times_validated to 0") + resyncCmd.PersistentFlags().Int("resync-timeout", 15, "timeout used for resync http requests") resyncCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node") resyncCmd.PersistentFlags().String("btc-password", "", "password for btc node") @@ -81,6 +82,10 @@ func init() { resyncCmd.PersistentFlags().String("btc-network-id", "", "btc network id") resyncCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node") + resyncCmd.PersistentFlags().String("eth-node-id", "", "eth node id") + resyncCmd.PersistentFlags().String("eth-client-name", "", "eth client name") + resyncCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash") + resyncCmd.PersistentFlags().String("eth-network-id", "", "eth network id") // and their bindings viper.BindPFlag("ipfs.path", resyncCmd.PersistentFlags().Lookup("ipfs-path")) @@ -93,6 +98,7 @@ func init() { viper.BindPFlag("resync.batchNumber", resyncCmd.PersistentFlags().Lookup("resync-batch-number")) viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old-cache")) viper.BindPFlag("resync.resetValidation", resyncCmd.PersistentFlags().Lookup("resync-reset-validation")) + viper.BindPFlag("resync.timeout", resyncCmd.PersistentFlags().Lookup("resync-timeout")) viper.BindPFlag("bitcoin.httpPath", resyncCmd.PersistentFlags().Lookup("btc-http-path")) viper.BindPFlag("bitcoin.pass", resyncCmd.PersistentFlags().Lookup("btc-password")) @@ -103,4 +109,8 @@ func init() { viper.BindPFlag("bitcoin.networkID", resyncCmd.PersistentFlags().Lookup("btc-network-id")) viper.BindPFlag("ethereum.httpPath", resyncCmd.PersistentFlags().Lookup("eth-http-path")) + viper.BindPFlag("ethereum.nodeID", resyncCmd.PersistentFlags().Lookup("eth-node-id")) + viper.BindPFlag("ethereum.clientName", resyncCmd.PersistentFlags().Lookup("eth-client-name")) + viper.BindPFlag("ethereum.genesisBlock", resyncCmd.PersistentFlags().Lookup("eth-genesis-block")) + viper.BindPFlag("ethereum.networkID", resyncCmd.PersistentFlags().Lookup("eth-network-id")) } diff --git a/cmd/superNode.go b/cmd/superNode.go index 6e13bac0..3ab4e416 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -121,6 +121,7 @@ func init() { superNodeCmd.PersistentFlags().Int("supernode-batch-size", 0, "data fetching batch size") superNodeCmd.PersistentFlags().Int("supernode-batch-number", 0, "how many goroutines to fetch data concurrently") superNodeCmd.PersistentFlags().Int("supernode-validation-level", 0, "backfill will resync any data below this level") + superNodeCmd.PersistentFlags().Int("supernode-timeout", 0, "timeout used for backfill http requests") superNodeCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node") superNodeCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node") @@ -133,6 +134,10 @@ func init() { superNodeCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node") superNodeCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node") + superNodeCmd.PersistentFlags().String("eth-node-id", "", "eth node id") + superNodeCmd.PersistentFlags().String("eth-client-name", "", "eth client name") + superNodeCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash") + superNodeCmd.PersistentFlags().String("eth-network-id", "", "eth network id") // and their bindings viper.BindPFlag("ipfs.path", superNodeCmd.PersistentFlags().Lookup("ipfs-path")) @@ -149,6 +154,7 @@ func init() { viper.BindPFlag("superNode.batchSize", superNodeCmd.PersistentFlags().Lookup("supernode-batch-size")) viper.BindPFlag("superNode.batchNumber", superNodeCmd.PersistentFlags().Lookup("supernode-batch-number")) viper.BindPFlag("superNode.validationLevel", superNodeCmd.PersistentFlags().Lookup("supernode-validation-level")) + viper.BindPFlag("superNode.timeout", superNodeCmd.PersistentFlags().Lookup("supernode-timeout")) viper.BindPFlag("bitcoin.wsPath", superNodeCmd.PersistentFlags().Lookup("btc-ws-path")) viper.BindPFlag("bitcoin.httpPath", superNodeCmd.PersistentFlags().Lookup("btc-http-path")) @@ -161,4 +167,8 @@ func init() { viper.BindPFlag("ethereum.wsPath", superNodeCmd.PersistentFlags().Lookup("eth-ws-path")) viper.BindPFlag("ethereum.httpPath", superNodeCmd.PersistentFlags().Lookup("eth-http-path")) + viper.BindPFlag("ethereum.nodeID", superNodeCmd.PersistentFlags().Lookup("eth-node-id")) + viper.BindPFlag("ethereum.clientName", superNodeCmd.PersistentFlags().Lookup("eth-client-name")) + viper.BindPFlag("ethereum.genesisBlock", superNodeCmd.PersistentFlags().Lookup("eth-genesis-block")) + viper.BindPFlag("ethereum.networkID", superNodeCmd.PersistentFlags().Lookup("eth-network-id")) } diff --git a/documentation/super_node/architecture.md b/documentation/super_node/architecture.md index a3e6fc94..3bd32a33 100644 --- a/documentation/super_node/architecture.md +++ b/documentation/super_node/architecture.md @@ -73,6 +73,7 @@ This set of parameters needs to be set no matter the chain type. frequency = 45 # $SUPERNODE_FREQUENCY batchSize = 1 # $SUPERNODE_BATCH_SIZE batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + timeout = 300 # $HTTP_TIMEOUT validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL ``` @@ -98,6 +99,10 @@ For Ethereum: [ethereum] wsPath = "127.0.0.1:8546" # $ETH_WS_PATH httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH + nodeID = "arch1" # $ETH_NODE_ID + clientName = "Geth" # $ETH_CLIENT_NAME + genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK + networkID = "1" # $ETH_NETWORK_ID ``` ## Database diff --git a/documentation/super_node/resync.md b/documentation/super_node/resync.md index a207a873..d3936fd9 100644 --- a/documentation/super_node/resync.md +++ b/documentation/super_node/resync.md @@ -41,6 +41,7 @@ This set of parameters needs to be set no matter the chain type. stop = 1000 # $RESYNC_STOP batchSize = 10 # $RESYNC_BATCH_SIZE batchNumber = 100 # $RESYNC_BATCH_NUMBER + timeout = 300 # $HTTP_TIMEOUT clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION ``` @@ -65,4 +66,8 @@ For Ethereum: ```toml [ethereum] httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH + nodeID = "arch1" # $ETH_NODE_ID + clientName = "Geth" # $ETH_CLIENT_NAME + genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK + networkID = "1" # $ETH_NETWORK_ID ``` diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index 0b74cb2b..f84ff97c 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -40,4 +40,4 @@ nodeID = "ocd0" # $BTC_NODE_ID clientName = "Omnicore" # $BTC_CLIENT_NAME genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK - networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID \ No newline at end of file + networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 5258db42..406dfa54 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -15,6 +15,7 @@ stop = 0 # $RESYNC_STOP batchSize = 10 # $RESYNC_BATCH_SIZE batchNumber = 100 # $RESYNC_BATCH_NUMBER + timeout = 300 # $HTTP_TIMEOUT clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION @@ -30,8 +31,13 @@ frequency = 15 # $SUPERNODE_FREQUENCY batchSize = 5 # $SUPERNODE_BATCH_SIZE batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + timeout = 300 # $HTTP_TIMEOUT validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL [ethereum] wsPath = "127.0.0.1:8546" # $ETH_WS_PATH - httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH \ No newline at end of file + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH + nodeID = "arch1" # $ETH_NODE_ID + clientName = "Geth" # $ETH_CLIENT_NAME + genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK + networkID = "1" # $ETH_NETWORK_ID diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go index d2156f8d..e6a1b6b6 100644 --- a/libraries/shared/mocks/batch_client.go +++ b/libraries/shared/mocks/batch_client.go @@ -17,6 +17,7 @@ package mocks import ( + "context" "encoding/json" "errors" @@ -62,3 +63,24 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error { } return nil } + +// BatchCallContext mockClient method to simulate batch call to geth +func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []client.BatchElem) error { + if mc.MappedStateDiffAt == nil { + return errors.New("mockclient needs to be initialized with statediff payloads and errors") + } + for _, batchElem := range batch { + if len(batchElem.Args) != 1 { + return errors.New("expected batch elem to contain single argument") + } + blockHeight, ok := batchElem.Args[0].(uint64) + if !ok { + return errors.New("expected batch elem argument to be a uint64") + } + err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 29059253..61d9efba 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -85,7 +85,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert if err != nil { return nil, err } - fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient) + fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout) if err != nil { return nil, err } diff --git a/pkg/super_node/btc/payload_fetcher.go b/pkg/super_node/btc/payload_fetcher.go index 3582bd90..80d63a93 100644 --- a/pkg/super_node/btc/payload_fetcher.go +++ b/pkg/super_node/btc/payload_fetcher.go @@ -17,6 +17,8 @@ package btc import ( + "fmt" + "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -48,11 +50,11 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain for i, height := range blockHeights { hash, err := fetcher.client.GetBlockHash(int64(height)) if err != nil { - return nil, err + return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlockHash err at blockheight %d: %s", height, err.Error()) } block, err := fetcher.client.GetBlock(hash) if err != nil { - return nil, err + return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlock err at blockheight %d: %s", height, err.Error()) } blockPayloads[i] = BlockPayload{ BlockHeight: int64(height), diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 5614c1e2..f0bda9ad 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -72,6 +72,7 @@ type Config struct { BatchSize uint64 BatchNumber uint64 ValidationLevel int + Timeout time.Duration // HTTP connection timeout in seconds } // NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file @@ -90,6 +91,13 @@ func NewSuperNodeConfig() (*Config, error) { viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) + viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) + + timeout := viper.GetInt("superNode.timeout") + if timeout < 15 { + timeout = 15 + } + c.Timeout = time.Second * time.Duration(timeout) chain := viper.GetString("superNode.chain") c.Chain, err = shared.NewChainType(chain) diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 34ddb93d..3c4a2532 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -18,6 +18,7 @@ package super_node import ( "fmt" + "time" "github.com/btcsuite/btcd/chaincfg" @@ -92,7 +93,7 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha } // NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type -func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.PayloadFetcher, error) { +func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) { switch chain { case shared.Ethereum: batchClient, ok := client.(eth.BatchClient) @@ -100,7 +101,7 @@ func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.Paylo var expectedClient eth.BatchClient return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client) } - return eth.NewPayloadFetcher(batchClient), nil + return eth.NewPayloadFetcher(batchClient, timeout), nil case shared.Bitcoin: connConfig, ok := client.(*rpcclient.ConnConfig) if !ok { diff --git a/pkg/super_node/eth/payload_fetcher.go b/pkg/super_node/eth/payload_fetcher.go index ac12aa41..fece7c10 100644 --- a/pkg/super_node/eth/payload_fetcher.go +++ b/pkg/super_node/eth/payload_fetcher.go @@ -17,33 +17,36 @@ package eth import ( + "context" "fmt" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "time" "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/eth/client" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion type BatchClient interface { - BatchCall(batch []client.BatchElem) error + BatchCallContext(ctx context.Context, batch []client.BatchElem) error } // PayloadFetcher satisfies the PayloadFetcher interface for ethereum type PayloadFetcher struct { // PayloadFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state // http.Client is thread-safe - client BatchClient + client BatchClient + timeout time.Duration } const method = "statediff_stateDiffAt" // NewStateDiffFetcher returns a PayloadFetcher -func NewPayloadFetcher(bc BatchClient) *PayloadFetcher { +func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher { return &PayloadFetcher{ - client: bc, + client: bc, + timeout: timeout, } } @@ -58,14 +61,16 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain Result: new(statediff.Payload), }) } - batchErr := fetcher.client.BatchCall(batch) + ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout) + defer cancel() + batchErr := fetcher.client.BatchCallContext(ctx, batch) if batchErr != nil { - return nil, fmt.Errorf("PayloadFetcher err: %s", batchErr.Error()) + return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error()) } results := make([]shared.RawChainData, 0, len(blockHeights)) for _, batchElem := range batch { if batchElem.Error != nil { - return nil, fmt.Errorf("PayloadFetcher err: %s", batchElem.Error.Error()) + return nil, fmt.Errorf("ethereum PayloadFetcher err at blockheight %d: %s", batchElem.Args[0].(uint64), batchElem.Error.Error()) } payload, ok := batchElem.Result.(*statediff.Payload) if ok { diff --git a/pkg/super_node/eth/payload_fetcher_test.go b/pkg/super_node/eth/payload_fetcher_test.go index 0451aae3..fb7b49b2 100644 --- a/pkg/super_node/eth/payload_fetcher_test.go +++ b/pkg/super_node/eth/payload_fetcher_test.go @@ -17,6 +17,8 @@ package eth_test import ( + "time" + "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -38,7 +40,7 @@ var _ = Describe("StateDiffFetcher", func() { Expect(setDiffAtErr1).ToNot(HaveOccurred()) setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) Expect(setDiffAtErr2).ToNot(HaveOccurred()) - stateDiffFetcher = eth.NewPayloadFetcher(mc) + stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60) }) It("Batch calls statediff_stateDiffAt", func() { blockHeights := []uint64{ diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index 64c5a266..bd35c00c 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -18,6 +18,7 @@ package resync import ( "fmt" + "time" "github.com/spf13/viper" @@ -52,10 +53,11 @@ type Config struct { DBConfig config.Database IPFSPath string - HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s) - NodeInfo core.Node // Info for the associated node - Ranges [][2]uint64 // The block height ranges to resync - BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) + HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s) + NodeInfo core.Node // Info for the associated node + Ranges [][2]uint64 // The block height ranges to resync + BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) + Timeout time.Duration // HTTP connection timeout in seconds BatchNumber uint64 Quit chan bool // Channel for shutting down @@ -76,6 +78,13 @@ func NewReSyncConfig() (*Config, error) { viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE) viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER) viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION) + viper.BindEnv("resync.timeout", shared.HTTP_TIMEOUT) + + timeout := viper.GetInt("resync.timeout") + if timeout < 15 { + timeout = 15 + } + c.Timeout = time.Second * time.Duration(timeout) start := uint64(viper.GetInt64("resync.start")) stop := uint64(viper.GetInt64("resync.stop")) diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index 0b9fd1ec..1e291ff1 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -80,7 +80,7 @@ func NewResyncService(settings *Config) (Resync, error) { if err != nil { return nil, err } - fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient) + fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout) if err != nil { return nil, err } diff --git a/pkg/super_node/shared/env.go b/pkg/super_node/shared/env.go index fbd66dd4..4a895f29 100644 --- a/pkg/super_node/shared/env.go +++ b/pkg/super_node/shared/env.go @@ -24,17 +24,20 @@ import ( "github.com/btcsuite/btcd/rpcclient" "github.com/spf13/viper" - "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/core" - "github.com/vulcanize/vulcanizedb/pkg/eth/node" ) // Env variables const ( - IPFS_PATH = "IPFS_PATH" + IPFS_PATH = "IPFS_PATH" + HTTP_TIMEOUT = "HTTP_TIMEOUT" - ETH_WS_PATH = "ETH_WS_PATH" - ETH_HTTP_PATH = "ETH_HTTP_PATH" + ETH_WS_PATH = "ETH_WS_PATH" + ETH_HTTP_PATH = "ETH_HTTP_PATH" + ETH_NODE_ID = "ETH_NODE_ID" + ETH_CLIENT_NAME = "ETH_CLIENT_NAME" + ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" + ETH_NETWORK_ID = "ETH_NETWORK_ID" BTC_WS_PATH = "BTC_WS_PATH" BTC_HTTP_PATH = "BTC_HTTP_PATH" @@ -47,14 +50,22 @@ const ( ) // GetEthNodeAndClient returns eth node info and client from path url -func GetEthNodeAndClient(path string) (core.Node, core.RPCClient, error) { - rawRPCClient, err := rpc.Dial(path) +func GetEthNodeAndClient(path string) (core.Node, *rpc.Client, error) { + viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) + viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) + viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) + viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) + + rpcClient, err := rpc.Dial(path) if err != nil { return core.Node{}, nil, err } - rpcClient := client.NewRPCClient(rawRPCClient, path) - vdbNode := node.MakeNode(rpcClient) - return vdbNode, rpcClient, nil + return core.Node{ + ID: viper.GetString("ethereum.nodeID"), + ClientName: viper.GetString("ethereum.clientName"), + GenesisBlock: viper.GetString("ethereum.genesisBlock"), + NetworkID: viper.GetString("ethereum.networkID"), + }, rpcClient, nil } // GetIPFSPath returns the ipfs path from the config or env variable From 43c254b5f6667b37a775e2538e2486fa45329d05 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 20 Apr 2020 08:14:02 -0500 Subject: [PATCH 7/7] fixes after tests --- environments/superNodeETH.toml | 4 +- libraries/shared/mocks/batch_client.go | 4 +- libraries/shared/mocks/stream_client.go | 44 ++++++++++++++ libraries/shared/storage/utils/bins.go | 29 ++++++++- pkg/super_node/backfiller.go | 32 +++++----- pkg/super_node/backfiller_test.go | 7 ++- pkg/super_node/btc/cleaner.go | 3 + pkg/super_node/btc/retriever.go | 19 +----- pkg/super_node/config.go | 14 ++--- pkg/super_node/constructors.go | 12 ++-- pkg/super_node/eth/cleaner.go | 3 + pkg/super_node/eth/payload_fetcher.go | 8 +-- pkg/super_node/eth/retriever.go | 19 +----- pkg/super_node/eth/retriever_test.go | 79 ++++++++++++++++++++++++- pkg/super_node/eth/streamer.go | 15 +++-- pkg/super_node/eth/streamer_test.go | 4 +- 16 files changed, 214 insertions(+), 82 deletions(-) create mode 100644 libraries/shared/mocks/stream_client.go diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 406dfa54..837c6afa 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -13,8 +13,8 @@ type = "state" # $RESYNC_TYPE start = 0 # $RESYNC_START stop = 0 # $RESYNC_STOP - batchSize = 10 # $RESYNC_BATCH_SIZE - batchNumber = 100 # $RESYNC_BATCH_NUMBER + batchSize = 5 # $RESYNC_BATCH_SIZE + batchNumber = 50 # $RESYNC_BATCH_NUMBER timeout = 300 # $HTTP_TIMEOUT clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go index e6a1b6b6..e81555af 100644 --- a/libraries/shared/mocks/batch_client.go +++ b/libraries/shared/mocks/batch_client.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/eth/client" ) @@ -65,7 +67,7 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error { } // BatchCallContext mockClient method to simulate batch call to geth -func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []client.BatchElem) error { +func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { if mc.MappedStateDiffAt == nil { return errors.New("mockclient needs to be initialized with statediff payloads and errors") } diff --git a/libraries/shared/mocks/stream_client.go b/libraries/shared/mocks/stream_client.go new file mode 100644 index 00000000..6ae821b4 --- /dev/null +++ b/libraries/shared/mocks/stream_client.go @@ -0,0 +1,44 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "context" + + "github.com/ethereum/go-ethereum/rpc" +) + +type StreamClient struct { + passedContext context.Context + passedResult interface{} + passedNamespace string + passedPayloadChan interface{} + passedSubscribeArgs []interface{} +} + +func (client *StreamClient) Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + client.passedNamespace = namespace + client.passedPayloadChan = payloadChan + client.passedContext = ctx + + for _, arg := range args { + client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg) + } + + subscription := rpc.ClientSubscription{} + return &subscription, nil +} diff --git a/libraries/shared/storage/utils/bins.go b/libraries/shared/storage/utils/bins.go index 18f28c32..42ea0a84 100644 --- a/libraries/shared/storage/utils/bins.go +++ b/libraries/shared/storage/utils/bins.go @@ -16,7 +16,11 @@ package utils -import "errors" +import ( + "errors" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) { if endingBlock < startingBlock { @@ -43,3 +47,26 @@ func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint6 } return blockRangeBins, nil } + +func MissingHeightsToGaps(heights []uint64) []shared.Gap { + validationGaps := make([]shared.Gap, 0) + start := heights[0] + lastHeight := start + for i, height := range heights[1:] { + if height != lastHeight+1 { + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: lastHeight, + }) + start = height + } + if i+2 == len(heights) { + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: height, + }) + } + lastHeight = height + } + return validationGaps +} diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 61d9efba..ff7514bb 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -62,7 +62,7 @@ type BackFillService struct { // Channel for receiving quit signal QuitChan chan bool // Chain type - chain shared.ChainType + Chain shared.ChainType // Headers with times_validated lower than this will be resynced validationLevel int } @@ -108,7 +108,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert BatchNumber: int64(batchNumber), ScreenAndServeChan: screenAndServeChan, QuitChan: settings.Quit, - chain: settings.Chain, + Chain: settings.Chain, validationLevel: settings.ValidationLevel, }, nil } @@ -122,25 +122,25 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) + log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String()) wg.Done() return case <-ticker.C: - log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) + log.Infof("searching for gaps in the %s super node database", bfs.Chain.String()) startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() if err != nil { - log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err) + log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) continue } - if startingBlock != 0 { - log.Infof("found gap at the beginning of the %s sync", bfs.chain.String()) + if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum { + log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { log.Error(err) } } gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) if err != nil { - log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err) + log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err) continue } for _, gap := range gaps { @@ -151,15 +151,15 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } } }() - log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String()) + log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String()) } // backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { - log.Infof("filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) + log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock) if endingBlock < startingBlock { - return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) + return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String()) } // break the range up into bins of smaller ranges blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) @@ -184,12 +184,12 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { go func(blockHeights []uint64) { payloads, err := bfs.Fetcher.FetchAt(blockHeights) if err != nil { - log.Errorf("%s super node historical data fetcher error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error()) } for _, payload := range payloads { ipldPayload, err := bfs.Converter.Convert(payload) if err != nil { - log.Errorf("%s super node historical data converter error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error()) } // If there is a ScreenAndServe process listening, forward payload to it select { @@ -198,14 +198,14 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { } cidPayload, err := bfs.Publisher.Publish(ipldPayload) if err != nil { - log.Errorf("%s super node historical data publisher error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) } if err := bfs.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s super node historical data indexer error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) } } // when this goroutine is done, send out a signal - log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) + log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) processingDone <- true }(blockHeights) } diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 236357e0..56adea4e 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 0, + FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 101, @@ -69,6 +69,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) @@ -101,7 +102,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 0, + FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 100, @@ -124,6 +125,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) @@ -173,6 +175,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) diff --git a/pkg/super_node/btc/cleaner.go b/pkg/super_node/btc/cleaner.go index 95a00b27..a057dddf 100644 --- a/pkg/super_node/btc/cleaner.go +++ b/pkg/super_node/btc/cleaner.go @@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { SET times_validated = 0 WHERE block_number BETWEEN $1 AND $2` if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } return err } } diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go index 4a08cbfd..350917bb 100644 --- a/pkg/super_node/btc/retriever.go +++ b/pkg/super_node/btc/retriever.go @@ -20,6 +20,8 @@ import ( "fmt" "math/big" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/lib/pq" "github.com/ethereum/go-ethereum/common" @@ -194,22 +196,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if len(heights) == 0 { return emptyGaps, nil } - validationGaps := make([]shared.Gap, 0) - start := heights[0] - lastHeight := start - for _, height := range heights[1:] { - if height == lastHeight+1 { - lastHeight = height - continue - } - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: lastHeight, - }) - start = height - lastHeight = start - } - return append(emptyGaps, validationGaps...), nil + return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index f0bda9ad..be3d09c3 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -91,13 +91,6 @@ func NewSuperNodeConfig() (*Config, error) { viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) - viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) - - timeout := viper.GetInt("superNode.timeout") - if timeout < 15 { - timeout = 15 - } - c.Timeout = time.Second * time.Duration(timeout) chain := viper.GetString("superNode.chain") c.Chain, err = shared.NewChainType(chain) @@ -178,6 +171,13 @@ func (c *Config) BackFillFields() error { viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) + viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) + + timeout := viper.GetInt("superNode.timeout") + if timeout < 15 { + timeout = 15 + } + c.Timeout = time.Second * time.Duration(timeout) switch c.Chain { case shared.Ethereum: diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 3c4a2532..fa62fb7d 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -21,12 +21,10 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/rpcclient" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" @@ -73,10 +71,9 @@ func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriev func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { switch chain { case shared.Ethereum: - ethClient, ok := clientOrConfig.(core.RPCClient) + ethClient, ok := clientOrConfig.(*rpc.Client) if !ok { - var expectedClientType core.RPCClient - return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, clientOrConfig) + return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", &rpc.Client{}, clientOrConfig) } streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) return eth.NewPayloadStreamer(ethClient), streamChan, nil @@ -96,10 +93,9 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) { switch chain { case shared.Ethereum: - batchClient, ok := client.(eth.BatchClient) + batchClient, ok := client.(*rpc.Client) if !ok { - var expectedClient eth.BatchClient - return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client) + return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", &rpc.Client{}, client) } return eth.NewPayloadFetcher(batchClient, timeout), nil case shared.Bitcoin: diff --git a/pkg/super_node/eth/cleaner.go b/pkg/super_node/eth/cleaner.go index c07dd063..37ebcba6 100644 --- a/pkg/super_node/eth/cleaner.go +++ b/pkg/super_node/eth/cleaner.go @@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { SET times_validated = 0 WHERE block_number BETWEEN $1 AND $2` if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } return err } } diff --git a/pkg/super_node/eth/payload_fetcher.go b/pkg/super_node/eth/payload_fetcher.go index fece7c10..b1f33649 100644 --- a/pkg/super_node/eth/payload_fetcher.go +++ b/pkg/super_node/eth/payload_fetcher.go @@ -21,15 +21,15 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" - "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion type BatchClient interface { - BatchCallContext(ctx context.Context, batch []client.BatchElem) error + BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error } // PayloadFetcher satisfies the PayloadFetcher interface for ethereum @@ -53,9 +53,9 @@ func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher { // FetchAt fetches the statediff payloads at the given block heights // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { - batch := make([]client.BatchElem, 0) + batch := make([]rpc.BatchElem, 0) for _, height := range blockHeights { - batch = append(batch, client.BatchElem{ + batch = append(batch, rpc.BatchElem{ Method: method, Args: []interface{}{height}, Result: new(statediff.Payload), diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 5baa2c72..539052db 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -20,6 +20,8 @@ import ( "fmt" "math/big" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/jmoiron/sqlx" @@ -479,22 +481,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if len(heights) == 0 { return emptyGaps, nil } - validationGaps := make([]shared.Gap, 0) - start := heights[0] - lastHeight := start - for _, height := range heights[1:] { - if height == lastHeight+1 { - lastHeight = height - continue - } - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: lastHeight, - }) - start = height - lastHeight = start - } - return append(emptyGaps, validationGaps...), nil + return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index 71f4b1d8..444a5d24 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -544,8 +544,16 @@ var _ = Describe("Retriever", func() { payload4.HeaderCID.BlockNumber = "101" payload5 := payload4 payload5.HeaderCID.BlockNumber = "102" - payload6 := payload5 - payload6.HeaderCID.BlockNumber = "1000" + payload6 := payload4 + payload6.HeaderCID.BlockNumber = "103" + payload7 := payload4 + payload7.HeaderCID.BlockNumber = "104" + payload8 := payload4 + payload8.HeaderCID.BlockNumber = "105" + payload9 := payload4 + payload9.HeaderCID.BlockNumber = "106" + payload10 := payload5 + payload10.HeaderCID.BlockNumber = "1000" err := repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) @@ -558,11 +566,76 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload6) Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload7) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload8) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload9) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload10) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(3)) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 103, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) + }) + + It("Finds validation level gaps", func() { + payload1 := *mocks.MockCIDPayload + payload1.HeaderCID.BlockNumber = "1010101" + payload2 := payload1 + payload2.HeaderCID.BlockNumber = "5" + payload3 := payload2 + payload3.HeaderCID.BlockNumber = "100" + payload4 := payload3 + payload4.HeaderCID.BlockNumber = "101" + payload5 := payload4 + payload5.HeaderCID.BlockNumber = "102" + payload6 := payload4 + payload6.HeaderCID.BlockNumber = "103" + payload7 := payload4 + payload7.HeaderCID.BlockNumber = "104" + payload8 := payload4 + payload8.HeaderCID.BlockNumber = "105" + payload9 := payload4 + payload9.HeaderCID.BlockNumber = "106" + payload10 := payload5 + payload10.HeaderCID.BlockNumber = "1000" + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload3) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload4) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload5) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload6) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload7) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload8) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload9) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload10) + Expect(err).ToNot(HaveOccurred()) + + cleaner := eth.NewCleaner(db) + err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}}) + Expect(err).ToNot(HaveOccurred()) + + gaps, err := retriever.RetrieveGapsInData(1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(5)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) }) diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index 123bc911..c16ad6fe 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -17,10 +17,12 @@ package eth import ( + "context" + + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -28,13 +30,18 @@ const ( PayloadChanBufferSize = 20000 // the max eth sub buffer size ) +// StreamClient is an interface for subscribing and streaming from geth +type StreamClient interface { + Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) +} + // PayloadStreamer satisfies the PayloadStreamer interface for ethereum type PayloadStreamer struct { - Client core.RPCClient + Client StreamClient } // NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum -func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { +func NewPayloadStreamer(client StreamClient) *PayloadStreamer { return &PayloadStreamer{ Client: client, } @@ -53,5 +60,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared. } } }() - return ps.Client.Subscribe("statediff", stateDiffChan, "stream") + return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream") } diff --git a/pkg/super_node/eth/streamer_test.go b/pkg/super_node/eth/streamer_test.go index 0e47041c..d6c014f6 100644 --- a/pkg/super_node/eth/streamer_test.go +++ b/pkg/super_node/eth/streamer_test.go @@ -18,14 +18,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/eth/fakes" + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var _ = Describe("StateDiff Streamer", func() { It("subscribes to the geth statediff service", func() { - client := &fakes.MockRPCClient{} + client := &mocks.StreamClient{} streamer := eth.NewPayloadStreamer(client) payloadChan := make(chan shared.RawChainData) _, err := streamer.Stream(payloadChan)