[WIP] Add a service to fill indexing gap for watched addresses #135

Closed
prathamesh0 wants to merge 16 commits from pm-watched-addresses into master
12 changed files with 635 additions and 199 deletions
Showing only changes of commit 594ce41e6c - Show all commits

View File

@ -16,9 +16,7 @@
package cmd package cmd
import ( import (
"encoding/json"
"errors" "errors"
"math"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -27,9 +25,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/mailgun/groupcache/v2" "github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -37,6 +33,7 @@ import (
"github.com/vulcanize/gap-filler/pkg/mux" "github.com/vulcanize/gap-filler/pkg/mux"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
fill "github.com/vulcanize/ipld-eth-server/pkg/fill"
"github.com/vulcanize/ipld-eth-server/pkg/graphql" "github.com/vulcanize/ipld-eth-server/pkg/graphql"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve" s "github.com/vulcanize/ipld-eth-server/pkg/serve"
@ -105,7 +102,8 @@ func serve() {
} }
if serverConfig.WatchedAddressGapFillerEnabled { if serverConfig.WatchedAddressGapFillerEnabled {
go startWatchedAddressGapFiller(serverConfig) service := fill.New(serverConfig)
go service.Start()
logWithCommand.Info("watched address gap filler enabled") logWithCommand.Info("watched address gap filler enabled")
} else { } else {
logWithCommand.Info("watched address gap filler disabled") logWithCommand.Info("watched address gap filler disabled")
@ -302,130 +300,6 @@ func startStateTrieValidator(config *s.Config, server s.Server) {
} }
} }
type WatchedAddress struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
startBlock uint64
endBlock uint64
}
func startWatchedAddressGapFiller(config *s.Config) {
fillInterval := config.WatchedAddressGapFillInterval
for {
time.Sleep(time.Duration(fillInterval) * time.Second)
// Get watched addresses from the db
// Get the block number to start fill at
// Get the block number to end fill at
fillWatchedAddresses, minStartBlock, maxEndBlock := getFillAddresses(config)
if len(fillWatchedAddresses) > 0 {
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock)
}
// Fill the missing diffs
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
params := statediff.Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
fillAddresses := []interface{}{}
for _, fillWatchedAddress := range fillWatchedAddresses {
if blockNumber >= fillWatchedAddress.startBlock && blockNumber <= fillWatchedAddress.endBlock {
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
}
}
fillWatchedAddressGap(config, blockNumber, params, fillAddresses)
}
}
}
// getFillAddresses gets the addresses and finds the encompassing range to perform the fill
// it also sets the address specific fill range
func getFillAddresses(config *s.Config) ([]WatchedAddress, uint64, uint64) {
rows := []WatchedAddress{}
pgStr := "SELECT * FROM eth.watched_addresses"
err := config.DB.Select(&rows, pgStr)
if err != nil {
log.Fatalf("Error fetching watched addreesses: %s", err.Error())
}
fillWatchedAddresses := []WatchedAddress{}
minStartBlock := uint64(math.MaxUint64)
maxEndBlock := uint64(0)
for _, row := range rows {
// Check for a gap between created_at and watched_at
// CreatedAt and WatchedAt being equal is considered a gap
if row.CreatedAt > row.WatchedAt {
continue
}
var startBlock uint64 = 0
var endBlock uint64 = 0
// Check if some of the gap was filled earlier
if row.LastFilledAt > 0 {
if row.LastFilledAt < row.WatchedAt {
startBlock = row.LastFilledAt + 1
}
} else {
startBlock = row.CreatedAt
}
// Add the address for filling
if startBlock > 0 {
row.startBlock = startBlock
if startBlock < minStartBlock {
minStartBlock = startBlock
}
endBlock = row.WatchedAt
row.endBlock = endBlock
if endBlock > maxEndBlock {
maxEndBlock = endBlock
}
fillWatchedAddresses = append(fillWatchedAddresses, row)
}
}
return fillWatchedAddresses, minStartBlock, maxEndBlock
}
func fillWatchedAddressGap(config *s.Config, blockNumber uint64, params statediff.Params, fillAddresses []interface{}) {
// Make a RPC call to write the statediffs
var data json.RawMessage
err := config.Client.Call(&data, "statediff_writeStateDiffAt", blockNumber, params)
if err != nil {
log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error())
}
// Update the db
query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
query = config.DB.Rebind(query)
args := []interface{}{blockNumber}
args = append(args, fillAddresses...)
_, err = config.DB.Exec(query, args...)
if err != nil {
log.Fatalf(err.Error())
}
}
func parseRpcAddresses(value string) ([]*rpc.Client, error) { func parseRpcAddresses(value string) ([]*rpc.Client, error) {
rpcAddresses := strings.Split(value, ",") rpcAddresses := strings.Split(value, ",")
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses)) rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))

