Add mode to write to CSV files in statediff file writer (#249)

* Change file writing mode to csv files

* Implement writer interface for file indexer

* Implement option for csv or sql in file mode

* Close files in CSV writer

* Add tests for CSV file mode

* Implement CSV file for watched addresses

* Separate test configs for CSV and SQL

* Refactor common code for file indexer tests
This commit is contained in:
nikugogoi 2022-06-29 17:17:57 +05:30 committed by GitHub
parent bf4bba2888
commit 558a187391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2724 additions and 1378 deletions

View File

@ -53,32 +53,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: vulcanize/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > $GITHUB_WORKSPACE/config.sh
echo db_write=true >> $GITHUB_WORKSPACE/config.sh
cat $GITHUB_WORKSPACE/config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db-sharding.yml" \
--env-file $GITHUB_WORKSPACE/config.sh \
up -d --build
docker-compose up -d
- name: Give the migration a few seconds
run: sleep 30;

View File

@ -211,7 +211,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
switch dbType {
case shared.FILE:
fileModeStr := ctx.GlobalString(utils.StateDiffFileMode.Name)
fileMode, err := file.ResolveFileMode(fileModeStr)
if err != nil {
utils.Fatalf("%v", err)
}
indexerConfig = file.Config{
Mode: fileMode,
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}

View File

@ -171,6 +171,8 @@ var (
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,

View File

@ -244,6 +244,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,

View File

@ -902,9 +902,18 @@ var (
Name: "statediff.db.nodeid",
Usage: "Node ID to use when writing state diffs to database",
}
StateDiffFileMode = cli.StringFlag{
Name: "statediff.file.mode",
Usage: "Statediff file writing mode (current options: csv, sql)",
Value: "csv",
}
StateDiffFileCsvDir = cli.StringFlag{
Name: "statediff.file.csvdir",
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
}
StateDiffFilePath = cli.StringFlag{
Name: "statediff.file.path",
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
}
StateDiffKnownGapsFilePath = cli.StringFlag{
Name: "statediff.knowngapsfile.path",

View File

@ -1,14 +1,27 @@
version: "3.2"
services:
ipld-eth-db:
migrations:
restart: on-failure
depends_on:
- access-node
image: vulcanize/ipld-eth-db:v4.1.1-alpha
- ipld-eth-db
image: vulcanize/ipld-eth-db:v4.1.4-alpha
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "vulcanize_testing_v4"
DATABASE_NAME: "vulcanize_testing"
DATABASE_PASSWORD: "password"
DATABASE_HOSTNAME: "access-node"
DATABASE_HOSTNAME: "ipld-eth-db"
DATABASE_PORT: 5432
ipld-eth-db:
image: timescale/timescaledb:latest-pg14
restart: always
command: ["postgres", "-c", "log_statement=all"]
environment:
POSTGRES_USER: "vdbm"
POSTGRES_DB: "vulcanize_testing"
POSTGRES_PASSWORD: "password"
ports:
- "127.0.0.1:8077:5432"
volumes:
- ./statediff/indexer/database/file:/file_indexer

View File

@ -17,12 +17,38 @@
package file
import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
// Config holds params for writing sql statements out to a file
// FileMode to explicitly type the mode of file writer we are using
type FileMode string
const (
CSV FileMode = "CSV"
SQL FileMode = "SQL"
Unknown FileMode = "Unknown"
)
// ResolveFileMode resolves a FileMode from a provided string
func ResolveFileMode(str string) (FileMode, error) {
switch strings.ToLower(str) {
case "csv":
return CSV, nil
case "sql":
return SQL, nil
default:
return Unknown, fmt.Errorf("unrecognized file type string: %s", str)
}
}
// Config holds params for writing out CSV or SQL files
type Config struct {
Mode FileMode
OutputDir string
FilePath string
WatchedAddressesFilePath string
NodeInfo node.Info
@ -33,15 +59,26 @@ func (c Config) Type() shared.DBType {
return shared.FILE
}
// TestConfig config for unit tests
var TestConfig = Config{
var nodeInfo = node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
}
// CSVTestConfig config for unit tests
var CSVTestConfig = Config{
Mode: CSV,
OutputDir: "./statediffing_test",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
NodeInfo: nodeInfo,
}
// SQLTestConfig config for unit tests
var SQLTestConfig = Config{
Mode: SQL,
FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
},
NodeInfo: nodeInfo,
}

View File

@ -0,0 +1,135 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file_test
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
)
const dbDirectory = "/file_indexer"
const pgCopyStatement = `COPY %s FROM '%s' CSV`
func setupCSVLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
file.CSVTestConfig.OutputDir = "./statediffing_legacy_test"
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.CSVTestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
}
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func dumpCSVFileData(t *testing.T) {
outputDir := filepath.Join(dbDirectory, file.CSVTestConfig.OutputDir)
for _, tbl := range file.Tables {
var stmt string
varcharColumns := tbl.VarcharColumns()
if len(varcharColumns) > 0 {
stmt = fmt.Sprintf(
pgCopyStatement+" FORCE NOT NULL %s",
tbl.Name,
file.TableFilePath(outputDir, tbl.Name),
strings.Join(varcharColumns, ", "),
)
} else {
stmt = fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFilePath(outputDir, tbl.Name))
}
_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}
}
func dumpWatchedAddressesCSVFileData(t *testing.T) {
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}
func tearDownCSV(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)
if err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
require.NoError(t, err)
}
err = sqlxdb.Close()
require.NoError(t, err)
}
func TestCSVFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupCSVLegacy(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testLegacyPublishAndIndexHeaderIPLDs(t)
})
}

