go-ethereum/statediff/service.go
Michael 0c1e451c98 V1.10.23 statediff v4 wip (#275)
* Statediff Geth

Handle conflicts (#244)

* Handle conflicts

* Update go mod file versions

* Make lint changes

Disassociate block number from the indexer object

Update ipld-eth-db ref

Refactor builder code to make it reusable

Use prefix comparison for account selective statediffing

Update builder unit tests

Add mode to write to CSV files in statediff file writer (#249)

* Change file writing mode to csv files

* Implement writer interface for file indexer

* Implement option for csv or sql in file mode

* Close files in CSV writer

* Add tests for CSV file mode

* Implement CSV file for watched addresses

* Separate test configs for CSV and SQL

* Refactor common code for file indexer tests

Update indexer to include block hash in receipts and logs (#256)

* Update indexer to include block hash in receipts and logs

* Upgrade ipld-eth-db image in docker-compose to run tests

Use watched addresses from direct indexing params by default while serving statediff APIs (#262)

* Use watched addresses from direct indexing params in statediff APIs by default

* Avoid using indexer object when direct indexing is off

* Add nil check before accessing watched addresses from direct indexing params

Rebase missed these changes needed at 1.10.20

Flags cleanup for CLI changes and linter complaints

Linter appeasements to achieve perfection

enforce go 1.18 for check (#267)

* enforce go 1.18 for check

* tests on 1.18 as well

* adding db yml for possible change in docker-compose behavior in yml parsing

Add indexer tests for handling non canonical blocks (#254)

* Add indexer tests for header and transactions in a non canonical block

* Add indexer tests for receipts in a non-canonical block and refactor

* Add indexer tests for logs in a non-canonical block

* Add indexer tests for state and storage nodes in a non-canonical block

* Add indexer tests for non-canonical block at another height

* Avoid passing address of a pointer

* Update refs in GitHub workflow

* Add genesis file path to stack-orchestrator config in GitHub workflow

* Add descriptive comments

fix non-deterministic ordering in unit tests

Refactor indexer tests to avoid duplicate code (#270)

* Refactor indexer tests to avoid duplicate code

* Refactor file mode indexer tests

* Fix expected db stats for sqlx after tx closure

* Refactor indexer tests for legacy block

* Refactor mainnet indexer tests

* Refactor tests for watched addressess methods

* Fix query in legacy indexer test

rebase and resolve onto 1.10.23... still error out of index related to GetLeafKeys

changed trie.Commit behavior was subtle about not not flushing to disk without an Update

* no merge nodeset throws nil

* linter appeasement

Co-authored-by: Abdul Rabbani <abdulrabbani00@gmail.com>
2022-09-01 15:16:07 -04:00

1034 lines
36 KiB
Go

// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package statediff
import (
"bytes"
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
"github.com/thoas/go-funk"
)
const (
chainEventChanSize = 20000
genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
)
var writeLoopParams = ParamsWithMutex{
Params: Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
},
}
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
CurrentBlock() *types.Block
GetBlockByHash(hash common.Hash) *types.Block
GetBlockByNumber(number uint64) *types.Block
GetReceiptsByHash(hash common.Hash) types.Receipts
GetTd(hash common.Hash, number uint64) *big.Int
UnlockTrie(root common.Hash)
StateCache() state.Database
}
// IService is the state-diffing service interface
type IService interface {
// Lifecycle Start() and Stop() methods
node.Lifecycle
// APIs method for getting API(s) for this service
APIs() []rpc.API
// Loop is the main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// Subscribe method to subscribe to receive state diff processing output`
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
// Unsubscribe method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
// StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params Params) (*Payload, error)
// StateTrieAt method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
// StreamCodeAndCodeHash method to stream out all code and codehash pairs
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool)
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params Params) error
// WriteStateDiffFor method to write state diff object directly to DB
WriteStateDiffFor(blockHash common.Hash, params Params) error
// WriteLoop event loop for progressively processing and writing diffs directly to DB
WriteLoop(chainEventCh chan core.ChainEvent)
// Method to change the addresses being watched in write loop params
WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error
}
// Service is the underlying struct for the state diffing service
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
// Used to build the state diff objects
Builder Builder
// Used to subscribe to chain events (blocks)
BlockChain blockChain
// Used to signal shutdown of the service
QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the Params rlp)
Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription params rlp hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]Params
// Cache the last block so that we can avoid having to lookup the next block's parent
BlockCache BlockCache
// The publicBackendAPI which provides useful information about the current state
BackendAPI ethapi.Backend
// Should the statediff service wait for geth to sync to head?
WaitForSync bool
// Used to signal if we should check for KnownGaps
KnownGaps KnownGapsState
// Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32
// Interface for publishing statediffs as PG-IPLD objects
indexer interfaces.StateDiffIndexer
// Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool
// Size of the worker pool
numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
}
// BlockCache caches the last block for safe access from different service loops
type BlockCache struct {
sync.Mutex
blocks map[common.Hash]*types.Block
maxSize uint
}
func NewBlockCache(max uint) BlockCache {
return BlockCache{
blocks: make(map[common.Hash]*types.Block),
maxSize: max,
}
}
// New creates a new statediff.Service
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
blockChain := ethServ.BlockChain()
var indexer interfaces.StateDiffIndexer
var db sql.Database
var err error
quitCh := make(chan bool)
indexerConfigAvailable := params.IndexerConfig != nil
if indexerConfigAvailable {
info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
ChainID: blockChain.Config().ChainID.Uint64(),
ID: params.ID,
ClientName: params.ClientName,
}
var err error
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
if err != nil {
return err
}
indexer.ReportDBMetrics(10*time.Second, quitCh)
}
workers := params.NumWorkers
if workers == 0 {
workers = 1
}
// If we ever have multiple processingKeys we can update them here
// along with the expectedDifference
knownGaps := &KnownGapsState{
processingKey: 0,
expectedDifference: big.NewInt(1),
errorState: false,
writeFilePath: params.KnownGapsFilePath,
db: db,
statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false,
}
if indexerConfigAvailable {
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false
}
}
sds := &Service{
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: params.WaitForSync,
KnownGaps: *knownGaps,
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
}
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())
if indexerConfigAvailable {
err = loadWatchedAddresses(indexer)
if err != nil {
return err
}
}
return nil
}
// Protocols exports the services p2p protocols, this service has none
func (sds *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns the RPC descriptors the statediff.Service offers
func (sds *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: APIName,
Version: APIVersion,
Service: NewPublicStateDiffAPI(sds),
Public: true,
},
}
}
// Return the parent block of currentBlock, using the cached block if available;
// and cache the passed block
func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block {
lbc.Lock()
parentHash := currentBlock.ParentHash()
var parentBlock *types.Block
if block, ok := lbc.blocks[parentHash]; ok {
parentBlock = block
if len(lbc.blocks) > int(lbc.maxSize) {
delete(lbc.blocks, parentHash)
}
} else {
parentBlock = bc.GetBlockByHash(parentHash)
}
lbc.blocks[currentBlock.Hash()] = currentBlock
lbc.Unlock()
return parentBlock
}
type workerParams struct {
chainEventCh <-chan core.ChainEvent
wg *sync.WaitGroup
id uint
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err()
var wg sync.WaitGroup
// Process metrics for chain events, then forward to workers
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case chainEvent := <-chainEventCh:
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "err", err)
}
return
case <-sds.QuitChan:
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "err", err)
}
return
}
}
}()
wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker}
go sds.writeLoopWorker(params)
}
wg.Wait()
}
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId)
return
}
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
}
func (sds *Service) writeLoopWorker(params workerParams) {
defer params.wg.Done()
for {
select {
//Notify chain event channel of events
case chainEvent := <-params.chainEventCh:
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
currentBlock := chainEvent.Block
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue
}
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentBlock.Number().Uint64() == genesisBlockNumber {
sds.writeGenesisStateDiff(parentBlock, params.id)
}
// If for any reason we need to check for gaps,
// Check and update the gaps table.
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
sds.KnownGaps.checkForGaps = false
}
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
sds.KnownGaps.errorState = true
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "blockNumber", currentBlock.Number())
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
// Write object to startdiff
continue
}
sds.KnownGaps.errorState = false
if sds.KnownGaps.knownErrorBlocks != nil {
// We must pass in parameters by VALUE not reference.
// If we pass them in my reference, the references can change before the computation is complete!
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
sds.KnownGaps.knownErrorBlocks = nil
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
}
if sds.KnownGaps.sqlFileWaitingForWrite {
log.Info("There are entries in the SQL file for knownGaps that should be written")
err := sds.KnownGaps.writeSqlFileStmtToDb()
if err != nil {
log.Error("Unable to write KnownGap sql file to DB")
}
}
// TODO: how to handle with concurrent workers
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case <-sds.QuitChan:
log.Info("Quitting the statediff writing process", "worker", params.id)
return
}
}
}
// Loop is the main processing method
func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
log.Info("Starting statediff listening loop")
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err()
for {
select {
//Notify chain event channel of events
case chainEvent := <-chainEventCh:
statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
log.Debug("Loop(): chain event received", "event", chainEvent)
// if we don't have any subscribers, do not process a statediff
if atomic.LoadInt32(&sds.subscribers) == 0 {
log.Debug("Currently no subscribers to the statediffing service; processing is halted")
continue
}
currentBlock := chainEvent.Block
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue
}
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentBlock.Number().Uint64() == genesisBlockNumber {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
sds.streamStateDiff(parentBlock, common.Hash{})
}
sds.streamStateDiff(currentBlock, parentBlock.Root())
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
log.Info("Quitting the statediffing listening loop")
sds.close()
return
case <-sds.QuitChan:
log.Info("Quitting the statediffing listening loop")
sds.close()
return
}
}
}
// streamStateDiff method builds the state diff payload for each subscription according to their subscription type and sends them the result
func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common.Hash) {
sds.Lock()
for ty, subs := range sds.Subscriptions {
params, ok := sds.SubscriptionTypes[ty]
if !ok {
log.Error("no parameter set associated with this subscription", "subscription type", ty.Hex())
sds.closeType(ty)
continue
}
// create payload for this subscription type
payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
if err != nil {
log.Error("statediff processing error", "block height", currentBlock.Number().Uint64(), "parameters", params, "error", err.Error())
continue
}
for id, sub := range subs {
select {
case sub.PayloadChan <- *payload:
log.Debug("sending statediff payload at head", "height", currentBlock.Number(), "subscription id", id)
default:
log.Info("unable to send statediff payload; channel has no receiver", "subscription id", id)
}
}
}
sds.Unlock()
}
// StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
}
// StateDiffFor returns a state diff object payload for the specific blockhash
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
if currentBlock.NumberU64() == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
}
// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params Params) (*Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(Args{
NewStateRoot: currentBlock.Root(),
OldStateRoot: parentRoot,
BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(),
}, params)
// allow dereferencing of parent, keep current locked as it should be the next parent
sds.BlockChain.UnlockTrie(parentRoot)
if err != nil {
return nil, err
}
stateDiffRlp, err := rlp.EncodeToBytes(&stateDiff)
if err != nil {
return nil, err
}
log.Info("state diff size", "at block height", currentBlock.Number().Uint64(), "rlp byte size", len(stateDiffRlp))
return sds.newPayload(stateDiffRlp, currentBlock, params)
}
func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Params) (*Payload, error) {
payload := &Payload{
StateObjectRlp: stateObject,
}
if params.IncludeBlock {
blockBuff := new(bytes.Buffer)
if err := block.EncodeRLP(blockBuff); err != nil {
return nil, err
}
payload.BlockRlp = blockBuff.Bytes()
}
if params.IncludeTD {
payload.TotalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
}
if params.IncludeReceipts {
receiptBuff := new(bytes.Buffer)
receipts := sds.BlockChain.GetReceiptsByHash(block.Hash())
if err := rlp.Encode(receiptBuff, receipts); err != nil {
return nil, err
}
payload.ReceiptsRlp = receiptBuff.Bytes()
}
return payload, nil
}
// StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state trie", "block height", blockNumber)
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
return sds.processStateTrie(currentBlock, params)
}
func (sds *Service) processStateTrie(block *types.Block, params Params) (*Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil {
return nil, err
}
stateTrieRlp, err := rlp.EncodeToBytes(&stateNodes)
if err != nil {
return nil, err
}
log.Info("state trie size", "at block height", block.Number().Uint64(), "rlp byte size", len(stateTrieRlp))
return sds.newPayload(stateTrieRlp, block, params)
}
// Subscribe is used by the API to subscribe to the service loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) {
log.Info("Subscribing to the statediff service")
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
log.Info("State diffing subscription received; beginning statediff processing")
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
// Subscription type is defined as the hash of the rlp-serialized subscription params
by, err := rlp.EncodeToBytes(&params)
if err != nil {
log.Error("State diffing params need to be rlp-serializable")
return
}
subscriptionType := crypto.Keccak256Hash(by)
// Add subscriber
sds.Lock()
if sds.Subscriptions[subscriptionType] == nil {
sds.Subscriptions[subscriptionType] = make(map[rpc.ID]Subscription)
}
sds.Subscriptions[subscriptionType][id] = Subscription{
PayloadChan: sub,
QuitChan: quitChan,
}
sds.SubscriptionTypes[subscriptionType] = params
sds.Unlock()
}
// Unsubscribe is used to unsubscribe from the service loop
func (sds *Service) Unsubscribe(id rpc.ID) error {
log.Info("Unsubscribing from the statediff service", "subscription id", id)
sds.Lock()
for ty := range sds.Subscriptions {
delete(sds.Subscriptions[ty], id)
if len(sds.Subscriptions[ty]) == 0 {
// If we removed the last subscription of this type, remove the subscription type outright
delete(sds.Subscriptions, ty)
delete(sds.SubscriptionTypes, ty)
}
}
if len(sds.Subscriptions) == 0 {
if atomic.CompareAndSwapInt32(&sds.subscribers, 1, 0) {
log.Info("No more subscriptions; halting statediff processing")
}
}
sds.Unlock()
return nil
}
// This function will check the status of geth syncing.
// It will return false if geth has finished syncing.
// It will return a true Geth is still syncing.
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
syncStatus, err := pubEthAPI.Syncing()
if err != nil {
return true, err
}
if syncStatus != false {
return true, err
}
return false, err
}
// This function calls GetSyncStatus to check if we have caught up to head.
// It will keep looking and checking if we have caught up to head.
// It will only complete if we catch up to head, otherwise it will keep looping forever.
func (sds *Service) WaitingForSync() error {
log.Info("We are going to wait for geth to sync to head!")
// Has the geth node synced to head?
Synced := false
pubEthAPI := ethapi.NewEthereumAPI(sds.BackendAPI)
for !Synced {
syncStatus, err := sds.GetSyncStatus(pubEthAPI)
if err != nil {
return err
}
if !syncStatus {
log.Info("Geth has caught up to the head of the chain")
Synced = true
} else {
time.Sleep(1 * time.Second)
}
}
return nil
}
// Start is used to begin the service
func (sds *Service) Start() error {
log.Info("Starting statediff service")
if sds.WaitForSync {
log.Info("Statediff service will wait until geth has caught up to the head of the chain.")
err := sds.WaitingForSync()
if err != nil {
return err
}
log.Info("Continuing with startdiff start process")
}
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.Loop(chainEventCh)
if sds.enableWriteLoop {
log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params)
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.WriteLoop(chainEventCh)
}
return nil
}
// Stop is used to close down the service
func (sds *Service) Stop() error {
log.Info("Stopping statediff service")
close(sds.QuitChan)
return nil
}
// close is used to close all listening subscriptions
func (sds *Service) close() {
sds.Lock()
for ty, subs := range sds.Subscriptions {
for id, sub := range subs {
select {
case sub.QuitChan <- true:
log.Info("closing subscription", "id", id)
default:
log.Info("unable to close subscription; channel has no receiver", "subscription id", id)
}
delete(sds.Subscriptions[ty], id)
}
delete(sds.Subscriptions, ty)
delete(sds.SubscriptionTypes, ty)
}
sds.Unlock()
}
// closeType is used to close all subscriptions of given type
// closeType needs to be called with subscription access locked
func (sds *Service) closeType(subType common.Hash) {
subs := sds.Subscriptions[subType]
for id, sub := range subs {
sendNonBlockingQuit(id, sub)
}
delete(sds.Subscriptions, subType)
delete(sds.SubscriptionTypes, subType)
}
func sendNonBlockingQuit(id rpc.ID, sub Subscription) {
select {
case sub.QuitChan <- true:
log.Info("closing subscription", "id", id)
default:
log.Info("unable to close subscription; channel has no receiver", "subscription id", id)
}
}
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) {
current := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending code and codehash", "block height", blockNumber)
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil {
log.Error("error creating trie for block", "block height", current.Number(), "err", err)
close(quitChan)
return
}
it := currentTrie.NodeIterator([]byte{})
leafIt := trie.NewIterator(it)
go func() {
defer close(quitChan)
for leafIt.Next() {
select {
case <-sds.QuitChan:
return
default:
}
account := new(types.StateAccount)
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
log.Error("error decoding state account", "err", err)
return
}
codeHash := common.BytesToHash(account.CodeHash)
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
if err != nil {
log.Error("error collecting contract code", "err", err)
return
}
outChan <- types2.CodeAndCodeHash{
Hash: codeHash,
Code: code,
}
}
}()
}
// WriteStateDiffAt writes a state diff at the specific blockheight directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
parentRoot := common.Hash{}
if blockNumber != 0 {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
parentRoot := common.Hash{}
if currentBlock.NumberU64() != 0 {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
// log.Info("Writing state diff", "block height", block.Number().Uint64())
var totalDifficulty *big.Int
var receipts types.Receipts
var err error
var tx interfaces.Batch
if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
}
if params.IncludeReceipts {
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
}
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
if err != nil {
return err
}
// defer handling of commit/rollback for any return case
defer func() {
if err := tx.Submit(err); err != nil {
log.Error("batch transaction submission failed", "err", err)
}
}()
output := func(node types2.StateNode) error {
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
codeOutput := func(c types2.CodeAndCodeHash) error {
return sds.indexer.PushCodeAndCodeHash(tx, c)
}
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
}, params, output, codeOutput)
// allow dereferencing of parent, keep current locked as it should be the next parent
sds.BlockChain.UnlockTrie(parentRoot)
if err != nil {
return err
}
return nil
}
// Wrapper function on writeStateDiff to retry when the deadlock is detected.
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
var err error
for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
if i != sds.maxRetry {
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
}
continue
}
break
}
return err
}
// Performs one of following operations on the watched addresses in writeLoopParams and the db:
// add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
// lock writeLoopParams for a write
writeLoopParams.Lock()
defer writeLoopParams.Unlock()
// get the current block number
currentBlockNumber := sds.BlockChain.CurrentBlock().Number()
switch operation {
case types2.Add:
// filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg types2.WatchAddressArg) bool {
if funk.Contains(writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) {
log.Warn("Address already being watched", "address", arg.Address)
return false
}
return true
}).([]types2.WatchAddressArg)
if !ok {
return fmt.Errorf("add: filtered args %s", typeAssertionFailed)
}
// get addresses from the filtered args
filteredAddresses, err := MapWatchAddressArgsToAddresses(filteredArgs)
if err != nil {
return fmt.Errorf("add: filtered addresses %s", err.Error())
}
// update the db
if sds.indexer != nil {
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
}
// update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Remove:
// get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("remove: mapped addresses %s", err.Error())
}
// remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok {
return fmt.Errorf("remove: filtered addresses %s", typeAssertionFailed)
}
// update the db
if sds.indexer != nil {
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
}
// update in-memory params
writeLoopParams.WatchedAddresses = addresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Set:
// get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("set: mapped addresses %s", err.Error())
}
// update the db
if sds.indexer != nil {
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
}
// update in-memory params
writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear:
// update the db
if sds.indexer != nil {
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
}
}
// update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{}
writeLoopParams.ComputeWatchedAddressesLeafPaths()
default:
return fmt.Errorf("%s %s", unexpectedOperation, operation)
}
return nil
}
// loadWatchedAddresses loads watched addresses to in-memory write loop params
func loadWatchedAddresses(indexer interfaces.StateDiffIndexer) error {
watchedAddresses, err := indexer.LoadWatchedAddresses()
if err != nil {
return err
}
writeLoopParams.Lock()
defer writeLoopParams.Unlock()
writeLoopParams.WatchedAddresses = watchedAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
return nil
}
// MapWatchAddressArgsToAddresses maps []WatchAddressArg to corresponding []common.Address
func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Address, error) {
addresses, ok := funk.Map(args, func(arg types2.WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return nil, fmt.Errorf(typeAssertionFailed)
}
return addresses, nil
}