additional logging to resync service; rename state/storage key to leaf key and allow insert of null values for intermediate nodes
This commit is contained in:
parent
e570a2c9d4
commit
834e2bcff4
@ -66,7 +66,7 @@ func init() {
|
||||
resyncCmd.PersistentFlags().Int("resync-stop", 0, "block height to stop resync")
|
||||
resyncCmd.PersistentFlags().Int("resync-batch-size", 0, "data fetching batch size")
|
||||
resyncCmd.PersistentFlags().Int("resync-batch-number", 0, "how many goroutines to fetch data concurrently")
|
||||
resyncCmd.PersistentFlags().Bool("resync-clear-old", false, "if true, clear out old data of the provided type within the resync range before resyncing")
|
||||
resyncCmd.PersistentFlags().Bool("resync-clear-old-cache", false, "if true, clear out old data of the provided type within the resync range before resyncing")
|
||||
|
||||
resyncCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
|
||||
resyncCmd.PersistentFlags().String("btc-password", "", "password for btc node")
|
||||
@ -87,7 +87,7 @@ func init() {
|
||||
viper.BindPFlag("resync.stop", resyncCmd.PersistentFlags().Lookup("resync-stop"))
|
||||
viper.BindPFlag("resync.batchSize", resyncCmd.PersistentFlags().Lookup("resync-batch-size"))
|
||||
viper.BindPFlag("resync.batchNumber", resyncCmd.PersistentFlags().Lookup("resync-batch-number"))
|
||||
viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old"))
|
||||
viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old-cache"))
|
||||
|
||||
viper.BindPFlag("bitcoin.httpPath", resyncCmd.PersistentFlags().Lookup("btc-http-path"))
|
||||
viper.BindPFlag("bitcoin.pass", resyncCmd.PersistentFlags().Lookup("btc-password"))
|
||||
|
13
db/migrations/00031_rename_to_leaf_key.sql
Normal file
13
db/migrations/00031_rename_to_leaf_key.sql
Normal file
@ -0,0 +1,13 @@
|
||||
-- +goose Up
|
||||
ALTER TABLE eth.state_cids
|
||||
RENAME COLUMN state_key TO state_leaf_key;
|
||||
|
||||
ALTER TABLE eth.storage_cids
|
||||
RENAME COLUMN storage_key TO storage_leaf_key;
|
||||
|
||||
-- +goose Down
|
||||
ALTER TABLE eth.storage_cids
|
||||
RENAME COLUMN storage_leaf_key TO storage_key;
|
||||
|
||||
ALTER TABLE eth.state_cids
|
||||
RENAME COLUMN state_leaf_key TO state_key;
|
@ -420,7 +420,7 @@ ALTER SEQUENCE eth.state_accounts_id_seq OWNED BY eth.state_accounts.id;
|
||||
CREATE TABLE eth.state_cids (
|
||||
id integer NOT NULL,
|
||||
header_id integer NOT NULL,
|
||||
state_key character varying(66),
|
||||
state_leaf_key character varying(66),
|
||||
cid text NOT NULL,
|
||||
state_path bytea,
|
||||
node_type integer
|
||||
@ -454,7 +454,7 @@ ALTER SEQUENCE eth.state_cids_id_seq OWNED BY eth.state_cids.id;
|
||||
CREATE TABLE eth.storage_cids (
|
||||
id integer NOT NULL,
|
||||
state_id integer NOT NULL,
|
||||
storage_key character varying(66),
|
||||
storage_leaf_key character varying(66),
|
||||
cid text NOT NULL,
|
||||
storage_path bytea,
|
||||
node_type integer
|
||||
|
@ -30,6 +30,10 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||
)
|
||||
|
||||
var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
)
|
||||
|
||||
// Indexer satisfies the Indexer interface for ethereum
|
||||
type CIDIndexer struct {
|
||||
db *postgres.DB
|
||||
@ -133,10 +137,14 @@ func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID in
|
||||
func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
|
||||
for _, stateCID := range payload.StateNodeCIDs {
|
||||
var stateID int64
|
||||
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_key, cid, state_path, node_type) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_key, cid, node_type) = ($2, $3, $5)
|
||||
var stateKey string
|
||||
if stateCID.StateKey != nullHash.String() {
|
||||
stateKey = stateCID.StateKey
|
||||
}
|
||||
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type) = ($2, $3, $5)
|
||||
RETURNING id`,
|
||||
headerID, stateCID.StateKey, stateCID.CID, stateCID.Path, stateCID.NodeType).Scan(&stateID)
|
||||
headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType).Scan(&stateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -166,8 +174,12 @@ func (in *CIDIndexer) indexStateAccount(tx *sqlx.Tx, stateAccount StateAccountMo
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel, stateID int64) error {
|
||||
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_key, cid, storage_path, node_type) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_key, cid, node_type) = ($2, $3, $5)`,
|
||||
stateID, storageCID.StorageKey, storageCID.CID, storageCID.Path, storageCID.NodeType)
|
||||
var storageKey string
|
||||
if storageCID.StorageKey != nullHash.String() {
|
||||
storageKey = storageCID.StorageKey
|
||||
}
|
||||
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type) = ($2, $3, $5)`,
|
||||
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType)
|
||||
return err
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ var _ = Describe("Indexer", func() {
|
||||
Expect(shared.ListContainsString(rcts, mocks.Rct2CID.String())).To(BeTrue())
|
||||
// check that state nodes were properly indexed
|
||||
stateNodes := make([]eth.StateNodeModel, 0)
|
||||
pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
|
||||
WHERE header_cids.block_number = $1`
|
||||
err = db.Select(&stateNodes, pgStr, 1)
|
||||
@ -104,7 +104,7 @@ var _ = Describe("Indexer", func() {
|
||||
}
|
||||
// check that storage nodes were properly indexed
|
||||
storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0)
|
||||
pgStr = `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.node_type, storage_cids.storage_path
|
||||
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||
WHERE storage_cids.state_id = state_cids.id
|
||||
AND state_cids.header_id = header_cids.id
|
||||
|
@ -74,7 +74,7 @@ type StateNodeModel struct {
|
||||
ID int64 `db:"id"`
|
||||
HeaderID int64 `db:"header_id"`
|
||||
Path []byte `db:"state_path"`
|
||||
StateKey string `db:"state_key"`
|
||||
StateKey string `db:"state_leaf_key"`
|
||||
NodeType int `db:"node_type"`
|
||||
CID string `db:"cid"`
|
||||
}
|
||||
@ -84,7 +84,7 @@ type StorageNodeModel struct {
|
||||
ID int64 `db:"id"`
|
||||
StateID int64 `db:"state_id"`
|
||||
Path []byte `db:"storage_path"`
|
||||
StorageKey string `db:"storage_key"`
|
||||
StorageKey string `db:"storage_leaf_key"`
|
||||
NodeType int `db:"node_type"`
|
||||
CID string `db:"cid"`
|
||||
}
|
||||
@ -94,8 +94,8 @@ type StorageNodeWithStateKeyModel struct {
|
||||
ID int64 `db:"id"`
|
||||
StateID int64 `db:"state_id"`
|
||||
Path []byte `db:"storage_path"`
|
||||
StateKey string `db:"state_key"`
|
||||
StorageKey string `db:"storage_key"`
|
||||
StateKey string `db:"state_leaf_key"`
|
||||
StorageKey string `db:"storage_leaf_key"`
|
||||
NodeType int `db:"node_type"`
|
||||
CID string `db:"cid"`
|
||||
}
|
||||
|
@ -391,7 +391,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
|
||||
log.Debug("retrieving state cids for header id ", headerID)
|
||||
args := make([]interface{}, 0, 2)
|
||||
pgStr := `SELECT state_cids.id, state_cids.header_id,
|
||||
state_cids.state_key, state_cids.node_type, state_cids.cid, state_cids.state_path
|
||||
state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.state_path
|
||||
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
|
||||
WHERE header_cids.id = $1`
|
||||
args = append(args, headerID)
|
||||
@ -401,7 +401,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
|
||||
for i, addr := range stateFilter.Addresses {
|
||||
keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String()
|
||||
}
|
||||
pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
|
||||
pgStr += ` AND state_cids.state_leaf_key = ANY($2::VARCHAR(66)[])`
|
||||
args = append(args, pq.Array(keys))
|
||||
}
|
||||
if !stateFilter.IntermediateNodes {
|
||||
@ -415,8 +415,8 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
|
||||
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) {
|
||||
log.Debug("retrieving storage cids for header id ", headerID)
|
||||
args := make([]interface{}, 0, 3)
|
||||
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key,
|
||||
storage_cids.node_type, storage_cids.cid, storage_cids.storage_path, state_cids.state_key
|
||||
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key,
|
||||
storage_cids.node_type, storage_cids.cid, storage_cids.storage_path, state_cids.state_leaf_key
|
||||
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||
WHERE storage_cids.state_id = state_cids.id
|
||||
AND state_cids.header_id = header_cids.id
|
||||
@ -429,12 +429,12 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
|
||||
for i, addr := range storageFilter.Addresses {
|
||||
keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String()
|
||||
}
|
||||
pgStr += fmt.Sprintf(` AND state_cids.state_key = ANY($%d::VARCHAR(66)[])`, id)
|
||||
pgStr += fmt.Sprintf(` AND state_cids.state_leaf_key = ANY($%d::VARCHAR(66)[])`, id)
|
||||
args = append(args, pq.Array(keys))
|
||||
id++
|
||||
}
|
||||
if len(storageFilter.StorageKeys) > 0 {
|
||||
pgStr += fmt.Sprintf(` AND storage_cids.storage_key = ANY($%d::VARCHAR(66)[])`, id)
|
||||
pgStr += fmt.Sprintf(` AND storage_cids.storage_leaf_key = ANY($%d::VARCHAR(66)[])`, id)
|
||||
args = append(args, pq.Array(storageFilter.StorageKeys))
|
||||
}
|
||||
if !storageFilter.IntermediateNodes {
|
||||
|
@ -113,6 +113,7 @@ func NewResyncService(settings *Config) (Resync, error) {
|
||||
|
||||
func (rs *Service) Resync() error {
|
||||
if rs.clearOldCache {
|
||||
logrus.Infof("cleaning out old data from Postgres")
|
||||
if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil {
|
||||
return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user