Use watched addresses from direct indexing params by default while serving statediff APIs #262

Merged
prathamesh0 merged 3 commits from pm-default-watchedadddresses into v1.10.19-statediff-v4 2022-07-25 12:40:10 +00:00
2 changed files with 70 additions and 24 deletions

View File

@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var db sql.Database var db sql.Database
var err error var err error
quitCh := make(chan bool) quitCh := make(chan bool)
if params.IndexerConfig != nil { indexerConfigAvailable := params.IndexerConfig != nil
if indexerConfigAvailable {
info := nodeinfo.Info{ info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(), GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(cfg.NetworkId, 10), NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
@ -201,12 +202,14 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
statediffMetrics: statediffMetrics, statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false, sqlFileWaitingForWrite: false,
} }
if indexerConfigAvailable {
if params.IndexerConfig.Type() == shared.POSTGRES { if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true knownGaps.checkForGaps = true
} else { } else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false knownGaps.checkForGaps = false
} }
}
sds := &Service{ sds := &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
BlockChain: blockChain, BlockChain: blockChain,
@ -226,10 +229,12 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
if indexerConfigAvailable {
err = loadWatchedAddresses(indexer) err = loadWatchedAddresses(indexer)
if err != nil { if err != nil {
return err return err
} }
}
return nil return nil
} }
@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber) log.Info("sending state diff", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash) log.Info("sending state diff", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
@ -895,10 +932,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...) writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
@ -917,10 +956,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.RemoveWatchedAddresses(args) err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = addresses writeLoopParams.WatchedAddresses = addresses
@ -933,20 +974,24 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update the db // update the db
if sds.indexer != nil {
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = argAddresses writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths() writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear: case types2.Clear:
// update the db // update the db
if sds.indexer != nil {
err := sds.indexer.ClearWatchedAddresses() err := sds.indexer.ClearWatchedAddresses()
if err != nil { if err != nil {
return err return err
} }
}
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{} writeLoopParams.WatchedAddresses = []common.Address{}

View File

@ -81,6 +81,7 @@ var (
IncludeBlock: true, IncludeBlock: true,
IncludeReceipts: true, IncludeReceipts: true,
IncludeTD: true, IncludeTD: true,
WatchedAddresses: []common.Address{},
} }
) )