From 209f7d37c25a989b383953394c8bd9bc75b0844c Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Mon, 25 Jul 2022 10:45:19 +0530 Subject: [PATCH 1/3] Use watched addresses from direct indexing params in statediff APIs by default --- statediff/service.go | 32 ++++++++++++++++++++++++++++++++ statediff/service_test.go | 7 ++++--- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 517ab3466..39e47c779 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -477,6 +477,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) log.Info("sending state diff", "block height", blockNumber) + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -493,6 +500,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload currentBlock := sds.BlockChain.GetBlockByHash(blockHash) log.Info("sending state diff", "block hash", blockHash) + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -777,6 +791,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { + log.Info("writing state diff at", "block height", blockNumber) + + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() @@ -793,6 +816,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { + log.Info("writing state diff for", "block hash", blockHash) + + // use watched addresses from statediffing write loop if not provided + if params.WatchedAddresses == nil { + writeLoopParams.RLock() + params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) + copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) + writeLoopParams.RUnlock() + } // compute leaf paths of watched addresses in the params params.ComputeWatchedAddressesLeafPaths() diff --git a/statediff/service_test.go b/statediff/service_test.go index 3f80f1d2e..ecf8bdadb 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -78,9 +78,10 @@ var ( event3 = core.ChainEvent{Block: testBlock3} defaultParams = statediff.Params{ - IncludeBlock: true, - IncludeReceipts: true, - IncludeTD: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + WatchedAddresses: []common.Address{}, } ) -- 2.45.2 From 155e6d2010d733370b2f9458036143dbafd155fa Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Mon, 25 Jul 2022 11:07:40 +0530 Subject: [PATCH 2/3] Avoid using indexer object when direct indexing is off --- statediff/service.go | 55 +++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 39e47c779..bc9336f6e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params var db sql.Database var err error quitCh := make(chan bool) - if params.IndexerConfig != nil { + indexerConfigAvailable := params.IndexerConfig != nil + if indexerConfigAvailable { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(cfg.NetworkId, 10), @@ -201,11 +202,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediffMetrics: statediffMetrics, sqlFileWaitingForWrite: false, } - if params.IndexerConfig.Type() == shared.POSTGRES { - knownGaps.checkForGaps = true - } else { - log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") - knownGaps.checkForGaps = false + if indexerConfigAvailable { + if params.IndexerConfig.Type() == shared.POSTGRES { + knownGaps.checkForGaps = true + } else { + log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") + knownGaps.checkForGaps = false + } } sds := &Service{ Mutex: sync.Mutex{}, @@ -226,9 +229,11 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) - err = loadWatchedAddresses(indexer) - if err != nil { - return err + if indexerConfigAvailable { + err = loadWatchedAddresses(indexer) + if err != nil { + return err + } } return nil @@ -927,9 +932,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber) + if err != nil { + return err + } } // update in-memory params @@ -949,9 +956,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.RemoveWatchedAddresses(args) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.RemoveWatchedAddresses(args) + if err != nil { + return err + } } // update in-memory params @@ -965,9 +974,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W } // update the db - err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) - if err != nil { - return err + if sds.indexer != nil { + err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber) + if err != nil { + return err + } } // update in-memory params @@ -975,9 +986,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W writeLoopParams.ComputeWatchedAddressesLeafPaths() case types2.Clear: // update the db - err := sds.indexer.ClearWatchedAddresses() - if err != nil { - return err + if sds.indexer != nil { + err := sds.indexer.ClearWatchedAddresses() + if err != nil { + return err + } } // update in-memory params -- 2.45.2 From 9ce278e4c112f7af4d160b4c819ec145a19f3293 Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Mon, 25 Jul 2022 17:49:54 +0530 Subject: [PATCH 3/3] Add nil check before accessing watched addresses from direct indexing params --- statediff/service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index bc9336f6e..30d227f17 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -483,7 +483,7 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er log.Info("sending state diff", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided - if params.WatchedAddresses == nil { + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { writeLoopParams.RLock() params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) @@ -506,7 +506,7 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload log.Info("sending state diff", "block hash", blockHash) // use watched addresses from statediffing write loop if not provided - if params.WatchedAddresses == nil { + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { writeLoopParams.RLock() params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) @@ -799,7 +799,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { log.Info("writing state diff at", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided - if params.WatchedAddresses == nil { + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { writeLoopParams.RLock() params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) @@ -824,7 +824,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro log.Info("writing state diff for", "block hash", blockHash) // use watched addresses from statediffing write loop if not provided - if params.WatchedAddresses == nil { + if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { writeLoopParams.RLock() params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses) -- 2.45.2