Add helper scripts for processing and importing statediffed data (#105)
* Add helper scripts for processing and importing statediffed data * Add instructions to use the helper scripts * Update scripts to use arrays * Update README with bad row output example * Remove delimiter option from xargs command
This commit is contained in:
parent
67d8bced4f
commit
2db235f244
2
Makefile
2
Makefile
@ -4,7 +4,7 @@ docker-build:
|
||||
docker build -t cerc-io/eth-statediff-service .
|
||||
|
||||
.PHONY: test
|
||||
test: | $(GOOSE)
|
||||
test:
|
||||
go test -p 1 ./pkg/... -v
|
||||
|
||||
build:
|
||||
|
98
README.md
98
README.md
@ -216,6 +216,8 @@ An example config file:
|
||||
|
||||
* When `eth-statediff-service` is run in file mode (`database.type`) the output is in form of a SQL file or multiple CSV files.
|
||||
|
||||
### SQL
|
||||
|
||||
* Assuming the output files are located in host's `./output_dir` directory.
|
||||
|
||||
* Create a directory to store post-processed output:
|
||||
@ -224,7 +226,11 @@ An example config file:
|
||||
mkdir -p output_dir/processed_output
|
||||
```
|
||||
|
||||
### SQL
|
||||
* (Optional) Get row counts in the output:
|
||||
|
||||
```bash
|
||||
wc -l output_dir/statediff.sql > output_stats.txt
|
||||
```
|
||||
|
||||
* De-duplicate data:
|
||||
|
||||
@ -242,86 +248,48 @@ An example config file:
|
||||
|
||||
### CSV
|
||||
|
||||
* De-duplicate data and copy to post-processed output directory:
|
||||
* Create an env file with the required variables. Refer [.sample.env](./scripts/.sample.env).
|
||||
|
||||
* (Optional) Get row counts in the output:
|
||||
|
||||
```bash
|
||||
# public.blocks
|
||||
sort -u output_dir/public.blocks.csv -o output_dir/processed_output/deduped-public.blocks.csv
|
||||
|
||||
# eth.header_cids
|
||||
sort -u output_dir/eth.header_cids.csv -o output_dir/processed_output/deduped-eth.header_cids.csv
|
||||
|
||||
# eth.uncle_cids
|
||||
sort -u output_dir/eth.uncle_cids.csv -o output_dir/processed_output/deduped-eth.uncle_cids.csv
|
||||
|
||||
# eth.transaction_cids
|
||||
sort -u output_dir/eth.transaction_cids.csv -o output_dir/processed_output/deduped-eth.transaction_cids.csv
|
||||
|
||||
# eth.access_list_elements
|
||||
sort -u output_dir/eth.access_list_elements.csv -o output_dir/processed_output/deduped-eth.access_list_elements.csv
|
||||
|
||||
# eth.receipt_cids
|
||||
sort -u output_dir/eth.receipt_cids.csv -o output_dir/processed_output/deduped-eth.receipt_cids.csv
|
||||
|
||||
# eth.log_cids
|
||||
sort -u output_dir/eth.log_cids.csv -o output_dir/processed_output/deduped-eth.log_cids.csv
|
||||
|
||||
# eth.state_cids
|
||||
sort -u output_dir/eth.state_cids.csv -o output_dir/processed_output/deduped-eth.state_cids.csv
|
||||
|
||||
# eth.storage_cids
|
||||
sort -u output_dir/eth.storage_cids.csv -o output_dir/processed_output/deduped-eth.storage_cids.csv
|
||||
|
||||
# eth.state_accounts
|
||||
sort -u output_dir/eth.state_accounts.csv -o output_dir/processed_output/deduped-eth.state_accounts.csv
|
||||
|
||||
# public.nodes
|
||||
cp output_dir/public.nodes.csv output_dir/processed_output/public.nodes.csv
|
||||
./scripts/count-lines.sh <ENV_FILE_PATH>
|
||||
```
|
||||
|
||||
* Copy over the post-processed output files to the DB server (say in `/output_dir`).
|
||||
|
||||
* Start `psql` to run the import commands:
|
||||
* De-duplicate data:
|
||||
|
||||
```bash
|
||||
psql -U <DATABASE_USER> -h <DATABASE_HOSTNAME> -p <DATABASE_PORT> <DATABASE_NAME>
|
||||
./scripts/dedup.sh <ENV_FILE_PATH>
|
||||
```
|
||||
|
||||
* Run the following to import data:
|
||||
* Perform column checks:
|
||||
|
||||
```bash
|
||||
# public.nodes
|
||||
COPY public.nodes FROM '/output_dir/processed_output/public.nodes.csv' CSV;
|
||||
./scripts/check-columns.sh <ENV_FILE_PATH>
|
||||
```
|
||||
|
||||
# public.nodes
|
||||
COPY public.blocks FROM '/output_dir/processed_output/deduped-public.blocks.csv' CSV;
|
||||
Check the output logs for any rows detected with unexpected number of columns.
|
||||
|
||||
# eth.header_cids
|
||||
COPY eth.header_cids FROM '/output_dir/processed_output/deduped-eth.header_cids.csv' CSV;
|
||||
Example:
|
||||
|
||||
# eth.uncle_cids
|
||||
COPY eth.uncle_cids FROM '/output_dir/processed_output/deduped-eth.uncle_cids.csv' CSV;
|
||||
```bash
|
||||
# log
|
||||
eth.header_cids
|
||||
Start: Wednesday 21 September 2022 06:00:38 PM IST
|
||||
Time taken: 00:00:05
|
||||
End: Wednesday 21 September 2022 06:00:43 PM IST
|
||||
Total bad rows: 1 ./check-columns/eth.header_cids.txt
|
||||
|
||||
# eth.transaction_cids
|
||||
COPY eth.transaction_cids FROM '/output_dir/processed_output/deduped-eth.transaction_cids.csv' CSV FORCE NOT NULL dst;
|
||||
# bad row output
|
||||
# line number, num. of columns, data
|
||||
23 17 22,xxxxxx,0x07f5ea5c94aa8dea60b28f6b6315d92f2b6d78ca4b74ea409adeb191b5a114f2,0x5918487321aa57dd0c50977856c6231e7c4ee79e95b694c7c8830227d77a1ecc,bagiacgzaa726uxeuvkg6uyfsr5vwgfozf4vw26gkjn2ouqe232yzdnnbctza,45,geth,0,0xad8fa8df61b98dbda7acd6ca76d5ce4cbba663d5f608cc940957adcdb94cee8d,0xc621412320a20b4aaff5363bdf063b9d13e394ef82e55689ab703aae5db08e26,0x71ec1c7d81269ce115be81c81f13e1cc2601c292a7f20440a77257ecfdc69940,0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347,\x2000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000,1658408419,/blocks/DMQAP5PKLSKKVDPKMCZI623DCXMS6K3NPDFEW5HKICNN5MMRWWQRJ4Q,1,0x0000000000000000000000000000000000000000
|
||||
```
|
||||
|
||||
# eth.access_list_elements
|
||||
COPY eth.access_list_elements FROM '/output_dir/processed_output/deduped-eth.access_list_elements.csv' CSV;
|
||||
* Import data using `timescaledb-parallel-copy`:
|
||||
(requires [`timescaledb-parallel-copy`](https://github.com/timescale/timescaledb-parallel-copy) installation; readily comes with TimescaleDB docker image)
|
||||
|
||||
# eth.receipt_cids
|
||||
COPY eth.receipt_cids FROM '/output_dir/processed_output/deduped-eth.receipt_cids.csv' CSV FORCE NOT NULL post_state, contract, contract_hash;
|
||||
|
||||
# eth.log_cids
|
||||
COPY eth.log_cids FROM '/output_dir/processed_output/deduped-eth.log_cids.csv' CSV FORCE NOT NULL topic0, topic1, topic2, topic3;
|
||||
|
||||
# eth.state_cids
|
||||
COPY eth.state_cids FROM '/output_dir/processed_output/deduped-eth.state_cids.csv' CSV FORCE NOT NULL state_leaf_key;
|
||||
|
||||
# eth.storage_cids
|
||||
COPY eth.storage_cids FROM '/output_dir/processed_output/deduped-eth.storage_cids.csv' CSV FORCE NOT NULL storage_leaf_key;
|
||||
|
||||
# eth.state_accounts
|
||||
COPY eth.state_accounts FROM '/output_dir/processed_output/deduped-eth.state_accounts.csv' CSV;
|
||||
```bash
|
||||
./scripts/timescaledb-import.sh <ENV_FILE_PATH>
|
||||
```
|
||||
|
||||
* NOTE: `COPY` command on CSVs inserts empty strings as `NULL` in the DB. Passing `FORCE_NOT_NULL <COLUMN_NAME>` forces it to insert empty strings instead. This is required to maintain compatibility of the imported statediff data with the data generated in `postgres` mode. Reference: https://www.postgresql.org/docs/14/sql-copy.html
|
||||
|
28
scripts/.env.example
Normal file
28
scripts/.env.example
Normal file
@ -0,0 +1,28 @@
|
||||
# Used by the script to count rows (count-lines.sh)
|
||||
COUNT_LINES_LOG=./count-lines.log
|
||||
COUNT_LINES_INPUT_DIR=~/eth-statediff-service/output_dir
|
||||
COUNT_LINES_OUTPUT_FILE=./output-stats.txt
|
||||
|
||||
# Used by the script to dedup output files (dedup.sh)
|
||||
DEDUP_LOG=./dedup.log
|
||||
DEDUP_INPUT_DIR=~/eth-statediff-service/output_dir
|
||||
DEDUP_OUTPUT_DIR=~/eth-statediff-service/dedup_dir
|
||||
DEDUP_SORT_DIR=./.sort
|
||||
|
||||
# Used by the script to perform column checks (check-columns.sh)
|
||||
CHECK_COLUMNS_LOG=./check-columns.log
|
||||
CHECK_COLUMNS_INPUT_DIR=~/eth-statediff-service/output_dir
|
||||
CHECK_COLUMNS_INPUT_DEDUP_DIR=~/eth-statediff-service/dedup_dir
|
||||
CHECK_COLUMNS_OUTPUT_DIR=./check-columns
|
||||
|
||||
# Used by the script to import data (timescaledb-import.sh)
|
||||
IMPORT_LOG=./tsdb-import.log
|
||||
IMPORT_INPUT_DIR=~/eth-statediff-service/output_dir
|
||||
IMPORT_INPUT_DEDUP_DIR=~/eth-statediff-service/dedup_dir
|
||||
TIMESCALEDB_WORKERS=8
|
||||
|
||||
DATABASE_USER=vdbm
|
||||
DATABASE_HOSTNAME=localhost
|
||||
DATABASE_PORT=8077
|
||||
DATABASE_NAME=vulcanize_testing
|
||||
DATABASE_PASSWORD=password
|
58
scripts/check-columns.sh
Executable file
58
scripts/check-columns.sh
Executable file
@ -0,0 +1,58 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Requires:
|
||||
# CHECK_COLUMNS_LOG
|
||||
# CHECK_COLUMNS_INPUT_DIR
|
||||
# CHECK_COLUMNS_INPUT_DEDUP_DIR
|
||||
# CHECK_COLUMNS_OUTPUT_DIR
|
||||
|
||||
# env file arg
|
||||
ENV=$1
|
||||
echo "Using env file: ${ENV}"
|
||||
|
||||
# read env file
|
||||
export $(grep -v '^#' ${ENV} | xargs)
|
||||
|
||||
# redirect stdout/stderr to a file
|
||||
exec >"${CHECK_COLUMNS_LOG}" 2>&1
|
||||
|
||||
# create output dir if not exists
|
||||
mkdir -p "${CHECK_COLUMNS_OUTPUT_DIR}"
|
||||
|
||||
start_timestamp=$(date +%s)
|
||||
|
||||
declare -A expected_columns
|
||||
expected_columns=(
|
||||
["public.nodes"]="5"
|
||||
["public.blocks"]="3"
|
||||
# ["eth.access_list_elements"]="?" # skipping as values include ','
|
||||
["eth.log_cids"]="12"
|
||||
["eth.state_accounts"]="7"
|
||||
["eth.storage_cids"]="9"
|
||||
["eth.uncle_cids"]="7"
|
||||
["eth.header_cids"]="16"
|
||||
["eth.receipt_cids"]="10"
|
||||
["eth.state_cids"]="8"
|
||||
["eth.transaction_cids"]="11"
|
||||
)
|
||||
|
||||
for table_name in "${!expected_columns[@]}";
|
||||
do
|
||||
if [ "${table_name}" = "public.blocks" ];
|
||||
then
|
||||
command="$(dirname "$0")/find-bad-rows.sh -i ${CHECK_COLUMNS_INPUT_DEDUP_DIR}/deduped-${table_name}.csv -c ${expected_columns[${table_name}]} -d true -o ${CHECK_COLUMNS_OUTPUT_DIR}/${table_name}.txt"
|
||||
else
|
||||
command="$(dirname "$0")/find-bad-rows.sh -i ${CHECK_COLUMNS_INPUT_DIR}/${table_name}.csv -c ${expected_columns[${table_name}]} -d true -o ${CHECK_COLUMNS_OUTPUT_DIR}/${table_name}.txt"
|
||||
fi
|
||||
|
||||
echo "${table_name}"
|
||||
echo Start: "$(date)"
|
||||
eval "${command}"
|
||||
echo End: "$(date)"
|
||||
echo Total bad rows: $(wc -l ${CHECK_COLUMNS_OUTPUT_DIR}/${table_name}.txt)
|
||||
echo
|
||||
done
|
||||
|
||||
difference=$(($(date +%s)-start_timestamp))
|
||||
echo Time taken: $((difference/86400)):$(date -d@${difference} -u +%H:%M:%S)
|
||||
echo
|
46
scripts/count-lines.sh
Executable file
46
scripts/count-lines.sh
Executable file
@ -0,0 +1,46 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Requires:
|
||||
# COUNT_LINES_LOG
|
||||
# COUNT_LINES_INPUT_DIR
|
||||
# COUNT_LINES_OUTPUT_FILE
|
||||
|
||||
# env file arg
|
||||
ENV=$1
|
||||
echo "Using env file: ${ENV}"
|
||||
|
||||
# read env file
|
||||
export $(grep -v '^#' ${ENV} | xargs)
|
||||
|
||||
# redirect stdout/stderr to a file
|
||||
exec >"${COUNT_LINES_LOG}" 2>&1
|
||||
|
||||
start_timestamp=$(date +%s)
|
||||
|
||||
table_names=(
|
||||
"public.nodes"
|
||||
"public.blocks"
|
||||
"eth.access_list_elements"
|
||||
"eth.log_cids"
|
||||
"eth.state_accounts"
|
||||
"eth.storage_cids"
|
||||
"eth.uncle_cids"
|
||||
"eth.header_cids"
|
||||
"eth.receipt_cids"
|
||||
"eth.state_cids"
|
||||
"eth.transaction_cids"
|
||||
)
|
||||
|
||||
echo "Row counts:" > "${COUNT_LINES_OUTPUT_FILE}"
|
||||
|
||||
for table_name in "${table_names[@]}";
|
||||
do
|
||||
echo "${table_name}";
|
||||
echo Start: "$(date)"
|
||||
wc -l "${COUNT_LINES_INPUT_DIR}"/"${table_name}.csv" >> "${COUNT_LINES_OUTPUT_FILE}"
|
||||
echo End: "$(date)"
|
||||
echo
|
||||
done
|
||||
|
||||
difference=$(($(date +%s)-start_timestamp))
|
||||
echo Time taken: $((difference/86400)):$(date -d@${difference} -u +%H:%M:%S)
|
35
scripts/dedup.sh
Executable file
35
scripts/dedup.sh
Executable file
@ -0,0 +1,35 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Requires:
|
||||
# DEDUP_LOG
|
||||
# DEDUP_INPUT_DIR
|
||||
# DEDUP_OUTPUT_DIR
|
||||
# DEDUP_SORT_DIR
|
||||
|
||||
# env file arg
|
||||
ENV=$1
|
||||
echo "Using env file: ${ENV}"
|
||||
|
||||
# read env file
|
||||
export $(grep -v '^#' ${ENV} | xargs)
|
||||
|
||||
# redirect stdout/stderr to a file
|
||||
exec >"${DEDUP_LOG}" 2>&1
|
||||
|
||||
# create output dir if not exists
|
||||
mkdir -p "${DEDUP_OUTPUT_DIR}"
|
||||
|
||||
start_timestamp=$(date +%s)
|
||||
|
||||
echo "public.blocks"
|
||||
echo Start: "$(date)"
|
||||
sort -T "${DEDUP_SORT_DIR}" -u "${DEDUP_INPUT_DIR}"/public.blocks.csv -o "${DEDUP_OUTPUT_DIR}"/deduped-public.blocks.csv
|
||||
echo End: "$(date)"
|
||||
echo Total deduped rows: $(wc -l ${DEDUP_OUTPUT_DIR}/deduped-public.blocks.csv)
|
||||
echo
|
||||
|
||||
difference=$(($(date +%s)-start_timestamp))
|
||||
echo Time taken: $((difference/86400)):$(date -d@${difference} -u +%H:%M:%S)
|
||||
|
||||
# NOTE: This script currently only dedups public.blocks output file.
|
||||
# If the output contains blocks that were statediffed more than once, output files for other tables will have to be deduped as well.
|
43
scripts/find-bad-rows.sh
Executable file
43
scripts/find-bad-rows.sh
Executable file
@ -0,0 +1,43 @@
|
||||
#!/bin/bash
|
||||
|
||||
# flags
|
||||
# -i <input-file>: Input data file path
|
||||
# -c <expected-columns>: Expected number of columns in each row of the input file
|
||||
# -o [output-file]: Output destination file path (default: STDOUT)
|
||||
# -d [include-data]: Whether to include the data row in output (true | false) (default: false)
|
||||
|
||||
# eg: ./scripts/find-bad-rows.sh -i eth.state_cids.csv -c 8 -o res.txt -d true
|
||||
# output: 1 9 1500000,xxxxxxxx,0x83952d392f9b0059eea94b10d1a095eefb1943ea91595a16c6698757127d4e1c,,
|
||||
# baglacgzasvqcntdahkxhufdnkm7a22s2eetj6mx6nzkarwxtkvy4x3bubdgq,\x0f,0,f,/blocks/,
|
||||
# DMQJKYBGZRQDVLT2CRWVGPQNNJNCCJU7GL7G4VAI3LZVK4OL5Q2ARTI
|
||||
|
||||
while getopts i:c:o:d: OPTION
|
||||
do
|
||||
case "${OPTION}" in
|
||||
i) inputFile=${OPTARG};;
|
||||
c) expectedColumns=${OPTARG};;
|
||||
o) outputFile=${OPTARG};;
|
||||
d) data=${OPTARG};;
|
||||
esac
|
||||
done
|
||||
|
||||
timestamp=$(date +%s)
|
||||
|
||||
# if data requested, dump row number, number of columns and the row
|
||||
if [ "${data}" = true ] ; then
|
||||
if [ -z "${outputFile}" ]; then
|
||||
awk -F"," "NF!=${expectedColumns} {print NR, NF, \$0}" < ${inputFile}
|
||||
else
|
||||
awk -F"," "NF!=${expectedColumns} {print NR, NF, \$0}" < ${inputFile} > ${outputFile}
|
||||
fi
|
||||
# else, dump only row number, number of columns
|
||||
else
|
||||
if [ -z "${outputFile}" ]; then
|
||||
awk -F"," "NF!=${expectedColumns} {print NR, NF}" < ${inputFile}
|
||||
else
|
||||
awk -F"," "NF!=${expectedColumns} {print NR, NF}" < ${inputFile} > ${outputFile}
|
||||
fi
|
||||
fi
|
||||
|
||||
difference=$(($(date +%s)-timestamp))
|
||||
echo Time taken: $(date -d@${difference} -u +%H:%M:%S)
|
75
scripts/timescaledb-import.sh
Executable file
75
scripts/timescaledb-import.sh
Executable file
@ -0,0 +1,75 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Requires:
|
||||
# IMPORT_LOG
|
||||
# IMPORT_INPUT_DIR
|
||||
# IMPORT_INPUT_DEDUP_DIR
|
||||
# TIMESCALEDB_WORKERS
|
||||
# DATABASE_USER
|
||||
# DATABASE_HOSTNAME
|
||||
# DATABASE_PORT
|
||||
# DATABASE_NAME
|
||||
# DATABASE_PASSWORD
|
||||
|
||||
DEFAULT_TIMESCALEDB_WORKERS=8
|
||||
|
||||
# env file arg
|
||||
ENV=$1
|
||||
echo "Using env file: ${ENV}"
|
||||
|
||||
# read env file
|
||||
export $(grep -v '^#' ${ENV} | xargs)
|
||||
|
||||
if [ "$TIMESCALEDB_WORKERS" = "" ]; then
|
||||
TIMESCALEDB_WORKERS=$DEFAULT_TIMESCALEDB_WORKERS
|
||||
fi
|
||||
|
||||
# redirect stdout/stderr to a file
|
||||
exec >"${IMPORT_LOG}" 2>&1
|
||||
|
||||
start_timestamp=$(date +%s)
|
||||
|
||||
declare -a tables
|
||||
# schema-table-copyOptions
|
||||
tables=(
|
||||
"public-nodes"
|
||||
"public-blocks"
|
||||
"eth-access_list_elements"
|
||||
"eth-log_cids-FORCE NOT NULL topic0, topic1, topic2, topic3 CSV"
|
||||
"eth-state_accounts"
|
||||
"eth-storage_cids-FORCE NOT NULL storage_leaf_key CSV"
|
||||
"eth-uncle_cids"
|
||||
"eth-header_cids"
|
||||
"eth-receipt_cids-FORCE NOT NULL post_state, contract, contract_hash CSV"
|
||||
"eth-state_cids-FORCE NOT NULL state_leaf_key CSV"
|
||||
"eth-transaction_cids-FORCE NOT NULL dst CSV"
|
||||
)
|
||||
|
||||
for elem in "${tables[@]}";
|
||||
do
|
||||
IFS='-' read -a arr <<< "${elem}"
|
||||
|
||||
if [ "${arr[0]}.${arr[1]}" = "public.blocks" ];
|
||||
then
|
||||
copy_command="timescaledb-parallel-copy --connection \"host=${DATABASE_HOSTNAME} port=${DATABASE_PORT} user=${DATABASE_USER} password=${DATABASE_PASSWORD} sslmode=disable\" --db-name ${DATABASE_NAME} --schema ${arr[0]} --table ${arr[1]} --file ${IMPORT_INPUT_DEDUP_DIR}/deduped-${arr[0]}.${arr[1]}.csv --workers ${TIMESCALEDB_WORKERS} --reporting-period 300s"
|
||||
else
|
||||
copy_command="timescaledb-parallel-copy --connection \"host=${DATABASE_HOSTNAME} port=${DATABASE_PORT} user=${DATABASE_USER} password=${DATABASE_PASSWORD} sslmode=disable\" --db-name ${DATABASE_NAME} --schema ${arr[0]} --table ${arr[1]} --file ${IMPORT_INPUT_DIR}/${arr[0]}.${arr[1]}.csv --workers ${TIMESCALEDB_WORKERS} --reporting-period 300s"
|
||||
fi
|
||||
|
||||
if [ "${arr[2]}" != "" ];
|
||||
then
|
||||
copy_with_options="${copy_command} --copy-options \"${arr[2]}\""
|
||||
else
|
||||
copy_with_options=${copy_command}
|
||||
fi
|
||||
|
||||
echo "${arr[0]}.${arr[1]}"
|
||||
echo Start: "$(date)"
|
||||
eval "${copy_with_options}"
|
||||
echo End: "$(date)"
|
||||
echo
|
||||
done
|
||||
|
||||
difference=$(($(date +%s)-start_timestamp))
|
||||
echo Time taken: $((difference/86400)):$(date -d@${difference} -u +%H:%M:%S)
|
||||
echo
|
Loading…
Reference in New Issue
Block a user