Use watched addresses from direct indexing params by default while serving statediff APIs #262
@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
var db sql.Database
|
var db sql.Database
|
||||||
var err error
|
var err error
|
||||||
quitCh := make(chan bool)
|
quitCh := make(chan bool)
|
||||||
if params.IndexerConfig != nil {
|
indexerConfigAvailable := params.IndexerConfig != nil
|
||||||
|
if indexerConfigAvailable {
|
||||||
info := nodeinfo.Info{
|
info := nodeinfo.Info{
|
||||||
GenesisBlock: blockChain.Genesis().Hash().Hex(),
|
GenesisBlock: blockChain.Genesis().Hash().Hex(),
|
||||||
NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
|
NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
|
||||||
@ -201,12 +202,14 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
statediffMetrics: statediffMetrics,
|
statediffMetrics: statediffMetrics,
|
||||||
sqlFileWaitingForWrite: false,
|
sqlFileWaitingForWrite: false,
|
||||||
}
|
}
|
||||||
|
if indexerConfigAvailable {
|
||||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||||
knownGaps.checkForGaps = true
|
knownGaps.checkForGaps = true
|
||||||
} else {
|
} else {
|
||||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||||
knownGaps.checkForGaps = false
|
knownGaps.checkForGaps = false
|
||||||
}
|
}
|
||||||
|
}
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
@ -226,10 +229,12 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
stack.RegisterLifecycle(sds)
|
stack.RegisterLifecycle(sds)
|
||||||
stack.RegisterAPIs(sds.APIs())
|
stack.RegisterAPIs(sds.APIs())
|
||||||
|
|
||||||
|
if indexerConfigAvailable {
|
||||||
err = loadWatchedAddresses(indexer)
|
err = loadWatchedAddresses(indexer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
|
|||||||
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
|
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
|
||||||
log.Info("sending state diff", "block height", blockNumber)
|
log.Info("sending state diff", "block height", blockNumber)
|
||||||
|
|
||||||
|
// use watched addresses from statediffing write loop if not provided
|
||||||
|
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
|
||||||
|
writeLoopParams.RLock()
|
||||||
|
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
|
||||||
|
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
|
||||||
|
writeLoopParams.RUnlock()
|
||||||
|
}
|
||||||
// compute leaf paths of watched addresses in the params
|
// compute leaf paths of watched addresses in the params
|
||||||
params.ComputeWatchedAddressesLeafPaths()
|
params.ComputeWatchedAddressesLeafPaths()
|
||||||
|
|
||||||
@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
|
|||||||
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
|
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
|
||||||
log.Info("sending state diff", "block hash", blockHash)
|
log.Info("sending state diff", "block hash", blockHash)
|
||||||
|
|
||||||
|
// use watched addresses from statediffing write loop if not provided
|
||||||
|
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
|
||||||
|
writeLoopParams.RLock()
|
||||||
|
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
|
||||||
|
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
|
||||||
|
writeLoopParams.RUnlock()
|
||||||
|
}
|
||||||
// compute leaf paths of watched addresses in the params
|
// compute leaf paths of watched addresses in the params
|
||||||
params.ComputeWatchedAddressesLeafPaths()
|
params.ComputeWatchedAddressesLeafPaths()
|
||||||
|
|
||||||
@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
|
|||||||
// This operation cannot be performed back past the point of db pruning; it requires an archival node
|
// This operation cannot be performed back past the point of db pruning; it requires an archival node
|
||||||
// for historical data
|
// for historical data
|
||||||
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
||||||
|
log.Info("writing state diff at", "block height", blockNumber)
|
||||||
|
|
||||||
|
// use watched addresses from statediffing write loop if not provided
|
||||||
|
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
|
||||||
|
writeLoopParams.RLock()
|
||||||
|
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
|
||||||
|
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
|
||||||
|
writeLoopParams.RUnlock()
|
||||||
|
}
|
||||||
// compute leaf paths of watched addresses in the params
|
// compute leaf paths of watched addresses in the params
|
||||||
params.ComputeWatchedAddressesLeafPaths()
|
params.ComputeWatchedAddressesLeafPaths()
|
||||||
|
|
||||||
@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
|||||||
// This operation cannot be performed back past the point of db pruning; it requires an archival node
|
// This operation cannot be performed back past the point of db pruning; it requires an archival node
|
||||||
// for historical data
|
// for historical data
|
||||||
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
|
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
|
||||||
|
log.Info("writing state diff for", "block hash", blockHash)
|
||||||
|
|
||||||
|
// use watched addresses from statediffing write loop if not provided
|
||||||
|
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
|
||||||
|
writeLoopParams.RLock()
|
||||||
|
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
|
||||||
|
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
|
||||||
|
writeLoopParams.RUnlock()
|
||||||
|
}
|
||||||
// compute leaf paths of watched addresses in the params
|
// compute leaf paths of watched addresses in the params
|
||||||
params.ComputeWatchedAddressesLeafPaths()
|
params.ComputeWatchedAddressesLeafPaths()
|
||||||
|
|
||||||
@ -895,10 +932,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update the db
|
// update the db
|
||||||
|
if sds.indexer != nil {
|
||||||
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
|
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update in-memory params
|
// update in-memory params
|
||||||
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
|
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
|
||||||
@ -917,10 +956,12 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update the db
|
// update the db
|
||||||
|
if sds.indexer != nil {
|
||||||
err = sds.indexer.RemoveWatchedAddresses(args)
|
err = sds.indexer.RemoveWatchedAddresses(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update in-memory params
|
// update in-memory params
|
||||||
writeLoopParams.WatchedAddresses = addresses
|
writeLoopParams.WatchedAddresses = addresses
|
||||||
@ -933,20 +974,24 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update the db
|
// update the db
|
||||||
|
if sds.indexer != nil {
|
||||||
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
|
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update in-memory params
|
// update in-memory params
|
||||||
writeLoopParams.WatchedAddresses = argAddresses
|
writeLoopParams.WatchedAddresses = argAddresses
|
||||||
writeLoopParams.ComputeWatchedAddressesLeafPaths()
|
writeLoopParams.ComputeWatchedAddressesLeafPaths()
|
||||||
case types2.Clear:
|
case types2.Clear:
|
||||||
// update the db
|
// update the db
|
||||||
|
if sds.indexer != nil {
|
||||||
err := sds.indexer.ClearWatchedAddresses()
|
err := sds.indexer.ClearWatchedAddresses()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update in-memory params
|
// update in-memory params
|
||||||
writeLoopParams.WatchedAddresses = []common.Address{}
|
writeLoopParams.WatchedAddresses = []common.Address{}
|
||||||
|
@ -81,6 +81,7 @@ var (
|
|||||||
IncludeBlock: true,
|
IncludeBlock: true,
|
||||||
IncludeReceipts: true,
|
IncludeReceipts: true,
|
||||||
IncludeTD: true,
|
IncludeTD: true,
|
||||||
|
WatchedAddresses: []common.Address{},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user