View File

@ -0,0 +1,200 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file_test
import (
"context"
"errors"
"os"
"testing"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
)
func setupCSVIndexer(t *testing.T) {
file.CSVTestConfig.OutputDir = "./statediffing_test"
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)
}
if _, err := os.Stat(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.CSVTestConfig)
require.NoError(t, err)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func setupCSV(t *testing.T) {
setupCSVIndexer(t)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockBlock,
mocks.MockReceipts,
mocks.MockBlock.Difficulty())
if err != nil {
t.Fatal(err)
}
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()
for _, node := range mocks.StateDiffs {
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
require.NoError(t, err)
}
require.Equal(t, mocks.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
}
func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexHeaderIPLDs(t)
})
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexTransactionIPLDs(t)
})
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexLogIPLDs(t)
})
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexReceiptIPLDs(t)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexStateIPLDs(t)
})
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testPublishAndIndexStorageIPLDs(t)
})
}
func TestCSVFileWatchAddressMethods(t *testing.T) {
setupCSVIndexer(t)
defer tearDownCSV(t)
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
testLoadEmptyWatchedAddresses(t)
})
t.Run("Insert watched addresses", func(t *testing.T) {
testInsertWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
testInsertAlreadyWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Remove watched addresses", func(t *testing.T) {
testRemoveWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
testRemoveNonWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Set watched addresses", func(t *testing.T) {
testSetWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
testSetAlreadyWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Load watched addresses", func(t *testing.T) {
testLoadWatchedAddresses(t)
})
t.Run("Clear watched addresses", func(t *testing.T) {
testClearWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
testClearEmptyWatchedAddresses(t, func(t *testing.T) {
file.TearDownDB(t, sqlxdb)
dumpWatchedAddressesCSVFileData(t)
})
})
}

View File