View File

@ -19,7 +19,6 @@ package eth_test
import ( import (
"context" "context"
"math/big" "math/big"
"os"
"strconv" "strconv"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -32,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -40,6 +38,7 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
@ -183,19 +182,6 @@ var (
} }
) )
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT"))
uri := postgres.DbConnectionString(postgres.ConnectionParams{
User: os.Getenv("DATABASE_USER"),
Password: os.Getenv("DATABASE_PASSWORD"),
Hostname: os.Getenv("DATABASE_HOSTNAME"),
Name: os.Getenv("DATABASE_NAME"),
Port: port,
})
return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{})
}
var _ = Describe("API", func() { var _ = Describe("API", func() {
var ( var (
db *postgres.DB db *postgres.DB
@ -210,7 +196,7 @@ var _ = Describe("API", func() {
tx *indexer.BlockTx tx *indexer.BlockTx
) )
db, err = SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexAndPublisher, err := indexer.NewStateDiffIndexer(chainConfig, db) indexAndPublisher, err := indexer.NewStateDiffIndexer(chainConfig, db)
@ -269,7 +255,7 @@ var _ = Describe("API", func() {
}) })
// Single test db tear down at end of all tests // Single test db tear down at end of all tests
defer It("test teardown", func() { eth.TearDownDB(db) }) defer It("test teardown", func() { shared.TearDownDB(db) })
/* /*
Headers and blocks Headers and blocks

View File

@ -32,6 +32,7 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
@ -217,7 +218,7 @@ var _ = Describe("Retriever", func() {
) )
BeforeEach(func() { BeforeEach(func() {
var err error var err error
db, err = SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
diffIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db) diffIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -225,7 +226,7 @@ var _ = Describe("Retriever", func() {
retriever = eth.NewCIDRetriever(db) retriever = eth.NewCIDRetriever(db)
}) })
AfterEach(func() { AfterEach(func() {
eth.TearDownDB(db) shared.TearDownDB(db)
}) })
Describe("Retrieve", func() { Describe("Retrieve", func() {

View File

@ -39,6 +39,7 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
@ -74,7 +75,7 @@ var _ = Describe("eth state reading tests", func() {
It("test init", func() { It("test init", func() {
// db and type initializations // db and type initializations
var err error var err error
db, err = SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
transformer, err := indexer.NewStateDiffIndexer(chainConfig, db) transformer, err := indexer.NewStateDiffIndexer(chainConfig, db)
@ -183,7 +184,7 @@ var _ = Describe("eth state reading tests", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
defer It("test teardown", func() { defer It("test teardown", func() {
eth.TearDownDB(db) shared.TearDownDB(db)
chain.Stop() chain.Stop()
}) })

View File

@ -25,6 +25,7 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var _ = Describe("IPLDFetcher", func() { var _ = Describe("IPLDFetcher", func() {
@ -39,7 +40,7 @@ var _ = Describe("IPLDFetcher", func() {
err error err error
tx *indexer.BlockTx tx *indexer.BlockTx
) )
db, err = SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
pubAndIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db) pubAndIndexer, err = indexer.NewStateDiffIndexer(params.TestChainConfig, db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -56,7 +57,7 @@ var _ = Describe("IPLDFetcher", func() {
}) })
AfterEach(func() { AfterEach(func() {
eth.TearDownDB(db) shared.TearDownDB(db)
}) })
It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() {

View File

@ -16,35 +16,7 @@
package eth package eth
import ( import "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
. "github.com/onsi/gomega"
)
// TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(db *postgres.DB) {
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.header_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.transaction_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.receipt_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.state_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.storage_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.log_cids`)
Expect(err).NotTo(HaveOccurred())
err = tx.Commit()
Expect(err).NotTo(HaveOccurred())
}
// TxModelsContainsCID used to check if a list of TxModels contains a specific cid string // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string
func TxModelsContainsCID(txs []models.TxModel, cid string) bool { func TxModelsContainsCID(txs []models.TxModel, cid string) bool {

View File

@ -0,0 +1,13 @@
package fill_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestFill(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ipld eth server fill test suite")
}

168
pkg/fill/service.go Normal file
View File

@ -0,0 +1,168 @@
package fill
import (
"math"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/serve"
)
// WatchedAddress type is used to process currently watched addresses
type WatchedAddress struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
StartBlock uint64
EndBlock uint64
}
// Service is the underlying struct for the watched address gap filling service
type Service struct {
db *postgres.DB
client *rpc.Client
interval int
}
// NewServer creates a new Service
func New(config *serve.Config) *Service {
return &Service{
db: config.DB,
client: config.Client,
interval: config.WatchedAddressGapFillInterval,
}
}
// Start is used to begin the service
func (s *Service) Start() {
for {
time.Sleep(time.Duration(s.interval) * time.Second)
// Get watched addresses from the db
rows := s.fetchWatchedAddresses()
// Get the block number to start fill at
// Get the block number to end fill at
fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows)
if len(fillWatchedAddresses) > 0 {
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock)
}
// Fill the missing diffs
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
params := statediff.Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
fillAddresses := []interface{}{}
for _, fillWatchedAddress := range fillWatchedAddresses {
if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock {
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
}
}
if len(fillAddresses) > 0 {
s.writeStateDiffAt(blockNumber, params)
s.UpdateLastFilledAt(blockNumber, fillAddresses)
}
}
}
}
// GetFillAddresses finds the encompassing range to perform fill for the given watched addresses
// it also sets the address specific fill range
func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uint64, uint64) {
fillWatchedAddresses := []WatchedAddress{}
minStartBlock := uint64(math.MaxUint64)
maxEndBlock := uint64(0)
for _, row := range rows {
// Check for a gap between created_at and watched_at
// CreatedAt and WatchedAt being equal is considered a gap of one block
if row.CreatedAt > row.WatchedAt {
continue
}
var startBlock uint64 = 0
var endBlock uint64 = 0
// Check if some of the gap was filled earlier
if row.LastFilledAt > 0 {
if row.LastFilledAt < row.WatchedAt {
startBlock = row.LastFilledAt + 1
}
} else {
startBlock = row.CreatedAt
}
// Add the address for filling
if startBlock > 0 {
row.StartBlock = startBlock
if startBlock < minStartBlock {
minStartBlock = startBlock
}
endBlock = row.WatchedAt
row.EndBlock = endBlock
if endBlock > maxEndBlock {
maxEndBlock = endBlock
}
fillWatchedAddresses = append(fillWatchedAddresses, row)
}
}
return fillWatchedAddresses, minStartBlock, maxEndBlock
}
// UpdateLastFilledAt updates the fill status for the provided addresses in the db
func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) {
// Prepare the query
query := "UPDATE eth.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
query = s.db.Rebind(query)
args := []interface{}{blockNumber}
args = append(args, fillAddresses...)
// Execute the update query
_, err := s.db.Exec(query, args...)
if err != nil {
log.Fatalf(err.Error())
}
}
// fetchWatchedAddresses fetches watched addresses from the db
func (s *Service) fetchWatchedAddresses() []WatchedAddress {
rows := []WatchedAddress{}
pgStr := "SELECT * FROM eth.watched_addresses"
err := s.db.Select(&rows, pgStr)
if err != nil {
log.Fatalf("Error fetching watched addreesses: %s", err.Error())
}
return rows
}
// writeStateDiffAt makes a RPC call to writeout statediffs at a blocknumber with the given params
func (s *Service) writeStateDiffAt(blockNumber uint64, params statediff.Params) {
err := s.client.Call(nil, "statediff_writeStateDiffAt", blockNumber, params)
if err != nil {
log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error())
}
}

389
pkg/fill/service_test.go Normal file
View File

@ -0,0 +1,389 @@
package fill_test
import (
"math/big"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/statediff/indexer"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
fill "github.com/vulcanize/ipld-eth-server/pkg/fill"
"github.com/vulcanize/ipld-eth-server/pkg/serve"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
)
var _ = Describe("Service", func() {
var (
db *postgres.DB
watchedAddressGapFiller *fill.Service
statediffIndexer *indexer.StateDiffIndexer
err error
contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F"
contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B"
contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc"
)
It("test init", func() {
// db initialization
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
// indexer initialization
statediffIndexer, err = indexer.NewStateDiffIndexer(nil, db)
Expect(err).ToNot(HaveOccurred())
// fill service intialization
watchedAddressGapFiller = fill.New(&serve.Config{
DB: db,
})
})
defer It("test teardown", func() {
shared.TearDownDB(db)
})
Describe("GetFillAddresses", func() {
Context("overlapping fill ranges", func() {
It("gives the range to run fill for each address", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 0,
},
{
Address: contract3Address,
CreatedAt: 20,
WatchedAt: 30,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 0,
StartBlock: 40,
EndBlock: 70,
},
{
Address: contract3Address,
CreatedAt: 20,
WatchedAt: 30,
LastFilledAt: 0,
StartBlock: 20,
EndBlock: 30,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(70)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("non-overlapping fill ranges", func() {
It("gives the range to run fill for each address", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 70,
WatchedAt: 90,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 70,
WatchedAt: 90,
LastFilledAt: 0,
StartBlock: 70,
EndBlock: 90,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(90)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("a contract watched before it was created", func() {
It("gives no range for an address when it is watched before it's created", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 90,
WatchedAt: 70,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(50)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("a contract having some of the gap filled earlier", func() {
It("gives the remaining range for an address to run fill for", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 50,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 50,
StartBlock: 51,
EndBlock: 70,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(70)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
It("gives no range for an address when the gap is already filled", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 70,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(50)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
})
Describe("UpdateLastFilledAt", func() {
pgStr := "SELECT * FROM eth.watched_addresses"
BeforeEach(func() {
shared.TearDownDB(db)
})
It("updates last filled at for a single address", func() {
// fill db with watched addresses
watchedAddresses := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: 10,
},
}
watchedAt := uint64(50)
err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt)))
Expect(err).ToNot(HaveOccurred())
// update last filled at block in the db
fillAddresses := []interface{}{
contract1Address,
}
fillAt := uint64(12)
watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses)
// expected data
expectedData := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
}
rows := []fill.WatchedAddress{}
err = db.Select(&rows, pgStr)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(expectedData))
})
It("updates last filled at for multiple address", func() {
// fill db with watched addresses
watchedAddresses := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: 10,
},
{
Address: contract2Address,
CreatedAt: 20,
},
{
Address: contract3Address,
CreatedAt: 30,
},
}
watchedAt := uint64(50)
err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt)))
Expect(err).ToNot(HaveOccurred())
// update last filled at block in the db
fillAddresses := []interface{}{
contract1Address,
contract2Address,
contract3Address,
}
fillAt := uint64(50)
watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses)
// expected data
expectedData := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
{
Address: contract2Address,
CreatedAt: 20,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
{
Address: contract3Address,
CreatedAt: 30,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
}
rows := []fill.WatchedAddress{}
err = db.Select(&rows, pgStr)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(expectedData))
})
})
})

View File

@ -20,8 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"os"
"strconv"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
@ -33,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -42,22 +39,10 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/graphql" "github.com/vulcanize/ipld-eth-server/pkg/graphql"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT"))
uri := postgres.DbConnectionString(postgres.ConnectionParams{
User: os.Getenv("DATABASE_USER"),
Password: os.Getenv("DATABASE_PASSWORD"),
Hostname: os.Getenv("DATABASE_HOSTNAME"),
Name: os.Getenv("DATABASE_NAME"),
Port: port,
})
return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{})
}
var _ = Describe("GraphQL", func() { var _ = Describe("GraphQL", func() {
const ( const (
gqlEndPoint = "127.0.0.1:8083" gqlEndPoint = "127.0.0.1:8083"
@ -82,7 +67,7 @@ var _ = Describe("GraphQL", func() {
It("test init", func() { It("test init", func() {
var err error var err error
db, err = SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
transformer, err := indexer.NewStateDiffIndexer(chainConfig, db) transformer, err := indexer.NewStateDiffIndexer(chainConfig, db)
@ -186,7 +171,7 @@ var _ = Describe("GraphQL", func() {
defer It("test teardown", func() { defer It("test teardown", func() {
err := graphQLServer.Stop() err := graphQLServer.Stop()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
eth.TearDownDB(db) shared.TearDownDB(db)
chain.Stop() chain.Stop()
}) })

View File

@ -18,8 +18,14 @@ package shared
import ( import (
"bytes" "bytes"
"os"
"strconv"
. "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
) )
// IPLDsContainBytes used to check if a list of strings contains a particular string // IPLDsContainBytes used to check if a list of strings contains a particular string
@ -31,3 +37,42 @@ func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool {
} }
return false return false
} }
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
port, _ := strconv.Atoi(os.Getenv("DATABASE_PORT"))
uri := postgres.DbConnectionString(postgres.ConnectionParams{
User: os.Getenv("DATABASE_USER"),
Password: os.Getenv("DATABASE_PASSWORD"),
Hostname: os.Getenv("DATABASE_HOSTNAME"),
Name: os.Getenv("DATABASE_NAME"),
Port: port,
})
return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{})
}
// TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(db *postgres.DB) {
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.header_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.transaction_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.receipt_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.state_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.storage_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.log_cids`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.watched_addresses`)
Expect(err).NotTo(HaveOccurred())
err = tx.Commit()
Expect(err).NotTo(HaveOccurred())
}

View File

@ -14,6 +14,7 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
integration "github.com/vulcanize/ipld-eth-server/test" integration "github.com/vulcanize/ipld-eth-server/test"
) )