Concurrent statediff iteration #12

Merged
roysc merged 14 commits from use-concurrent-iterator into main 2023-09-22 08:44:38 +00:00
Showing only changes of commit 2b680598eb - Show all commits

View File

@ -57,7 +57,7 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
nodeID string nodeID string
wg *sync.WaitGroup wg *sync.WaitGroup
removedCacheFlag *uint32 removedCacheFlag uint32
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@ -130,7 +130,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32)
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
@ -388,8 +387,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
if stateNode.Removed { if stateNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
@ -419,8 +418,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them // if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageDiff { for _, storageNode := range stateNode.StorageDiff {
if storageNode.Removed { if storageNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{