Port: Check if the statediff is already in progress or completed.
This commit is contained in:
parent
b6fc15dd81
commit
8fa061ded5
@ -413,6 +413,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the output.
|
||||||
|
// In the "dump" case, this is presumed to be false.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.dump.Close()
|
return sdi.dump.Close()
|
||||||
|
@ -459,6 +459,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the output.
|
||||||
|
// In the "file" case this is presumed to be false.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.fileWriter.Close()
|
return sdi.fileWriter.Close()
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
core "github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
@ -248,7 +248,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
|
|||||||
}
|
}
|
||||||
|
|
||||||
// processUncles publishes and indexes uncle IPLDs in Postgres
|
// processUncles publishes and indexes uncle IPLDs in Postgres
|
||||||
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, unclesHash core.Hash, uncles []*types.Header) error {
|
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, unclesHash common.Hash, uncles []*types.Header) error {
|
||||||
// publish and index uncles
|
// publish and index uncles
|
||||||
uncleEncoding, err := rlp.EncodeToBytes(uncles)
|
uncleEncoding, err := rlp.EncodeToBytes(uncles)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -349,7 +349,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
|
|||||||
if len(receipt.PostState) == 0 {
|
if len(receipt.PostState) == 0 {
|
||||||
rctModel.PostStatus = receipt.Status
|
rctModel.PostStatus = receipt.Status
|
||||||
} else {
|
} else {
|
||||||
rctModel.PostState = core.BytesToHash(receipt.PostState).String()
|
rctModel.PostState = common.BytesToHash(receipt.PostState).String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil {
|
if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil {
|
||||||
@ -400,7 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
stateModel = models.StateNodeModel{
|
stateModel = models.StateNodeModel{
|
||||||
BlockNumber: tx.BlockNumber,
|
BlockNumber: tx.BlockNumber,
|
||||||
HeaderID: headerID,
|
HeaderID: headerID,
|
||||||
StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||||
CID: shared.RemovedNodeStateCID,
|
CID: shared.RemovedNodeStateCID,
|
||||||
Removed: true,
|
Removed: true,
|
||||||
}
|
}
|
||||||
@ -408,12 +408,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
stateModel = models.StateNodeModel{
|
stateModel = models.StateNodeModel{
|
||||||
BlockNumber: tx.BlockNumber,
|
BlockNumber: tx.BlockNumber,
|
||||||
HeaderID: headerID,
|
HeaderID: headerID,
|
||||||
StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||||
CID: stateNode.AccountWrapper.CID,
|
CID: stateNode.AccountWrapper.CID,
|
||||||
Removed: false,
|
Removed: false,
|
||||||
Balance: stateNode.AccountWrapper.Account.Balance.String(),
|
Balance: stateNode.AccountWrapper.Account.Balance.String(),
|
||||||
Nonce: stateNode.AccountWrapper.Account.Nonce,
|
Nonce: stateNode.AccountWrapper.Account.Nonce,
|
||||||
CodeHash: core.BytesToHash(stateNode.AccountWrapper.Account.CodeHash).String(),
|
CodeHash: common.BytesToHash(stateNode.AccountWrapper.Account.CodeHash).String(),
|
||||||
StorageRoot: stateNode.AccountWrapper.Account.Root.String(),
|
StorageRoot: stateNode.AccountWrapper.Account.Root.String(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -430,8 +430,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
storageModel := models.StorageNodeModel{
|
storageModel := models.StorageNodeModel{
|
||||||
BlockNumber: tx.BlockNumber,
|
BlockNumber: tx.BlockNumber,
|
||||||
HeaderID: headerID,
|
HeaderID: headerID,
|
||||||
StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||||
StorageKey: core.BytesToHash(storageNode.LeafKey).String(),
|
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||||
CID: shared.RemovedNodeStorageCID,
|
CID: shared.RemovedNodeStorageCID,
|
||||||
Removed: true,
|
Removed: true,
|
||||||
Value: []byte{},
|
Value: []byte{},
|
||||||
@ -444,8 +444,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
storageModel := models.StorageNodeModel{
|
storageModel := models.StorageNodeModel{
|
||||||
BlockNumber: tx.BlockNumber,
|
BlockNumber: tx.BlockNumber,
|
||||||
HeaderID: headerID,
|
HeaderID: headerID,
|
||||||
StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||||
StorageKey: core.BytesToHash(storageNode.LeafKey).String(),
|
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||||
CID: storageNode.CID,
|
CID: storageNode.CID,
|
||||||
Removed: false,
|
Removed: false,
|
||||||
Value: storageNode.Value,
|
Value: storageNode.Value,
|
||||||
@ -468,6 +468,11 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the database.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return sdi.dbWriter.hasHeader(hash, number)
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.dbWriter.Close()
|
return sdi.dbWriter.Close()
|
||||||
@ -476,7 +481,7 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
// Update the known gaps table with the gap information.
|
// Update the known gaps table with the gap information.
|
||||||
|
|
||||||
// LoadWatchedAddresses reads watched addresses from the database
|
// LoadWatchedAddresses reads watched addresses from the database
|
||||||
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]core.Address, error) {
|
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
|
||||||
addressStrings := make([]string, 0)
|
addressStrings := make([]string, 0)
|
||||||
pgStr := "SELECT address FROM eth_meta.watched_addresses"
|
pgStr := "SELECT address FROM eth_meta.watched_addresses"
|
||||||
err := sdi.dbWriter.db.Select(sdi.ctx, &addressStrings, pgStr)
|
err := sdi.dbWriter.db.Select(sdi.ctx, &addressStrings, pgStr)
|
||||||
@ -484,9 +489,9 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]core.Address, error) {
|
|||||||
return nil, fmt.Errorf("error loading watched addresses: %v", err)
|
return nil, fmt.Errorf("error loading watched addresses: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
watchedAddresses := []core.Address{}
|
watchedAddresses := []common.Address{}
|
||||||
for _, addressString := range addressStrings {
|
for _, addressString := range addressStrings {
|
||||||
watchedAddresses = append(watchedAddresses, core.HexToAddress(addressString))
|
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
|
||||||
}
|
}
|
||||||
|
|
||||||
return watchedAddresses, nil
|
return watchedAddresses, nil
|
||||||
|
@ -45,6 +45,7 @@ type Driver interface {
|
|||||||
|
|
||||||
// Statements interface to accommodate different SQL query syntax
|
// Statements interface to accommodate different SQL query syntax
|
||||||
type Statements interface {
|
type Statements interface {
|
||||||
|
ExistsHeaderStm() string
|
||||||
InsertHeaderStm() string
|
InsertHeaderStm() string
|
||||||
InsertUncleStm() string
|
InsertUncleStm() string
|
||||||
InsertTxStm() string
|
InsertTxStm() string
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
|
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
|
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
|
||||||
)
|
)
|
||||||
@ -39,6 +41,10 @@ type DB struct {
|
|||||||
sql.Driver
|
sql.Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) ExistsHeaderStm() string {
|
||||||
|
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name)
|
||||||
|
}
|
||||||
|
|
||||||
// InsertHeaderStm satisfies the sql.Statements interface
|
// InsertHeaderStm satisfies the sql.Statements interface
|
||||||
// Stm == Statement
|
// Stm == Statement
|
||||||
func (db *DB) InsertHeaderStm() string {
|
func (db *DB) InsertHeaderStm() string {
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/jackc/pgtype"
|
"github.com/jackc/pgtype"
|
||||||
shopspring "github.com/jackc/pgtype/ext/shopspring-numeric"
|
shopspring "github.com/jackc/pgtype/ext/shopspring-numeric"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
@ -46,6 +47,11 @@ func (w *Writer) Close() error {
|
|||||||
return w.db.Close()
|
return w.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
|
||||||
|
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists)
|
||||||
|
return exists, err
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
|
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
// StateDiffIndexer interface required to index statediff data
|
// StateDiffIndexer interface required to index statediff data
|
||||||
type StateDiffIndexer interface {
|
type StateDiffIndexer interface {
|
||||||
|
HasBlock(hash common.Hash, number uint64) (bool, error)
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
||||||
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
||||||
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
|
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
|
||||||
|
@ -23,9 +23,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
func countStateDiffBegin(block *types.Block, logger log.Logger) time.Time {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
logger := log.New("hash", block.Hash().String(), "number", block.NumberU64())
|
|
||||||
|
|
||||||
defaultStatediffMetrics.underway.Inc(1)
|
defaultStatediffMetrics.underway.Inc(1)
|
||||||
logger.Debug("writeStateDiff BEGIN",
|
logger.Debug("writeStateDiff BEGIN",
|
||||||
@ -35,7 +34,7 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
|||||||
"total_time", defaultStatediffMetrics.totalProcessingTime.Value(),
|
"total_time", defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||||
)
|
)
|
||||||
|
|
||||||
return start, logger
|
return start
|
||||||
}
|
}
|
||||||
|
|
||||||
func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
|
func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
|
||||||
|
43
service.go
43
service.go
@ -107,6 +107,9 @@ type Service struct {
|
|||||||
// Map of block number to in-flight jobs (for WriteStateDiffAt)
|
// Map of block number to in-flight jobs (for WriteStateDiffAt)
|
||||||
currentJobs map[uint64]JobID
|
currentJobs map[uint64]JobID
|
||||||
currentJobsMutex sync.Mutex
|
currentJobsMutex sync.Mutex
|
||||||
|
// All in-progress statediff jobs
|
||||||
|
currentBlocks map[string]bool
|
||||||
|
currentBlocksMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// ID for identifying client subscriptions
|
// ID for identifying client subscriptions
|
||||||
@ -169,6 +172,7 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
|
|||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
jobStatusSubs: map[SubID]jobStatusSubscription{},
|
jobStatusSubs: map[SubID]jobStatusSubscription{},
|
||||||
currentJobs: map[uint64]JobID{},
|
currentJobs: map[uint64]JobID{},
|
||||||
|
currentBlocks: map[string]bool{},
|
||||||
writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams},
|
writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -674,14 +678,45 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
|||||||
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
|
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Claim exclusive access for state diffing the specified block.
|
||||||
|
// Returns true and a function to release access if successful, else false, nil.
|
||||||
|
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
|
||||||
|
sds.currentBlocksMutex.Lock()
|
||||||
|
defer sds.currentBlocksMutex.Unlock()
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
|
||||||
|
if sds.currentBlocks[key] {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
sds.currentBlocks[key] = true
|
||||||
|
return true, func() {
|
||||||
|
sds.currentBlocksMutex.Lock()
|
||||||
|
defer sds.currentBlocksMutex.Unlock()
|
||||||
|
delete(sds.currentBlocks, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
|
log := log.New("hash", block.Hash(), "number", block.Number())
|
||||||
|
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
|
||||||
|
defer relinquish()
|
||||||
|
} else {
|
||||||
|
log.Info("Not writing, statediff in progress.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done {
|
||||||
|
log.Info("Not writing, statediff already done.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var totalDifficulty *big.Int
|
var totalDifficulty *big.Int
|
||||||
var receipts types.Receipts
|
var receipts types.Receipts
|
||||||
var err error
|
var err error
|
||||||
var tx interfaces.Batch
|
var tx interfaces.Batch
|
||||||
start, logger := countStateDiffBegin(block)
|
|
||||||
defer countStateDiffEnd(start, logger, &err)
|
start := countStateDiffBegin(block, log)
|
||||||
|
defer countStateDiffEnd(start, log, &err)
|
||||||
if sds.indexer == nil {
|
if sds.indexer == nil {
|
||||||
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
|
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
|
||||||
}
|
}
|
||||||
@ -698,12 +733,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
}
|
}
|
||||||
|
|
||||||
output := func(node types2.StateLeafNode) error {
|
output := func(node types2.StateLeafNode) error {
|
||||||
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), logger,
|
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log,
|
||||||
metrics.IndexerMetrics.OutputTimer)
|
metrics.IndexerMetrics.OutputTimer)
|
||||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||||
}
|
}
|
||||||
ipldOutput := func(c types2.IPLD) error {
|
ipldOutput := func(c types2.IPLD) error {
|
||||||
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger,
|
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log,
|
||||||
metrics.IndexerMetrics.IPLDOutputTimer)
|
metrics.IndexerMetrics.IPLDOutputTimer)
|
||||||
return sds.indexer.PushIPLD(tx, c)
|
return sds.indexer.PushIPLD(tx, c)
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,10 @@ type StateDiffIndexer struct{}
|
|||||||
|
|
||||||
type batch struct{}
|
type batch struct{}
|
||||||
|
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||||
return &batch{}, nil
|
return &batch{}, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user