commit
3bd1e518c6
@ -71,6 +71,7 @@ func init() {
|
||||
resyncCmd.PersistentFlags().Int("resync-batch-number", 0, "how many goroutines to fetch data concurrently")
|
||||
resyncCmd.PersistentFlags().Bool("resync-clear-old-cache", false, "if true, clear out old data of the provided type within the resync range before resyncing")
|
||||
resyncCmd.PersistentFlags().Bool("resync-reset-validation", false, "if true, reset times_validated to 0")
|
||||
resyncCmd.PersistentFlags().Int("resync-timeout", 15, "timeout used for resync http requests")
|
||||
|
||||
resyncCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
|
||||
resyncCmd.PersistentFlags().String("btc-password", "", "password for btc node")
|
||||
@ -81,6 +82,10 @@ func init() {
|
||||
resyncCmd.PersistentFlags().String("btc-network-id", "", "btc network id")
|
||||
|
||||
resyncCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
|
||||
resyncCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
|
||||
resyncCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
|
||||
resyncCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
|
||||
resyncCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
|
||||
|
||||
// and their bindings
|
||||
viper.BindPFlag("ipfs.path", resyncCmd.PersistentFlags().Lookup("ipfs-path"))
|
||||
@ -93,6 +98,7 @@ func init() {
|
||||
viper.BindPFlag("resync.batchNumber", resyncCmd.PersistentFlags().Lookup("resync-batch-number"))
|
||||
viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old-cache"))
|
||||
viper.BindPFlag("resync.resetValidation", resyncCmd.PersistentFlags().Lookup("resync-reset-validation"))
|
||||
viper.BindPFlag("resync.timeout", resyncCmd.PersistentFlags().Lookup("resync-timeout"))
|
||||
|
||||
viper.BindPFlag("bitcoin.httpPath", resyncCmd.PersistentFlags().Lookup("btc-http-path"))
|
||||
viper.BindPFlag("bitcoin.pass", resyncCmd.PersistentFlags().Lookup("btc-password"))
|
||||
@ -103,4 +109,8 @@ func init() {
|
||||
viper.BindPFlag("bitcoin.networkID", resyncCmd.PersistentFlags().Lookup("btc-network-id"))
|
||||
|
||||
viper.BindPFlag("ethereum.httpPath", resyncCmd.PersistentFlags().Lookup("eth-http-path"))
|
||||
viper.BindPFlag("ethereum.nodeID", resyncCmd.PersistentFlags().Lookup("eth-node-id"))
|
||||
viper.BindPFlag("ethereum.clientName", resyncCmd.PersistentFlags().Lookup("eth-client-name"))
|
||||
viper.BindPFlag("ethereum.genesisBlock", resyncCmd.PersistentFlags().Lookup("eth-genesis-block"))
|
||||
viper.BindPFlag("ethereum.networkID", resyncCmd.PersistentFlags().Lookup("eth-network-id"))
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ func streamEthSubscription() {
|
||||
}
|
||||
|
||||
func getRPCClient() core.RPCClient {
|
||||
vulcPath := viper.GetString("superNode.ethSubscription.path")
|
||||
vulcPath := viper.GetString("superNode.ethSubscription.wsPath")
|
||||
if vulcPath == "" {
|
||||
vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
|
||||
}
|
||||
|
@ -121,6 +121,7 @@ func init() {
|
||||
superNodeCmd.PersistentFlags().Int("supernode-batch-size", 0, "data fetching batch size")
|
||||
superNodeCmd.PersistentFlags().Int("supernode-batch-number", 0, "how many goroutines to fetch data concurrently")
|
||||
superNodeCmd.PersistentFlags().Int("supernode-validation-level", 0, "backfill will resync any data below this level")
|
||||
superNodeCmd.PersistentFlags().Int("supernode-timeout", 0, "timeout used for backfill http requests")
|
||||
|
||||
superNodeCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node")
|
||||
superNodeCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
|
||||
@ -133,6 +134,10 @@ func init() {
|
||||
|
||||
superNodeCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
|
||||
superNodeCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
|
||||
superNodeCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
|
||||
superNodeCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
|
||||
superNodeCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
|
||||
superNodeCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
|
||||
|
||||
// and their bindings
|
||||
viper.BindPFlag("ipfs.path", superNodeCmd.PersistentFlags().Lookup("ipfs-path"))
|
||||
@ -149,6 +154,7 @@ func init() {
|
||||
viper.BindPFlag("superNode.batchSize", superNodeCmd.PersistentFlags().Lookup("supernode-batch-size"))
|
||||
viper.BindPFlag("superNode.batchNumber", superNodeCmd.PersistentFlags().Lookup("supernode-batch-number"))
|
||||
viper.BindPFlag("superNode.validationLevel", superNodeCmd.PersistentFlags().Lookup("supernode-validation-level"))
|
||||
viper.BindPFlag("superNode.timeout", superNodeCmd.PersistentFlags().Lookup("supernode-timeout"))
|
||||
|
||||
viper.BindPFlag("bitcoin.wsPath", superNodeCmd.PersistentFlags().Lookup("btc-ws-path"))
|
||||
viper.BindPFlag("bitcoin.httpPath", superNodeCmd.PersistentFlags().Lookup("btc-http-path"))
|
||||
@ -161,4 +167,8 @@ func init() {
|
||||
|
||||
viper.BindPFlag("ethereum.wsPath", superNodeCmd.PersistentFlags().Lookup("eth-ws-path"))
|
||||
viper.BindPFlag("ethereum.httpPath", superNodeCmd.PersistentFlags().Lookup("eth-http-path"))
|
||||
viper.BindPFlag("ethereum.nodeID", superNodeCmd.PersistentFlags().Lookup("eth-node-id"))
|
||||
viper.BindPFlag("ethereum.clientName", superNodeCmd.PersistentFlags().Lookup("eth-client-name"))
|
||||
viper.BindPFlag("ethereum.genesisBlock", superNodeCmd.PersistentFlags().Lookup("eth-genesis-block"))
|
||||
viper.BindPFlag("ethereum.networkID", superNodeCmd.PersistentFlags().Lookup("eth-network-id"))
|
||||
}
|
||||
|
262
documentation/super_node/apis.md
Normal file
262
documentation/super_node/apis.md
Normal file
@ -0,0 +1,262 @@
|
||||
## VulcanizeDB Super Node APIs
|
||||
The super node exposes a number of different APIs for remote access to the underlying DB.
|
||||
|
||||
|
||||
### Table of Contents
|
||||
1. [Postgraphile](#postgraphile)
|
||||
1. [RPC Subscription Interface](#rpc-subscription-interface)
|
||||
1. [Native API Recapitulation](#native-api-recapitulation)
|
||||
|
||||
|
||||
### Postgraphile
|
||||
The super node stores all processed data in Postgres using PG-IPFS, this includes all of the IPLD objects.
|
||||
[Postgraphile](https://www.graphile.org/postgraphile/) can be used to expose GraphQL endpoints for the Postgres tables.
|
||||
|
||||
e.g.
|
||||
|
||||
`postgraphile --plugins @graphile/pg-pubsub --subscriptions --simple-subscriptions -c postgres://localhost:5432/vulcanize_public?sslmode=disable -s public,btc,eth -a -j`
|
||||
|
||||
|
||||
This will stand up a Postgraphile server on the public, eth, and btc schemas- exposing GraphQL endpoints for all of the tables contained under those schemas.
|
||||
All of their data can then be queried with standard [GraphQL](https://graphql.org) queries.
|
||||
|
||||
|
||||
### RPC Subscription Interface
|
||||
A direct, real-time subscription to the data being processed by the super node can be established over WS or IPC through the [Stream](../../pkg/super_node/api.go#L53) RPC method.
|
||||
This method is not chain-specific and each chain-type supports it, it is accessed under the "vdb" namespace rather than a chain-specific namespace. An interface for
|
||||
subscribing to this endpoint is provided [here](../../libraries/shared/streamer/super_node_streamer.go).
|
||||
|
||||
When subscribing to this endpoint, the subscriber provides a set of RLP-encoded subscription parameters. These parameters will be chain-specific, and are used
|
||||
by the super node to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../../pkg/super_node/btc/subscription_config.go), [ETH](../../pkg/super_node/eth/subscription_config.go)).
|
||||
|
||||
#### Ethereum RPC Subscription
|
||||
An example of how to subscribe to a real-time Ethereum data feed from the super node using the `Stream` RPC method is provided below
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
|
||||
)
|
||||
|
||||
config, _ := eth.NewEthSubscriptionConfig()
|
||||
rlpConfig, _ := rlp.EncodeToBytes(config)
|
||||
vulcPath := viper.GetString("superNode.ethSubscription.path")
|
||||
rawRPCClient, _ := rpc.Dial(vulcPath)
|
||||
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
|
||||
stream := streamer.NewSuperNodeStreamer(rpcClient)
|
||||
payloadChan := make(chan super_node.SubscriptionPayload, 20000)
|
||||
subscription, _ := stream.Stream(payloadChan, rlpConfig)
|
||||
for {
|
||||
select {
|
||||
case payload := <- payloadChan:
|
||||
// do something with the subscription payload
|
||||
case err := <- subscription.Err():
|
||||
// do something with the subscription error
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The .toml file being used to fill the Ethereum subscription config would look something like this:
|
||||
|
||||
```toml
|
||||
[superNode]
|
||||
[superNode.ethSubscription]
|
||||
historicalData = false
|
||||
historicalDataOnly = false
|
||||
startingBlock = 0
|
||||
endingBlock = 0
|
||||
wsPath = "ws://127.0.0.1:8080"
|
||||
[superNode.ethSubscription.headerFilter]
|
||||
off = false
|
||||
uncles = false
|
||||
[superNode.ethSubscription.txFilter]
|
||||
off = false
|
||||
src = []
|
||||
dst = []
|
||||
[superNode.ethSubscription.receiptFilter]
|
||||
off = false
|
||||
contracts = []
|
||||
topic0s = []
|
||||
topic1s = []
|
||||
topic2s = []
|
||||
topic3s = []
|
||||
[superNode.ethSubscription.stateFilter]
|
||||
off = false
|
||||
addresses = []
|
||||
intermediateNodes = false
|
||||
[superNode.ethSubscription.storageFilter]
|
||||
off = true
|
||||
addresses = []
|
||||
storageKeys = []
|
||||
intermediateNodes = false
|
||||
```
|
||||
|
||||
These configuration parameters are broken down as follows:
|
||||
|
||||
`ethSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to
|
||||
|
||||
`ethSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and
|
||||
send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data
|
||||
|
||||
`ethSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and
|
||||
not stream forward syncing data
|
||||
|
||||
`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in
|
||||
|
||||
`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in;
|
||||
setting to 0 means there is no end/we will continue streaming indefinitely.
|
||||
|
||||
`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any headers to the subscriber
|
||||
- setting `uncles` to true tells the super node to send uncles in addition to normal headers.
|
||||
|
||||
`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any transactions to the subscriber
|
||||
- `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for,
|
||||
if they have any addresses then the super node will only send transactions that were sent or received by the addresses contained
|
||||
in `src` and `dst`, respectively.
|
||||
|
||||
`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any receipts to the subscriber
|
||||
- `topic0s` is a string array which can be filled with event topics we want to filter for,
|
||||
if it has any topics then the super node will only send receipts that contain logs which have that topic0.
|
||||
- `contracts` is a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super node will
|
||||
only send receipts that correspond to one of those contracts.
|
||||
- `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the super node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters.
|
||||
|
||||
`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any state data to the subscriber
|
||||
- `addresses` is a string array which can be filled with ETH addresses we want to filter state for,
|
||||
if it has any addresses then the super node will only send state leafs (accounts) corresponding to those account addresses.
|
||||
- By default the super node only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
|
||||
|
||||
`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any storage data to the subscriber
|
||||
- `addresses` is a string array which can be filled with ETH addresses we want to filter storage for,
|
||||
if it has any addresses then the super node will only send storage nodes from the storage tries at those state addresses.
|
||||
- `storageKeys` is another string array that can be filled with storage keys we want to filter storage data for. It is important to note that the storage keys need to be the actual keccak256 hashes, whereas
|
||||
the addresses in the `addresses` fields are pre-hashed ETH addresses.
|
||||
- By default the super node only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
|
||||
|
||||
### Bitcoin RPC Subscription:
|
||||
An example of how to subscribe to a real-time Bitcoin data feed from the super node using the `Stream` RPC method is provided below
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
|
||||
)
|
||||
|
||||
config, _ := btc.NewBtcSubscriptionConfig()
|
||||
rlpConfig, _ := rlp.EncodeToBytes(config)
|
||||
vulcPath := viper.GetString("superNode.btcSubscription.path")
|
||||
rawRPCClient, _ := rpc.Dial(vulcPath)
|
||||
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
|
||||
stream := streamer.NewSuperNodeStreamer(rpcClient)
|
||||
payloadChan := make(chan super_node.SubscriptionPayload, 20000)
|
||||
subscription, _ := stream.Stream(payloadChan, rlpConfig)
|
||||
for {
|
||||
select {
|
||||
case payload := <- payloadChan:
|
||||
// do something with the subscription payload
|
||||
case err := <- subscription.Err():
|
||||
// do something with the subscription error
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The .toml file being used to fill the Bitcoin subscription config would look something like this:
|
||||
|
||||
```toml
|
||||
[superNode]
|
||||
[superNode.btcSubscription]
|
||||
historicalData = false
|
||||
historicalDataOnly = false
|
||||
startingBlock = 0
|
||||
endingBlock = 0
|
||||
wsPath = "ws://127.0.0.1:8080"
|
||||
[superNode.btcSubscription.headerFilter]
|
||||
off = false
|
||||
[superNode.btcSubscription.txFilter]
|
||||
off = false
|
||||
segwit = false
|
||||
witnessHashes = []
|
||||
indexes = []
|
||||
pkScriptClass = []
|
||||
multiSig = false
|
||||
addresses = []
|
||||
```
|
||||
|
||||
These configuration parameters are broken down as follows:
|
||||
|
||||
`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint we subscribe to
|
||||
|
||||
`btcSubscription.historicalData` specifies whether or not the super node should look up historical data in its cache and
|
||||
send that to the subscriber, if this is set to `false` then the super node only streams newly synced/incoming data
|
||||
|
||||
`btcSubscription.historicalDataOnly` will tell the super node to only send historical data with the specified range and
|
||||
not stream forward syncing data
|
||||
|
||||
`btcSubscription.startingBlock` is the starting block number for the range we want to receive data in
|
||||
|
||||
`btcSubscription.endingBlock` is the ending block number for the range we want to receive data in;
|
||||
setting to 0 means there is no end/we will continue streaming indefinitely.
|
||||
|
||||
`btcSubscription.headerFilter` has one sub-option: `off`.
|
||||
|
||||
- Setting `off` to true tells the super node to
|
||||
not send any headers to the subscriber.
|
||||
- Additional header-filtering options will be added in the future.
|
||||
|
||||
`btcSubscription.txFilter` has seven sub-options: `off`, `segwit`, `witnessHashes`, `indexes`, `pkScriptClass`, `multiSig`, and `addresses`.
|
||||
|
||||
- Setting `off` to true tells the super node to not send any transactions to the subscriber.
|
||||
- Setting `segwit` to true tells the super node to only send segwit transactions.
|
||||
- `witnessHashes` is a string array that can be filled with witness hash string; if it contains any hashes the super node will only send transactions that contain one of those hashes.
|
||||
- `indexes` is an int64 array that can be filled with tx index numbers; if it contains any integers the super node will only send transactions at those indexes (e.g. `[0]` will send only coinbase transactions)
|
||||
- `pkScriptClass` is an uint8 array that can be filled with pk script class numbers; if it contains any integers the super node will only send transactions that have at least one tx output with one of the specified pkscript classes;
|
||||
possible class types are 0 through 8 as defined [here](https://github.com/btcsuite/btcd/blob/master/txscript/standard.go#L52).
|
||||
- Setting `multisig` to true tells the super node to send only multi-sig transactions- to send only transaction that have at least one tx output that requires more than one signature to spend.
|
||||
- `addresses` is a string array that can be filled with btc address strings; if it contains any addresses the super node will only send transactions that have at least one tx output with at least one of the provided addresses.
|
||||
|
||||
|
||||
### Native API Recapitulation:
|
||||
In addition to providing novel Postgraphile and RPC-Subscription endpoints, we are working towards complete recapitulation of the
|
||||
standard chain APIs. This will allow direct compatibility with software that already makes use of the standard interfaces.
|
||||
|
||||
#### Ethereum JSON-RPC API
|
||||
The super node currently faithfully recapitulates portions of the Ethereum JSON-RPC api standard.
|
||||
|
||||
The currently supported endpoints include:
|
||||
`eth_blockNumber`
|
||||
`eth_getLogs`
|
||||
`eth_getHeaderByNumber`
|
||||
`eth_getBlockByNumber`
|
||||
`eth_getBlockByHash`
|
||||
`eth_getTransactionByHash`
|
||||
|
||||
Additional endpoints will be added in the near future, with the immediate goal of recapitulating the largest set of "eth_" endpoints which can be provided as a service.
|
||||
|
||||
#### Bitcoin JSON-RPC API:
|
||||
In the near future, the standard Bitcoin JSON-RPC interfaces will be implemented.
|
@ -1,16 +1,138 @@
|
||||
These are the components of a VulcanizeDB Watcher:
|
||||
* Data Fetcher/Streamer sources:
|
||||
* go-ethereum
|
||||
* bitcoind
|
||||
* btcd
|
||||
* IPFS
|
||||
* Transformers contain:
|
||||
* converter
|
||||
* publisher
|
||||
* indexer
|
||||
* Endpoints contain:
|
||||
* api
|
||||
* backend
|
||||
* filterer
|
||||
* retriever
|
||||
* ipld_server
|
||||
# VulcanizeDB Super Node Architecture
|
||||
The VulcanizeDB super node is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS
|
||||
all chain data. The raw data indexed by the super node serves as the basis for more specific watchers and applications.
|
||||
|
||||
Currently the service supports complete processing of all Bitcoin and Ethereum data.
|
||||
|
||||
## Table of Contents
|
||||
1. [Processes](#processes)
|
||||
1. [Command](#command)
|
||||
1. [Configuration](#config)
|
||||
1. [Database](#database)
|
||||
1. [APIs](#apis)
|
||||
1. [Resync](#resync)
|
||||
1. [IPFS Considerations](#ipfs-considerations)
|
||||
|
||||
## Processes
|
||||
The [super node service](../../pkg/super_node/service.go#L61) is a watcher comprised of the following interfaces:
|
||||
|
||||
* [Payload Fetcher](../../pkg/super_node/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/super_node/btc/payload_fetcher.go), [ETH](../../pkg/super_node/eth/payload_fetcher.go)).
|
||||
* [Payload Streamer](../../pkg/super_node/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/super_node/btc/http_streamer.go), [ETH](../../pkg/super_node/eth/streamer.go)).
|
||||
* [Payload Converter](../../pkg/super_node/shared/interfaces.go#L34): Converters raw chain data to an intermediary form prepared for IPFS publishing. ([BTC](../../pkg/super_node/btc/converter.go), [ETH](../../pkg/super_node/eth/converter.go)).
|
||||
* [IPLD Publisher](../../pkg/super_node/shared/interfaces.go#L39): Publishes the converted data to IPFS, returning their CIDs and associated metadata for indexing. ([BTC](../../pkg/super_node/btc/publisher.go), [ETH](../../pkg/super_node/eth/publisher.go)).
|
||||
* [CID Indexer](../../pkg/super_node/shared/interfaces.go#L44): Indexes CIDs in Postgres with their associated metadata. This metadata is chain specific and selected based on utility. ([BTC](../../pkg/super_node/btc/indexer.go), [ETH](../../pkg/super_node/eth/indexer.go)).
|
||||
* [CID Retriever](../../pkg/super_node/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/super_node/btc/retriever.go), [ETH](../../pkg/super_node/eth/retriever.go)).
|
||||
* [IPLD Fetcher](../../pkg/super_node/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/super_node/btc/ipld_fetcher.go), [ETH](../../pkg/super_node/eth/ipld_fetcher.go))
|
||||
* [Response Filterer](../../pkg/super_node/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/super_node/btc/filterer.go), [ETH](../../pkg/super_node/eth/filterer.go)).
|
||||
* [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/super_node/api.go), [ETH](../../pkg/super_node/eth/api.go)).
|
||||
|
||||
|
||||
Appropriating the service for a new chain is done by creating underlying types to satisfy these interfaces for
|
||||
the specifics of that chain.
|
||||
|
||||
The service uses these interfaces to operate in any combination of three modes: sync, serve, and backfill.
|
||||
* Sync: Streams raw chain data at the head, converts and publishes it to IPFS, and indexes the resulting set of CIDs in Postgres with useful metadata.
|
||||
* BackFill: Automatically searches for and detects gaps in the DB; fetches, converts, publishes, and indexes the data to fill these gaps.
|
||||
* Serve: Opens up IPC, HTTP, and WebSocket servers on top of the superNode DB and any concurrent sync and/or backfill processes.
|
||||
|
||||
|
||||
These three modes are all operated through a single vulcanizeDB command: `superNode`
|
||||
|
||||
## Command
|
||||
|
||||
Usage: `./vulcanizedb superNode --config={config.toml}`
|
||||
|
||||
Configuration can also be done through CLI options and/or environmental variables.
|
||||
CLI options can be found using `./vulcanizedb superNode --help`.
|
||||
|
||||
## Config
|
||||
|
||||
Below is the set of universal config parameters for the superNode command, in .toml form, with the respective environmental variables commented to the side.
|
||||
This set of parameters needs to be set no matter the chain type.
|
||||
|
||||
```toml
|
||||
[database]
|
||||
name = "vulcanize_public" # $DATABASE_NAME
|
||||
hostname = "localhost" # $DATABASE_HOSTNAME
|
||||
port = 5432 # $DATABASE_PORT
|
||||
user = "vdbm" # $DATABASE_USER
|
||||
password = "" # $DATABASE_PASSWORD
|
||||
|
||||
[ipfs]
|
||||
path = "~/.ipfs" # $IPFS_PATH
|
||||
|
||||
[superNode]
|
||||
chain = "bitcoin" # $SUPERNODE_CHAIN
|
||||
server = true # $SUPERNODE_SERVER
|
||||
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH
|
||||
wsPath = "127.0.0.1:8082" # $SUPERNODE_WS_PATH
|
||||
httpPath = "127.0.0.1:8083" # $SUPERNODE_HTTP_PATH
|
||||
sync = true # $SUPERNODE_SYNC
|
||||
workers = 1 # $SUPERNODE_WORKERS
|
||||
backFill = true # $SUPERNODE_BACKFILL
|
||||
frequency = 45 # $SUPERNODE_FREQUENCY
|
||||
batchSize = 1 # $SUPERNODE_BATCH_SIZE
|
||||
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
|
||||
timeout = 300 # $HTTP_TIMEOUT
|
||||
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL
|
||||
```
|
||||
|
||||
Additional parameters need to be set depending on the specific chain.
|
||||
|
||||
For Bitcoin:
|
||||
|
||||
```toml
|
||||
[bitcoin]
|
||||
wsPath = "127.0.0.1:8332" # $BTC_WS_PATH
|
||||
httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH
|
||||
pass = "password" # $BTC_NODE_PASSWORD
|
||||
user = "username" # $BTC_NODE_USER
|
||||
nodeID = "ocd0" # $BTC_NODE_ID
|
||||
clientName = "Omnicore" # $BTC_CLIENT_NAME
|
||||
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK
|
||||
networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID
|
||||
```
|
||||
|
||||
For Ethereum:
|
||||
|
||||
```toml
|
||||
[ethereum]
|
||||
wsPath = "127.0.0.1:8546" # $ETH_WS_PATH
|
||||
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
|
||||
nodeID = "arch1" # $ETH_NODE_ID
|
||||
clientName = "Geth" # $ETH_CLIENT_NAME
|
||||
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
|
||||
networkID = "1" # $ETH_NETWORK_ID
|
||||
```
|
||||
|
||||
## Database
|
||||
|
||||
Currently, the super node persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations).
|
||||
Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema.
|
||||
Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely.
|
||||
|
||||
In the future, we will be moving to a foreign table based architecture wherein a single db is used for shared data while each watcher uses
|
||||
its own database and accesses and acts on the shared data through foreign tables. Isolating watchers to their own databases will prevent complications and
|
||||
conflicts between watcher db migrations.
|
||||
|
||||
|
||||
## APIs
|
||||
|
||||
The super node provides mutliple types of APIs by which to interface with its data.
|
||||
More detailed information on the APIs can be found [here](apis.md).
|
||||
|
||||
## Resync
|
||||
|
||||
A separate command `resync` is available for directing the resyncing of data within specified ranges.
|
||||
This is useful if we want to re-validate a range of data using a new source or clean out bad/deprecated data.
|
||||
More detailed information on this command can be found [here](resync.md).
|
||||
|
||||
## IPFS Considerations
|
||||
|
||||
Currently, the IPLD Publisher and Fetcher use internalized IPFS processes which interface directly with a local IPFS repository.
|
||||
This circumvents the need to run a full IPFS daemon with a [go-ipld-eth](https://github.com/ipfs/go-ipld-eth) plugin, but can lead to issues
|
||||
with lock-contention on the IPFS repo if another IPFS process is configured and running at the same $IPFS_PATH. This also necessitates the need for
|
||||
a locally configured IPFS repository.
|
||||
|
||||
Once go-ipld-eth has been updated to work with a modern version of PG-IPFS, an additional option will be provided to direct
|
||||
all publishing and fetching of IPLD objects through a remote IPFS daemon.
|
73
documentation/super_node/resync.md
Normal file
73
documentation/super_node/resync.md
Normal file
@ -0,0 +1,73 @@
|
||||
## VulcanizeDB Super Node Resync
|
||||
The `resync` command is made available for directing the resyncing of super node data within specified ranges.
|
||||
It also contains a utility for cleaning out old data, and resetting the validation level of data.
|
||||
|
||||
### Rational
|
||||
|
||||
Manual resyncing of data is useful when we want to re-validate data within specific ranges using a new source.
|
||||
|
||||
Cleaning out data is useful when we need to remove bad/deprecated data or prepare for breaking changes to the db schemas.
|
||||
|
||||
Resetting the validation level of data is useful for designating ranges of data for resyncing by an ongoing super node
|
||||
backfill process.
|
||||
|
||||
### Command
|
||||
|
||||
Usage: `./vulcanizedb resync --config={config.toml}`
|
||||
|
||||
Configuration can also be done through CLI options and/or environmental variables.
|
||||
CLI options can be found using `./vulcanizedb resync --help`.
|
||||
|
||||
### Config
|
||||
|
||||
Below is the set of universal config parameters for the resync command, in .toml form, with the respective environmental variables commented to the side.
|
||||
This set of parameters needs to be set no matter the chain type.
|
||||
|
||||
```toml
|
||||
[database]
|
||||
name = "vulcanize_public" # $DATABASE_NAME
|
||||
hostname = "localhost" # $DATABASE_HOSTNAME
|
||||
port = 5432 # $DATABASE_PORT
|
||||
user = "vdbm" # $DATABASE_USER
|
||||
password = "" # $DATABASE_PASSWORD
|
||||
|
||||
[ipfs]
|
||||
path = "~/.ipfs" # $IPFS_PATH
|
||||
|
||||
[resync]
|
||||
chain = "ethereum" # $RESYNC_CHAIN
|
||||
type = "state" # $RESYNC_TYPE
|
||||
start = 0 # $RESYNC_START
|
||||
stop = 1000 # $RESYNC_STOP
|
||||
batchSize = 10 # $RESYNC_BATCH_SIZE
|
||||
batchNumber = 100 # $RESYNC_BATCH_NUMBER
|
||||
timeout = 300 # $HTTP_TIMEOUT
|
||||
clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE
|
||||
resetValidation = true # $RESYNC_RESET_VALIDATION
|
||||
```
|
||||
|
||||
Additional parameters need to be set depending on the specific chain.
|
||||
|
||||
For Bitcoin:
|
||||
|
||||
```toml
|
||||
[bitcoin]
|
||||
httpPath = "127.0.0.1:8332" # $BTC_HTTP_PATH
|
||||
pass = "password" # $BTC_NODE_PASSWORD
|
||||
user = "username" # $BTC_NODE_USER
|
||||
nodeID = "ocd0" # $BTC_NODE_ID
|
||||
clientName = "Omnicore" # $BTC_CLIENT_NAME
|
||||
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" # $BTC_GENESIS_BLOCK
|
||||
networkID = "0xD9B4BEF9" # $BTC_NETWORK_ID
|
||||
```
|
||||
|
||||
For Ethereum:
|
||||
|
||||
```toml
|
||||
[ethereum]
|
||||
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
|
||||
nodeID = "arch1" # $ETH_NODE_ID
|
||||
clientName = "Geth" # $ETH_CLIENT_NAME
|
||||
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
|
||||
networkID = "1" # $ETH_NETWORK_ID
|
||||
```
|
@ -1,14 +1,35 @@
|
||||
## Super Node Setup
|
||||
# VulcanizeDB Super Node Setup
|
||||
Step-by-step instructions for manually setting up and running a VulcanizeDB super node.
|
||||
|
||||
Vulcanizedb can act as an index for chain data stored on IPFS through the use of the `superNode` command.
|
||||
Steps:
|
||||
1. [Postgres](#postgres)
|
||||
1. [Goose](#goose)
|
||||
1. [IPFS](#ipfs)
|
||||
1. [Blockchain](#blockchain)
|
||||
1. [VulcanizeDB](#vulcanizedb)
|
||||
|
||||
### Manual Setup
|
||||
### Postgres
|
||||
A postgresDB is needed to storing all of the data in the vulcanizedb system.
|
||||
Postgres is used as the backing datastore for IPFS, and is used to index the CIDs for all of the chain data stored on IPFS.
|
||||
Follow the guides [here](https://wiki.postgresql.org/wiki/Detailed_installation_guides) for setting up Postgres.
|
||||
|
||||
These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/statediffing)
|
||||
and IPFS.
|
||||
Once the Postgres server is running, we will need to make a database for vulcanizedb, e.g. `vulcanize_public`.
|
||||
|
||||
#### IPFS
|
||||
To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs)
|
||||
`createdb vulcanize_public`
|
||||
|
||||
For running the automated tests, also create a database named `vulcanize_testing`.
|
||||
|
||||
`createdb vulcanize_testing`
|
||||
|
||||
### Goose
|
||||
We use [goose](https://github.com/pressly/goose) as our migration management tool. While it is not necessary to use `goose` for manual setup, it
|
||||
is required for running the automated tests.
|
||||
|
||||
|
||||
### IPFS
|
||||
We use IPFS to store IPLD objects for each type of data we extract from on chain.
|
||||
|
||||
To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs):
|
||||
|
||||
`go get github.com/ipfs/go-ipfs`
|
||||
|
||||
@ -26,11 +47,11 @@ Start by adding the fork and switching over to it:
|
||||
|
||||
`git checkout -b postgres_update vulcanize/postgres_update`
|
||||
|
||||
Now install this fork of ipfs, first be sure to remove any previous installation.
|
||||
Now install this fork of ipfs, first be sure to remove any previous installation:
|
||||
|
||||
`make install`
|
||||
|
||||
Check that is installed properly by running
|
||||
Check that is installed properly by running:
|
||||
|
||||
`ipfs`
|
||||
|
||||
@ -49,7 +70,7 @@ export IPFS_PGPORT=
|
||||
export IPFS_PGPASSWORD=
|
||||
```
|
||||
|
||||
And then run the ipfs command
|
||||
And then run the ipfs command:
|
||||
|
||||
`ipfs init --profile=postgresds`
|
||||
|
||||
@ -62,10 +83,14 @@ and will ask us to enter the password, avoiding storing it to an ENV variable.
|
||||
|
||||
Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes (in fact, we can't).
|
||||
|
||||
#### Geth
|
||||
For Geth, we currently *require* a special fork, and we can set this up as follows:
|
||||
### Blockchain
|
||||
This section describes how to setup an Ethereum or Bitcoin node to serve as a data source for the super node
|
||||
|
||||
Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch
|
||||
#### Ethereum
|
||||
For Ethereum, we currently *require* [a special fork of go-ethereum](https://github.com/vulcanize/go-ethereum/tree/statediff_at_anyblock-1.9.11). This can be setup as follows.
|
||||
Skip this steps if you already have access to a node that displays the statediffing endpoints.
|
||||
|
||||
Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch:
|
||||
|
||||
`go get github.com/ethereum/go-ethereum`
|
||||
|
||||
@ -75,9 +100,9 @@ Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch
|
||||
|
||||
`git fetch vulcanize`
|
||||
|
||||
`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.9`
|
||||
`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.11`
|
||||
|
||||
Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first)
|
||||
Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first):
|
||||
|
||||
`make geth`
|
||||
|
||||
@ -87,163 +112,49 @@ And run the output binary with statediffing turned on:
|
||||
|
||||
`./geth --statediff --statediff.streamblock --ws --syncmode=full`
|
||||
|
||||
Note: if you wish to access historical data (perform `backFill`) then the node will need to operate as an archival node (`--gcmode=archive`)
|
||||
|
||||
Note: other CLI options- statediff specific ones included- can be explored with `./geth help`
|
||||
|
||||
The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter.
|
||||
Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this
|
||||
will be indicated in the output.
|
||||
Note that until it receives a subscriber, the statediffing process does nothing but wait for one. Once a subscription is received, this
|
||||
will be indicated in the output and node will begin processing and sending statediffs.
|
||||
|
||||
Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process.
|
||||
The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc"
|
||||
Also in the output will be the endpoints that we will use to interface with the node.
|
||||
The default ws url is "127.0.0.1:8546" and the default http url is "127.0.0.1:8545".
|
||||
These values will be used as the `ethereum.wsPath` and `ethereum.httpPath` in the super node config, respectively.
|
||||
|
||||
#### Vulcanizedb
|
||||
#### Bitcoin
|
||||
For Bitcoin, the super node is able to operate entirely through the universally exposed JSON-RPC interfaces.
|
||||
This means we can use any of the standard full nodes (e.g. bitcoind, btcd) as our data source.
|
||||
|
||||
The `superNode` command is used to initialize and run an instance of the VulcanizeDB SuperNode
|
||||
Point at a remote node or set one up locally using the instructions for [bitcoind](https://github.com/bitcoin/bitcoin) and [btcd](https://github.com/btcsuite/btcd).
|
||||
|
||||
Usage:
|
||||
The default http url is "127.0.0.1:8332". We will use the http endpoint as both the `bitcoin.wsPath` and `bitcoin.httpPath`
|
||||
(bitcoind does not support websocket endpoints, we are currently using a "subscription" wrapper around the http endpoints)
|
||||
|
||||
### Vulcanizedb
|
||||
Finally, we can begin the vulcanizeDB process itself.
|
||||
|
||||
Start by downloading vulcanizedb and moving into the repo:
|
||||
|
||||
`go get github.com/vulcanize/vulcanizedb`
|
||||
|
||||
`cd $GOPATH/src/github.com/vulcanize/vulcanizedb`
|
||||
|
||||
Run the db migrations against the Postgres database we created for vulcanizeDB:
|
||||
|
||||
`goose -dir=./db/migrations postgres postgres://localhost:5432/vulcanize_public?sslmode=disable up`
|
||||
|
||||
At this point, if we want to run the automated tests:
|
||||
|
||||
`make test`
|
||||
`make integration_test`
|
||||
|
||||
Then, build the vulcanizedb binary:
|
||||
|
||||
`go build`
|
||||
|
||||
And run the super node command with a provided [config](architecture.md/#):
|
||||
|
||||
`./vulcanizedb superNode --config=<config_file.toml`
|
||||
|
||||
|
||||
The config file contains the parameters needed to initialize a super node with the appropriate chain(s), settings, and services
|
||||
|
||||
The below example spins up a super node for btc and eth
|
||||
```toml
|
||||
[superNode]
|
||||
chains = ["ethereum", "bitcoin"]
|
||||
ipfsPath = "/Users/iannorden/.ipfs"
|
||||
|
||||
[superNode.ethereum.database]
|
||||
name = "vulcanize_demo"
|
||||
hostname = "localhost"
|
||||
port = 5432
|
||||
user = "postgres"
|
||||
|
||||
[superNode.ethereum.sync]
|
||||
on = true
|
||||
wsPath = "ws://127.0.0.1:8546"
|
||||
workers = 1
|
||||
|
||||
[superNode.ethereum.server]
|
||||
on = true
|
||||
ipcPath = "/Users/iannorden/.vulcanize/eth/vulcanize.ipc"
|
||||
wsPath = "127.0.0.1:8080"
|
||||
httpPath = "127.0.0.1:8081"
|
||||
|
||||
[superNode.ethereum.backFill]
|
||||
on = true
|
||||
httpPath = "http://127.0.0.1:8545"
|
||||
frequency = 15
|
||||
batchSize = 50
|
||||
|
||||
[superNode.bitcoin.database]
|
||||
name = "vulcanize_demo"
|
||||
hostname = "localhost"
|
||||
port = 5432
|
||||
user = "postgres"
|
||||
|
||||
[superNode.bitcoin.sync]
|
||||
on = true
|
||||
wsPath = "127.0.0.1:8332"
|
||||
workers = 1
|
||||
pass = "GhhOhxL6GxteDhgzrTqj"
|
||||
user = "ocdrpc"
|
||||
|
||||
[superNode.bitcoin.server]
|
||||
on = true
|
||||
ipcPath = "/Users/iannorden/.vulcanize/btc/vulcanize.ipc"
|
||||
wsPath = "127.0.0.1:8082"
|
||||
httpPath = "127.0.0.1:8083"
|
||||
|
||||
[superNode.bitcoin.backFill]
|
||||
on = true
|
||||
httpPath = "127.0.0.1:8332"
|
||||
frequency = 15
|
||||
batchSize = 50
|
||||
pass = "GhhOhxL6GxteDhgzrTqj"
|
||||
user = "ocdrpc"
|
||||
|
||||
[superNode.bitcoin.node]
|
||||
nodeID = "ocd0"
|
||||
clientName = "Omnicore"
|
||||
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
|
||||
networkID = "0xD9B4BEF9"
|
||||
```
|
||||
|
||||
### Dockerfile Setup
|
||||
|
||||
The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance.
|
||||
Note that the instance will need sufficient memory and storage for this to work.
|
||||
|
||||
1. Install basic dependencies
|
||||
```
|
||||
sudo yum update
|
||||
sudo yum install -y curl gpg gcc gcc-c++ make git
|
||||
```
|
||||
|
||||
2. Install Go 1.12
|
||||
```
|
||||
wget https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz
|
||||
tar -xzf go1.12.6.linux-amd64.tar.gz
|
||||
sudo mv go /usr/local
|
||||
```
|
||||
|
||||
3. Edit .bash_profile to export GOPATH
|
||||
```
|
||||
export GOROOT=/usr/local/go
|
||||
export GOPATH=$HOME/go
|
||||
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
|
||||
```
|
||||
|
||||
4. Install and setup Postgres
|
||||
```
|
||||
sudo yum install postgresql postgresql96-server
|
||||
sudo service postgresql96 initdb
|
||||
sudo service postgresql96 start
|
||||
sudo -u postgres createuser -s ec2-user
|
||||
sudo -u postgres createdb ec2-user
|
||||
sudo su postgres
|
||||
psql
|
||||
ALTER USER "ec2-user" WITH SUPERUSER;
|
||||
\q
|
||||
exit
|
||||
```
|
||||
|
||||
4b. Edit hba_file to trust local connections
|
||||
```
|
||||
psql
|
||||
SHOW hba_file;
|
||||
/q
|
||||
sudo vim {PATH_TO_FILE}
|
||||
```
|
||||
|
||||
4c. Stop and restart Postgres server to affect changes
|
||||
```
|
||||
sudo service postgresql96 stop
|
||||
sudo service postgresql96 start
|
||||
```
|
||||
|
||||
5. Install and start Docker (exit and re-enter ec2 instance afterwards to affect changes)
|
||||
```
|
||||
sudo yum install -y docker
|
||||
sudo service docker start
|
||||
sudo usermod -aG docker ec2-user
|
||||
```
|
||||
|
||||
6. Fetch the repository
|
||||
```
|
||||
go get github.com/vulcanize/vulcanizedb
|
||||
cd $GOPATH/src/github.com/vulcanize/vulcanizedb
|
||||
```
|
||||
|
||||
7. Create the db
|
||||
```
|
||||
createdb vulcanize_public
|
||||
```
|
||||
|
||||
8. Build and run the Docker image
|
||||
```
|
||||
cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node
|
||||
docker build --build-arg CONFIG_FILE=environments/superNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 EXPOSE_PORT_3=8082 --build-arg EXPOSE_PORT_4=8083 .
|
||||
docker run --network host -e IPFS_INIT=true -e VDB_PG_NAME=vulcanize_public -e VDB_PG_HOSTNAME=localhost -e VDB_PG_PORT=5432 -e VDB_PG_USER=postgres -e VDB_PG_PASSWORD=password {IMAGE_ID}
|
||||
```
|
@ -1,88 +0,0 @@
|
||||
## SuperNode Subscription
|
||||
|
||||
A transformer can subscribe to the SueprNode service over its ipc or ws endpoints, when subscribing the transformer
|
||||
specifies the chain and a set of parameters which define which subsets of that chain's data the server should feed to them.
|
||||
|
||||
### Ethereum data
|
||||
The `streamEthSubscribe` command serves as a simple demonstration/example of subscribing to the super-node Ethereum feed, it subscribes with a set of parameters
|
||||
defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node Ethereum data,
|
||||
the shared/libraries/streamer can be used.
|
||||
|
||||
Usage:
|
||||
|
||||
`./vulcanizedb streamEthSubscribe --config=<config_file.toml>`
|
||||
|
||||
The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubscription config structure](../../pkg/super_node/config/eth_subscription.go)
|
||||
|
||||
```toml
|
||||
[superNode]
|
||||
[superNode.ethSubscription]
|
||||
historicalData = false
|
||||
historicalDataOnly = false
|
||||
startingBlock = 0
|
||||
endingBlock = 0
|
||||
wsPath = "ws://127.0.0.1:8080"
|
||||
[superNode.ethSubscription.headerFilter]
|
||||
off = false
|
||||
uncles = false
|
||||
[superNode.ethSubscription.txFilter]
|
||||
off = false
|
||||
src = []
|
||||
dst = []
|
||||
[superNode.ethSubscription.receiptFilter]
|
||||
off = false
|
||||
contracts = []
|
||||
topic0s = []
|
||||
topic1s = []
|
||||
topic2s = []
|
||||
topic3s = []
|
||||
[superNode.ethSubscription.stateFilter]
|
||||
off = false
|
||||
addresses = []
|
||||
intermediateNodes = false
|
||||
[superNode.ethSubscription.storageFilter]
|
||||
off = true
|
||||
addresses = []
|
||||
storageKeys = []
|
||||
intermediateNodes = false
|
||||
```
|
||||
|
||||
`ethSubscription.path` is used to define the SuperNode ws url OR ipc endpoint we subscribe to
|
||||
|
||||
`ethSubscription.historicalData` specifies whether or not the super-node should look up historical data in its cache and
|
||||
send that to the subscriber, if this is set to `false` then the super-node only streams newly synced/incoming data
|
||||
|
||||
`ethSubscription.historicalDataOnly` will tell the super-node to only send historical data with the specified range and
|
||||
not stream forward syncing data
|
||||
|
||||
`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in
|
||||
|
||||
`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in;
|
||||
setting to 0 means there is no end/we will continue streaming indefinitely.
|
||||
|
||||
`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to
|
||||
not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers.
|
||||
|
||||
`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to
|
||||
not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for,
|
||||
if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained
|
||||
in `src` and `dst`, respectively.
|
||||
|
||||
`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to
|
||||
not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for,
|
||||
if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is
|
||||
a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will
|
||||
only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for
|
||||
transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters.
|
||||
|
||||
`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to
|
||||
not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for,
|
||||
if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node
|
||||
only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
|
||||
|
||||
`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to
|
||||
not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for,
|
||||
if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string
|
||||
array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas
|
||||
the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node
|
||||
only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
|
16
documentation/super_node/watcher.md
Normal file
16
documentation/super_node/watcher.md
Normal file
@ -0,0 +1,16 @@
|
||||
These are the components of a VulcanizeDB Watcher:
|
||||
* Data Fetcher/Streamer sources:
|
||||
* go-ethereum
|
||||
* bitcoind
|
||||
* btcd
|
||||
* IPFS
|
||||
* Transformers contain:
|
||||
* converter
|
||||
* publisher
|
||||
* indexer
|
||||
* Endpoints contain:
|
||||
* api
|
||||
* backend
|
||||
* filterer
|
||||
* retriever
|
||||
* ipld_server
|
@ -1,6 +1,6 @@
|
||||
[database]
|
||||
name = "vulcanize_public" # $DATABASE_NAME
|
||||
hostname = "localhost" # &DATABASE_HOSTNAME
|
||||
hostname = "localhost" # $DATABASE_HOSTNAME
|
||||
port = 5432 # $DATABASE_PORT
|
||||
user = "vdbm" # $DATABASE_USER
|
||||
password = "" # $DATABASE_PASSWORD
|
||||
|
@ -1,6 +1,6 @@
|
||||
[database]
|
||||
name = "vulcanize_public" # $DATABASE_NAME
|
||||
hostname = "localhost" # &DATABASE_HOSTNAME
|
||||
hostname = "localhost" # $DATABASE_HOSTNAME
|
||||
port = 5432 # $DATABASE_PORT
|
||||
user = "vdbm" # $DATABASE_USER
|
||||
password = "" # $DATABASE_PASSWORD
|
||||
@ -13,8 +13,9 @@
|
||||
type = "state" # $RESYNC_TYPE
|
||||
start = 0 # $RESYNC_START
|
||||
stop = 0 # $RESYNC_STOP
|
||||
batchSize = 10 # $RESYNC_BATCH_SIZE
|
||||
batchNumber = 100 # $RESYNC_BATCH_NUMBER
|
||||
batchSize = 5 # $RESYNC_BATCH_SIZE
|
||||
batchNumber = 50 # $RESYNC_BATCH_NUMBER
|
||||
timeout = 300 # $HTTP_TIMEOUT
|
||||
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
|
||||
resetValidation = true # $RESYNC_RESET_VALIDATION
|
||||
|
||||
@ -30,8 +31,13 @@
|
||||
frequency = 15 # $SUPERNODE_FREQUENCY
|
||||
batchSize = 5 # $SUPERNODE_BATCH_SIZE
|
||||
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
|
||||
timeout = 300 # $HTTP_TIMEOUT
|
||||
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL
|
||||
|
||||
[ethereum]
|
||||
wsPath = "127.0.0.1:8546" # $ETH_WS_PATH
|
||||
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
|
||||
nodeID = "arch1" # $ETH_NODE_ID
|
||||
clientName = "Geth" # $ETH_CLIENT_NAME
|
||||
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
|
||||
networkID = "1" # $ETH_NETWORK_ID
|
||||
|
@ -17,9 +17,12 @@
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
|
||||
)
|
||||
@ -62,3 +65,24 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchCallContext mockClient method to simulate batch call to geth
|
||||
func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
|
||||
if mc.MappedStateDiffAt == nil {
|
||||
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
|
||||
}
|
||||
for _, batchElem := range batch {
|
||||
if len(batchElem.Args) != 1 {
|
||||
return errors.New("expected batch elem to contain single argument")
|
||||
}
|
||||
blockHeight, ok := batchElem.Args[0].(uint64)
|
||||
if !ok {
|
||||
return errors.New("expected batch elem argument to be a uint64")
|
||||
}
|
||||
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
44
libraries/shared/mocks/stream_client.go
Normal file
44
libraries/shared/mocks/stream_client.go
Normal file
@ -0,0 +1,44 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2019 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
type StreamClient struct {
|
||||
passedContext context.Context
|
||||
passedResult interface{}
|
||||
passedNamespace string
|
||||
passedPayloadChan interface{}
|
||||
passedSubscribeArgs []interface{}
|
||||
}
|
||||
|
||||
func (client *StreamClient) Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
|
||||
client.passedNamespace = namespace
|
||||
client.passedPayloadChan = payloadChan
|
||||
client.passedContext = ctx
|
||||
|
||||
for _, arg := range args {
|
||||
client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg)
|
||||
}
|
||||
|
||||
subscription := rpc.ClientSubscription{}
|
||||
return &subscription, nil
|
||||
}
|
@ -16,7 +16,11 @@
|
||||
|
||||
package utils
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
)
|
||||
|
||||
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
|
||||
if endingBlock < startingBlock {
|
||||
@ -43,3 +47,26 @@ func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint6
|
||||
}
|
||||
return blockRangeBins, nil
|
||||
}
|
||||
|
||||
func MissingHeightsToGaps(heights []uint64) []shared.Gap {
|
||||
validationGaps := make([]shared.Gap, 0)
|
||||
start := heights[0]
|
||||
lastHeight := start
|
||||
for i, height := range heights[1:] {
|
||||
if height != lastHeight+1 {
|
||||
validationGaps = append(validationGaps, shared.Gap{
|
||||
Start: start,
|
||||
Stop: lastHeight,
|
||||
})
|
||||
start = height
|
||||
}
|
||||
if i+2 == len(heights) {
|
||||
validationGaps = append(validationGaps, shared.Gap{
|
||||
Start: start,
|
||||
Stop: height,
|
||||
})
|
||||
}
|
||||
lastHeight = height
|
||||
}
|
||||
return validationGaps
|
||||
}
|
||||
|
26
pkg/ipfs/interfaces.go
Normal file
26
pkg/ipfs/interfaces.go
Normal file
@ -0,0 +1,26 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2019 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package ipfs
|
||||
|
||||
import (
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
)
|
||||
|
||||
// DagPutter is a general interface for a dag putter
|
||||
type DagPutter interface {
|
||||
DagPut(n ipld.Node) (string, error)
|
||||
}
|
@ -62,7 +62,7 @@ type BackFillService struct {
|
||||
// Channel for receiving quit signal
|
||||
QuitChan chan bool
|
||||
// Chain type
|
||||
chain shared.ChainType
|
||||
Chain shared.ChainType
|
||||
// Headers with times_validated lower than this will be resynced
|
||||
validationLevel int
|
||||
}
|
||||
@ -85,7 +85,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient)
|
||||
fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -108,7 +108,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
|
||||
BatchNumber: int64(batchNumber),
|
||||
ScreenAndServeChan: screenAndServeChan,
|
||||
QuitChan: settings.Quit,
|
||||
chain: settings.Chain,
|
||||
Chain: settings.Chain,
|
||||
validationLevel: settings.ValidationLevel,
|
||||
}, nil
|
||||
}
|
||||
@ -122,25 +122,25 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
|
||||
for {
|
||||
select {
|
||||
case <-bfs.QuitChan:
|
||||
log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String())
|
||||
log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String())
|
||||
wg.Done()
|
||||
return
|
||||
case <-ticker.C:
|
||||
log.Infof("searching for gaps in the %s super node database", bfs.chain.String())
|
||||
log.Infof("searching for gaps in the %s super node database", bfs.Chain.String())
|
||||
startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber()
|
||||
if err != nil {
|
||||
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err)
|
||||
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
|
||||
continue
|
||||
}
|
||||
if startingBlock != 0 {
|
||||
log.Infof("found gap at the beginning of the %s sync", bfs.chain.String())
|
||||
if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum {
|
||||
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
|
||||
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
|
||||
if err != nil {
|
||||
log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err)
|
||||
log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err)
|
||||
continue
|
||||
}
|
||||
for _, gap := range gaps {
|
||||
@ -151,15 +151,15 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
|
||||
}
|
||||
}
|
||||
}()
|
||||
log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String())
|
||||
log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String())
|
||||
}
|
||||
|
||||
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
|
||||
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
|
||||
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
|
||||
log.Infof("filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock)
|
||||
log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock)
|
||||
if endingBlock < startingBlock {
|
||||
return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String())
|
||||
return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String())
|
||||
}
|
||||
// break the range up into bins of smaller ranges
|
||||
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize)
|
||||
@ -184,12 +184,12 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
|
||||
go func(blockHeights []uint64) {
|
||||
payloads, err := bfs.Fetcher.FetchAt(blockHeights)
|
||||
if err != nil {
|
||||
log.Errorf("%s super node historical data fetcher error: %s", bfs.chain.String(), err.Error())
|
||||
log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error())
|
||||
}
|
||||
for _, payload := range payloads {
|
||||
ipldPayload, err := bfs.Converter.Convert(payload)
|
||||
if err != nil {
|
||||
log.Errorf("%s super node historical data converter error: %s", bfs.chain.String(), err.Error())
|
||||
log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error())
|
||||
}
|
||||
// If there is a ScreenAndServe process listening, forward payload to it
|
||||
select {
|
||||
@ -198,14 +198,14 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
|
||||
}
|
||||
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
|
||||
if err != nil {
|
||||
log.Errorf("%s super node historical data publisher error: %s", bfs.chain.String(), err.Error())
|
||||
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
|
||||
}
|
||||
if err := bfs.Indexer.Index(cidPayload); err != nil {
|
||||
log.Errorf("%s super node historical data indexer error: %s", bfs.chain.String(), err.Error())
|
||||
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())
|
||||
}
|
||||
}
|
||||
// when this goroutine is done, send out a signal
|
||||
log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
|
||||
log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
|
||||
processingDone <- true
|
||||
}(blockHeights)
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
|
||||
ReturnErr: nil,
|
||||
}
|
||||
mockRetriever := &mocks2.CIDRetriever{
|
||||
FirstBlockNumberToReturn: 0,
|
||||
FirstBlockNumberToReturn: 1,
|
||||
GapsToRetrieve: []shared.Gap{
|
||||
{
|
||||
Start: 100, Stop: 101,
|
||||
@ -69,6 +69,7 @@ var _ = Describe("BackFiller", func() {
|
||||
BatchSize: super_node.DefaultMaxBatchSize,
|
||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||
QuitChan: quitChan,
|
||||
Chain: shared.Ethereum,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
backfiller.FillGapsInSuperNode(wg)
|
||||
@ -101,7 +102,7 @@ var _ = Describe("BackFiller", func() {
|
||||
ReturnErr: nil,
|
||||
}
|
||||
mockRetriever := &mocks2.CIDRetriever{
|
||||
FirstBlockNumberToReturn: 0,
|
||||
FirstBlockNumberToReturn: 1,
|
||||
GapsToRetrieve: []shared.Gap{
|
||||
{
|
||||
Start: 100, Stop: 100,
|
||||
@ -124,6 +125,7 @@ var _ = Describe("BackFiller", func() {
|
||||
BatchSize: super_node.DefaultMaxBatchSize,
|
||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||
QuitChan: quitChan,
|
||||
Chain: shared.Ethereum,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
backfiller.FillGapsInSuperNode(wg)
|
||||
@ -173,6 +175,7 @@ var _ = Describe("BackFiller", func() {
|
||||
BatchSize: super_node.DefaultMaxBatchSize,
|
||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||
QuitChan: quitChan,
|
||||
Chain: shared.Ethereum,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
backfiller.FillGapsInSuperNode(wg)
|
||||
|
@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
|
||||
SET times_validated = 0
|
||||
WHERE block_number BETWEEN $1 AND $2`
|
||||
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
package btc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/rpcclient"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcutil"
|
||||
@ -48,11 +50,11 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain
|
||||
for i, height := range blockHeights {
|
||||
hash, err := fetcher.client.GetBlockHash(int64(height))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlockHash err at blockheight %d: %s", height, err.Error())
|
||||
}
|
||||
block, err := fetcher.client.GetBlock(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlock err at blockheight %d: %s", height, err.Error())
|
||||
}
|
||||
blockPayloads[i] = BlockPayload{
|
||||
BlockHeight: int64(height),
|
||||
|
@ -28,9 +28,9 @@ import (
|
||||
|
||||
// IPLDPublisher satisfies the IPLDPublisher for ethereum
|
||||
type IPLDPublisher struct {
|
||||
HeaderPutter shared.DagPutter
|
||||
TransactionPutter shared.DagPutter
|
||||
TransactionTriePutter shared.DagPutter
|
||||
HeaderPutter ipfs.DagPutter
|
||||
TransactionPutter ipfs.DagPutter
|
||||
TransactionTriePutter ipfs.DagPutter
|
||||
}
|
||||
|
||||
// NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||
|
||||
"github.com/lib/pq"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -194,22 +196,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
|
||||
if len(heights) == 0 {
|
||||
return emptyGaps, nil
|
||||
}
|
||||
validationGaps := make([]shared.Gap, 0)
|
||||
start := heights[0]
|
||||
lastHeight := start
|
||||
for _, height := range heights[1:] {
|
||||
if height == lastHeight+1 {
|
||||
lastHeight = height
|
||||
continue
|
||||
}
|
||||
validationGaps = append(validationGaps, shared.Gap{
|
||||
Start: start,
|
||||
Stop: lastHeight,
|
||||
})
|
||||
start = height
|
||||
lastHeight = start
|
||||
}
|
||||
return append(emptyGaps, validationGaps...), nil
|
||||
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
|
||||
}
|
||||
|
||||
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
||||
|
@ -52,7 +52,7 @@ type TxFilter struct {
|
||||
}
|
||||
|
||||
// Init is used to initialize a EthSubscription struct with env variables
|
||||
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
|
||||
func NewBtcSubscriptionConfig() (*SubscriptionSettings, error) {
|
||||
sc := new(SubscriptionSettings)
|
||||
// Below default to false, which means we do not backfill by default
|
||||
sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData")
|
||||
|
@ -72,6 +72,7 @@ type Config struct {
|
||||
BatchSize uint64
|
||||
BatchNumber uint64
|
||||
ValidationLevel int
|
||||
Timeout time.Duration // HTTP connection timeout in seconds
|
||||
}
|
||||
|
||||
// NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file
|
||||
@ -170,6 +171,13 @@ func (c *Config) BackFillFields() error {
|
||||
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE)
|
||||
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER)
|
||||
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL)
|
||||
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
|
||||
|
||||
timeout := viper.GetInt("superNode.timeout")
|
||||
if timeout < 15 {
|
||||
timeout = 15
|
||||
}
|
||||
c.Timeout = time.Second * time.Duration(timeout)
|
||||
|
||||
switch c.Chain {
|
||||
case shared.Ethereum:
|
||||
|
@ -18,14 +18,13 @@ package super_node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
|
||||
"github.com/btcsuite/btcd/rpcclient"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
|
||||
@ -72,10 +71,9 @@ func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriev
|
||||
func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) {
|
||||
switch chain {
|
||||
case shared.Ethereum:
|
||||
ethClient, ok := clientOrConfig.(core.RPCClient)
|
||||
ethClient, ok := clientOrConfig.(*rpc.Client)
|
||||
if !ok {
|
||||
var expectedClientType core.RPCClient
|
||||
return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, clientOrConfig)
|
||||
return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", &rpc.Client{}, clientOrConfig)
|
||||
}
|
||||
streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize)
|
||||
return eth.NewPayloadStreamer(ethClient), streamChan, nil
|
||||
@ -92,15 +90,14 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha
|
||||
}
|
||||
|
||||
// NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type
|
||||
func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.PayloadFetcher, error) {
|
||||
func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) {
|
||||
switch chain {
|
||||
case shared.Ethereum:
|
||||
batchClient, ok := client.(eth.BatchClient)
|
||||
batchClient, ok := client.(*rpc.Client)
|
||||
if !ok {
|
||||
var expectedClient eth.BatchClient
|
||||
return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client)
|
||||
return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", &rpc.Client{}, client)
|
||||
}
|
||||
return eth.NewPayloadFetcher(batchClient), nil
|
||||
return eth.NewPayloadFetcher(batchClient, timeout), nil
|
||||
case shared.Bitcoin:
|
||||
connConfig, ok := client.(*rpcclient.ConnConfig)
|
||||
if !ok {
|
||||
|
@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
|
||||
SET times_validated = 0
|
||||
WHERE block_number BETWEEN $1 AND $2`
|
||||
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -17,55 +17,60 @@
|
||||
package eth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
)
|
||||
|
||||
// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
|
||||
type BatchClient interface {
|
||||
BatchCall(batch []client.BatchElem) error
|
||||
BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error
|
||||
}
|
||||
|
||||
// PayloadFetcher satisfies the PayloadFetcher interface for ethereum
|
||||
type PayloadFetcher struct {
|
||||
// PayloadFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state
|
||||
// http.Client is thread-safe
|
||||
client BatchClient
|
||||
client BatchClient
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
const method = "statediff_stateDiffAt"
|
||||
|
||||
// NewStateDiffFetcher returns a PayloadFetcher
|
||||
func NewPayloadFetcher(bc BatchClient) *PayloadFetcher {
|
||||
func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
|
||||
return &PayloadFetcher{
|
||||
client: bc,
|
||||
client: bc,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// FetchAt fetches the statediff payloads at the given block heights
|
||||
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
|
||||
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
|
||||
batch := make([]client.BatchElem, 0)
|
||||
batch := make([]rpc.BatchElem, 0)
|
||||
for _, height := range blockHeights {
|
||||
batch = append(batch, client.BatchElem{
|
||||
batch = append(batch, rpc.BatchElem{
|
||||
Method: method,
|
||||
Args: []interface{}{height},
|
||||
Result: new(statediff.Payload),
|
||||
})
|
||||
}
|
||||
batchErr := fetcher.client.BatchCall(batch)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout)
|
||||
defer cancel()
|
||||
batchErr := fetcher.client.BatchCallContext(ctx, batch)
|
||||
if batchErr != nil {
|
||||
return nil, fmt.Errorf("PayloadFetcher err: %s", batchErr.Error())
|
||||
return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error())
|
||||
}
|
||||
results := make([]shared.RawChainData, 0, len(blockHeights))
|
||||
for _, batchElem := range batch {
|
||||
if batchElem.Error != nil {
|
||||
return nil, fmt.Errorf("PayloadFetcher err: %s", batchElem.Error.Error())
|
||||
return nil, fmt.Errorf("ethereum PayloadFetcher err at blockheight %d: %s", batchElem.Args[0].(uint64), batchElem.Error.Error())
|
||||
}
|
||||
payload, ok := batchElem.Result.(*statediff.Payload)
|
||||
if ok {
|
||||
|
@ -17,6 +17,8 @@
|
||||
package eth_test
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
@ -38,7 +40,7 @@ var _ = Describe("StateDiffFetcher", func() {
|
||||
Expect(setDiffAtErr1).ToNot(HaveOccurred())
|
||||
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
|
||||
Expect(setDiffAtErr2).ToNot(HaveOccurred())
|
||||
stateDiffFetcher = eth.NewPayloadFetcher(mc)
|
||||
stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60)
|
||||
})
|
||||
It("Batch calls statediff_stateDiffAt", func() {
|
||||
blockHeights := []uint64{
|
||||
|
@ -36,13 +36,13 @@ import (
|
||||
|
||||
// IPLDPublisher satisfies the IPLDPublisher for ethereum
|
||||
type IPLDPublisher struct {
|
||||
HeaderPutter shared.DagPutter
|
||||
TransactionPutter shared.DagPutter
|
||||
TransactionTriePutter shared.DagPutter
|
||||
ReceiptPutter shared.DagPutter
|
||||
ReceiptTriePutter shared.DagPutter
|
||||
StatePutter shared.DagPutter
|
||||
StoragePutter shared.DagPutter
|
||||
HeaderPutter ipfs.DagPutter
|
||||
TransactionPutter ipfs.DagPutter
|
||||
TransactionTriePutter ipfs.DagPutter
|
||||
ReceiptPutter ipfs.DagPutter
|
||||
ReceiptTriePutter ipfs.DagPutter
|
||||
StatePutter ipfs.DagPutter
|
||||
StoragePutter ipfs.DagPutter
|
||||
}
|
||||
|
||||
// NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/jmoiron/sqlx"
|
||||
@ -479,22 +481,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
|
||||
if len(heights) == 0 {
|
||||
return emptyGaps, nil
|
||||
}
|
||||
validationGaps := make([]shared.Gap, 0)
|
||||
start := heights[0]
|
||||
lastHeight := start
|
||||
for _, height := range heights[1:] {
|
||||
if height == lastHeight+1 {
|
||||
lastHeight = height
|
||||
continue
|
||||
}
|
||||
validationGaps = append(validationGaps, shared.Gap{
|
||||
Start: start,
|
||||
Stop: lastHeight,
|
||||
})
|
||||
start = height
|
||||
lastHeight = start
|
||||
}
|
||||
return append(emptyGaps, validationGaps...), nil
|
||||
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
|
||||
}
|
||||
|
||||
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
||||
|
@ -544,8 +544,16 @@ var _ = Describe("Retriever", func() {
|
||||
payload4.HeaderCID.BlockNumber = "101"
|
||||
payload5 := payload4
|
||||
payload5.HeaderCID.BlockNumber = "102"
|
||||
payload6 := payload5
|
||||
payload6.HeaderCID.BlockNumber = "1000"
|
||||
payload6 := payload4
|
||||
payload6.HeaderCID.BlockNumber = "103"
|
||||
payload7 := payload4
|
||||
payload7.HeaderCID.BlockNumber = "104"
|
||||
payload8 := payload4
|
||||
payload8.HeaderCID.BlockNumber = "105"
|
||||
payload9 := payload4
|
||||
payload9.HeaderCID.BlockNumber = "106"
|
||||
payload10 := payload5
|
||||
payload10.HeaderCID.BlockNumber = "1000"
|
||||
err := repo.Index(&payload1)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload2)
|
||||
@ -558,11 +566,76 @@ var _ = Describe("Retriever", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload6)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload7)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload8)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload9)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload10)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
gaps, err := retriever.RetrieveGapsInData(1)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(len(gaps)).To(Equal(3))
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 103, Stop: 999})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
||||
})
|
||||
|
||||
It("Finds validation level gaps", func() {
|
||||
payload1 := *mocks.MockCIDPayload
|
||||
payload1.HeaderCID.BlockNumber = "1010101"
|
||||
payload2 := payload1
|
||||
payload2.HeaderCID.BlockNumber = "5"
|
||||
payload3 := payload2
|
||||
payload3.HeaderCID.BlockNumber = "100"
|
||||
payload4 := payload3
|
||||
payload4.HeaderCID.BlockNumber = "101"
|
||||
payload5 := payload4
|
||||
payload5.HeaderCID.BlockNumber = "102"
|
||||
payload6 := payload4
|
||||
payload6.HeaderCID.BlockNumber = "103"
|
||||
payload7 := payload4
|
||||
payload7.HeaderCID.BlockNumber = "104"
|
||||
payload8 := payload4
|
||||
payload8.HeaderCID.BlockNumber = "105"
|
||||
payload9 := payload4
|
||||
payload9.HeaderCID.BlockNumber = "106"
|
||||
payload10 := payload5
|
||||
payload10.HeaderCID.BlockNumber = "1000"
|
||||
err := repo.Index(&payload1)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload2)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload3)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload4)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload6)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload7)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload8)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload9)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = repo.Index(&payload10)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
cleaner := eth.NewCleaner(db)
|
||||
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
gaps, err := retriever.RetrieveGapsInData(1)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(len(gaps)).To(Equal(5))
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
|
||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
@ -17,10 +17,12 @@
|
||||
package eth
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
)
|
||||
|
||||
@ -28,13 +30,18 @@ const (
|
||||
PayloadChanBufferSize = 20000 // the max eth sub buffer size
|
||||
)
|
||||
|
||||
// StreamClient is an interface for subscribing and streaming from geth
|
||||
type StreamClient interface {
|
||||
Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
|
||||
}
|
||||
|
||||
// PayloadStreamer satisfies the PayloadStreamer interface for ethereum
|
||||
type PayloadStreamer struct {
|
||||
Client core.RPCClient
|
||||
Client StreamClient
|
||||
}
|
||||
|
||||
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
|
||||
func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer {
|
||||
func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
|
||||
return &PayloadStreamer{
|
||||
Client: client,
|
||||
}
|
||||
@ -53,5 +60,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ps.Client.Subscribe("statediff", stateDiffChan, "stream")
|
||||
return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream")
|
||||
}
|
||||
|
@ -18,14 +18,14 @@ import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/fakes"
|
||||
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
)
|
||||
|
||||
var _ = Describe("StateDiff Streamer", func() {
|
||||
It("subscribes to the geth statediff service", func() {
|
||||
client := &fakes.MockRPCClient{}
|
||||
client := &mocks.StreamClient{}
|
||||
streamer := eth.NewPayloadStreamer(client)
|
||||
payloadChan := make(chan shared.RawChainData)
|
||||
_, err := streamer.Stream(payloadChan)
|
||||
|
@ -18,6 +18,7 @@ package resync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
|
||||
@ -52,10 +53,11 @@ type Config struct {
|
||||
DBConfig config.Database
|
||||
IPFSPath string
|
||||
|
||||
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
|
||||
NodeInfo core.Node // Info for the associated node
|
||||
Ranges [][2]uint64 // The block height ranges to resync
|
||||
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
|
||||
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
|
||||
NodeInfo core.Node // Info for the associated node
|
||||
Ranges [][2]uint64 // The block height ranges to resync
|
||||
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
|
||||
Timeout time.Duration // HTTP connection timeout in seconds
|
||||
BatchNumber uint64
|
||||
|
||||
Quit chan bool // Channel for shutting down
|
||||
@ -76,6 +78,13 @@ func NewReSyncConfig() (*Config, error) {
|
||||
viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE)
|
||||
viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER)
|
||||
viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION)
|
||||
viper.BindEnv("resync.timeout", shared.HTTP_TIMEOUT)
|
||||
|
||||
timeout := viper.GetInt("resync.timeout")
|
||||
if timeout < 15 {
|
||||
timeout = 15
|
||||
}
|
||||
c.Timeout = time.Second * time.Duration(timeout)
|
||||
|
||||
start := uint64(viper.GetInt64("resync.start"))
|
||||
stop := uint64(viper.GetInt64("resync.stop"))
|
||||
|
@ -80,7 +80,7 @@ func NewResyncService(settings *Config) (Resync, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient)
|
||||
fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -24,17 +24,20 @@ import (
|
||||
|
||||
"github.com/btcsuite/btcd/rpcclient"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/node"
|
||||
)
|
||||
|
||||
// Env variables
|
||||
const (
|
||||
IPFS_PATH = "IPFS_PATH"
|
||||
IPFS_PATH = "IPFS_PATH"
|
||||
HTTP_TIMEOUT = "HTTP_TIMEOUT"
|
||||
|
||||
ETH_WS_PATH = "ETH_WS_PATH"
|
||||
ETH_HTTP_PATH = "ETH_HTTP_PATH"
|
||||
ETH_WS_PATH = "ETH_WS_PATH"
|
||||
ETH_HTTP_PATH = "ETH_HTTP_PATH"
|
||||
ETH_NODE_ID = "ETH_NODE_ID"
|
||||
ETH_CLIENT_NAME = "ETH_CLIENT_NAME"
|
||||
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK"
|
||||
ETH_NETWORK_ID = "ETH_NETWORK_ID"
|
||||
|
||||
BTC_WS_PATH = "BTC_WS_PATH"
|
||||
BTC_HTTP_PATH = "BTC_HTTP_PATH"
|
||||
@ -47,14 +50,22 @@ const (
|
||||
)
|
||||
|
||||
// GetEthNodeAndClient returns eth node info and client from path url
|
||||
func GetEthNodeAndClient(path string) (core.Node, core.RPCClient, error) {
|
||||
rawRPCClient, err := rpc.Dial(path)
|
||||
func GetEthNodeAndClient(path string) (core.Node, *rpc.Client, error) {
|
||||
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
|
||||
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
|
||||
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
|
||||
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
|
||||
|
||||
rpcClient, err := rpc.Dial(path)
|
||||
if err != nil {
|
||||
return core.Node{}, nil, err
|
||||
}
|
||||
rpcClient := client.NewRPCClient(rawRPCClient, path)
|
||||
vdbNode := node.MakeNode(rpcClient)
|
||||
return vdbNode, rpcClient, nil
|
||||
return core.Node{
|
||||
ID: viper.GetString("ethereum.nodeID"),
|
||||
ClientName: viper.GetString("ethereum.clientName"),
|
||||
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
|
||||
NetworkID: viper.GetString("ethereum.networkID"),
|
||||
}, rpcClient, nil
|
||||
}
|
||||
|
||||
// GetIPFSPath returns the ipfs path from the config or env variable
|
||||
|
@ -18,8 +18,6 @@ package shared
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
)
|
||||
|
||||
// PayloadStreamer streams chain-specific payloads to the provided channel
|
||||
@ -71,11 +69,6 @@ type ClientSubscription interface {
|
||||
Unsubscribe()
|
||||
}
|
||||
|
||||
// DagPutter is a general interface for a dag putter
|
||||
type DagPutter interface {
|
||||
DagPut(n node.Node) (string, error)
|
||||
}
|
||||
|
||||
// Cleaner is for cleaning out data from the cache within the given ranges
|
||||
type Cleaner interface {
|
||||
Clean(rngs [][2]uint64, t DataType) error
|
||||
|
@ -74,7 +74,7 @@ func NewWatcherConfig() (*Config, error) {
|
||||
return nil, err
|
||||
}
|
||||
case shared.Bitcoin:
|
||||
c.SubscriptionConfig, err = btc.NewEthSubscriptionConfig()
|
||||
c.SubscriptionConfig, err = btc.NewBtcSubscriptionConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -19,9 +19,9 @@ package version
|
||||
import "fmt"
|
||||
|
||||
const (
|
||||
VersionMajor = 0 // Major version component of the current release
|
||||
VersionMinor = 1 // Minor version component of the current release
|
||||
VersionPatch = 0 // Patch version component of the current release
|
||||
VersionMajor = 0 // Major version component of the current release
|
||||
VersionMinor = 1 // Minor version component of the current release
|
||||
VersionPatch = 0 // Patch version component of the current release
|
||||
VersionMeta = "alpha" // Version metadata to append to the version string
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user