Merge pull request #98 from deep-stack/pm-import-instructions

Add instructions to import data output in file mode
This commit is contained in:
Ashwin Phatak 2022-07-19 15:10:25 +05:30 committed by GitHub
commit 681e656034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 286 additions and 144 deletions

382
README.md
View File

@ -2,13 +2,13 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/vulcanize/eth-statediff-service)](https://goreportcard.com/report/github.com/vulcanize/eth-statediff-service)
>> standalone statediffing service ontop of LevelDB
>> standalone statediffing service on top of LevelDB
Purpose:
Stand up a statediffing service directly on top of a go-ethereum LevelDB instance.
This service can serve historical state data over the same rpc interface as
[statediffing geth](https://github.com/vulcanize/go-ethereum/releases/tag/v1.9.11-statediff-0.0.5) without needing to run a full node
[statediffing geth](https://github.com/vulcanize/go-ethereum) without needing to run a full node.
## Setup
@ -18,6 +18,112 @@ Build the binary:
make build
```
## Configuration
An example config file:
```toml
[leveldb]
# LevelDB access mode <local | remote>
mode = "local" # LVLDB_MODE
# in local mode
# LevelDB paths
path = "/Users/user/Library/Ethereum/geth/chaindata" # LVLDB_PATH
ancient = "/Users/user/Library/Ethereum/geth/chaindata/ancient" # LVLDB_ANCIENT
# in remote mode
# URL for leveldb-ethdb-rpc endpoint
url = "http://127.0.0.1:8082/" # LVLDB_URL
[server]
ipcPath = ".ipc" # SERVICE_IPC_PATH
httpPath = "127.0.0.1:8545" # SERVICE_HTTP_PATH
[statediff]
prerun = true # STATEDIFF_PRERUN
serviceWorkers = 1 # STATEDIFF_SERVICE_WORKERS
workerQueueSize = 1024 # STATEDIFF_WORKER_QUEUE_SIZE
trieWorkers = 4 # STATEDIFF_TRIE_WORKERS
[prerun]
only = false # PRERUN_ONLY
# to perform prerun in a specific range (optional)
start = 0 # PRERUN_RANGE_START
stop = 100 # PRERUN_RANGE_STOP
# to perform prerun over multiple ranges (optional)
ranges = [
[101, 1000]
]
# statediffing params for prerun
[prerun.params]
intermediateStateNodes = true # PRERUN_INTERMEDIATE_STATE_NODES
intermediateStorageNodes = true # PRERUN_INTERMEDIATE_STORAGE_NODES
includeBlock = true # PRERUN_INCLUDE_BLOCK
includeReceipts = true # PRERUN_INCLUDE_RECEIPTS
includeTD = true # PRERUN_INCLUDE_TD
includeCode = true # PRERUN_INCLUDE_CODE
watchedAddresses = []
[log]
file = "" # LOG_FILE_PATH
level = "info" # LOG_LEVEL
[database]
# output type <postgres | file | dump>
type = "postgres"
# with postgres type
# db credentials
name = "vulcanize_test" # DATABASE_NAME
hostname = "localhost" # DATABASE_HOSTNAME
port = 5432 # DATABASE_PORT
user = "vulcanize" # DATABASE_USER
password = "..." # DATABASE_PASSWORD
driver = "sqlx" # DATABASE_DRIVER_TYPE <sqlx | pgx>
# with file type
# file mode <sql | csv>
fileMode = "csv" # DATABASE_FILE_MODE
# with SQL file mode
filePath = "" # DATABASE_FILE_PATH
# with CSV file mode
fileCsvDir = "output_dir" # DATABASE_FILE_CSV_DIR
# with dump type
# <stdout | stderr | discard>
dumpDestination = "" # DATABASE_DUMP_DST
[cache]
database = 1024 # DB_CACHE_SIZE_MB
trie = 1024 # TRIE_CACHE_SIZE_MB
[prom]
# prometheus metrics
metrics = true # PROM_METRICS
http = true # PROM_HTTP
httpAddr = "localhost" # PROM_HTTP_ADDR
httpPort = "8889" # PROM_HTTP_PORT
dbStats = true # PROM_DB_STATS
[ethereum]
# node info
nodeID = "" # ETH_NODE_ID
clientName = "eth-statediff-service" # ETH_CLIENT_NAME
networkID = 1 # ETH_NETWORK_ID
chainID = 1 # ETH_CHAIN_ID
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # ETH_GENESIS_BLOCK
# path to custom chain config file (optional)
# keep chainID same as that in chain config file
chainConfig = "./chain.json" # ETH_CHAIN_CONFIG
```
### Local Setup
* Create a chain config file `chain.json` according to chain config in genesis json file used by local geth.
@ -42,144 +148,180 @@ make build
}
```
* Change the following in [config file](./environments/config.toml)
```toml
[leveldb]
mode = "local"
# Path to geth LevelDB data
path = "/path-to-local-geth-data/chaindata"
ancient = "/path-to-local-geth-data/chaindata/ancient"
[ethereum]
chainConfig = "./chain.json" # Path to custom chain config file
chainID = 41337 # Same chain ID as in chain.json
[database]
# Update database config
name = "vulcanize_testing"
hostname = "localhost"
port = 5432
user = "postgres"
password = "postgres"
type = "postgres"
```
* To write statediff for a range of block make changes in [config file](./environments/config.toml)
```toml
[prerun]
only = false
ranges = [
[8, 15] # Block number range for which to write statediff.
]
```
* To use remote LevelDB RPC endpoint change the following in [config file](./environments/config.toml)
```toml
[leveldb]
mode = "remote"
url = "http://127.0.0.1:8082/" # Remote LevelDB RPC url
```
Provide the path to the above file in the config.
## Usage
* Create / update the config file (refer to example config above).
### `serve`
To serve state diffs over RPC:
* To serve the statediff RPC API:
`eth-statediff-service serve --config=<config path>`
```bash
./eth-statediff-service serve --config=<config path>
```
Example:
Example:
```bash
./eth-statediff-service serve --config environments/config.toml
```
```bash
./eth-statediff-service serve --config environments/config.toml
```
Available RPC methods are:
* `statediff_stateTrieAt()`
* `statediff_streamCodeAndCodeHash()`
* `statediff_stateDiffAt()`
* `statediff_writeStateDiffAt()`
* `statediff_writeStateDiffsInRange()`
* Available RPC methods:
* `statediff_stateTrieAt()`
* `statediff_streamCodeAndCodeHash()`
* `statediff_stateDiffAt()`
* `statediff_writeStateDiffAt()`
* `statediff_writeStateDiffsInRange()`
e.g. `curl -X POST -H 'Content-Type: application/json' --data '{"jsonrpc":"2.0","method":"statediff_writeStateDiffsInRange","params":['"$BEGIN"', '"$END"', {"intermediateStateNodes":true,"intermediateStorageNodes":true,"includeBlock":true,"includeReceipts":true,"includeTD":true,"includeCode":true}],"id":1}' "$HOST":"$PORT"`
Example:
The process can be configured locally with sets of ranges to process as a "prerun" to processing directed by the server endpoints.
This is done by turning "prerun" on in the config (`statediff.prerun = true`) and defining ranged and params in the
`prerun` section of the config as shown below.
```bash
curl -X POST -H 'Content-Type: application/json' --data '{"jsonrpc":"2.0","method":"statediff_writeStateDiffsInRange","params":['"$BEGIN"', '"$END"', {"intermediateStateNodes":true,"intermediateStorageNodes":true,"includeBlock":true,"includeReceipts":true,"includeTD":true,"includeCode":true}],"id":1}' "$HOST":"$PORT"
```
## Configuration
* Prerun:
* The process can be configured locally with sets of ranges to process as a "prerun" to processing directed by the server endpoints.
* This is done by turning "prerun" on in the config (`statediff.prerun = true`) and defining ranges and params in the
`prerun` section of the config.
* Set the range using `prerun.start` and `prerun.stop`. Use `prerun.ranges` if prerun on more than one range is required.
An example config file:
* NOTE: Currently, `params.includeTD` must be set to / passed as `true`.
```toml
[leveldb]
mode = "local"
# path and ancient LevelDB paths required in local mode
path = "/Users/user/Library/Ethereum/geth/chaindata"
ancient = "/Users/user/Library/Ethereum/geth/chaindata/ancient"
# url for leveldb-ethdb-rpc endpoint required in remote mode
url = "http://127.0.0.1:8082/"
## Monitoring
[server]
ipcPath = ".ipc"
httpPath = "127.0.0.1:8545"
* Enable metrics using config parameters `prom.metrics` and `prom.http`.
* `eth-statediff-service` exposes following prometheus metrics at `/metrics` endpoint:
* `ranges_queued`: Number of range requests currently queued.
* `loaded_height`: The last block that was loaded for processing.
* `processed_height`: The last block that was processed.
* `stats.t_block_load`: Block loading time.
* `stats.t_block_processing`: Block (header, uncles, txs, rcts, tx trie, rct trie) processing time.
* `stats.t_state_processing`: State (state trie, storage tries, and code) processing time.
* `stats.t_postgres_tx_commit`: Postgres tx commit time.
* `http.count`: HTTP request count.
* `http.duration`: HTTP request duration.
* `ipc.count`: Unix socket connection count.
[statediff]
prerun = true
serviceWorkers = 1
workerQueueSize = 1024
trieWorkers = 4
## Tests
[prerun]
only = false
ranges = [
[0, 1000]
]
[prerun.params]
intermediateStateNodes = true
intermediateStorageNodes = true
includeBlock = true
includeReceipts = true
includeTD = true
includeCode = true
watchedAddresses = []
* Run unit tests:
[log]
file = ""
level = "info"
```bash
make test
```
[eth]
chainID = 1
## Import output data in file mode into a database
[database]
name = "vulcanize_test"
hostname = "localhost"
port = 5432
user = "vulcanize"
password = "..."
type = "postgres"
driver = "sqlx"
dumpDestination = ""
fileMode = "csv"
fileCsvDir = ""
filePath = ""
* When `eth-statediff-service` is run in file mode (`database.type`) the output is in form of a SQL file or multiple CSV files.
[cache]
database = 1024
trie = 1024
* Assuming the output files are located in host's `./output_dir` directory.
[prom]
dbStats = false
metrics = true
http = true
httpAddr = "localhost"
httpPort = "8889"
* Create a directory to store post-processed output:
[ethereum]
nodeID = ""
clientName = "eth-statediff-service"
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
networkID = 1
chainID = 1
```
```bash
mkdir -p output_dir/processed_output
```
### SQL
* De-duplicate data:
```bash
sort -u output_dir/statediff.sql -o output_dir/processed_output/deduped-statediff.sql
```
* Copy over the post-processed output files to the DB server (say in `/output_dir`).
* Run the following to import data:
```bash
psql -U <DATABASE_USER> -h <DATABASE_HOSTNAME> -p <DATABASE_PORT> <DATABASE_NAME> --set ON_ERROR_STOP=on -f /output_dir/processed_output/deduped-statediff.sql
```
### CSV
* De-duplicate data and copy to post-processed output directory:
```bash
# public.blocks
sort -u output_dir/public.blocks.csv -o output_dir/processed_output/deduped-public.blocks.csv
# eth.header_cids
sort -u output_dir/eth.header_cids.csv -o output_dir/processed_output/deduped-eth.header_cids.csv
# eth.uncle_cids
sort -u output_dir/eth.uncle_cids.csv -o output_dir/processed_output/deduped-eth.uncle_cids.csv
# eth.transaction_cids
sort -u output_dir/eth.transaction_cids.csv -o output_dir/processed_output/deduped-eth.transaction_cids.csv
# eth.access_list_elements
sort -u output_dir/eth.access_list_elements.csv -o output_dir/processed_output/deduped-eth.access_list_elements.csv
# eth.receipt_cids
sort -u output_dir/eth.receipt_cids.csv -o output_dir/processed_output/deduped-eth.receipt_cids.csv
# eth.log_cids
sort -u output_dir/eth.log_cids.csv -o output_dir/processed_output/deduped-eth.log_cids.csv
# eth.state_cids
sort -u output_dir/eth.state_cids.csv -o output_dir/processed_output/deduped-eth.state_cids.csv
# eth.storage_cids
sort -u output_dir/eth.storage_cids.csv -o output_dir/processed_output/deduped-eth.storage_cids.csv
# eth.state_accounts
sort -u output_dir/eth.state_accounts.csv -o output_dir/processed_output/deduped-eth.state_accounts.csv
# public.nodes
cp output_dir/public.nodes.csv output_dir/processed_output/public.nodes.csv
```
* Copy over the post-processed output files to the DB server (say in `/output_dir`).
* Start `psql` to run the import commands:
```bash
psql -U <DATABASE_USER> -h <DATABASE_HOSTNAME> -p <DATABASE_PORT> <DATABASE_NAME>
```
* Run the following to import data:
```bash
# public.nodes
COPY public.nodes FROM '/output_dir/processed_output/public.nodes.csv' CSV;
# public.nodes
COPY public.blocks FROM '/output_dir/processed_output/deduped-public.blocks.csv' CSV;
# eth.header_cids
COPY eth.header_cids FROM '/output_dir/processed_output/deduped-eth.header_cids.csv' CSV;
# eth.uncle_cids
COPY eth.uncle_cids FROM '/output_dir/processed_output/deduped-eth.uncle_cids.csv' CSV;
# eth.transaction_cids
COPY eth.transaction_cids FROM '/output_dir/processed_output/deduped-eth.transaction_cids.csv' CSV FORCE NOT NULL dst;
# eth.access_list_elements
COPY eth.access_list_elements FROM '/output_dir/processed_output/deduped-eth.access_list_elements.csv' CSV;
# eth.receipt_cids
COPY eth.receipt_cids FROM '/output_dir/processed_output/deduped-eth.receipt_cids.csv' CSV FORCE NOT NULL post_state, contract, contract_hash;
# eth.log_cids
COPY eth.log_cids FROM '/output_dir/processed_output/deduped-eth.log_cids.csv' CSV FORCE NOT NULL topic0, topic1, topic2, topic3;
# eth.state_cids
COPY eth.state_cids FROM '/output_dir/processed_output/deduped-eth.state_cids.csv' CSV FORCE NOT NULL state_leaf_key;
# eth.storage_cids
COPY eth.storage_cids FROM '/output_dir/processed_output/deduped-eth.storage_cids.csv' CSV FORCE NOT NULL storage_leaf_key;
# eth.state_accounts
COPY eth.state_accounts FROM '/output_dir/processed_output/deduped-eth.state_accounts.csv' CSV;
```
* NOTE: `COPY` command on CSVs inserts empty strings as `NULL` in the DB. Passing `FORCE_NOT_NULL <COLUMN_NAME>` forces it to insert empty strings instead. This is required to maintain compatibility of the imported statediff data with the data generated in `postgres` mode. Reference: https://www.postgresql.org/docs/14/sql-copy.html

View File

@ -6,12 +6,13 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff"
gethsd "github.com/ethereum/go-ethereum/statediff"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie"
"github.com/spf13/viper"
sd "github.com/vulcanize/eth-statediff-service/pkg"
"github.com/vulcanize/eth-statediff-service/pkg/prom"
)
type blockRange [2]uint64
@ -78,11 +79,16 @@ func createStateDiffService() (sd.StateDiffService, error) {
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Info("Creating statediff indexer")
_, indexer, err := ind.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf)
db, indexer, err := ind.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf)
if err != nil {
logWithCommand.Fatal(err)
}
if conf.Type() == shared.POSTGRES && viper.GetBool("prom.dbStats") {
prom.RegisterDBCollector(viper.GetString("database.name"), db)
}
logWithCommand.Info("Creating statediff service")
sdConf := sd.Config{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
@ -97,7 +103,7 @@ func setupPreRunRanges() []sd.RangeRequest {
if !viper.GetBool("statediff.prerun") {
return nil
}
preRunParams := gethsd.Params{
preRunParams := statediff.Params{
IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"),
IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"),
IncludeBlock: viper.GetBool("prerun.params.includeBlock"),

View File

@ -2,7 +2,6 @@
mode = "local"
path = "/app/geth-rw/chaindata"
ancient = "/app/geth-rw/chaindata/ancient"
url = "http://127.0.0.1:8082/"
[server]
ipcPath = ""
@ -31,33 +30,29 @@
level = "info"
[database]
type = "postgres"
name = ""
hostname = ""
port = 5432
user = ""
password = ""
type = "postgres"
driver = "sqlx"
dumpDestination = ""
fileMode = "csv"
fileCsvDir = ""
filePath = ""
[cache]
database = 1024
trie = 4096
[prom]
dbStats = false
metrics = true
http = true
httpAddr = "0.0.0.0"
httpPort = 9100
dbStats = false
[ethereum]
chainConfig = ""
nodeID = ""
clientName = "eth-statediff-service"
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
networkID = 1
chainID = 1
chainConfig = ""

View File

@ -17,9 +17,9 @@
package prom
import (
"database/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
)
const (
@ -29,7 +29,7 @@ const (
// DBStatsGetter is an interface that gets sql.DBStats.
type DBStatsGetter interface {
Stats() sql.DBStats
Stats() sql.Stats
}
// DBStatsCollector implements the prometheus.Collector interface.
@ -122,41 +122,41 @@ func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
c.maxOpenDesc,
prometheus.GaugeValue,
float64(stats.MaxOpenConnections),
float64(stats.MaxOpen()),
)
ch <- prometheus.MustNewConstMetric(
c.openDesc,
prometheus.GaugeValue,
float64(stats.OpenConnections),
float64(stats.Open()),
)
ch <- prometheus.MustNewConstMetric(
c.inUseDesc,
prometheus.GaugeValue,
float64(stats.InUse),
float64(stats.InUse()),
)
ch <- prometheus.MustNewConstMetric(
c.idleDesc,
prometheus.GaugeValue,
float64(stats.Idle),
float64(stats.Idle()),
)
ch <- prometheus.MustNewConstMetric(
c.waitedForDesc,
prometheus.CounterValue,
float64(stats.WaitCount),
float64(stats.WaitCount()),
)
ch <- prometheus.MustNewConstMetric(
c.blockedSecondsDesc,
prometheus.CounterValue,
stats.WaitDuration.Seconds(),
stats.WaitDuration().Seconds(),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxIdleDesc,
prometheus.CounterValue,
float64(stats.MaxIdleClosed),
float64(stats.MaxIdleClosed()),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxLifetimeDesc,
prometheus.CounterValue,
float64(stats.MaxLifetimeClosed),
float64(stats.MaxLifetimeClosed()),
)
}

View File

@ -19,7 +19,6 @@ package prom
import (
"time"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
@ -123,7 +122,7 @@ func Init() {
}
// RegisterDBCollector create metric collector for given connection
func RegisterDBCollector(name string, db *sqlx.DB) {
func RegisterDBCollector(name string, db DBStatsGetter) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}