@ -0,0 +1,455 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
"encoding/csv"
"errors"
"fmt"
"math/big"
"os"
"path/filepath"
"strconv"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
var (
Tables = []*types.Table{
&types.TableIPLDBlock,
&types.TableNodeInfo,
&types.TableHeader,
&types.TableStateNode,
&types.TableStorageNode,
&types.TableUncle,
&types.TableTransaction,
&types.TableAccessListElement,
&types.TableReceipt,
&types.TableLog,
&types.TableStateAccount,
}
)
type tableRow struct {
table types.Table
values []interface{}
}
type CSVWriter struct {
// dir containing output files
dir string
writers fileWriters
watchedAddressesWriter fileWriter
rows chan tableRow
flushChan chan struct{}
flushFinished chan struct{}
quitChan chan struct{}
doneChan chan struct{}
}
type fileWriter struct {
*csv.Writer
file *os.File
}
// fileWriters wraps the file writers for each output table
type fileWriters map[string]fileWriter
func newFileWriter(path string) (ret fileWriter, err error) {
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return
}
ret = fileWriter{
Writer: csv.NewWriter(file),
file: file,
}
return
}
func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
writers := fileWriters{}
for _, tbl := range tables {
w, err := newFileWriter(TableFilePath(dir, tbl.Name))
if err != nil {
return nil, err
}
writers[tbl.Name] = w
}
return writers, nil
}
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
}
func (tx fileWriters) close() error {
for _, w := range tx {
err := w.file.Close()
if err != nil {
return err
}
}
return nil
}
func (tx fileWriters) flush() error {
for _, w := range tx {
w.Flush()
if err := w.Error(); err != nil {
return err
}
}
return nil
}
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, fmt.Errorf("unable to make MkdirAll for path: %s err: %s", path, err)
}
writers, err := makeFileWriters(path, Tables)
if err != nil {
return nil, err
}
watchedAddressesWriter, err := newFileWriter(watchedAddressesFilePath)
if err != nil {
return nil, err
}
csvWriter := &CSVWriter{
writers: writers,
watchedAddressesWriter: watchedAddressesWriter,
dir: path,
rows: make(chan tableRow),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
return csvWriter, nil
}
func (csw *CSVWriter) Loop() {
go func() {
defer close(csw.doneChan)
for {
select {
case row := <-csw.rows:
err := csw.writers.write(&row.table, row.values...)
if err != nil {
panic(fmt.Sprintf("error writing csv buffer: %v", err))
}
case <-csw.quitChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
}
return
case <-csw.flushChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
}
csw.flushFinished <- struct{}{}
}
}
}()
}
// Flush sends a flush signal to the looping process
func (csw *CSVWriter) Flush() {
csw.flushChan <- struct{}{}
<-csw.flushFinished
}
func TableFilePath(dir, name string) string { return filepath.Join(dir, name+".csv") }
// Close satisfies io.Closer
func (csw *CSVWriter) Close() error {
close(csw.quitChan)
<-csw.doneChan
close(csw.rows)
close(csw.flushChan)
close(csw.flushFinished)
return csw.writers.close()
}
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
var values []interface{}
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.rows <- tableRow{types.TableNodeInfo, values}
}
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
var values []interface{}
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.rows <- tableRow{types.TableIPLDBlock, values}
}
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: key,
Data: value,
})
}
func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i node.Node) {
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
})
}
func (csw *CSVWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", "", err
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: prefixedKey,
Data: raw,
})
return c.String(), prefixedKey, err
}
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
csw.rows <- tableRow{types.TableHeader, values}
indexerMetrics.blocks.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey)
csw.rows <- tableRow{types.TableUncle, values}
}
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
var values []interface{}
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
csw.rows <- tableRow{types.TableTransaction, values}
indexerMetrics.transactions.Inc(1)
}
func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
var values []interface{}
values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
csw.rows <- tableRow{types.TableAccessListElement, values}
indexerMetrics.accessListEntries.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{}
values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)
csw.rows <- tableRow{types.TableReceipt, values}
indexerMetrics.receipts.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
var values []interface{}
values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)
csw.rows <- tableRow{types.TableLog, values}
indexerMetrics.logs.Inc(1)
}
}
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var stateKey string
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
stateNode.NodeType, true, stateNode.MhKey)
csw.rows <- tableRow{types.TableStateNode, values}
}
func (csw *CSVWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
var values []interface{}
values = append(values, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
strconv.FormatUint(stateAccount.Nonce, 10), stateAccount.CodeHash, stateAccount.StorageRoot)
csw.rows <- tableRow{types.TableStateAccount, values}
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var storageKey string
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
csw.rows <- tableRow{types.TableStorageNode, values}
}
// LoadWatchedAddresses loads watched addresses from a file
func (csw *CSVWriter) loadWatchedAddresses() ([]common.Address, error) {
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
// load csv rows from watched addresses file
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the csv rows
watchedAddresses := funk.Map(rows, func(row []string) common.Address {
// first column is for address in eth_meta.watched_addresses
addressString := row[0]
return common.HexToAddress(addressString)
}).([]common.Address)
return watchedAddresses, nil
}
// InsertWatchedAddresses inserts the given addresses in a file
func (csw *CSVWriter) insertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// load csv rows from watched addresses file
watchedAddresses, err := csw.loadWatchedAddresses()
if err != nil {
return err
}
// append rows for new addresses to existing csv file
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, common.HexToAddress(arg.Address)) {
continue
}
var values []interface{}
values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
row := types.TableWatchedAddresses.ToCsvRow(values...)
// writing directly instead of using rows channel as it needs to be flushed immediately
err = csw.watchedAddressesWriter.Write(row)
if err != nil {
return err
}
}
// watched addresses need to be flushed immediately to the file to keep them in sync with in-memory watched addresses
csw.watchedAddressesWriter.Flush()
err = csw.watchedAddressesWriter.Error()
if err != nil {
return err
}
return nil
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (csw *CSVWriter) removeWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// load csv rows from watched addresses file
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of rows having addresses to be removed
filteredRows := funk.Filter(rows, func(row []string) bool {
return !funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
// Compare first column in table for address
return arg.Address == row[0]
})
}).([][]string)
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, filteredRows)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (csw *CSVWriter) setWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
var rows [][]string
for _, arg := range args {
row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
rows = append(rows, row)
}
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, rows)
}
// loadCSVWatchedAddresses loads csv rows from the given file
func loadWatchedAddressesRows(filePath string) ([][]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return [][]string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
reader := csv.NewReader(file)
return reader.ReadAll()
}
// dumpWatchedAddressesRows dumps csv rows to the given file
func dumpWatchedAddressesRows(watchedAddressesWriter fileWriter, filteredRows [][]string) error {
file := watchedAddressesWriter.file
file.Close()
file, err := os.Create(file.Name())
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
watchedAddressesWriter.Writer = csv.NewWriter(file)
watchedAddressesWriter.file = file
for _, row := range filteredRows {
watchedAddressesWriter.Write(row)
}
watchedAddressesWriter.Flush()
return nil
}

