Merge pull request #154 from vulcanize/schema_updates
misc fixes/adjustments
This commit is contained in:
commit
898e64bd3e
@ -104,6 +104,13 @@ e.g.
|
||||
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname
|
||||
`
|
||||
|
||||
When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by
|
||||
`--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped.
|
||||
This is done so that we can scale out the production of the SQL statements horizontally, merge the separate SQL files produced,
|
||||
de-duplicate using unix tools (`sort statediff.sql | uniq` or `sort -u statediff.sql`), bulk load using psql
|
||||
(`psql db_name --set ON_ERROR_STOP=on -f statediff.sql`), and then add our primary and foreign key constraints and indexes
|
||||
back afterwards.
|
||||
|
||||
### RPC endpoints
|
||||
The state diffing service exposes both a WS subscription endpoint, and a number of HTTP unary endpoints.
|
||||
|
||||
|
@ -167,7 +167,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (types2.State
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Writes a statediff object to output callback
|
||||
// WriteStateDiffObject writes a statediff object to output callback
|
||||
func (sdb *builder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, codeOutput types2.CodeSink) error {
|
||||
if !params.IntermediateStateNodes || len(params.WatchedAddresses) > 0 {
|
||||
// if we are watching only specific accounts then we are only diffing leaf nodes
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
|
||||
switch config.Type() {
|
||||
case shared.FILE:
|
||||
log.Info("Starting statediff service in SQL file writing mode")
|
||||
fc, ok := config.(file.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
@ -41,6 +43,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
fc.NodeInfo = nodeInfo
|
||||
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||
case shared.POSTGRES:
|
||||
log.Info("Starting statediff service in Postgres writing mode")
|
||||
pgc, ok := config.(postgres.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
@ -63,6 +66,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
}
|
||||
return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver))
|
||||
case shared.DUMP:
|
||||
log.Info("Starting statediff service in data dump mode")
|
||||
dumpc, ok := config.(dump.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
||||
|
@ -72,9 +72,12 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
|
||||
}
|
||||
log.Info("Writing statediff SQL statements to file", "file", filePath)
|
||||
w := NewSQLWriter(file)
|
||||
wg := new(sync.WaitGroup)
|
||||
w.Loop()
|
||||
w.upsertNode(config.NodeInfo)
|
||||
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
|
||||
return &StateDiffIndexer{
|
||||
writer: w,
|
||||
chainConfig: chainConfig,
|
||||
@ -130,11 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
if err := sdi.writer.flush(); err != nil {
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
}
|
||||
sdi.writer.Flush()
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
@ -33,7 +32,8 @@ import (
|
||||
|
||||
var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize)
|
||||
pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize)
|
||||
collatedStmtSize = pipeSize * 16
|
||||
)
|
||||
|
||||
// SQLWriter writes sql statements to a file
|
||||
@ -43,6 +43,8 @@ type SQLWriter struct {
|
||||
collatedStmt []byte
|
||||
collationIndex int
|
||||
|
||||
flushChan chan struct{}
|
||||
flushFinished chan struct{}
|
||||
quitChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
}
|
||||
@ -53,6 +55,8 @@ func NewSQLWriter(file *os.File) *SQLWriter {
|
||||
file: file,
|
||||
stmts: make(chan []byte),
|
||||
collatedStmt: make([]byte, collatedStmtSize),
|
||||
flushChan: make(chan struct{}),
|
||||
flushFinished: make(chan struct{}),
|
||||
quitChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
}
|
||||
@ -72,16 +76,21 @@ func (sqw *SQLWriter) Loop() {
|
||||
l = len(stmt)
|
||||
if l+sqw.collationIndex+1 > collatedStmtSize {
|
||||
if err := sqw.flush(); err != nil {
|
||||
log.Error("error writing cached sql stmts to file", "err", err)
|
||||
panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err))
|
||||
}
|
||||
}
|
||||
copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l-1], stmt)
|
||||
copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l], stmt)
|
||||
sqw.collationIndex += l
|
||||
case <-sqw.quitChan:
|
||||
if err := sqw.flush(); err != nil {
|
||||
log.Error("error writing cached sql stmts to file", "err", err)
|
||||
panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err))
|
||||
}
|
||||
return
|
||||
case <-sqw.flushChan:
|
||||
if err := sqw.flush(); err != nil {
|
||||
panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err))
|
||||
}
|
||||
sqw.flushFinished <- struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -94,8 +103,14 @@ func (sqw *SQLWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush sends a flush signal to the looping process
|
||||
func (sqw *SQLWriter) Flush() {
|
||||
sqw.flushChan <- struct{}{}
|
||||
<-sqw.flushFinished
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) flush() error {
|
||||
if _, err := sqw.file.Write(sqw.collatedStmt[0 : sqw.collationIndex-1]); err != nil {
|
||||
if _, err := sqw.file.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
sqw.collationIndex = 0
|
||||
@ -103,46 +118,43 @@ func (sqw *SQLWriter) flush() error {
|
||||
}
|
||||
|
||||
const (
|
||||
nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d)
|
||||
ON CONFLICT (node_id) DO NOTHING;\n`
|
||||
nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " +
|
||||
"('%s', '%s', '%s', '%s', %d);\n"
|
||||
|
||||
ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n`
|
||||
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '%x');\n"
|
||||
|
||||
headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
|
||||
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n`
|
||||
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
|
||||
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
||||
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, %d);\n"
|
||||
|
||||
headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
|
||||
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n`
|
||||
headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
|
||||
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
||||
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, NULL);\n"
|
||||
|
||||
uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (block_hash) DO NOTHING;\n`
|
||||
uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
|
||||
"('%s', '%s', '%s', '%s', '%s', '%s');\n"
|
||||
|
||||
txInsert = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d)
|
||||
ON CONFLICT (tx_hash) DO NOTHING;\n`
|
||||
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
|
||||
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%x', %d);\n"
|
||||
|
||||
alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s)
|
||||
ON CONFLICT (tx_id, index) DO NOTHING;\n`
|
||||
alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"
|
||||
|
||||
rctInsert = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s)
|
||||
ON CONFLICT (tx_id) DO NOTHING;\n`
|
||||
rctInsert = "INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " +
|
||||
"post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n"
|
||||
|
||||
logInsert = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (rct_id, index) DO NOTHING;\n`
|
||||
logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
|
||||
"topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '%x');\n"
|
||||
|
||||
stateInsert = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s)
|
||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
|
||||
stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
|
||||
"VALUES ('%s', '%s', '%s', '%x', %d, %t, '%s');\n"
|
||||
|
||||
accountInsert = `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s)
|
||||
ON CONFLICT (header_id, state_path) DO NOTHING;\n`
|
||||
accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " +
|
||||
"VALUES ('%s', '%x', '%s', %d, '%x', '%s');\n"
|
||||
|
||||
storageInsert = `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s)
|
||||
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
|
||||
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
||||
"node_type, diff, mh_key) VALUES ('%s', '%x', '%s', '%s', '%x', %d, %t, '%s');\n"
|
||||
)
|
||||
|
||||
// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n`
|
||||
|
||||
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID))
|
||||
}
|
||||
@ -183,15 +195,11 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||
if header.BaseFee == nil {
|
||||
stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1,
|
||||
header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
|
||||
header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey)
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1)
|
||||
} else {
|
||||
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee,
|
||||
header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
|
||||
header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, header.BaseFee)
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
|
||||
}
|
||||
sqw.stmts <- []byte(stmt)
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
@ -228,8 +236,8 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
if stateNode.StateKey != nullHash.String() {
|
||||
stateKey = stateNode.StateKey
|
||||
}
|
||||
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType,
|
||||
true, stateNode.MhKey, stateKey, stateNode.CID, stateNode.NodeType, true, stateNode.MhKey))
|
||||
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
|
||||
stateNode.NodeType, true, stateNode.MhKey))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
|
||||
@ -243,6 +251,5 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
storageKey = storageCID.StorageKey
|
||||
}
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey, storageKey, storageCID.CID, storageCID.NodeType,
|
||||
true, storageCID.MhKey))
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ const (
|
||||
txCIDsPgStr string = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9))
|
||||
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = (excluded.cid, excluded.dst, excluded.src, excluded.index, excluded.mh_key, excluded.tx_data, excluded.tx_type)
|
||||
RETURNING id`
|
||||
accessListPgStr string = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4))
|
||||
accessListPgStr string = `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4))
|
||||
ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = (excluded.address, excluded.storage_keys)`
|
||||
rctCIDsPgStr string = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8))
|
||||
ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = (excluded.leaf_cid, excluded.contract, excluded.contract_hash, excluded.leaf_mh_key, excluded.post_state, excluded.post_status, excluded.log_root)
|
||||
@ -138,7 +138,7 @@ func (pbw *PostgresBatchWriter) upsertTransactionCID(tx *sqlx.Tx, transaction mo
|
||||
}
|
||||
|
||||
func (pbw *PostgresBatchWriter) upsertAccessListElement(tx *sqlx.Tx, accessListElement models.AccessListElementModel, txID int64) error {
|
||||
_, err := tx.Exec(`INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
_, err := tx.Exec(`INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)`,
|
||||
txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
|
||||
if err != nil {
|
||||
|
@ -264,7 +264,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Fatalf("expected AccessListTxType (1), got %d", txType)
|
||||
}
|
||||
accessListElementModels := make([]models.AccessListElementModel, 0)
|
||||
pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC`
|
||||
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
|
||||
err = db.Select(context.Background(), &accessListElementModels, pgStr, c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
|
||||
var DefaultConfig = Config{
|
||||
Hostname: "localhost",
|
||||
Port: 5432,
|
||||
DatabaseName: "vulcanize_testing",
|
||||
DatabaseName: "vulcanize_test",
|
||||
Username: "postgres",
|
||||
Password: "password",
|
||||
Password: "",
|
||||
}
|
||||
|
||||
// Config holds params for a Postgres db
|
||||
|
@ -56,7 +56,7 @@ func (db *DB) InsertTxStm() string {
|
||||
|
||||
// InsertAccessListElementStm satisfies the sql.Statements interface
|
||||
func (db *DB) InsertAccessListElementStm() string {
|
||||
return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (tx_id, index) DO NOTHING`
|
||||
}
|
||||
|
||||
|
@ -286,7 +286,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Fatalf("expected AccessListTxType (1), got %d", txType)
|
||||
}
|
||||
accessListElementModels := make([]models.AccessListElementModel, 0)
|
||||
pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC`
|
||||
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
|
||||
err = db.Select(context.Background(), &accessListElementModels, pgStr, c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -53,7 +53,7 @@ func TearDownDB(t *testing.T, db Database) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = tx.Exec(ctx, `DELETE FROM eth.access_list_element`)
|
||||
_, err = tx.Exec(ctx, `DELETE FROM eth.access_list_elements`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error
|
||||
}
|
||||
|
||||
/*
|
||||
INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (tx_id, index) DO NOTHING
|
||||
*/
|
||||
func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
|
||||
|
@ -239,7 +239,6 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
chainEventFwd <- chainEvent
|
||||
case err := <-errCh:
|
||||
println("here")
|
||||
log.Error("Error from chain event subscription", "error", err)
|
||||
close(sds.QuitChan)
|
||||
log.Info("Quitting the statediffing writing loop")
|
||||
|
Loading…
Reference in New Issue
Block a user