diff --git a/cmd/serve.go b/cmd/serve.go index 5843d229..913f7703 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -16,9 +16,7 @@ package cmd import ( - "encoding/json" "errors" - "math" "net/http" "net/url" "os" @@ -27,9 +25,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" "github.com/mailgun/groupcache/v2" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -37,6 +33,7 @@ import ( "github.com/vulcanize/gap-filler/pkg/mux" "github.com/vulcanize/ipld-eth-server/pkg/eth" + fill "github.com/vulcanize/ipld-eth-server/pkg/fill" "github.com/vulcanize/ipld-eth-server/pkg/graphql" srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" s "github.com/vulcanize/ipld-eth-server/pkg/serve" @@ -105,7 +102,8 @@ func serve() { } if serverConfig.WatchedAddressGapFillerEnabled { - go startWatchedAddressGapFiller(serverConfig) + service := fill.New(serverConfig) + go service.Start() logWithCommand.Info("watched address gap filler enabled") } else { logWithCommand.Info("watched address gap filler disabled") @@ -302,130 +300,6 @@ func startStateTrieValidator(config *s.Config, server s.Server) { } } -type WatchedAddress struct { - Address string `db:"address"` - CreatedAt uint64 `db:"created_at"` - WatchedAt uint64 `db:"watched_at"` - LastFilledAt uint64 `db:"last_filled_at"` - - startBlock uint64 - endBlock uint64 -} - -func startWatchedAddressGapFiller(config *s.Config) { - fillInterval := config.WatchedAddressGapFillInterval - - for { - time.Sleep(time.Duration(fillInterval) * time.Second) - - // Get watched addresses from the db - // Get the block number to start fill at - // Get the block number to end fill at - fillWatchedAddresses, minStartBlock, maxEndBlock := getFillAddresses(config) - - if len(fillWatchedAddresses) > 0 { - log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) - } - - // Fill the missing diffs - for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { - params := statediff.Params{ - IntermediateStateNodes: true, - IntermediateStorageNodes: true, - IncludeBlock: true, - IncludeReceipts: true, - IncludeTD: true, - IncludeCode: true, - } - - fillAddresses := []interface{}{} - for _, fillWatchedAddress := range fillWatchedAddresses { - if blockNumber >= fillWatchedAddress.startBlock && blockNumber <= fillWatchedAddress.endBlock { - params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) - fillAddresses = append(fillAddresses, fillWatchedAddress.Address) - } - } - - fillWatchedAddressGap(config, blockNumber, params, fillAddresses) - } - } -} - -// getFillAddresses gets the addresses and finds the encompassing range to perform the fill -// it also sets the address specific fill range -func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) { - rows := []WatchedAddress{} - pgStr := "SELECT * FROM eth.watched_addresses" - - err := config.DB.Select(&rows, pgStr) - if err != nil { - log.Fatalf("Error fetching watched addreesses: %s", err.Error()) - } - - fillWatchedAddresses := []WatchedAddress{} - minStartBlock := uint64(math.MaxUint64) - maxEndBlock := uint64(0) - - for _, row := range rows { - // Check for a gap between created_at and watched_at - // CreatedAt and WatchedAt being equal is considered a gap - if row.CreatedAt > row.WatchedAt { - continue - } - - var startBlock uint64 = 0 - var endBlock uint64 = 0 - - // Check if some of the gap was filled earlier - if row.LastFilledAt > 0 { - if row.LastFilledAt < row.WatchedAt { - startBlock = row.LastFilledAt + 1 - } - } else { - startBlock = row.CreatedAt - } - - // Add the address for filling - if startBlock > 0 { - row.startBlock = startBlock - if startBlock < minStartBlock { - minStartBlock = startBlock - } - - endBlock = row.WatchedAt - row.endBlock = endBlock - if endBlock > maxEndBlock { - maxEndBlock = endBlock - } - - fillWatchedAddresses = append(fillWatchedAddresses, row) - } - } - - return fillWatchedAddresses, minStartBlock, maxEndBlock -} - -func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []interface{}) { - // Make a RPC call to write the statediffs - var data json.RawMessage - err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params) - if err != nil { - log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error()) - } - - // Update the db - query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")" - query = config.DB.Rebind(query) - - args := []interface{}{blockNumber} - args = append(args, fillAddresses...) - - _, err = config.DB.Exec(query, args...) - if err != nil { - log.Fatalf(err.Error()) - } -} - func parseRpcAddresses(value string) ([]*rpc.Client, error) { rpcAddresses := strings.Split(value, ",") rpcClients := make([]*rpc.Client, 0, len(rpcAddresses)) diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index 203a7572..486e1602 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -19,7 +19,6 @@ package eth_test import ( "context" "math/big" - "os" "strconv" "github.com/ethereum/go-ethereum/common" @@ -32,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff/indexer" - "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" @@ -40,6 +38,7 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ) @@ -183,19 +182,6 @@ var ( } ) -// SetupDB is use to setup a db for watcher tests -func SetupDB() (*postgres.DB, error) { - port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT")) - uri := postgres.DbConnectionString(postgres.ConnectionParams{ - User: os.Getenv("DATABASE_USER"), - Password: os.Getenv("DATABASE_PASSWORD"), - Hostname: os.Getenv("DATABASE_HOSTNAME"), - Name: os.Getenv("DATABASE_NAME"), - Port: port, - }) - return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{}) -} - var _ = Describe("API", func() { var ( db *postgres.DB @@ -210,7 +196,7 @@ var _ = Describe("API", func() { tx *indexer.BlockTx ) - db, err = SetupDB() + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) indexAndPublisher, err := indexer.NewStateDiffIndexer(chainConfig, db) @@ -269,7 +255,7 @@ var _ = Describe("API", func() { }) // Single test db tear down at end of all tests - defer It("test teardown", func() { eth.TearDownDB(db) }) + defer It("test teardown", func() { shared.TearDownDB(db) }) /* Headers and blocks diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index f6b74b99..4b00e0d5 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -32,6 +32,7 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -217,7 +218,7 @@ var _ = Describe("Retriever", func() { ) BeforeEach(func() { var err error - db, err = SetupDB() + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) diffIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db) Expect(err).ToNot(HaveOccurred()) @@ -225,7 +226,7 @@ var _ = Describe("Retriever", func() { retriever = eth.NewCIDRetriever(db) }) AfterEach(func() { - eth.TearDownDB(db) + shared.TearDownDB(db) }) Describe("Retrieve", func() { diff --git a/pkg/eth/eth_state_test.go b/pkg/eth/eth_state_test.go index 581c0628..70e051bf 100644 --- a/pkg/eth/eth_state_test.go +++ b/pkg/eth/eth_state_test.go @@ -39,6 +39,7 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ) @@ -74,7 +75,7 @@ var _ = Describe("eth state reading tests", func() { It("test init", func() { // db and type initializations var err error - db, err = SetupDB() + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) transformer, err := indexer.NewStateDiffIndexer(chainConfig, db) @@ -183,7 +184,7 @@ var _ = Describe("eth state reading tests", func() { Expect(err).ToNot(HaveOccurred()) }) defer It("test teardown", func() { - eth.TearDownDB(db) + shared.TearDownDB(db) chain.Stop() }) diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index c6699a09..659c3b13 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var _ = Describe("IPLDFetcher", func() { @@ -39,7 +40,7 @@ var _ = Describe("IPLDFetcher", func() { err error tx *indexer.BlockTx ) - db, err = SetupDB() + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) pubAndIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db) Expect(err).ToNot(HaveOccurred()) @@ -56,7 +57,7 @@ var _ = Describe("IPLDFetcher", func() { }) AfterEach(func() { - eth.TearDownDB(db) + shared.TearDownDB(db) }) It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { diff --git a/pkg/eth/test_helpers.go b/pkg/eth/test_helpers.go index 8c4557db..4d39cf2e 100644 --- a/pkg/eth/test_helpers.go +++ b/pkg/eth/test_helpers.go @@ -16,35 +16,7 @@ package eth -import ( - "github.com/ethereum/go-ethereum/statediff/indexer/models" - "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - . "github.com/onsi/gomega" -) - -// TearDownDB is used to tear down the watcher dbs after tests -func TearDownDB(db *postgres.DB) { - tx, err := db.Beginx() - Expect(err).NotTo(HaveOccurred()) - - _, err = tx.Exec(`DELETE FROM eth.header_cids`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM eth.transaction_cids`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM eth.receipt_cids`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM eth.state_cids`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM eth.storage_cids`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM blocks`) - Expect(err).NotTo(HaveOccurred()) - _, err = tx.Exec(`DELETE FROM eth.log_cids`) - Expect(err).NotTo(HaveOccurred()) - - err = tx.Commit() - Expect(err).NotTo(HaveOccurred()) -} +import "github.com/ethereum/go-ethereum/statediff/indexer/models" // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string func TxModelsContainsCID(txs []models.TxModel, cid string) bool { diff --git a/pkg/fill/fill_suite_test.go b/pkg/fill/fill_suite_test.go new file mode 100644 index 00000000..a5c3a4c3 --- /dev/null +++ b/pkg/fill/fill_suite_test.go @@ -0,0 +1,13 @@ +package fill_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestFill(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ipld eth server fill test suite") +} diff --git a/pkg/fill/service.go b/pkg/fill/service.go new file mode 100644 index 00000000..26ab2ae6 --- /dev/null +++ b/pkg/fill/service.go @@ -0,0 +1,168 @@ +package fill + +import ( + "math" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/ipld-eth-server/pkg/serve" +) + +// WatchedAddress type is used to process currently watched addresses +type WatchedAddress struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + + StartBlock uint64 + EndBlock uint64 +} + +// Service is the underlying struct for the watched address gap filling service +type Service struct { + db *postgres.DB + client *rpc.Client + interval int +} + +// NewServer creates a new Service +func New(config *serve.Config) *Service { + return &Service{ + db: config.DB, + client: config.Client, + interval: config.WatchedAddressGapFillInterval, + } +} + +// Start is used to begin the service +func (s *Service) Start() { + for { + time.Sleep(time.Duration(s.interval) * time.Second) + + // Get watched addresses from the db + rows := s.fetchWatchedAddresses() + + // Get the block number to start fill at + // Get the block number to end fill at + fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows) + + if len(fillWatchedAddresses) > 0 { + log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) + } + + // Fill the missing diffs + for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { + params := statediff.Params{ + IntermediateStateNodes: true, + IntermediateStorageNodes: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + IncludeCode: true, + } + + fillAddresses := []interface{}{} + for _, fillWatchedAddress := range fillWatchedAddresses { + if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock { + params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) + fillAddresses = append(fillAddresses, fillWatchedAddress.Address) + } + } + + if len(fillAddresses) > 0 { + s.writeStateDiffAt(blockNumber, params) + s.UpdateLastFilledAt(blockNumber, fillAddresses) + } + } + } +} + +// GetFillAddresses finds the encompassing range to perform fill for the given watched addresses +// it also sets the address specific fill range +func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uint64, uint64) { + fillWatchedAddresses := []WatchedAddress{} + minStartBlock := uint64(math.MaxUint64) + maxEndBlock := uint64(0) + + for _, row := range rows { + // Check for a gap between created_at and watched_at + // CreatedAt and WatchedAt being equal is considered a gap of one block + if row.CreatedAt > row.WatchedAt { + continue + } + + var startBlock uint64 = 0 + var endBlock uint64 = 0 + + // Check if some of the gap was filled earlier + if row.LastFilledAt > 0 { + if row.LastFilledAt < row.WatchedAt { + startBlock = row.LastFilledAt + 1 + } + } else { + startBlock = row.CreatedAt + } + + // Add the address for filling + if startBlock > 0 { + row.StartBlock = startBlock + if startBlock < minStartBlock { + minStartBlock = startBlock + } + + endBlock = row.WatchedAt + row.EndBlock = endBlock + if endBlock > maxEndBlock { + maxEndBlock = endBlock + } + + fillWatchedAddresses = append(fillWatchedAddresses, row) + } + } + + return fillWatchedAddresses, minStartBlock, maxEndBlock +} + +// UpdateLastFilledAt updates the fill status for the provided addresses in the db +func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) { + // Prepare the query + query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")" + query = s.db.Rebind(query) + + args := []interface{}{blockNumber} + args = append(args, fillAddresses...) + + // Execute the update query + _, err := s.db.Exec(query, args...) + if err != nil { + log.Fatalf(err.Error()) + } +} + +// fetchWatchedAddresses fetches watched addresses from the db +func (s *Service) fetchWatchedAddresses() []WatchedAddress { + rows := []WatchedAddress{} + pgStr := "SELECT * FROM eth.watched_addresses" + + err := s.db.Select(&rows, pgStr) + if err != nil { + log.Fatalf("Error fetching watched addreesses: %s", err.Error()) + } + + return rows +} + +// writeStateDiffAt makes a RPC call to writeout statediffs at a blocknumber with the given params +func (s *Service) writeStateDiffAt(blockNumber uint64, params statediff.Params) { + err := s.client.Call(nil, "statediff_writeStateDiffAt", blockNumber, params) + if err != nil { + log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error()) + } +} diff --git a/pkg/fill/service_test.go b/pkg/fill/service_test.go new file mode 100644 index 00000000..bfd4070a --- /dev/null +++ b/pkg/fill/service_test.go @@ -0,0 +1,389 @@ +package fill_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/ethereum/go-ethereum/statediff/indexer" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" + + fill "github.com/vulcanize/ipld-eth-server/pkg/fill" + "github.com/vulcanize/ipld-eth-server/pkg/serve" + "github.com/vulcanize/ipld-eth-server/pkg/shared" +) + +var _ = Describe("Service", func() { + + var ( + db *postgres.DB + watchedAddressGapFiller *fill.Service + statediffIndexer *indexer.StateDiffIndexer + err error + + contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F" + contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B" + contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc" + ) + + It("test init", func() { + // db initialization + db, err = shared.SetupDB() + Expect(err).ToNot(HaveOccurred()) + + // indexer initialization + statediffIndexer, err = indexer.NewStateDiffIndexer(nil, db) + Expect(err).ToNot(HaveOccurred()) + + // fill service intialization + watchedAddressGapFiller = fill.New(&serve.Config{ + DB: db, + }) + }) + + defer It("test teardown", func() { + shared.TearDownDB(db) + }) + + Describe("GetFillAddresses", func() { + Context("overlapping fill ranges", func() { + It("gives the range to run fill for each address", func() { + // input data + rows := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + }, + { + Address: contract2Address, + CreatedAt: 40, + WatchedAt: 70, + LastFilledAt: 0, + }, + { + Address: contract3Address, + CreatedAt: 20, + WatchedAt: 30, + LastFilledAt: 0, + }, + } + + // expected output data + expectedOutputAddresses := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + StartBlock: 10, + EndBlock: 50, + }, + { + Address: contract2Address, + CreatedAt: 40, + WatchedAt: 70, + LastFilledAt: 0, + StartBlock: 40, + EndBlock: 70, + }, + { + Address: contract3Address, + CreatedAt: 20, + WatchedAt: 30, + LastFilledAt: 0, + StartBlock: 20, + EndBlock: 30, + }, + } + expectedOutputStartBlock := uint64(10) + expectedOutputEndBlock := uint64(70) + + fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows) + + Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses)) + Expect(minStartBlock).To(Equal(expectedOutputStartBlock)) + Expect(maxEndBlock).To(Equal(expectedOutputEndBlock)) + }) + }) + + Context("non-overlapping fill ranges", func() { + It("gives the range to run fill for each address", func() { + // input data + rows := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + }, + { + Address: contract2Address, + CreatedAt: 70, + WatchedAt: 90, + LastFilledAt: 0, + }, + } + + // expected output data + expectedOutputAddresses := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + StartBlock: 10, + EndBlock: 50, + }, + { + Address: contract2Address, + CreatedAt: 70, + WatchedAt: 90, + LastFilledAt: 0, + StartBlock: 70, + EndBlock: 90, + }, + } + expectedOutputStartBlock := uint64(10) + expectedOutputEndBlock := uint64(90) + + fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows) + + Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses)) + Expect(minStartBlock).To(Equal(expectedOutputStartBlock)) + Expect(maxEndBlock).To(Equal(expectedOutputEndBlock)) + }) + }) + + Context("a contract watched before it was created", func() { + It("gives no range for an address when it is watched before it's created", func() { + // input data + rows := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + }, + { + Address: contract2Address, + CreatedAt: 90, + WatchedAt: 70, + LastFilledAt: 0, + }, + } + + // expected output data + expectedOutputAddresses := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + StartBlock: 10, + EndBlock: 50, + }, + } + expectedOutputStartBlock := uint64(10) + expectedOutputEndBlock := uint64(50) + + fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows) + + Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses)) + Expect(minStartBlock).To(Equal(expectedOutputStartBlock)) + Expect(maxEndBlock).To(Equal(expectedOutputEndBlock)) + }) + }) + + Context("a contract having some of the gap filled earlier", func() { + It("gives the remaining range for an address to run fill for", func() { + // input data + rows := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + }, + { + Address: contract2Address, + CreatedAt: 40, + WatchedAt: 70, + LastFilledAt: 50, + }, + } + + // expected output data + expectedOutputAddresses := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + StartBlock: 10, + EndBlock: 50, + }, + { + Address: contract2Address, + CreatedAt: 40, + WatchedAt: 70, + LastFilledAt: 50, + StartBlock: 51, + EndBlock: 70, + }, + } + expectedOutputStartBlock := uint64(10) + expectedOutputEndBlock := uint64(70) + + fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows) + + Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses)) + Expect(minStartBlock).To(Equal(expectedOutputStartBlock)) + Expect(maxEndBlock).To(Equal(expectedOutputEndBlock)) + }) + + It("gives no range for an address when the gap is already filled", func() { + // input data + rows := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + }, + { + Address: contract2Address, + CreatedAt: 40, + WatchedAt: 70, + LastFilledAt: 70, + }, + } + + // expected output data + expectedOutputAddresses := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: 50, + LastFilledAt: 0, + StartBlock: 10, + EndBlock: 50, + }, + } + expectedOutputStartBlock := uint64(10) + expectedOutputEndBlock := uint64(50) + + fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows) + + Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses)) + Expect(minStartBlock).To(Equal(expectedOutputStartBlock)) + Expect(maxEndBlock).To(Equal(expectedOutputEndBlock)) + }) + }) + }) + + Describe("UpdateLastFilledAt", func() { + pgStr := "SELECT * FROM eth.watched_addresses" + + BeforeEach(func() { + shared.TearDownDB(db) + }) + + It("updates last filled at for a single address", func() { + // fill db with watched addresses + watchedAddresses := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: 10, + }, + } + watchedAt := uint64(50) + err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt))) + Expect(err).ToNot(HaveOccurred()) + + // update last filled at block in the db + fillAddresses := []interface{}{ + contract1Address, + } + fillAt := uint64(12) + watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses) + + // expected data + expectedData := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: watchedAt, + LastFilledAt: fillAt, + }, + } + + rows := []fill.WatchedAddress{} + err = db.Select(&rows, pgStr) + Expect(err).ToNot(HaveOccurred()) + + Expect(rows).To(Equal(expectedData)) + }) + + It("updates last filled at for multiple address", func() { + // fill db with watched addresses + watchedAddresses := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: 10, + }, + { + Address: contract2Address, + CreatedAt: 20, + }, + { + Address: contract3Address, + CreatedAt: 30, + }, + } + watchedAt := uint64(50) + err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt))) + Expect(err).ToNot(HaveOccurred()) + + // update last filled at block in the db + fillAddresses := []interface{}{ + contract1Address, + contract2Address, + contract3Address, + } + fillAt := uint64(50) + watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses) + + // expected data + expectedData := []fill.WatchedAddress{ + { + Address: contract1Address, + CreatedAt: 10, + WatchedAt: watchedAt, + LastFilledAt: fillAt, + }, + { + Address: contract2Address, + CreatedAt: 20, + WatchedAt: watchedAt, + LastFilledAt: fillAt, + }, + { + Address: contract3Address, + CreatedAt: 30, + WatchedAt: watchedAt, + LastFilledAt: fillAt, + }, + } + + rows := []fill.WatchedAddress{} + err = db.Select(&rows, pgStr) + Expect(err).ToNot(HaveOccurred()) + + Expect(rows).To(Equal(expectedData)) + }) + }) +}) diff --git a/pkg/graphql/graphql_test.go b/pkg/graphql/graphql_test.go index ceb87c9d..210b6366 100644 --- a/pkg/graphql/graphql_test.go +++ b/pkg/graphql/graphql_test.go @@ -20,8 +20,6 @@ import ( "context" "fmt" "math/big" - "os" - "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -33,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff/indexer" - "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" @@ -42,22 +39,10 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/graphql" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ) -// SetupDB is use to setup a db for watcher tests -func SetupDB() (*postgres.DB, error) { - port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT")) - uri := postgres.DbConnectionString(postgres.ConnectionParams{ - User: os.Getenv("DATABASE_USER"), - Password: os.Getenv("DATABASE_PASSWORD"), - Hostname: os.Getenv("DATABASE_HOSTNAME"), - Name: os.Getenv("DATABASE_NAME"), - Port: port, - }) - return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{}) -} - var _ = Describe("GraphQL", func() { const ( gqlEndPoint = "127.0.0.1:8083" @@ -82,7 +67,7 @@ var _ = Describe("GraphQL", func() { It("test init", func() { var err error - db, err = SetupDB() + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) transformer, err := indexer.NewStateDiffIndexer(chainConfig, db) @@ -186,7 +171,7 @@ var _ = Describe("GraphQL", func() { defer It("test teardown", func() { err := graphQLServer.Stop() Expect(err).ToNot(HaveOccurred()) - eth.TearDownDB(db) + shared.TearDownDB(db) chain.Stop() }) diff --git a/pkg/shared/test_helpers.go b/pkg/shared/test_helpers.go index 35b7adfa..e2ee9d58 100644 --- a/pkg/shared/test_helpers.go +++ b/pkg/shared/test_helpers.go @@ -18,8 +18,14 @@ package shared import ( "bytes" + "os" + "strconv" + + . "github.com/onsi/gomega" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" ) // IPLDsContainBytes used to check if a list of strings contains a particular string @@ -31,3 +37,42 @@ func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool { } return false } + +// SetupDB is use to setup a db for watcher tests +func SetupDB() (*postgres.DB, error) { + port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT")) + uri := postgres.DbConnectionString(postgres.ConnectionParams{ + User: os.Getenv("DATABASE_USER"), + Password: os.Getenv("DATABASE_PASSWORD"), + Hostname: os.Getenv("DATABASE_HOSTNAME"), + Name: os.Getenv("DATABASE_NAME"), + Port: port, + }) + return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{}) +} + +// TearDownDB is used to tear down the watcher dbs after tests +func TearDownDB(db *postgres.DB) { + tx, err := db.Beginx() + Expect(err).NotTo(HaveOccurred()) + + _, err = tx.Exec(`DELETE FROM eth.header_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.transaction_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.receipt_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.state_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.storage_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM blocks`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.log_cids`) + Expect(err).NotTo(HaveOccurred()) + _, err = tx.Exec(`DELETE FROM eth.watched_addresses`) + Expect(err).NotTo(HaveOccurred()) + + err = tx.Commit() + Expect(err).NotTo(HaveOccurred()) +} diff --git a/test/watched_address_gap_filling_service_integration_test.go b/test/watched_address_gap_filling_service_integration_test.go index a74da5ba..34617b81 100644 --- a/test/watched_address_gap_filling_service_integration_test.go +++ b/test/watched_address_gap_filling_service_integration_test.go @@ -14,6 +14,7 @@ import ( sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + integration "github.com/vulcanize/ipld-eth-server/test" )