View File

@ -17,7 +17,6 @@
package file
import (
"bufio"
"context"
"errors"
"fmt"
@ -30,8 +29,6 @@ import (
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -47,8 +44,10 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
const defaultFilePath = "./statediff.sql"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
const defaultCSVOutputDir = "./statediff_output"
const defaultSQLFilePath = "./statediff.sql"
const defaultWatchedAddressesCSVFilePath = "./statediff-watched-addresses.csv"
const defaultWatchedAddressesSQLFilePath = "./statediff-watched-addresses.sql"
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
@ -60,46 +59,74 @@ var (
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter *SQLWriter
fileWriter FileWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
removedCacheFlag *uint32
watchedAddressesFilePath string
}
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
filePath := config.FilePath
if filePath == "" {
filePath = defaultFilePath
}
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
}
file, err := os.Create(filePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing statediff SQL statements to file", "file", filePath)
var err error
var writer FileWriter
watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
w := NewSQLWriter(file)
switch config.Mode {
case CSV:
outputDir := config.OutputDir
if outputDir == "" {
outputDir = defaultCSVOutputDir
}
if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir)
}
log.Info("Writing statediff CSV files to directory", "file", outputDir)
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath
}
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
if err != nil {
return nil, err
}
case SQL:
filePath := config.FilePath
if filePath == "" {
filePath = defaultSQLFilePath
}
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
}
file, err := os.Create(filePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing statediff SQL statements to file", "file", filePath)
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesSQLFilePath
}
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer = NewSQLWriter(file, watchedAddressesFilePath)
default:
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
}
wg := new(sync.WaitGroup)
w.Loop()
w.upsertNode(config.NodeInfo)
writer.Loop()
writer.upsertNode(config.NodeInfo)
return &StateDiffIndexer{
fileWriter: w,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
watchedAddressesFilePath: watchedAddressesFilePath,
fileWriter: writer,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
}, nil
}
@ -524,162 +551,25 @@ func (sdi *StateDiffIndexer) Close() error {
// LoadWatchedAddresses loads watched addresses from a file
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}
return watchedAddresses, nil
return sdi.fileWriter.loadWatchedAddresses()
}
// InsertWatchedAddresses inserts the given addresses in a file
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return err
}
// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
watchedAddresses = append(watchedAddresses, addressString)
}
// append statements for new addresses to existing statements
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
return sdi.fileWriter.insertWatchedAddresses(args, currentBlockNumber)
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of statements having addresses to be removed
var filteredStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
return arg.Address == addressString
})
if !toRemove {
filteredStmts = append(filteredStmts, stmt)
}
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts)
return sdi.fileWriter.removeWatchedAddresses(args)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
return sdi.fileWriter.setWatchedAddresses(args, currentBlockNumber)
}
// ClearWatchedAddresses clears all the watched addresses from a file
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0))
}
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
stmts := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}
return stmts, nil
}
// dumpWatchedAddressesStatements dumps sql statements to the given file
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
defer file.Close()
for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return nil
}
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
func parseWatchedAddressStatement(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt)
if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}
// extract address argument from parse output for a SQL statement of form
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
SelectStmt.GetSelectStmt().
ValuesLists[0].GetList().
Items[0].GetAConst().
GetVal().
GetString_().
Str
return addressString, nil
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,60 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
"math/big"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/types"
)
// Writer interface required by the file indexer
type FileWriter interface {
// Methods used to control the writer
Loop()
Close() error
Flush()
// Methods to upsert ethereum data model objects
upsertNode(node nodeinfo.Info)
upsertHeaderCID(header models.HeaderModel)
upsertUncleCID(uncle models.UncleModel)
upsertTransactionCID(transaction models.TxModel)
upsertAccessListElement(accessListElement models.AccessListElementModel)
upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel)
upsertStateCID(stateNode models.StateNodeModel)
upsertStateAccount(stateAccount models.StateAccountModel)
upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel)
// Methods to upsert IPLD in different ways
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
// Methods to read and write watched addresses
loadWatchedAddresses() ([]common.Address, error)
insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
removeWatchedAddresses(args []types.WatchAddressArg) error
setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
}

