Use watched addresses from direct indexing params by default while serving statediff APIs (#262)

* Use watched addresses from direct indexing params in statediff APIs by default

* Avoid using indexer object when direct indexing is off

* Add nil check before accessing watched addresses from direct indexing params
This commit is contained in:
prathamesh0 2022-07-25 18:10:09 +05:30 committed by GitHub
parent 7b4ef34de2
commit 5083ee5837
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 24 deletions

View File

@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var db sql.Database var db sql.Database
var err error var err error
quitCh := make(chan bool) quitCh := make(chan bool)
if params.IndexerConfig != nil { indexerConfigAvailable := params.IndexerConfig != nil
if indexerConfigAvailable {
info := nodeinfo.Info{ info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(), GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(cfg.NetworkId, 10), NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
@ -201,12 +202,14 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
statediffMetrics: statediffMetrics, statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false, sqlFileWaitingForWrite: false,
} }
if indexerConfigAvailable {
if params.IndexerConfig.Type() == shared.POSTGRES { if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true knownGaps.checkForGaps = true
} else { } else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false knownGaps.checkForGaps = false
} }
}
sds := &Service{ sds := &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
BlockChain: blockChain, BlockChain: blockChain,
@ -226,10 +229,12 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
if indexerConfigAvailable {
err = loadWatchedAddresses(indexer) err = loadWatchedAddresses(indexer)
if err != nil { if err != nil {
return err return err
} }
}
return nil return nil
} }
@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber) log.Info("sending state diff", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash) log.Info("sending state diff", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -895,10 +932,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...) writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
@ -917,10 +956,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.RemoveWatchedAddresses(args) err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = addresses writeLoopParams.WatchedAddresses = addresses
@ -933,20 +974,24 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = argAddresses writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths() writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear: case types2.Clear:
// update the db // update the db
if sds.indexer != nil {
err := sds.indexer.ClearWatchedAddresses() err := sds.indexer.ClearWatchedAddresses()
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{} writeLoopParams.WatchedAddresses = []common.Address{}

View File

@ -81,6 +81,7 @@ var (
IncludeBlock: true, IncludeBlock: true,
IncludeReceipts: true, IncludeReceipts: true,
IncludeTD: true, IncludeTD: true,
WatchedAddresses: []common.Address{},
} }
) )