Add mode to write to CSV files in statediff file writer #249
25
.github/workflows/tests.yml
vendored
25
.github/workflows/tests.yml
vendored
@ -53,32 +53,9 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ env.stack-orchestrator-ref }}
|
||||
path: "./stack-orchestrator/"
|
||||
repository: vulcanize/stack-orchestrator
|
||||
fetch-depth: 0
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ env.ipld-eth-db-ref }}
|
||||
repository: vulcanize/ipld-eth-db
|
||||
path: "./ipld-eth-db/"
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Create config file
|
||||
run: |
|
||||
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > $GITHUB_WORKSPACE/config.sh
|
||||
echo db_write=true >> $GITHUB_WORKSPACE/config.sh
|
||||
cat $GITHUB_WORKSPACE/config.sh
|
||||
|
||||
- name: Run docker compose
|
||||
run: |
|
||||
docker-compose \
|
||||
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db-sharding.yml" \
|
||||
--env-file $GITHUB_WORKSPACE/config.sh \
|
||||
up -d --build
|
||||
docker-compose up -d
|
||||
|
||||
- name: Give the migration a few seconds
|
||||
run: sleep 30;
|
||||
|
@ -211,7 +211,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
}
|
||||
switch dbType {
|
||||
case shared.FILE:
|
||||
fileModeStr := ctx.GlobalString(utils.StateDiffFileMode.Name)
|
||||
fileMode, err := file.ResolveFileMode(fileModeStr)
|
||||
if err != nil {
|
||||
utils.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
indexerConfig = file.Config{
|
||||
Mode: fileMode,
|
||||
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
|
||||
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
|
||||
}
|
||||
|
@ -171,6 +171,8 @@ var (
|
||||
utils.StateDiffDBClientNameFlag,
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFileMode,
|
||||
utils.StateDiffFileCsvDir,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
|
@ -244,6 +244,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
|
||||
utils.StateDiffDBClientNameFlag,
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFileMode,
|
||||
utils.StateDiffFileCsvDir,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
|
@ -902,9 +902,18 @@ var (
|
||||
Name: "statediff.db.nodeid",
|
||||
Usage: "Node ID to use when writing state diffs to database",
|
||||
}
|
||||
StateDiffFileMode = cli.StringFlag{
|
||||
Name: "statediff.file.mode",
|
||||
Usage: "Statediff file writing mode (current options: csv, sql)",
|
||||
Value: "csv",
|
||||
}
|
||||
StateDiffFileCsvDir = cli.StringFlag{
|
||||
Name: "statediff.file.csvdir",
|
||||
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
|
||||
}
|
||||
StateDiffFilePath = cli.StringFlag{
|
||||
Name: "statediff.file.path",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
|
||||
}
|
||||
StateDiffKnownGapsFilePath = cli.StringFlag{
|
||||
Name: "statediff.knowngapsfile.path",
|
||||
|
@ -1,14 +1,27 @@
|
||||
version: "3.2"
|
||||
|
||||
services:
|
||||
ipld-eth-db:
|
||||
migrations:
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
- access-node
|
||||
image: vulcanize/ipld-eth-db:v4.1.1-alpha
|
||||
- ipld-eth-db
|
||||
image: vulcanize/ipld-eth-db:v4.1.4-alpha
|
||||
environment:
|
||||
DATABASE_USER: "vdbm"
|
||||
DATABASE_NAME: "vulcanize_testing_v4"
|
||||
DATABASE_NAME: "vulcanize_testing"
|
||||
DATABASE_PASSWORD: "password"
|
||||
DATABASE_HOSTNAME: "access-node"
|
||||
DATABASE_HOSTNAME: "ipld-eth-db"
|
||||
DATABASE_PORT: 5432
|
||||
|
||||
ipld-eth-db:
|
||||
image: timescale/timescaledb:latest-pg14
|
||||
restart: always
|
||||
command: ["postgres", "-c", "log_statement=all"]
|
||||
environment:
|
||||
POSTGRES_USER: "vdbm"
|
||||
POSTGRES_DB: "vulcanize_testing"
|
||||
POSTGRES_PASSWORD: "password"
|
||||
ports:
|
||||
- "127.0.0.1:8077:5432"
|
||||
volumes:
|
||||
- ./statediff/indexer/database/file:/file_indexer
|
||||
|
@ -17,12 +17,38 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
)
|
||||
|
||||
// Config holds params for writing sql statements out to a file
|
||||
// FileMode to explicitly type the mode of file writer we are using
|
||||
type FileMode string
|
||||
|
||||
const (
|
||||
CSV FileMode = "CSV"
|
||||
SQL FileMode = "SQL"
|
||||
Unknown FileMode = "Unknown"
|
||||
)
|
||||
|
||||
// ResolveFileMode resolves a FileMode from a provided string
|
||||
func ResolveFileMode(str string) (FileMode, error) {
|
||||
switch strings.ToLower(str) {
|
||||
case "csv":
|
||||
return CSV, nil
|
||||
case "sql":
|
||||
return SQL, nil
|
||||
default:
|
||||
return Unknown, fmt.Errorf("unrecognized file type string: %s", str)
|
||||
}
|
||||
}
|
||||
|
||||
// Config holds params for writing out CSV or SQL files
|
||||
type Config struct {
|
||||
Mode FileMode
|
||||
OutputDir string
|
||||
FilePath string
|
||||
WatchedAddressesFilePath string
|
||||
NodeInfo node.Info
|
||||
@ -33,15 +59,26 @@ func (c Config) Type() shared.DBType {
|
||||
return shared.FILE
|
||||
}
|
||||
|
||||
// TestConfig config for unit tests
|
||||
var TestConfig = Config{
|
||||
FilePath: "./statediffing_test_file.sql",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
|
||||
NodeInfo: node.Info{
|
||||
var nodeInfo = node.Info{
|
||||
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
|
||||
NetworkID: "1",
|
||||
ChainID: 1,
|
||||
ID: "mockNodeID",
|
||||
ClientName: "go-ethereum",
|
||||
},
|
||||
}
|
||||
|
||||
// CSVTestConfig config for unit tests
|
||||
var CSVTestConfig = Config{
|
||||
Mode: CSV,
|
||||
OutputDir: "./statediffing_test",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
|
||||
NodeInfo: nodeInfo,
|
||||
}
|
||||
|
||||
// SQLTestConfig config for unit tests
|
||||
var SQLTestConfig = Config{
|
||||
Mode: SQL,
|
||||
FilePath: "./statediffing_test_file.sql",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
|
||||
NodeInfo: nodeInfo,
|
||||
}
|
||||
|
135
statediff/indexer/database/file/csv_indexer_legacy_test.go
Normal file
135
statediff/indexer/database/file/csv_indexer_legacy_test.go
Normal file
@ -0,0 +1,135 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2022 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
)
|
||||
|
||||
const dbDirectory = "/file_indexer"
|
||||
const pgCopyStatement = `COPY %s FROM '%s' CSV`
|
||||
|
||||
func setupCSVLegacy(t *testing.T) {
|
||||
mockLegacyBlock = legacyData.MockBlock
|
||||
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
|
||||
file.CSVTestConfig.OutputDir = "./statediffing_legacy_test"
|
||||
|
||||
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
mockLegacyBlock,
|
||||
legacyData.MockReceipts,
|
||||
legacyData.MockBlock.Difficulty())
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ind.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, node := range legacyData.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpCSVFileData(t *testing.T) {
|
||||
outputDir := filepath.Join(dbDirectory, file.CSVTestConfig.OutputDir)
|
||||
|
||||
for _, tbl := range file.Tables {
|
||||
var stmt string
|
||||
varcharColumns := tbl.VarcharColumns()
|
||||
if len(varcharColumns) > 0 {
|
||||
stmt = fmt.Sprintf(
|
||||
pgCopyStatement+" FORCE NOT NULL %s",
|
||||
tbl.Name,
|
||||
file.TableFilePath(outputDir, tbl.Name),
|
||||
strings.Join(varcharColumns, ", "),
|
||||
)
|
||||
} else {
|
||||
stmt = fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFilePath(outputDir, tbl.Name))
|
||||
}
|
||||
|
||||
_, err = sqlxdb.Exec(stmt)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpWatchedAddressesCSVFileData(t *testing.T) {
|
||||
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
|
||||
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
|
||||
|
||||
_, err = sqlxdb.Exec(stmt)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func tearDownCSV(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = sqlxdb.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestCSVFileIndexerLegacy(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||
setupCSVLegacy(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
testLegacyPublishAndIndexHeaderIPLDs(t)
|
||||
})
|
||||
}
|
200
statediff/indexer/database/file/csv_indexer_test.go
Normal file
200
statediff/indexer/database/file/csv_indexer_test.go
Normal file
@ -0,0 +1,200 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2022 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||
)
|
||||
|
||||
func setupCSVIndexer(t *testing.T) {
|
||||
file.CSVTestConfig.OutputDir = "./statediffing_test"
|
||||
|
||||
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func setupCSV(t *testing.T) {
|
||||
setupCSVIndexer(t)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
mockBlock,
|
||||
mocks.MockReceipts,
|
||||
mocks.MockBlock.Difficulty())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ind.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
for _, node := range mocks.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
}
|
||||
|
||||
func TestCSVFileIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexHeaderIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexTransactionIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexLogIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexReceiptIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexStateIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||
setupCSV(t)
|
||||
dumpCSVFileData(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
testPublishAndIndexStorageIPLDs(t)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCSVFileWatchAddressMethods(t *testing.T) {
|
||||
setupCSVIndexer(t)
|
||||
defer tearDownCSV(t)
|
||||
|
||||
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
|
||||
testLoadEmptyWatchedAddresses(t)
|
||||
})
|
||||
|
||||
t.Run("Insert watched addresses", func(t *testing.T) {
|
||||
testInsertWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
|
||||
testInsertAlreadyWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Remove watched addresses", func(t *testing.T) {
|
||||
testRemoveWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
|
||||
testRemoveNonWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Set watched addresses", func(t *testing.T) {
|
||||
testSetWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
|
||||
testSetAlreadyWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Load watched addresses", func(t *testing.T) {
|
||||
testLoadWatchedAddresses(t)
|
||||
})
|
||||
|
||||
t.Run("Clear watched addresses", func(t *testing.T) {
|
||||
testClearWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
|
||||
testClearEmptyWatchedAddresses(t, func(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
dumpWatchedAddressesCSVFileData(t)
|
||||
})
|
||||
})
|
||||
}
|
455
statediff/indexer/database/file/csv_writer.go
Normal file
455
statediff/indexer/database/file/csv_writer.go
Normal file
@ -0,0 +1,455 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2022 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
var (
|
||||
Tables = []*types.Table{
|
||||
&types.TableIPLDBlock,
|
||||
&types.TableNodeInfo,
|
||||
&types.TableHeader,
|
||||
&types.TableStateNode,
|
||||
&types.TableStorageNode,
|
||||
&types.TableUncle,
|
||||
&types.TableTransaction,
|
||||
&types.TableAccessListElement,
|
||||
&types.TableReceipt,
|
||||
&types.TableLog,
|
||||
&types.TableStateAccount,
|
||||
}
|
||||
)
|
||||
|
||||
type tableRow struct {
|
||||
table types.Table
|
||||
values []interface{}
|
||||
}
|
||||
|
||||
type CSVWriter struct {
|
||||
// dir containing output files
|
||||
dir string
|
||||
|
||||
writers fileWriters
|
||||
watchedAddressesWriter fileWriter
|
||||
|
||||
rows chan tableRow
|
||||
flushChan chan struct{}
|
||||
flushFinished chan struct{}
|
||||
quitChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
}
|
||||
|
||||
type fileWriter struct {
|
||||
*csv.Writer
|
||||
file *os.File
|
||||
}
|
||||
|
||||
// fileWriters wraps the file writers for each output table
|
||||
type fileWriters map[string]fileWriter
|
||||
|
||||
func newFileWriter(path string) (ret fileWriter, err error) {
|
||||
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ret = fileWriter{
|
||||
Writer: csv.NewWriter(file),
|
||||
file: file,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writers := fileWriters{}
|
||||
for _, tbl := range tables {
|
||||
w, err := newFileWriter(TableFilePath(dir, tbl.Name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writers[tbl.Name] = w
|
||||
}
|
||||
return writers, nil
|
||||
}
|
||||
|
||||
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
|
||||
row := tbl.ToCsvRow(args...)
|
||||
return tx[tbl.Name].Write(row)
|
||||
}
|
||||
|
||||
func (tx fileWriters) close() error {
|
||||
for _, w := range tx {
|
||||
err := w.file.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx fileWriters) flush() error {
|
||||
for _, w := range tx {
|
||||
w.Flush()
|
||||
if err := w.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return nil, fmt.Errorf("unable to make MkdirAll for path: %s err: %s", path, err)
|
||||
}
|
||||
|
||||
writers, err := makeFileWriters(path, Tables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
watchedAddressesWriter, err := newFileWriter(watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
csvWriter := &CSVWriter{
|
||||
writers: writers,
|
||||
watchedAddressesWriter: watchedAddressesWriter,
|
||||
dir: path,
|
||||
rows: make(chan tableRow),
|
||||
flushChan: make(chan struct{}),
|
||||
flushFinished: make(chan struct{}),
|
||||
quitChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
}
|
||||
return csvWriter, nil
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) Loop() {
|
||||
go func() {
|
||||
defer close(csw.doneChan)
|
||||
for {
|
||||
select {
|
||||
case row := <-csw.rows:
|
||||
err := csw.writers.write(&row.table, row.values...)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error writing csv buffer: %v", err))
|
||||
}
|
||||
case <-csw.quitChan:
|
||||
if err := csw.writers.flush(); err != nil {
|
||||
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
|
||||
}
|
||||
return
|
||||
case <-csw.flushChan:
|
||||
if err := csw.writers.flush(); err != nil {
|
||||
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
|
||||
}
|
||||
csw.flushFinished <- struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Flush sends a flush signal to the looping process
|
||||
func (csw *CSVWriter) Flush() {
|
||||
csw.flushChan <- struct{}{}
|
||||
<-csw.flushFinished
|
||||
}
|
||||
|
||||
func TableFilePath(dir, name string) string { return filepath.Join(dir, name+".csv") }
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (csw *CSVWriter) Close() error {
|
||||
close(csw.quitChan)
|
||||
<-csw.doneChan
|
||||
close(csw.rows)
|
||||
close(csw.flushChan)
|
||||
close(csw.flushFinished)
|
||||
return csw.writers.close()
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
|
||||
var values []interface{}
|
||||
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
|
||||
csw.rows <- tableRow{types.TableNodeInfo, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
|
||||
var values []interface{}
|
||||
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
|
||||
csw.rows <- tableRow{types.TableIPLDBlock, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
|
||||
csw.upsertIPLD(models.IPLDModel{
|
||||
BlockNumber: blockNumber,
|
||||
Key: key,
|
||||
Data: value,
|
||||
})
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i node.Node) {
|
||||
csw.upsertIPLD(models.IPLDModel{
|
||||
BlockNumber: blockNumber,
|
||||
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
|
||||
Data: i.RawData(),
|
||||
})
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) {
|
||||
c, err := ipld.RawdataToCid(codec, raw, mh)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
|
||||
csw.upsertIPLD(models.IPLDModel{
|
||||
BlockNumber: blockNumber,
|
||||
Key: prefixedKey,
|
||||
Data: raw,
|
||||
})
|
||||
return c.String(), prefixedKey, err
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||
var values []interface{}
|
||||
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
|
||||
csw.rows <- tableRow{types.TableHeader, values}
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
|
||||
var values []interface{}
|
||||
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
|
||||
uncle.Reward, uncle.MhKey)
|
||||
csw.rows <- tableRow{types.TableUncle, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
|
||||
var values []interface{}
|
||||
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
||||
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
|
||||
csw.rows <- tableRow{types.TableTransaction, values}
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
|
||||
var values []interface{}
|
||||
values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
|
||||
csw.rows <- tableRow{types.TableAccessListElement, values}
|
||||
indexerMetrics.accessListEntries.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
|
||||
var values []interface{}
|
||||
values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
|
||||
rct.PostState, rct.PostStatus, rct.LogRoot)
|
||||
csw.rows <- tableRow{types.TableReceipt, values}
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
|
||||
for _, l := range logs {
|
||||
var values []interface{}
|
||||
values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
|
||||
l.Topic1, l.Topic2, l.Topic3, l.Data)
|
||||
csw.rows <- tableRow{types.TableLog, values}
|
||||
indexerMetrics.logs.Inc(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
var stateKey string
|
||||
if stateNode.StateKey != nullHash.String() {
|
||||
stateKey = stateNode.StateKey
|
||||
}
|
||||
|
||||
var values []interface{}
|
||||
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
|
||||
stateNode.NodeType, true, stateNode.MhKey)
|
||||
csw.rows <- tableRow{types.TableStateNode, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
|
||||
var values []interface{}
|
||||
values = append(values, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
|
||||
strconv.FormatUint(stateAccount.Nonce, 10), stateAccount.CodeHash, stateAccount.StorageRoot)
|
||||
csw.rows <- tableRow{types.TableStateAccount, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
var storageKey string
|
||||
if storageCID.StorageKey != nullHash.String() {
|
||||
storageKey = storageCID.StorageKey
|
||||
}
|
||||
|
||||
var values []interface{}
|
||||
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
|
||||
csw.rows <- tableRow{types.TableStorageNode, values}
|
||||
}
|
||||
|
||||
// LoadWatchedAddresses loads watched addresses from a file
|
||||
func (csw *CSVWriter) loadWatchedAddresses() ([]common.Address, error) {
|
||||
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
|
||||
// load csv rows from watched addresses file
|
||||
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// extract addresses from the csv rows
|
||||
watchedAddresses := funk.Map(rows, func(row []string) common.Address {
|
||||
// first column is for address in eth_meta.watched_addresses
|
||||
addressString := row[0]
|
||||
|
||||
return common.HexToAddress(addressString)
|
||||
}).([]common.Address)
|
||||
|
||||
return watchedAddresses, nil
|
||||
}
|
||||
|
||||
// InsertWatchedAddresses inserts the given addresses in a file
|
||||
func (csw *CSVWriter) insertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
// load csv rows from watched addresses file
|
||||
watchedAddresses, err := csw.loadWatchedAddresses()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// append rows for new addresses to existing csv file
|
||||
for _, arg := range args {
|
||||
// ignore if already watched
|
||||
if funk.Contains(watchedAddresses, common.HexToAddress(arg.Address)) {
|
||||
continue
|
||||
}
|
||||
|
||||
var values []interface{}
|
||||
values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
|
||||
row := types.TableWatchedAddresses.ToCsvRow(values...)
|
||||
|
||||
// writing directly instead of using rows channel as it needs to be flushed immediately
|
||||
err = csw.watchedAddressesWriter.Write(row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// watched addresses need to be flushed immediately to the file to keep them in sync with in-memory watched addresses
|
||||
csw.watchedAddressesWriter.Flush()
|
||||
err = csw.watchedAddressesWriter.Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveWatchedAddresses removes the given watched addresses from a file
|
||||
func (csw *CSVWriter) removeWatchedAddresses(args []sdtypes.WatchAddressArg) error {
|
||||
// load csv rows from watched addresses file
|
||||
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
|
||||
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get rid of rows having addresses to be removed
|
||||
filteredRows := funk.Filter(rows, func(row []string) bool {
|
||||
return !funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
|
||||
// Compare first column in table for address
|
||||
return arg.Address == row[0]
|
||||
})
|
||||
}).([][]string)
|
||||
|
||||
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, filteredRows)
|
||||
}
|
||||
|
||||
// SetWatchedAddresses clears and inserts the given addresses in a file
|
||||
func (csw *CSVWriter) setWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
var rows [][]string
|
||||
for _, arg := range args {
|
||||
row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, rows)
|
||||
}
|
||||
|
||||
// loadCSVWatchedAddresses loads csv rows from the given file
|
||||
func loadWatchedAddressesRows(filePath string) ([][]string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return [][]string{}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
|
||||
}
|
||||
|
||||
defer file.Close()
|
||||
reader := csv.NewReader(file)
|
||||
|
||||
return reader.ReadAll()
|
||||
}
|
||||
|
||||
// dumpWatchedAddressesRows dumps csv rows to the given file
|
||||
func dumpWatchedAddressesRows(watchedAddressesWriter fileWriter, filteredRows [][]string) error {
|
||||
file := watchedAddressesWriter.file
|
||||
file.Close()
|
||||
|
||||
file, err := os.Create(file.Name())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating watched addresses file: %v", err)
|
||||
}
|
||||
|
||||
watchedAddressesWriter.Writer = csv.NewWriter(file)
|
||||
watchedAddressesWriter.file = file
|
||||
|
||||
for _, row := range filteredRows {
|
||||
watchedAddressesWriter.Write(row)
|
||||
}
|
||||
|
||||
watchedAddressesWriter.Flush()
|
||||
|
||||
return nil
|
||||
}
|
@ -17,7 +17,6 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -30,8 +29,6 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/multiformats/go-multihash"
|
||||
pg_query "github.com/pganalyze/pg_query_go/v2"
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
@ -47,8 +44,10 @@ import (
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
const defaultFilePath = "./statediff.sql"
|
||||
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
|
||||
const defaultCSVOutputDir = "./statediff_output"
|
||||
const defaultSQLFilePath = "./statediff.sql"
|
||||
const defaultWatchedAddressesCSVFilePath = "./statediff-watched-addresses.csv"
|
||||
const defaultWatchedAddressesSQLFilePath = "./statediff-watched-addresses.sql"
|
||||
|
||||
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
|
||||
|
||||
@ -60,20 +59,45 @@ var (
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
||||
type StateDiffIndexer struct {
|
||||
fileWriter *SQLWriter
|
||||
fileWriter FileWriter
|
||||
chainConfig *params.ChainConfig
|
||||
nodeID string
|
||||
wg *sync.WaitGroup
|
||||
removedCacheFlag *uint32
|
||||
|
||||
watchedAddressesFilePath string
|
||||
}
|
||||
|
||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
|
||||
var err error
|
||||
var writer FileWriter
|
||||
|
||||
watchedAddressesFilePath := config.WatchedAddressesFilePath
|
||||
|
||||
switch config.Mode {
|
||||
case CSV:
|
||||
outputDir := config.OutputDir
|
||||
if outputDir == "" {
|
||||
outputDir = defaultCSVOutputDir
|
||||
}
|
||||
|
||||
if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir)
|
||||
}
|
||||
log.Info("Writing statediff CSV files to directory", "file", outputDir)
|
||||
|
||||
if watchedAddressesFilePath == "" {
|
||||
watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case SQL:
|
||||
filePath := config.FilePath
|
||||
if filePath == "" {
|
||||
filePath = defaultFilePath
|
||||
filePath = defaultSQLFilePath
|
||||
}
|
||||
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
|
||||
@ -84,22 +108,25 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
}
|
||||
log.Info("Writing statediff SQL statements to file", "file", filePath)
|
||||
|
||||
watchedAddressesFilePath := config.WatchedAddressesFilePath
|
||||
if watchedAddressesFilePath == "" {
|
||||
watchedAddressesFilePath = defaultWatchedAddressesFilePath
|
||||
watchedAddressesFilePath = defaultWatchedAddressesSQLFilePath
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer = NewSQLWriter(file, watchedAddressesFilePath)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
||||
}
|
||||
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
|
||||
|
||||
w := NewSQLWriter(file)
|
||||
wg := new(sync.WaitGroup)
|
||||
w.Loop()
|
||||
w.upsertNode(config.NodeInfo)
|
||||
writer.Loop()
|
||||
writer.upsertNode(config.NodeInfo)
|
||||
|
||||
return &StateDiffIndexer{
|
||||
fileWriter: w,
|
||||
fileWriter: writer,
|
||||
chainConfig: chainConfig,
|
||||
nodeID: config.NodeInfo.ID,
|
||||
wg: wg,
|
||||
watchedAddressesFilePath: watchedAddressesFilePath,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -524,162 +551,25 @@ func (sdi *StateDiffIndexer) Close() error {
|
||||
|
||||
// LoadWatchedAddresses loads watched addresses from a file
|
||||
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// extract addresses from the sql statements
|
||||
watchedAddresses := []common.Address{}
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
|
||||
}
|
||||
|
||||
return watchedAddresses, nil
|
||||
return sdi.fileWriter.loadWatchedAddresses()
|
||||
}
|
||||
|
||||
// InsertWatchedAddresses inserts the given addresses in a file
|
||||
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get already watched addresses
|
||||
var watchedAddresses []string
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
watchedAddresses = append(watchedAddresses, addressString)
|
||||
}
|
||||
|
||||
// append statements for new addresses to existing statements
|
||||
for _, arg := range args {
|
||||
// ignore if already watched
|
||||
if funk.Contains(watchedAddresses, arg.Address) {
|
||||
continue
|
||||
}
|
||||
|
||||
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
|
||||
stmts = append(stmts, stmt)
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
|
||||
return sdi.fileWriter.insertWatchedAddresses(args, currentBlockNumber)
|
||||
}
|
||||
|
||||
// RemoveWatchedAddresses removes the given watched addresses from a file
|
||||
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get rid of statements having addresses to be removed
|
||||
var filteredStmts []string
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
|
||||
return arg.Address == addressString
|
||||
})
|
||||
|
||||
if !toRemove {
|
||||
filteredStmts = append(filteredStmts, stmt)
|
||||
}
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts)
|
||||
return sdi.fileWriter.removeWatchedAddresses(args)
|
||||
}
|
||||
|
||||
// SetWatchedAddresses clears and inserts the given addresses in a file
|
||||
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
var stmts []string
|
||||
for _, arg := range args {
|
||||
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
|
||||
stmts = append(stmts, stmt)
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
|
||||
return sdi.fileWriter.setWatchedAddresses(args, currentBlockNumber)
|
||||
}
|
||||
|
||||
// ClearWatchedAddresses clears all the watched addresses from a file
|
||||
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
|
||||
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0))
|
||||
}
|
||||
|
||||
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
|
||||
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stmts := []string{}
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
stmts = append(stmts, scanner.Text())
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error loading watched addresses: %v", err)
|
||||
}
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
// dumpWatchedAddressesStatements dumps sql statements to the given file
|
||||
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating watched addresses file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
for _, stmt := range stmts {
|
||||
_, err := file.Write([]byte(stmt + "\n"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
|
||||
func parseWatchedAddressStatement(stmt string) (string, error) {
|
||||
parseResult, err := pg_query.Parse(stmt)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing sql stmt: %v", err)
|
||||
}
|
||||
|
||||
// extract address argument from parse output for a SQL statement of form
|
||||
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
|
||||
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
|
||||
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
|
||||
SelectStmt.GetSelectStmt().
|
||||
ValuesLists[0].GetList().
|
||||
Items[0].GetAConst().
|
||||
GetVal().
|
||||
GetString_().
|
||||
Str
|
||||
|
||||
return addressString, nil
|
||||
}
|
||||
|
1063
statediff/indexer/database/file/indexer_shared_test.go
Normal file
1063
statediff/indexer/database/file/indexer_shared_test.go
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
60
statediff/indexer/database/file/interfaces.go
Normal file
60
statediff/indexer/database/file/interfaces.go
Normal file
@ -0,0 +1,60 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2022 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
// Writer interface required by the file indexer
|
||||
type FileWriter interface {
|
||||
// Methods used to control the writer
|
||||
Loop()
|
||||
Close() error
|
||||
Flush()
|
||||
|
||||
// Methods to upsert ethereum data model objects
|
||||
upsertNode(node nodeinfo.Info)
|
||||
upsertHeaderCID(header models.HeaderModel)
|
||||
upsertUncleCID(uncle models.UncleModel)
|
||||
upsertTransactionCID(transaction models.TxModel)
|
||||
upsertAccessListElement(accessListElement models.AccessListElementModel)
|
||||
upsertReceiptCID(rct *models.ReceiptModel)
|
||||
upsertLogCID(logs []*models.LogsModel)
|
||||
upsertStateCID(stateNode models.StateNodeModel)
|
||||
upsertStateAccount(stateAccount models.StateAccountModel)
|
||||
upsertStorageCID(storageCID models.StorageNodeModel)
|
||||
upsertIPLD(ipld models.IPLDModel)
|
||||
|
||||
// Methods to upsert IPLD in different ways
|
||||
upsertIPLDDirect(blockNumber, key string, value []byte)
|
||||
upsertIPLDNode(blockNumber string, i node.Node)
|
||||
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
|
||||
|
||||
// Methods to read and write watched addresses
|
||||
loadWatchedAddresses() ([]common.Address, error)
|
||||
insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
|
||||
removeWatchedAddresses(args []types.WatchAddressArg) error
|
||||
setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
|
||||
}
|
@ -81,11 +81,11 @@ func testPushBlockAndState(t *testing.T, block *types.Block, receipts types.Rece
|
||||
}
|
||||
|
||||
func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
if _, err := os.Stat(file.CSVTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.TestConfig)
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
@ -118,7 +118,7 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
|
||||
}
|
||||
|
||||
func dumpData(t *testing.T) {
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
@ -127,7 +127,7 @@ func dumpData(t *testing.T) {
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
err := os.Remove(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
err = sqlxdb.Close()
|
||||
require.NoError(t, err)
|
||||
|
@ -22,33 +22,25 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||
)
|
||||
|
||||
var (
|
||||
legacyData = mocks.NewLegacyData()
|
||||
mockLegacyBlock *types.Block
|
||||
legacyHeaderCID cid.Cid
|
||||
)
|
||||
|
||||
func setupLegacy(t *testing.T) {
|
||||
mockLegacyBlock = legacyData.MockBlock
|
||||
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
|
||||
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.SQLTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
@ -65,6 +57,7 @@ func setupLegacy(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, node := range legacyData.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
@ -73,7 +66,6 @@ func setupLegacy(t *testing.T) {
|
||||
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
@ -81,7 +73,7 @@ func setupLegacy(t *testing.T) {
|
||||
}
|
||||
|
||||
func dumpFileData(t *testing.T) {
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
@ -91,30 +83,20 @@ func dumpFileData(t *testing.T) {
|
||||
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
|
||||
resetDB(t)
|
||||
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func resetDB(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
if err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -122,36 +104,11 @@ func tearDown(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func expectTrue(t *testing.T, value bool) {
|
||||
if !value {
|
||||
t.Fatalf("Assertion failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileIndexerLegacy(t *testing.T) {
|
||||
func TestSQLFileIndexerLegacy(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||
setupLegacy(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
pgStr := `SELECT cid, td, reward, block_hash, coinbase
|
||||
FROM eth.header_cids
|
||||
WHERE block_number = $1`
|
||||
// check header was properly indexed
|
||||
type res struct {
|
||||
CID string
|
||||
TD string
|
||||
Reward string
|
||||
BlockHash string `db:"block_hash"`
|
||||
Coinbase string `db:"coinbase"`
|
||||
}
|
||||
header := new(res)
|
||||
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, legacyHeaderCID.String(), header.CID)
|
||||
require.Equal(t, legacyData.MockBlock.Difficulty().String(), header.TD)
|
||||
require.Equal(t, "5000000000000011250", header.Reward)
|
||||
require.Equal(t, legacyData.MockBlock.Coinbase().String(), header.Coinbase)
|
||||
require.Nil(t, legacyData.MockHeader.BaseFee)
|
||||
testLegacyPublishAndIndexHeaderIPLDs(t)
|
||||
})
|
||||
}
|
174
statediff/indexer/database/file/sql_indexer_test.go
Normal file
174
statediff/indexer/database/file/sql_indexer_test.go
Normal file
@ -0,0 +1,174 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2019 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||
)
|
||||
|
||||
func setupIndexer(t *testing.T) {
|
||||
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.SQLTestConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T) {
|
||||
setupIndexer(t)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
mockBlock,
|
||||
mocks.MockReceipts,
|
||||
mocks.MockBlock.Difficulty())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ind.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
for _, node := range mocks.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
}
|
||||
|
||||
func TestSQLFileIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexHeaderIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexTransactionIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexLogIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexReceiptIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexStateIPLDs(t)
|
||||
})
|
||||
|
||||
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||
setup(t)
|
||||
dumpFileData(t)
|
||||
defer tearDown(t)
|
||||
|
||||
testPublishAndIndexStorageIPLDs(t)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSQLFileWatchAddressMethods(t *testing.T) {
|
||||
setupIndexer(t)
|
||||
defer tearDown(t)
|
||||
|
||||
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
|
||||
testLoadEmptyWatchedAddresses(t)
|
||||
})
|
||||
|
||||
t.Run("Insert watched addresses", func(t *testing.T) {
|
||||
testInsertWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
|
||||
testInsertAlreadyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Remove watched addresses", func(t *testing.T) {
|
||||
testRemoveWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
|
||||
testRemoveNonWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Set watched addresses", func(t *testing.T) {
|
||||
testSetWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
|
||||
testSetAlreadyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Load watched addresses", func(t *testing.T) {
|
||||
testLoadWatchedAddresses(t)
|
||||
})
|
||||
|
||||
t.Run("Clear watched addresses", func(t *testing.T) {
|
||||
testClearWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
|
||||
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
|
||||
testClearEmptyWatchedAddresses(t, resetAndDumpWatchedAddressesFileData)
|
||||
})
|
||||
}
|
@ -17,17 +17,24 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
"os"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
pg_query "github.com/pganalyze/pg_query_go/v2"
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -47,10 +54,12 @@ type SQLWriter struct {
|
||||
flushFinished chan struct{}
|
||||
quitChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
|
||||
watchedAddressesFilePath string
|
||||
}
|
||||
|
||||
// NewSQLWriter creates a new pointer to a Writer
|
||||
func NewSQLWriter(wc io.WriteCloser) *SQLWriter {
|
||||
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
|
||||
return &SQLWriter{
|
||||
wc: wc,
|
||||
stmts: make(chan []byte),
|
||||
@ -59,6 +68,7 @@ func NewSQLWriter(wc io.WriteCloser) *SQLWriter {
|
||||
flushFinished: make(chan struct{}),
|
||||
quitChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
watchedAddressesFilePath: watchedAddressesFilePath,
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,6 +116,9 @@ func (sqw *SQLWriter) Loop() {
|
||||
func (sqw *SQLWriter) Close() error {
|
||||
close(sqw.quitChan)
|
||||
<-sqw.doneChan
|
||||
close(sqw.stmts)
|
||||
close(sqw.flushChan)
|
||||
close(sqw.flushFinished)
|
||||
return sqw.wc.Close()
|
||||
}
|
||||
|
||||
@ -257,3 +270,160 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
||||
}
|
||||
|
||||
// LoadWatchedAddresses loads watched addresses from a file
|
||||
func (sqw *SQLWriter) loadWatchedAddresses() ([]common.Address, error) {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// extract addresses from the sql statements
|
||||
watchedAddresses := []common.Address{}
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
|
||||
}
|
||||
|
||||
return watchedAddresses, nil
|
||||
}
|
||||
|
||||
// InsertWatchedAddresses inserts the given addresses in a file
|
||||
func (sqw *SQLWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get already watched addresses
|
||||
var watchedAddresses []string
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
watchedAddresses = append(watchedAddresses, addressString)
|
||||
}
|
||||
|
||||
// append statements for new addresses to existing statements
|
||||
for _, arg := range args {
|
||||
// ignore if already watched
|
||||
if funk.Contains(watchedAddresses, arg.Address) {
|
||||
continue
|
||||
}
|
||||
|
||||
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
|
||||
stmts = append(stmts, stmt)
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
|
||||
}
|
||||
|
||||
// RemoveWatchedAddresses removes the given watched addresses from a file
|
||||
func (sqw *SQLWriter) removeWatchedAddresses(args []types.WatchAddressArg) error {
|
||||
// load sql statements from watched addresses file
|
||||
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get rid of statements having addresses to be removed
|
||||
var filteredStmts []string
|
||||
for _, stmt := range stmts {
|
||||
addressString, err := parseWatchedAddressStatement(stmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
toRemove := funk.Contains(args, func(arg types.WatchAddressArg) bool {
|
||||
return arg.Address == addressString
|
||||
})
|
||||
|
||||
if !toRemove {
|
||||
filteredStmts = append(filteredStmts, stmt)
|
||||
}
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, filteredStmts)
|
||||
}
|
||||
|
||||
// SetWatchedAddresses clears and inserts the given addresses in a file
|
||||
func (sqw *SQLWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
var stmts []string
|
||||
for _, arg := range args {
|
||||
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
|
||||
stmts = append(stmts, stmt)
|
||||
}
|
||||
|
||||
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
|
||||
}
|
||||
|
||||
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
|
||||
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stmts := []string{}
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
stmts = append(stmts, scanner.Text())
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error loading watched addresses: %v", err)
|
||||
}
|
||||
|
||||
return stmts, nil
|
||||
}
|
||||
|
||||
// dumpWatchedAddressesStatements dumps sql statements to the given file
|
||||
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating watched addresses file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
for _, stmt := range stmts {
|
||||
_, err := file.Write([]byte(stmt + "\n"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
|
||||
func parseWatchedAddressStatement(stmt string) (string, error) {
|
||||
parseResult, err := pg_query.Parse(stmt)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error parsing sql stmt: %v", err)
|
||||
}
|
||||
|
||||
// extract address argument from parse output for a SQL statement of form
|
||||
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
|
||||
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
|
||||
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
|
||||
SelectStmt.GetSelectStmt().
|
||||
ValuesLists[0].GetList().
|
||||
Items[0].GetAConst().
|
||||
GetVal().
|
||||
GetString_().
|
||||
Str
|
||||
|
||||
return addressString, nil
|
||||
}
|
184
statediff/indexer/database/file/types/schema.go
Normal file
184
statediff/indexer/database/file/types/schema.go
Normal file
@ -0,0 +1,184 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package types
|
||||
|
||||
var TableIPLDBlock = Table{
|
||||
`public.blocks`,
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "key", dbType: text},
|
||||
{name: "data", dbType: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableNodeInfo = Table{
|
||||
Name: `public.nodes`,
|
||||
Columns: []column{
|
||||
{name: "genesis_block", dbType: varchar},
|
||||
{name: "network_id", dbType: varchar},
|
||||
{name: "node_id", dbType: varchar},
|
||||
{name: "client_name", dbType: varchar},
|
||||
{name: "chain_id", dbType: integer},
|
||||
},
|
||||
}
|
||||
|
||||
var TableHeader = Table{
|
||||
"eth.header_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "block_hash", dbType: varchar},
|
||||
{name: "parent_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "td", dbType: numeric},
|
||||
{name: "node_id", dbType: varchar},
|
||||
{name: "reward", dbType: numeric},
|
||||
{name: "state_root", dbType: varchar},
|
||||
{name: "tx_root", dbType: varchar},
|
||||
{name: "receipt_root", dbType: varchar},
|
||||
{name: "uncle_root", dbType: varchar},
|
||||
{name: "bloom", dbType: bytea},
|
||||
{name: "timestamp", dbType: numeric},
|
||||
{name: "mh_key", dbType: text},
|
||||
{name: "times_validated", dbType: integer},
|
||||
{name: "coinbase", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateNode = Table{
|
||||
"eth.state_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_leaf_key", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "node_type", dbType: integer},
|
||||
{name: "diff", dbType: boolean},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStorageNode = Table{
|
||||
"eth.storage_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "storage_leaf_key", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "storage_path", dbType: bytea},
|
||||
{name: "node_type", dbType: integer},
|
||||
{name: "diff", dbType: boolean},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableUncle = Table{
|
||||
"eth.uncle_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "block_hash", dbType: varchar},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "parent_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "reward", dbType: numeric},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableTransaction = Table{
|
||||
"eth.transaction_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "tx_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "dst", dbType: varchar},
|
||||
{name: "src", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "mh_key", dbType: text},
|
||||
{name: "tx_data", dbType: bytea},
|
||||
{name: "tx_type", dbType: integer},
|
||||
{name: "value", dbType: numeric},
|
||||
},
|
||||
}
|
||||
|
||||
var TableAccessListElement = Table{
|
||||
"eth.access_list_elements",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "tx_id", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "storage_keys", dbType: varchar, isArray: true},
|
||||
},
|
||||
}
|
||||
|
||||
var TableReceipt = Table{
|
||||
"eth.receipt_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "tx_id", dbType: varchar},
|
||||
{name: "leaf_cid", dbType: text},
|
||||
{name: "contract", dbType: varchar},
|
||||
{name: "contract_hash", dbType: varchar},
|
||||
{name: "leaf_mh_key", dbType: text},
|
||||
{name: "post_state", dbType: varchar},
|
||||
{name: "post_status", dbType: integer},
|
||||
{name: "log_root", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableLog = Table{
|
||||
"eth.log_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "leaf_cid", dbType: text},
|
||||
{name: "leaf_mh_key", dbType: text},
|
||||
{name: "rct_id", dbType: varchar},
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "topic0", dbType: varchar},
|
||||
{name: "topic1", dbType: varchar},
|
||||
{name: "topic2", dbType: varchar},
|
||||
{name: "topic3", dbType: varchar},
|
||||
{name: "log_data", dbType: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateAccount = Table{
|
||||
"eth.state_accounts",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "balance", dbType: numeric},
|
||||
{name: "nonce", dbType: bigint},
|
||||
{name: "code_hash", dbType: bytea},
|
||||
{name: "storage_root", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableWatchedAddresses = Table{
|
||||
"eth_meta.watched_addresses",
|
||||
[]column{
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "created_at", dbType: bigint},
|
||||
{name: "watched_at", dbType: bigint},
|
||||
{name: "last_filled_at", dbType: bigint},
|
||||
},
|
||||
}
|
104
statediff/indexer/database/file/types/table.go
Normal file
104
statediff/indexer/database/file/types/table.go
Normal file
@ -0,0 +1,104 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/thoas/go-funk"
|
||||
)
|
||||
|
||||
type colType int
|
||||
|
||||
const (
|
||||
integer colType = iota
|
||||
boolean
|
||||
bigint
|
||||
numeric
|
||||
bytea
|
||||
varchar
|
||||
text
|
||||
)
|
||||
|
||||
type column struct {
|
||||
name string
|
||||
dbType colType
|
||||
isArray bool
|
||||
}
|
||||
type Table struct {
|
||||
Name string
|
||||
Columns []column
|
||||
}
|
||||
|
||||
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
|
||||
var row []string
|
||||
for i, col := range tbl.Columns {
|
||||
value := col.dbType.formatter()(args[i])
|
||||
|
||||
if col.isArray {
|
||||
valueList := funk.Map(args[i], col.dbType.formatter()).([]string)
|
||||
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
|
||||
}
|
||||
|
||||
row = append(row, value)
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
func (tbl *Table) VarcharColumns() []string {
|
||||
columns := funk.Filter(tbl.Columns, func(col column) bool {
|
||||
return col.dbType == varchar
|
||||
}).([]column)
|
||||
|
||||
columnNames := funk.Map(columns, func(col column) string {
|
||||
return col.name
|
||||
}).([]string)
|
||||
|
||||
return columnNames
|
||||
}
|
||||
|
||||
type colfmt = func(interface{}) string
|
||||
|
||||
func sprintf(f string) colfmt {
|
||||
return func(x interface{}) string { return fmt.Sprintf(f, x) }
|
||||
}
|
||||
|
||||
func (typ colType) formatter() colfmt {
|
||||
switch typ {
|
||||
case integer:
|
||||
return sprintf("%d")
|
||||
case boolean:
|
||||
return func(x interface{}) string {
|
||||
if x.(bool) {
|
||||
return "t"
|
||||
}
|
||||
return "f"
|
||||
}
|
||||
case bigint:
|
||||
return sprintf("%s")
|
||||
case numeric:
|
||||
return sprintf("%s")
|
||||
case bytea:
|
||||
return sprintf(`\x%x`)
|
||||
case varchar:
|
||||
return sprintf("%s")
|
||||
case text:
|
||||
return sprintf("%s")
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
@ -520,7 +520,8 @@ func TestPGXIndexer(t *testing.T) {
|
||||
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
|
||||
AND state_cids.header_id = header_cids.block_hash
|
||||
AND header_cids.block_number = $1
|
||||
AND storage_cids.node_type != 3`
|
||||
AND storage_cids.node_type != 3
|
||||
ORDER BY storage_path`
|
||||
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -513,7 +513,8 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
|
||||
AND state_cids.header_id = header_cids.block_hash
|
||||
AND header_cids.block_number = $1
|
||||
AND storage_cids.node_type != 3`
|
||||
AND storage_cids.node_type != 3
|
||||
ORDER BY storage_path`
|
||||
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
Loading…
Reference in New Issue
Block a user