View File

@ -81,11 +81,11 @@ func testPushBlockAndState(t *testing.T, block *types.Block, receipts types.Rece
}
func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
if _, err := os.Stat(file.CSVTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.CSVTestConfig.FilePath)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.TestConfig)
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.CSVTestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
@ -118,7 +118,7 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
}
func dumpData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
sqlFileBytes, err := os.ReadFile(file.CSVTestConfig.FilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
@ -127,7 +127,7 @@ func dumpData(t *testing.T) {
func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath)
err := os.Remove(file.CSVTestConfig.FilePath)
require.NoError(t, err)
err = sqlxdb.Close()
require.NoError(t, err)

View File

@ -22,33 +22,25 @@ import (
"os"
"testing"
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
)
var (
legacyData = mocks.NewLegacyData()
mockLegacyBlock *types.Block
legacyHeaderCID cid.Cid
)
func setupLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.SQLTestConfig.FilePath)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.SQLTestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
@ -65,6 +57,7 @@ func setupLegacy(t *testing.T) {
t.Fatal(err)
}
}()
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
@ -73,7 +66,6 @@ func setupLegacy(t *testing.T) {
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
@ -81,7 +73,7 @@ func setupLegacy(t *testing.T) {
}
func dumpFileData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.FilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
@ -91,30 +83,20 @@ func dumpFileData(t *testing.T) {
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
resetDB(t)
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}
func resetDB(t *testing.T) {
file.TearDownDB(t, sqlxdb)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath)
err := os.Remove(file.SQLTestConfig.FilePath)
require.NoError(t, err)
if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
if err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
require.NoError(t, err)
}
@ -122,36 +104,11 @@ func tearDown(t *testing.T) {
require.NoError(t, err)
}
func expectTrue(t *testing.T, value bool) {
if !value {
t.Fatalf("Assertion failed")
}
}
func TestFileIndexerLegacy(t *testing.T) {
func TestSQLFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacy(t)
dumpFileData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
require.NoError(t, err)
require.Equal(t, legacyHeaderCID.String(), header.CID)
require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD)
require.Equal(t, "5000000000000011250", header.Reward)
require.Equal(t, legacyData.MockBlock.Coinbase().String(), header.Coinbase)
require.Nil(t, legacyData.MockHeader.BaseFee)
testLegacyPublishAndIndexHeaderIPLDs(t)
})
}

View File

