[WIP] Add a service to fill indexing gap for watched addresses #135

Closed
prathamesh0 wants to merge 16 commits from pm-watched-addresses into master
7 changed files with 57 additions and 11 deletions
Showing only changes of commit c5e87d7f91 - Show all commits

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill_test
import (

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill
import (
@ -44,6 +60,7 @@ func New(config *serve.Config) *Service {
// Start is used to begin the service
func (s *Service) Start() {
for {
// Wait for specified interval duration
time.Sleep(time.Duration(s.interval) * time.Second)
// Get watched addresses from the db
@ -98,8 +115,8 @@ func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uin
continue
}
var startBlock uint64 = 0
var endBlock uint64 = 0
startBlock := uint64(0)
endBlock := uint64(0)
// Check if some of the gap was filled earlier
if row.LastFilledAt > 0 {
@ -133,7 +150,7 @@ func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uin
// UpdateLastFilledAt updates the fill status for the provided addresses in the db
func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) {
// Prepare the query
query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
query := "UPDATE eth_meta.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
query = s.db.Rebind(query)
args := []interface{}{blockNumber}
@ -149,7 +166,7 @@ func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interfa
// fetchWatchedAddresses fetches watched addresses from the db
func (s *Service) fetchWatchedAddresses() []WatchedAddress {
rows := []WatchedAddress{}
pgStr := "SELECT * FROM eth.watched_addresses"
pgStr := "SELECT * FROM eth_meta.watched_addresses"
err := s.db.Select(&rows, pgStr)
if err != nil {

View File

@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill_test
import (
@ -286,7 +302,7 @@ var _ = Describe("Service", func() {
})
Describe("UpdateLastFilledAt", func() {
pgStr := "SELECT * FROM eth.watched_addresses"
pgStr := "SELECT * FROM eth_meta.watched_addresses"
BeforeEach(func() {
shared.TearDownDB(db)

View File

@ -18,7 +18,6 @@ package serve
import (
"context"
"encoding/json"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
@ -95,8 +94,7 @@ func (api *PublicServerAPI) Chain() shared.ChainType {
// WatchAddress makes a geth WatchAddress API call with the given operation and args
func (api *PublicServerAPI) WatchAddress(operation statediff.OperationType, args []sdtypes.WatchAddressArg) error {
var data json.RawMessage
err := api.rpc.Call(&data, "statediff_watchAddress", operation, args)
err := api.rpc.Call(nil, "statediff_watchAddress", operation, args)
if err != nil {
return err
}

View File

@ -70,7 +70,7 @@ func TearDownDB(db *postgres.DB) {
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.log_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.watched_addresses`)
_, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses`)
Expect(err).NotTo(HaveOccurred())
err = tx.Commit()

View File

@ -13,7 +13,7 @@ fastify.get('/v1/healthz', async (req, reply) => {
fastify.get('/v1/deployContract', async (req, reply) => {
const GLDToken = await hre.ethers.getContractFactory("GLDToken");
let token = await GLDToken.deploy();
const token = await GLDToken.deploy();
await token.deployed();
return {

View File

@ -1,4 +1,3 @@
// import { artifacts } from 'hardhat';
const { artifacts } = require("hardhat")
const { utils, BigNumber } = require("ethers")