Backport fixes for issue #367 to v4. #394
| @ -504,6 +504,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // HasBlock checks whether the indicated block already exists in the output.
 | ||||||
|  | // In the "dump" case, this is presumed to be false.
 | ||||||
|  | func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { | ||||||
|  | 	return false, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Close satisfies io.Closer
 | // Close satisfies io.Closer
 | ||||||
| func (sdi *StateDiffIndexer) Close() error { | func (sdi *StateDiffIndexer) Close() error { | ||||||
| 	return sdi.dump.Close() | 	return sdi.dump.Close() | ||||||
|  | |||||||
| @ -542,6 +542,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // HasBlock checks whether the indicated block already exists in the output.
 | ||||||
|  | // In the "file" case this is presumed to be false.
 | ||||||
|  | func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { | ||||||
|  | 	return false, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Close satisfies io.Closer
 | // Close satisfies io.Closer
 | ||||||
| func (sdi *StateDiffIndexer) Close() error { | func (sdi *StateDiffIndexer) Close() error { | ||||||
| 	return sdi.fileWriter.Close() | 	return sdi.fileWriter.Close() | ||||||
|  | |||||||
| @ -227,6 +227,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip | |||||||
| 	return blockTx, err | 	return blockTx, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // HasBlock checks whether the indicated block already exists in the database.
 | ||||||
|  | func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { | ||||||
|  | 	return sdi.dbWriter.hasHeader(hash, number) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // processHeader publishes and indexes a header IPLD in Postgres
 | // processHeader publishes and indexes a header IPLD in Postgres
 | ||||||
| // it returns the headerID
 | // it returns the headerID
 | ||||||
| func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { | func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { | ||||||
|  | |||||||
| @ -45,6 +45,7 @@ type Driver interface { | |||||||
| 
 | 
 | ||||||
| // Statements interface to accommodate different SQL query syntax
 | // Statements interface to accommodate different SQL query syntax
 | ||||||
| type Statements interface { | type Statements interface { | ||||||
|  | 	ExistsHeaderStm() string | ||||||
| 	InsertHeaderStm() string | 	InsertHeaderStm() string | ||||||
| 	InsertUncleStm() string | 	InsertUncleStm() string | ||||||
| 	InsertTxStm() string | 	InsertTxStm() string | ||||||
|  | |||||||
| @ -84,13 +84,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { | |||||||
| 	for _, item := range tx.cache { | 	for _, item := range tx.cache { | ||||||
| 		switch item := item.(type) { | 		switch item := item.(type) { | ||||||
| 		case *copyFrom: | 		case *copyFrom: | ||||||
| 			_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) | 			_, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Error("COPY error", "table", item.tableName, "err", err) | 				log.Error("COPY error", "table", item.tableName, "err", err) | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 		case cachedStmt: | 		case cachedStmt: | ||||||
| 			_, err := base.Exec(ctx, item.sql, item.args...) | 			_, err = base.Exec(ctx, item.sql, item.args...) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
|  | |||||||
| @ -16,7 +16,9 @@ | |||||||
| 
 | 
 | ||||||
| package postgres | package postgres | ||||||
| 
 | 
 | ||||||
| import "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" | import ( | ||||||
|  | 	"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| var _ sql.Database = &DB{} | var _ sql.Database = &DB{} | ||||||
| 
 | 
 | ||||||
| @ -36,6 +38,11 @@ type DB struct { | |||||||
| 	sql.Driver | 	sql.Driver | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // ExistsHeaderStm satisfies the sql.Statements interface
 | ||||||
|  | func (db *DB) ExistsHeaderStm() string { | ||||||
|  | 	return "SELECT EXISTS(SELECT 1 from eth.header_cids WHERE block_number = $1 AND block_hash = $2 LIMIT 1)" | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // InsertHeaderStm satisfies the sql.Statements interface
 | // InsertHeaderStm satisfies the sql.Statements interface
 | ||||||
| // Stm == Statement
 | // Stm == Statement
 | ||||||
| func (db *DB) InsertHeaderStm() string { | func (db *DB) InsertHeaderStm() string { | ||||||
|  | |||||||
| @ -50,6 +50,11 @@ func (w *Writer) Close() error { | |||||||
| 	return w.db.Close() | 	return w.db.Close() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { | ||||||
|  | 	err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) | ||||||
|  | 	return exists, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /* | /* | ||||||
| INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) | INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) | ||||||
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) | VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) | ||||||
|  | |||||||
| @ -29,6 +29,7 @@ import ( | |||||||
| 
 | 
 | ||||||
| // StateDiffIndexer interface required to index statediff data
 | // StateDiffIndexer interface required to index statediff data
 | ||||||
| type StateDiffIndexer interface { | type StateDiffIndexer interface { | ||||||
|  | 	HasBlock(hash common.Hash, number uint64) (bool, error) | ||||||
| 	PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) | 	PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) | ||||||
| 	PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error | 	PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error | ||||||
| 	PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error | 	PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error | ||||||
|  | |||||||
| @ -39,18 +39,19 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) { | |||||||
| 	return start, logger | 	return start, logger | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration { | func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration { | ||||||
| 	duration := time.Since(start) | 	duration := time.Since(start) | ||||||
| 	defaultStatediffMetrics.underway.Dec(1) | 	defaultStatediffMetrics.underway.Dec(1) | ||||||
| 	if nil == err { | 	failed := nil != err && nil != *err | ||||||
| 		defaultStatediffMetrics.succeeded.Inc(1) | 	if failed { | ||||||
| 	} else { |  | ||||||
| 		defaultStatediffMetrics.failed.Inc(1) | 		defaultStatediffMetrics.failed.Inc(1) | ||||||
|  | 	} else { | ||||||
|  | 		defaultStatediffMetrics.succeeded.Inc(1) | ||||||
| 	} | 	} | ||||||
| 	defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds()) | 	defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds()) | ||||||
| 
 | 
 | ||||||
| 	logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]", | 	logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]", | ||||||
| 		duration.Milliseconds(), nil != err, | 		duration.Milliseconds(), failed, | ||||||
| 		defaultStatediffMetrics.underway.Count(), | 		defaultStatediffMetrics.underway.Count(), | ||||||
| 		defaultStatediffMetrics.succeeded.Count(), | 		defaultStatediffMetrics.succeeded.Count(), | ||||||
| 		defaultStatediffMetrics.failed.Count(), | 		defaultStatediffMetrics.failed.Count(), | ||||||
|  | |||||||
| @ -151,8 +151,10 @@ type Service struct { | |||||||
| 	// Job ID ticker
 | 	// Job ID ticker
 | ||||||
| 	lastJobID uint64 | 	lastJobID uint64 | ||||||
| 	// In flight jobs (for WriteStateDiffAt)
 | 	// In flight jobs (for WriteStateDiffAt)
 | ||||||
| 	currentJobs      map[uint64]JobID | 	currentJobs        map[uint64]JobID | ||||||
| 	currentJobsMutex sync.Mutex | 	currentJobsMutex   sync.Mutex | ||||||
|  | 	currentBlocks      map[string]bool | ||||||
|  | 	currentBlocksMutex sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // IDs used for tracking in-progress jobs (0 for invalid)
 | // IDs used for tracking in-progress jobs (0 for invalid)
 | ||||||
| @ -214,21 +216,24 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	sds := &Service{ | 	sds := &Service{ | ||||||
| 		Mutex:             sync.Mutex{}, | 		Mutex:              sync.Mutex{}, | ||||||
| 		BlockChain:        blockChain, | 		BlockChain:         blockChain, | ||||||
| 		Builder:           NewBuilder(blockChain.StateCache()), | 		Builder:            NewBuilder(blockChain.StateCache()), | ||||||
| 		QuitChan:          quitCh, | 		QuitChan:           quitCh, | ||||||
| 		Subscriptions:     make(map[common.Hash]map[rpc.ID]Subscription), | 		Subscriptions:      make(map[common.Hash]map[rpc.ID]Subscription), | ||||||
| 		SubscriptionTypes: make(map[common.Hash]Params), | 		SubscriptionTypes:  make(map[common.Hash]Params), | ||||||
| 		BlockCache:        NewBlockCache(workers), | 		BlockCache:         NewBlockCache(workers), | ||||||
| 		BackendAPI:        backend, | 		BackendAPI:         backend, | ||||||
| 		WaitForSync:       params.WaitForSync, | 		WaitForSync:        params.WaitForSync, | ||||||
| 		indexer:           indexer, | 		indexer:            indexer, | ||||||
| 		enableWriteLoop:   params.EnableWriteLoop, | 		enableWriteLoop:    params.EnableWriteLoop, | ||||||
| 		numWorkers:        workers, | 		numWorkers:         workers, | ||||||
| 		maxRetry:          defaultRetryLimit, | 		maxRetry:           defaultRetryLimit, | ||||||
| 		jobStatusSubs:     map[rpc.ID]statusSubscription{}, | 		jobStatusSubs:      map[rpc.ID]statusSubscription{}, | ||||||
| 		currentJobs:       map[uint64]JobID{}, | 		currentJobs:        map[uint64]JobID{}, | ||||||
|  | 		currentJobsMutex:   sync.Mutex{}, | ||||||
|  | 		currentBlocks:      map[string]bool{}, | ||||||
|  | 		currentBlocksMutex: sync.Mutex{}, | ||||||
| 	} | 	} | ||||||
| 	stack.RegisterLifecycle(sds) | 	stack.RegisterLifecycle(sds) | ||||||
| 	stack.RegisterAPIs(sds.APIs()) | 	stack.RegisterAPIs(sds.APIs()) | ||||||
| @ -251,21 +256,24 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index | |||||||
| 
 | 
 | ||||||
| 	quitCh := make(chan bool) | 	quitCh := make(chan bool) | ||||||
| 	sds := &Service{ | 	sds := &Service{ | ||||||
| 		Mutex:             sync.Mutex{}, | 		Mutex:              sync.Mutex{}, | ||||||
| 		BlockChain:        blockChain, | 		BlockChain:         blockChain, | ||||||
| 		Builder:           NewBuilder(blockChain.StateCache()), | 		Builder:            NewBuilder(blockChain.StateCache()), | ||||||
| 		QuitChan:          quitCh, | 		QuitChan:           quitCh, | ||||||
| 		Subscriptions:     make(map[common.Hash]map[rpc.ID]Subscription), | 		Subscriptions:      make(map[common.Hash]map[rpc.ID]Subscription), | ||||||
| 		SubscriptionTypes: make(map[common.Hash]Params), | 		SubscriptionTypes:  make(map[common.Hash]Params), | ||||||
| 		BlockCache:        NewBlockCache(workers), | 		BlockCache:         NewBlockCache(workers), | ||||||
| 		BackendAPI:        backend, | 		BackendAPI:         backend, | ||||||
| 		WaitForSync:       cfg.WaitForSync, | 		WaitForSync:        cfg.WaitForSync, | ||||||
| 		indexer:           indexer, | 		indexer:            indexer, | ||||||
| 		enableWriteLoop:   cfg.EnableWriteLoop, | 		enableWriteLoop:    cfg.EnableWriteLoop, | ||||||
| 		numWorkers:        workers, | 		numWorkers:         workers, | ||||||
| 		maxRetry:          defaultRetryLimit, | 		maxRetry:           defaultRetryLimit, | ||||||
| 		jobStatusSubs:     map[rpc.ID]statusSubscription{}, | 		jobStatusSubs:      map[rpc.ID]statusSubscription{}, | ||||||
| 		currentJobs:       map[uint64]JobID{}, | 		currentJobs:        map[uint64]JobID{}, | ||||||
|  | 		currentJobsMutex:   sync.Mutex{}, | ||||||
|  | 		currentBlocks:      map[string]bool{}, | ||||||
|  | 		currentBlocksMutex: sync.Mutex{}, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if indexer != nil { | 	if indexer != nil { | ||||||
| @ -877,14 +885,44 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro | |||||||
| 	return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) | 	return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // Claim exclusive access for state diffing the specified block.
 | ||||||
|  | // Returns true and a function to release access if successful, else false, nil.
 | ||||||
|  | func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { | ||||||
|  | 	sds.currentBlocksMutex.Lock() | ||||||
|  | 	defer sds.currentBlocksMutex.Unlock() | ||||||
|  | 
 | ||||||
|  | 	key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) | ||||||
|  | 	if sds.currentBlocks[key] { | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	sds.currentBlocks[key] = true | ||||||
|  | 	return true, func() { | ||||||
|  | 		sds.currentBlocksMutex.Lock() | ||||||
|  | 		defer sds.currentBlocksMutex.Unlock() | ||||||
|  | 		delete(sds.currentBlocks, key) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Writes a state diff from the current block, parent state root, and provided params
 | // Writes a state diff from the current block, parent state root, and provided params
 | ||||||
| func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { | func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { | ||||||
|  | 	if granted, relinquish := sds.claimExclusiveAccess(block); granted { | ||||||
|  | 		defer relinquish() | ||||||
|  | 	} else { | ||||||
|  | 		log.Info("Not writing, statediff in progress.", "number", block.NumberU64(), "hash", block.Hash().Hex()) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done { | ||||||
|  | 		log.Info("Not writing, statediff already done.", "number", block.NumberU64(), "hash", block.Hash().Hex()) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	var totalDifficulty *big.Int | 	var totalDifficulty *big.Int | ||||||
| 	var receipts types.Receipts | 	var receipts types.Receipts | ||||||
| 	var err error | 	var err error | ||||||
| 	var tx interfaces.Batch | 	var tx interfaces.Batch | ||||||
| 	start, logger := countStateDiffBegin(block) | 	start, logger := countStateDiffBegin(block) | ||||||
| 	defer countStateDiffEnd(start, logger, err) | 	defer countStateDiffEnd(start, logger, &err) | ||||||
| 	if params.IncludeTD { | 	if params.IncludeTD { | ||||||
| 		totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) | 		totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) | ||||||
| 	} | 	} | ||||||
| @ -916,7 +954,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p | |||||||
| 		BlockNumber:  block.Number(), | 		BlockNumber:  block.Number(), | ||||||
| 	}, params, output, codeOutput) | 	}, params, output, codeOutput) | ||||||
| 	// TODO this anti-pattern needs to be sorted out eventually
 | 	// TODO this anti-pattern needs to be sorted out eventually
 | ||||||
| 	if err := tx.Submit(err); err != nil { | 	if err = tx.Submit(err); err != nil { | ||||||
| 		return fmt.Errorf("batch transaction submission failed: %w", err) | 		return fmt.Errorf("batch transaction submission failed: %w", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -34,6 +34,10 @@ type StateDiffIndexer struct{} | |||||||
| 
 | 
 | ||||||
| type batch struct{} | type batch struct{} | ||||||
| 
 | 
 | ||||||
|  | func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { | ||||||
|  | 	return false, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { | func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { | ||||||
| 	return &batch{}, nil | 	return &batch{}, nil | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user