@ -0,0 +1,174 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file_test
import (
"context"
"errors"
"os"
"testing"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
)
func setupIndexer(t *testing.T) {
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.SQLTestConfig.FilePath)
require.NoError(t, err)
}
if _, err := os.Stat(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.SQLTestConfig)
require.NoError(t, err)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func setup(t *testing.T) {
setupIndexer(t)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockBlock,
mocks.MockReceipts,
mocks.MockBlock.Difficulty())
if err != nil {
t.Fatal(err)
}
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()
for _, node := range mocks.StateDiffs {
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
require.NoError(t, err)
}
require.Equal(t, mocks.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
}
func TestSQLFileIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexHeaderIPLDs(t)
})
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexTransactionIPLDs(t)
})
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexLogIPLDs(t)
})
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexReceiptIPLDs(t)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexStateIPLDs(t)
})
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
testPublishAndIndexStorageIPLDs(t)
})
}
func TestSQLFileWatchAddressMethods(t *testing.T) {
setupIndexer(t)
defer tearDown(t)
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
testLoadEmptyWatchedAddresses(t)
})
t.Run("Insert watched addresses", func(t *testing.T) {
testInsertWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
testInsertAlreadyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Remove watched addresses", func(t *testing.T) {
testRemoveWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
testRemoveNonWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Set watched addresses", func(t *testing.T) {
testSetWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
testSetAlreadyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Load watched addresses", func(t *testing.T) {
testLoadWatchedAddresses(t)
})
t.Run("Clear watched addresses", func(t *testing.T) {
testClearWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
testClearEmptyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
})
}

View File

@ -17,17 +17,24 @@
package file
import (
"bufio"
"errors"
"fmt"
"io"
"math/big"
"os"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/types"
)
var (
@ -47,18 +54,21 @@ type SQLWriter struct {
flushFinished chan struct{}
quitChan chan struct{}
doneChan chan struct{}
watchedAddressesFilePath string
}
// NewSQLWriter creates a new pointer to a Writer
func NewSQLWriter(wc io.WriteCloser) *SQLWriter {
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
return &SQLWriter{
wc: wc,
stmts: make(chan []byte),
collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
wc: wc,
stmts: make(chan []byte),
collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
watchedAddressesFilePath: watchedAddressesFilePath,
}
}
@ -106,6 +116,9 @@ func (sqw *SQLWriter) Loop() {
func (sqw *SQLWriter) Close() error {
close(sqw.quitChan)
<-sqw.doneChan
close(sqw.stmts)
close(sqw.flushChan)
close(sqw.flushFinished)
return sqw.wc.Close()
}
@ -257,3 +270,160 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
}
// LoadWatchedAddresses loads watched addresses from a file
func (sqw *SQLWriter) loadWatchedAddresses() ([]common.Address, error) {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}
return watchedAddresses, nil
}
// InsertWatchedAddresses inserts the given addresses in a file
func (sqw *SQLWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return err
}
// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
watchedAddresses = append(watchedAddresses, addressString)
}
// append statements for new addresses to existing statements
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sqw *SQLWriter) removeWatchedAddresses(args []types.WatchAddressArg) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of statements having addresses to be removed
var filteredStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
toRemove := funk.Contains(args, func(arg types.WatchAddressArg) bool {
return arg.Address == addressString
})
if !toRemove {
filteredStmts = append(filteredStmts, stmt)
}
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, filteredStmts)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sqw *SQLWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
}
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
stmts := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}
return stmts, nil
}
// dumpWatchedAddressesStatements dumps sql statements to the given file
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
defer file.Close()
for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return nil
}
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
func parseWatchedAddressStatement(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt)
if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}
// extract address argument from parse output for a SQL statement of form
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
SelectStmt.GetSelectStmt().
ValuesLists[0].GetList().
Items[0].GetAConst().
GetVal().
GetString_().
Str
return addressString, nil
}

View File

