Add mode to write to CSV files in statediff file writer #249

Merged
nikugogoi merged 8 commits from ng-file-csv into v1.10.19-statediff-v4 2022-06-29 11:47:57 +00:00
10 changed files with 535 additions and 32 deletions
Showing only changes of commit 201770b74d - Show all commits

View File

@ -212,7 +212,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
switch dbType {
case shared.FILE:
indexerConfig = file.Config{
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
OutputDir: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}
case shared.POSTGRES:

View File

@ -21,9 +21,9 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
// Config holds params for writing sql statements out to a file
// Config holds params for writing CSV files out to a directory
type Config struct {
FilePath string
OutputDir string
WatchedAddressesFilePath string
NodeInfo node.Info
}
@ -35,7 +35,7 @@ func (c Config) Type() shared.DBType {
// TestConfig config for unit tests
var TestConfig = Config{
FilePath: "./statediffing_test_file.sql",
OutputDir: "./statediffing_test",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",

View File

@ -0,0 +1,237 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
"encoding/csv"
"fmt"
"os"
"path/filepath"
"strconv"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/types"
)
var (
Tables = []*types.Table{
&types.TableIPLDBlock,
&types.TableNodeInfo,
&types.TableHeader,
&types.TableStateNode,
&types.TableStorageNode,
&types.TableUncle,
&types.TableTransaction,
&types.TableAccessListElement,
&types.TableReceipt,
&types.TableLog,
&types.TableStateAccount,
}
)
type CSVWriter struct {
dir string // dir containing output files
writers fileWriters
}
type fileWriter struct {
*csv.Writer
}
// fileWriters wraps the file writers for each output table
type fileWriters map[string]fileWriter
func newFileWriter(path string) (ret fileWriter, err error) {
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return
}
ret = fileWriter{csv.NewWriter(file)}
return
}
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
}
func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
writers := fileWriters{}
for _, tbl := range tables {
w, err := newFileWriter(TableFile(dir, tbl.Name))
if err != nil {
return nil, err
}
writers[tbl.Name] = w
}
return writers, nil
}
func (tx fileWriters) flush() error {
for _, w := range tx {
w.Flush()
if err := w.Error(); err != nil {
return err
}
}
return nil
}
func NewCSVWriter(path string) (*CSVWriter, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, fmt.Errorf("unable to make MkdirAll for path: %s err: %s", path, err)
}
writers, err := makeFileWriters(path, Tables)
if err != nil {
return nil, err
}
csvWriter := &CSVWriter{
writers: writers,
dir: path,
}
return csvWriter, nil
}
// Flush sends a flush signal to the looping process
func (csw *CSVWriter) Flush() {
csw.writers.flush()
}
func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv") }
// Close satisfies io.Closer
func (csw *CSVWriter) Close() error {
return csw.writers.flush()
}
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
csw.writers.write(&types.TableNodeInfo, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.writers.flush()
}
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
csw.writers.write(&types.TableIPLDBlock, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.writers.flush()
}
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: key,
Data: value,
})
}
func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i node.Node) {
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
})
}
func (csw *CSVWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", "", err
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
csw.upsertIPLD(models.IPLDModel{
BlockNumber: blockNumber,
Key: prefixedKey,
Data: raw,
})
return c.String(), prefixedKey, err
}
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
csw.writers.write(&types.TableHeader, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
csw.writers.flush()
indexerMetrics.blocks.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
csw.writers.write(&types.TableUncle, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey)
csw.writers.flush()
}
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
csw.writers.write(&types.TableTransaction, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
csw.writers.flush()
indexerMetrics.transactions.Inc(1)
}
func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
csw.writers.write(&types.TableAccessListElement, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
csw.writers.flush()
indexerMetrics.accessListEntries.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
csw.writers.write(&types.TableReceipt, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)
csw.writers.flush()
indexerMetrics.receipts.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
csw.writers.write(&types.TableLog, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)
indexerMetrics.logs.Inc(1)
}
csw.writers.flush()
}
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var stateKey string
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
csw.writers.write(&types.TableStateNode, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
stateNode.NodeType, true, stateNode.MhKey)
csw.writers.flush()
}
func (csw *CSVWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
csw.writers.write(&types.TableStateAccount, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
strconv.FormatUint(stateAccount.Nonce, 10), stateAccount.CodeHash, stateAccount.StorageRoot)
csw.writers.flush()
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var storageKey string
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
csw.writers.write(&types.TableStorageNode, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
csw.writers.flush()
}

View File

@ -47,7 +47,7 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
const defaultFilePath = "./statediff.sql"
const defaultOutputDir = "./statediff_output"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
@ -60,7 +60,7 @@ var (
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter *SQLWriter
fileWriter *CSVWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
@ -71,18 +71,12 @@ type StateDiffIndexer struct {
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
filePath := config.FilePath
if filePath == "" {
filePath = defaultFilePath
outputDir := config.OutputDir
if outputDir == "" {
outputDir = defaultOutputDir
}
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
}
file, err := os.Create(filePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing statediff SQL statements to file", "file", filePath)
log.Info("Writing statediff CSV files to directory", "file", outputDir)
watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
@ -90,9 +84,12 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
w := NewSQLWriter(file)
w, err := NewCSVWriter(outputDir)
if err != nil {
return nil, err
}
wg := new(sync.WaitGroup)
w.Loop()
w.upsertNode(config.NodeInfo)
return &StateDiffIndexer{
fileWriter: w,

View File

@ -19,6 +19,7 @@ package file_test
import (
"context"
"errors"
"fmt"
"os"
"testing"
@ -44,8 +45,8 @@ var (
func setupLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.OutputDir)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
@ -81,11 +82,13 @@ func setupLegacy(t *testing.T) {
}
func dumpFileData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
require.NoError(t, err)
pgCopyStatement := `COPY %s FROM '%s' CSV`
_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
for _, tbl := range file.Tables {
stm := fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFile(file.TestConfig.OutputDir, tbl.Name))
_, err = sqlxdb.Exec(stm)
require.NoError(t, err)
}
}
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
@ -111,7 +114,7 @@ func resetDB(t *testing.T) {
func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath)
err := os.RemoveAll(file.TestConfig.OutputDir)
require.NoError(t, err)
if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {

View File

@ -183,8 +183,8 @@ func init() {
}
func setupIndexer(t *testing.T) {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.OutputDir)
require.NoError(t, err)
}

View File

@ -81,8 +81,8 @@ func testPushBlockAndState(t *testing.T, block *types.Block, receipts types.Rece
}
func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.OutputDir)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.TestConfig)
@ -118,7 +118,7 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
}
func dumpData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
sqlFileBytes, err := os.ReadFile(file.TestConfig.OutputDir)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
@ -127,7 +127,7 @@ func dumpData(t *testing.T) {
func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath)
err := os.Remove(file.TestConfig.OutputDir)
require.NoError(t, err)
err = sqlxdb.Close()
require.NoError(t, err)

174
statediff/types/schema.go Normal file
View File

@ -0,0 +1,174 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
var TableIPLDBlock = Table{
`public.blocks`,
[]column{
{name: "block_number", typ: bigint},
{name: "key", typ: text},
{name: "data", typ: bytea},
},
}
var TableNodeInfo = Table{
Name: `public.nodes`,
Columns: []column{
{name: "genesis_block", typ: varchar},
{name: "network_id", typ: varchar},
{name: "node_id", typ: varchar},
{name: "client_name", typ: varchar},
{name: "chain_id", typ: integer},
},
}
var TableHeader = Table{
"eth.header_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "block_hash", typ: varchar},
{name: "parent_hash", typ: varchar},
{name: "cid", typ: text},
{name: "td", typ: numeric},
{name: "node_id", typ: varchar},
{name: "reward", typ: numeric},
{name: "state_root", typ: varchar},
{name: "tx_root", typ: varchar},
{name: "receipt_root", typ: varchar},
{name: "uncle_root", typ: varchar},
{name: "bloom", typ: bytea},
{name: "timestamp", typ: numeric},
{name: "mh_key", typ: text},
{name: "times_validated", typ: integer},
{name: "coinbase", typ: varchar},
},
}
var TableStateNode = Table{
"eth.state_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},
{name: "state_leaf_key", typ: varchar},
{name: "cid", typ: text},
{name: "state_path", typ: bytea},
{name: "node_type", typ: integer},
{name: "diff", typ: boolean},
{name: "mh_key", typ: text},
},
}
var TableStorageNode = Table{
"eth.storage_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},
{name: "state_path", typ: bytea},
{name: "storage_leaf_key", typ: varchar},
{name: "cid", typ: text},
{name: "storage_path", typ: bytea},
{name: "node_type", typ: integer},
{name: "diff", typ: boolean},
{name: "mh_key", typ: text},
},
}
var TableUncle = Table{
"eth.uncle_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "block_hash", typ: varchar},
{name: "header_id", typ: varchar},
{name: "parent_hash", typ: varchar},
{name: "cid", typ: text},
{name: "reward", typ: numeric},
{name: "mh_key", typ: text},
},
}
var TableTransaction = Table{
"eth.transaction_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},
{name: "tx_hash", typ: varchar},
{name: "cid", typ: text},
{name: "dst", typ: varchar},
{name: "src", typ: varchar},
{name: "index", typ: integer},
{name: "mh_key", typ: text},
{name: "tx_data", typ: bytea},
{name: "tx_type", typ: integer},
{name: "value", typ: numeric},
},
}
var TableAccessListElement = Table{
"eth.access_list_elements",
[]column{
{name: "block_number", typ: bigint},
{name: "tx_id", typ: varchar},
{name: "index", typ: integer},
{name: "address", typ: varchar},
{name: "storage_keys", typ: varchar, isArray: true},
},
}
var TableReceipt = Table{
"eth.receipt_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "tx_id", typ: varchar},
{name: "leaf_cid", typ: text},
{name: "contract", typ: varchar},
{name: "contract_hash", typ: varchar},
{name: "leaf_mh_key", typ: text},
{name: "post_state", typ: varchar},
{name: "post_status", typ: integer},
{name: "log_root", typ: varchar},
},
}
var TableLog = Table{
"eth.log_cids",
[]column{
{name: "block_number", typ: bigint},
{name: "leaf_cid", typ: text},
{name: "leaf_mh_key", typ: text},
{name: "rct_id", typ: varchar},
{name: "address", typ: varchar},
{name: "index", typ: integer},
{name: "topic0", typ: varchar},
{name: "topic1", typ: varchar},
{name: "topic2", typ: varchar},
{name: "topic3", typ: varchar},
{name: "log_data", typ: bytea},
},
}
var TableStateAccount = Table{
"eth.state_account",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},
{name: "state_path", typ: bytea},
{name: "balance", typ: numeric},
{name: "nonce", typ: bigint},
{name: "code_hash", typ: bytea},
{name: "storage_root", typ: varchar},
},
}

92
statediff/types/table.go Normal file
View File

@ -0,0 +1,92 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
import (
"fmt"
"strings"
"github.com/thoas/go-funk"
)
type colType int
const (
integer colType = iota
boolean
bigint
numeric
bytea
varchar
text
)
type column struct {
name string
typ colType
isArray bool
}
type Table struct {
Name string
Columns []column
}
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.typ.formatter()(args[i])
if col.isArray {
valueList := funk.Map(args[i], col.typ.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
type colfmt = func(interface{}) string
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}
func (typ colType) formatter() colfmt {
switch typ {
case integer:
return sprintf("%d")
case boolean:
return func(x interface{}) string {
if x.(bool) {
return "t"
}
return "f"
}
case bigint:
return sprintf("%s")
case numeric:
return sprintf("%s")
case bytea:
return sprintf(`\x%x`)
case varchar:
return sprintf("%s")
case text:
return sprintf("%s")
}
panic("unreachable")
}