@ -0,0 +1,184 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
var TableIPLDBlock = Table{
`public.blocks`,
[]column{
{name: "block_number", dbType: bigint},
{name: "key", dbType: text},
{name: "data", dbType: bytea},
},
}
var TableNodeInfo = Table{
Name: `public.nodes`,
Columns: []column{
{name: "genesis_block", dbType: varchar},
{name: "network_id", dbType: varchar},
{name: "node_id", dbType: varchar},
{name: "client_name", dbType: varchar},
{name: "chain_id", dbType: integer},
},
}
var TableHeader = Table{
"eth.header_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "block_hash", dbType: varchar},
{name: "parent_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "td", dbType: numeric},
{name: "node_id", dbType: varchar},
{name: "reward", dbType: numeric},
{name: "state_root", dbType: varchar},
{name: "tx_root", dbType: varchar},
{name: "receipt_root", dbType: varchar},
{name: "uncle_root", dbType: varchar},
{name: "bloom", dbType: bytea},
{name: "timestamp", dbType: numeric},
{name: "mh_key", dbType: text},
{name: "times_validated", dbType: integer},
{name: "coinbase", dbType: varchar},
},
}
var TableStateNode = Table{
"eth.state_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "state_leaf_key", dbType: varchar},
{name: "cid", dbType: text},
{name: "state_path", dbType: bytea},
{name: "node_type", dbType: integer},
{name: "diff", dbType: boolean},
{name: "mh_key", dbType: text},
},
}
var TableStorageNode = Table{
"eth.storage_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "state_path", dbType: bytea},
{name: "storage_leaf_key", dbType: varchar},
{name: "cid", dbType: text},
{name: "storage_path", dbType: bytea},
{name: "node_type", dbType: integer},
{name: "diff", dbType: boolean},
{name: "mh_key", dbType: text},
},
}
var TableUncle = Table{
"eth.uncle_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "block_hash", dbType: varchar},
{name: "header_id", dbType: varchar},
{name: "parent_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "reward", dbType: numeric},
{name: "mh_key", dbType: text},
},
}
var TableTransaction = Table{
"eth.transaction_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "tx_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "dst", dbType: varchar},
{name: "src", dbType: varchar},
{name: "index", dbType: integer},
{name: "mh_key", dbType: text},
{name: "tx_data", dbType: bytea},
{name: "tx_type", dbType: integer},
{name: "value", dbType: numeric},
},
}
var TableAccessListElement = Table{
"eth.access_list_elements",
[]column{
{name: "block_number", dbType: bigint},
{name: "tx_id", dbType: varchar},
{name: "index", dbType: integer},
{name: "address", dbType: varchar},
{name: "storage_keys", dbType: varchar, isArray: true},
},
}
var TableReceipt = Table{
"eth.receipt_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "tx_id", dbType: varchar},
{name: "leaf_cid", dbType: text},
{name: "contract", dbType: varchar},
{name: "contract_hash", dbType: varchar},
{name: "leaf_mh_key", dbType: text},
{name: "post_state", dbType: varchar},
{name: "post_status", dbType: integer},
{name: "log_root", dbType: varchar},
},
}
var TableLog = Table{
"eth.log_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "leaf_cid", dbType: text},
{name: "leaf_mh_key", dbType: text},
{name: "rct_id", dbType: varchar},
{name: "address", dbType: varchar},
{name: "index", dbType: integer},
{name: "topic0", dbType: varchar},
{name: "topic1", dbType: varchar},
{name: "topic2", dbType: varchar},
{name: "topic3", dbType: varchar},
{name: "log_data", dbType: bytea},
},
}
var TableStateAccount = Table{
"eth.state_accounts",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "state_path", dbType: bytea},
{name: "balance", dbType: numeric},
{name: "nonce", dbType: bigint},
{name: "code_hash", dbType: bytea},
{name: "storage_root", dbType: varchar},
},
}
var TableWatchedAddresses = Table{
"eth_meta.watched_addresses",
[]column{
{name: "address", dbType: varchar},
{name: "created_at", dbType: bigint},
{name: "watched_at", dbType: bigint},
{name: "last_filled_at", dbType: bigint},
},
}

View File

@ -0,0 +1,104 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
import (
"fmt"
"strings"
"github.com/thoas/go-funk"
)
type colType int
const (
integer colType = iota
boolean
bigint
numeric
bytea
varchar
text
)
type column struct {
name string
dbType colType
isArray bool
}
type Table struct {
Name string
Columns []column
}
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.dbType.formatter()(args[i])
if col.isArray {
valueList := funk.Map(args[i], col.dbType.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col column) bool {
return col.dbType == varchar
}).([]column)
columnNames := funk.Map(columns, func(col column) string {
return col.name
}).([]string)
return columnNames
}
type colfmt = func(interface{}) string
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}
func (typ colType) formatter() colfmt {
switch typ {
case integer:
return sprintf("%d")
case boolean:
return func(x interface{}) string {
if x.(bool) {
return "t"
}
return "f"
}
case bigint:
return sprintf("%s")
case numeric:
return sprintf("%s")
case bytea:
return sprintf(`\x%x`)
case varchar:
return sprintf("%s")
case text:
return sprintf("%s")
}
panic("unreachable")
}

View File

@ -520,7 +520,8 @@ func TestPGXIndexer(t *testing.T) {
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
AND storage_cids.node_type != 3
ORDER BY storage_path`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)

View File

@ -513,7 +513,8 @@ func TestSQLXIndexer(t *testing.T) {
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
AND storage_cids.node_type != 3
ORDER BY storage_path`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)