Merge changes to remove watched addresses fill service #156

Merged
ashwinphatak merged 2 commits from ng-v4-remove-fill into sharding 2022-05-23 15:01:23 +00:00
16 changed files with 3 additions and 1145 deletions

View File

@ -67,8 +67,6 @@ jobs:
ETH_FORWARD_ETH_CALLS: false
ETH_PROXY_ON_ERROR: false
ETH_HTTP_PATH: "go-ethereum:8545"
WATCHED_ADDRESS_GAP_FILLER_ENABLED: false
WATCHED_ADDRESS_GAP_FILLER_INTERVAL: 2
strategy:
matrix:
go-version: [1.16.x]
@ -136,8 +134,6 @@ jobs:
ETH_FORWARD_ETH_CALLS: true
ETH_PROXY_ON_ERROR: false
ETH_HTTP_PATH: "go-ethereum:8545"
WATCHED_ADDRESS_GAP_FILLER_ENABLED: false
WATCHED_ADDRESS_GAP_FILLER_INTERVAL: 2
strategy:
matrix:
go-version: [1.16.x]
@ -194,74 +190,3 @@ jobs:
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8081)" != "200" ]; do echo "waiting for ipld-eth-server..." && sleep 5; done && \
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8545)" != "200" ]; do echo "waiting for geth-statediff..." && sleep 5; done && \
make integrationtest
integrationtest_watchedaddress_gapfillingservice:
name: Run integration tests for watched addresses with gap filling service enabled
env:
STACK_ORCHESTRATOR_REF: 35c677433aee5fafdf74eb3c251a453691b818d0
GO_ETHEREUM_REF: ef8c9f2580dd577a7c5eca538fb0ed64d53dc4a4
GOPATH: /tmp/go
DB_WRITE: true
ETH_FORWARD_ETH_CALLS: false
ETH_PROXY_ON_ERROR: false
ETH_HTTP_PATH: "go-ethereum:8545"
WATCHED_ADDRESS_GAP_FILLER_ENABLED: true
WATCHED_ADDRESS_GAP_FILLER_INTERVAL: 2
strategy:
matrix:
go-version: [1.16.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- uses: actions/checkout@v2
with:
path: "./ipld-eth-server"
- uses: actions/checkout@v2
with:
ref: ${{ env.STACK_ORCHESTRATOR_REF }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
- uses: actions/checkout@v2
with:
ref: ${{ env.GO_ETHEREUM_REF }}
repository: vulcanize/go-ethereum
path: "./go-ethereum/"
- name: Create config file
run: |
echo vulcanize_go_ethereum=$GITHUB_WORKSPACE/go-ethereum/ > ./config.sh
echo vulcanize_ipld_eth_server=$GITHUB_WORKSPACE/ipld-eth-server/ >> ./config.sh
echo vulcanize_test_contract=$GITHUB_WORKSPACE/ipld-eth-server/test/contract >> ./config.sh
echo db_write=$DB_WRITE >> ./config.sh
echo eth_forward_eth_calls=$ETH_FORWARD_ETH_CALLS >> ./config.sh
echo eth_proxy_on_error=$ETH_PROXY_ON_ERROR >> ./config.sh
echo eth_http_path=$ETH_HTTP_PATH >> ./config.sh
echo watched_addres_gap_filler_enabled=$WATCHED_ADDRESS_GAP_FILLER_ENABLED >> ./config.sh
echo watched_addres_gap_filler_interval=$WATCHED_ADDRESS_GAP_FILLER_INTERVAL >> ./config.sh
cat ./config.sh
- name: Build geth
run: |
cd $GITHUB_WORKSPACE/stack-orchestrator/helper-scripts
./compile-geth.sh \
-p "$GITHUB_WORKSPACE/config.sh" \
-e docker
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-go-ethereum.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-server.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-contract.yml" \
--env-file "$GITHUB_WORKSPACE/config.sh" \
up -d --build
- name: Test
run: |
cd $GITHUB_WORKSPACE/ipld-eth-server
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8081)" != "200" ]; do echo "waiting for ipld-eth-server..." && sleep 5; done && \
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8545)" != "200" ]; do echo "waiting for geth-statediff..." && sleep 5; done && \
make integrationtest

View File

@ -33,7 +33,6 @@ import (
"github.com/vulcanize/gap-filler/pkg/mux"
"github.com/vulcanize/ipld-eth-server/v4/pkg/eth"
fill "github.com/vulcanize/ipld-eth-server/v4/pkg/fill"
"github.com/vulcanize/ipld-eth-server/v4/pkg/graphql"
srpc "github.com/vulcanize/ipld-eth-server/v4/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/v4/pkg/serve"
@ -101,25 +100,12 @@ func serve() {
logWithCommand.Info("state validator disabled")
}
var watchedAddressFillService *fill.Service
if serverConfig.WatchedAddressGapFillerEnabled {
watchedAddressFillService = fill.New(serverConfig)
wg.Add(1)
go watchedAddressFillService.Start(wg)
logWithCommand.Info("watched address gap filler enabled")
} else {
logWithCommand.Info("watched address gap filler disabled")
}
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
if graphQL != nil {
graphQL.Stop()
}
if watchedAddressFillService != nil {
watchedAddressFillService.Stop()
}
server.Stop()
wg.Wait()
}
@ -374,10 +360,6 @@ func init() {
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
// watched address gap filler flags
serveCmd.PersistentFlags().Bool("watched-address-gap-filler-enabled", false, "turn on the watched address gap filler")
serveCmd.PersistentFlags().Int("watched-address-gap-filler-interval", 60, "watched address gap fill interval in secs")
// and their bindings
// eth graphql server
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
@ -426,8 +408,4 @@ func init() {
// state validator flags
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
// watched address gap filler flags
viper.BindPFlag("watch.fill.enabled", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-enabled"))
viper.BindPFlag("watch.fill.interval", serveCmd.PersistentFlags().Lookup("watched-address-gap-filler-interval"))
}

View File

@ -37,8 +37,6 @@ services:
ETH_FORWARD_ETH_CALLS: $ETH_FORWARD_ETH_CALLS
ETH_PROXY_ON_ERROR: $ETH_PROXY_ON_ERROR
ETH_HTTP_PATH: $ETH_HTTP_PATH
WATCHED_ADDRESS_GAP_FILLER_ENABLED: $WATCHED_ADDRESS_GAP_FILLER_ENABLED
WATCHED_ADDRESS_GAP_FILLER_INTERVAL: $WATCHED_ADDRESS_GAP_FILLER_INTERVAL
volumes:
- type: bind
source: ./chain.json

View File

@ -28,8 +28,3 @@
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID
[watch]
[watch.fill]
enabled = false
interval = 30

View File

@ -1,29 +0,0 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestFill(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ipld eth server fill test suite")
}

View File

@ -1,206 +0,0 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill
import (
"math"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/v4/pkg/serve"
)
// WatchedAddress type is used to process currently watched addresses
type WatchedAddress struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
StartBlock uint64
EndBlock uint64
}
// Service is the underlying struct for the watched address gap filling service
type Service struct {
db *sqlx.DB
client *rpc.Client
interval int
quitChan chan bool
}
// NewServer creates a new Service
func New(config *serve.Config) *Service {
return &Service{
db: config.DB,
client: config.Client,
interval: config.WatchedAddressGapFillInterval,
quitChan: make(chan bool),
}
}
// Start is used to begin the service
func (s *Service) Start(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-s.quitChan:
log.Info("quiting eth ipld server process")
return
default:
s.fill()
}
}
}
// Stop is used to gracefully stop the service
func (s *Service) Stop() {
log.Info("stopping watched address gap filler")
close(s.quitChan)
}
// fill performs the filling of indexing gap for watched addresses
func (s *Service) fill() {
// Wait for specified interval duration
time.Sleep(time.Duration(s.interval) * time.Second)
// Get watched addresses from the db
rows := s.fetchWatchedAddresses()
// Get the block number to start fill at
// Get the block number to end fill at
fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows)
if len(fillWatchedAddresses) > 0 {
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock)
}
// Fill the missing diffs
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
params := statediff.Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
fillAddresses := []interface{}{}
for _, fillWatchedAddress := range fillWatchedAddresses {
if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock {
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
}
}
if len(fillAddresses) > 0 {
s.writeStateDiffAt(blockNumber, params)
s.UpdateLastFilledAt(blockNumber, fillAddresses)
}
}
}
// GetFillAddresses finds the encompassing range to perform fill for the given watched addresses
// it also sets the address specific fill range
func (s *Service) GetFillAddresses(rows []WatchedAddress) ([]WatchedAddress, uint64, uint64) {
fillWatchedAddresses := []WatchedAddress{}
minStartBlock := uint64(math.MaxUint64)
maxEndBlock := uint64(0)
for _, row := range rows {
// Check for a gap between created_at and watched_at
// CreatedAt and WatchedAt being equal is considered a gap of one block
if row.CreatedAt > row.WatchedAt {
continue
}
startBlock := uint64(0)
endBlock := uint64(0)
// Check if some of the gap was filled earlier
if row.LastFilledAt > 0 {
if row.LastFilledAt < row.WatchedAt {
startBlock = row.LastFilledAt + 1
}
} else {
startBlock = row.CreatedAt
}
// Add the address for filling
if startBlock > 0 {
row.StartBlock = startBlock
if startBlock < minStartBlock {
minStartBlock = startBlock
}
endBlock = row.WatchedAt
row.EndBlock = endBlock
if endBlock > maxEndBlock {
maxEndBlock = endBlock
}
fillWatchedAddresses = append(fillWatchedAddresses, row)
}
}
return fillWatchedAddresses, minStartBlock, maxEndBlock
}
// UpdateLastFilledAt updates the fill status for the provided addresses in the db
func (s *Service) UpdateLastFilledAt(blockNumber uint64, fillAddresses []interface{}) {
// Prepare the query
query := "UPDATE eth_meta.watched_addresses SET last_filled_at=? WHERE address IN (?" + strings.Repeat(",?", len(fillAddresses)-1) + ")"
query = s.db.Rebind(query)
args := []interface{}{blockNumber}
args = append(args, fillAddresses...)
// Execute the update query
_, err := s.db.Exec(query, args...)
if err != nil {
log.Fatalf(err.Error())
}
}
// fetchWatchedAddresses fetches watched addresses from the db
func (s *Service) fetchWatchedAddresses() []WatchedAddress {
rows := []WatchedAddress{}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
err := s.db.Select(&rows, pgStr)
if err != nil {
log.Fatalf("Error fetching watched addreesses: %s", err.Error())
}
return rows
}
// writeStateDiffAt makes a RPC call to writeout statediffs at a blocknumber with the given params
func (s *Service) writeStateDiffAt(blockNumber uint64, params statediff.Params) {
err := s.client.Call(nil, "statediff_writeStateDiffAt", blockNumber, params)
if err != nil {
log.Fatalf("Error making a RPC call to write statediff at block number %d: %s", blockNumber, err.Error())
}
}

View File

@ -1,411 +0,0 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fill_test
import (
"context"
"math/big"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/ipld-eth-server/v4/pkg/eth/test_helpers"
fill "github.com/vulcanize/ipld-eth-server/v4/pkg/fill"
"github.com/vulcanize/ipld-eth-server/v4/pkg/serve"
"github.com/vulcanize/ipld-eth-server/v4/pkg/shared"
)
var _ = Describe("Service", func() {
var (
db *sqlx.DB
watchedAddressGapFiller *fill.Service
statediffIndexer interfaces.StateDiffIndexer
err error
contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F"
contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B"
contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc"
chainConfig = params.TestChainConfig
ctx = context.Background()
)
It("test init", func() {
// db initialization
db = shared.SetupDB()
// indexer initialization
// statediffIndexer, err = indexer.NewStateDiffIndexer(nil, db)
statediffIndexer = shared.SetupTestStateDiffIndexer(ctx, chainConfig, test_helpers.Genesis.Hash())
Expect(err).ToNot(HaveOccurred())
// fill service intialization
watchedAddressGapFiller = fill.New(&serve.Config{
DB: db,
})
})
defer It("test teardown", func() {
shared.TearDownDB(db)
})
Describe("GetFillAddresses", func() {
Context("overlapping fill ranges", func() {
It("gives the range to run fill for each address", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 0,
},
{
Address: contract3Address,
CreatedAt: 20,
WatchedAt: 30,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 0,
StartBlock: 40,
EndBlock: 70,
},
{
Address: contract3Address,
CreatedAt: 20,
WatchedAt: 30,
LastFilledAt: 0,
StartBlock: 20,
EndBlock: 30,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(70)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("non-overlapping fill ranges", func() {
It("gives the range to run fill for each address", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 70,
WatchedAt: 90,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 70,
WatchedAt: 90,
LastFilledAt: 0,
StartBlock: 70,
EndBlock: 90,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(90)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("a contract watched before it was created", func() {
It("gives no range for an address when it is watched before it's created", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 90,
WatchedAt: 70,
LastFilledAt: 0,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(50)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
Context("a contract having some of the gap filled earlier", func() {
It("gives the remaining range for an address to run fill for", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 50,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 50,
StartBlock: 51,
EndBlock: 70,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(70)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
It("gives no range for an address when the gap is already filled", func() {
// input data
rows := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
},
{
Address: contract2Address,
CreatedAt: 40,
WatchedAt: 70,
LastFilledAt: 70,
},
}
// expected output data
expectedOutputAddresses := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: 50,
LastFilledAt: 0,
StartBlock: 10,
EndBlock: 50,
},
}
expectedOutputStartBlock := uint64(10)
expectedOutputEndBlock := uint64(50)
fillWatchedAddresses, minStartBlock, maxEndBlock := watchedAddressGapFiller.GetFillAddresses(rows)
Expect(fillWatchedAddresses).To(Equal(expectedOutputAddresses))
Expect(minStartBlock).To(Equal(expectedOutputStartBlock))
Expect(maxEndBlock).To(Equal(expectedOutputEndBlock))
})
})
})
Describe("UpdateLastFilledAt", func() {
pgStr := "SELECT * FROM eth_meta.watched_addresses"
BeforeEach(func() {
shared.TearDownDB(db)
})
It("updates last filled at for a single address", func() {
// fill db with watched addresses
watchedAddresses := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: 10,
},
}
watchedAt := uint64(50)
err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt)))
Expect(err).ToNot(HaveOccurred())
// update last filled at block in the db
fillAddresses := []interface{}{
contract1Address,
}
fillAt := uint64(12)
watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses)
// expected data
expectedData := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
}
rows := []fill.WatchedAddress{}
err = db.Select(&rows, pgStr)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(expectedData))
})
It("updates last filled at for multiple address", func() {
// fill db with watched addresses
watchedAddresses := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: 10,
},
{
Address: contract2Address,
CreatedAt: 20,
},
{
Address: contract3Address,
CreatedAt: 30,
},
}
watchedAt := uint64(50)
err = statediffIndexer.InsertWatchedAddresses(watchedAddresses, big.NewInt(int64(watchedAt)))
Expect(err).ToNot(HaveOccurred())
// update last filled at block in the db
fillAddresses := []interface{}{
contract1Address,
contract2Address,
contract3Address,
}
fillAt := uint64(50)
watchedAddressGapFiller.UpdateLastFilledAt(fillAt, fillAddresses)
// expected data
expectedData := []fill.WatchedAddress{
{
Address: contract1Address,
CreatedAt: 10,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
{
Address: contract2Address,
CreatedAt: 20,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
{
Address: contract3Address,
CreatedAt: 30,
WatchedAt: watchedAt,
LastFilledAt: fillAt,
},
}
rows := []fill.WatchedAddress{}
err = db.Select(&rows, pgStr)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(expectedData))
})
})
})

View File

@ -55,9 +55,6 @@ const (
VALIDATOR_ENABLED = "VALIDATOR_ENABLED"
VALIDATOR_EVERY_NTH_BLOCK = "VALIDATOR_EVERY_NTH_BLOCK"
WATCHED_ADDRESS_GAP_FILLER_ENABLED = "WATCHED_ADDRESS_GAP_FILLER_ENABLED"
WATCHED_ADDRESS_GAP_FILLER_INTERVAL = "WATCHED_ADDRESS_GAP_FILLER_INTERVAL"
)
// Config struct
@ -98,9 +95,6 @@ type Config struct {
StateValidationEnabled bool
StateValidationEveryNthBlock uint64
WatchedAddressGapFillerEnabled bool
WatchedAddressGapFillInterval int
}
// NewConfig is used to initialize a watcher config from a .toml file
@ -237,8 +231,6 @@ func NewConfig() (*Config, error) {
c.loadValidatorConfig()
c.loadWatchedAddressGapFillerConfig()
return c, err
}
@ -301,11 +293,3 @@ func (c *Config) loadValidatorConfig() {
c.StateValidationEnabled = viper.GetBool("validator.enabled")
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
}
func (c *Config) loadWatchedAddressGapFillerConfig() {
viper.BindEnv("watch.fill.enabled", WATCHED_ADDRESS_GAP_FILLER_ENABLED)
viper.BindEnv("watch.fill.interval", WATCHED_ADDRESS_GAP_FILLER_INTERVAL)
c.WatchedAddressGapFillerEnabled = viper.GetBool("watch.fill.enabled")
c.WatchedAddressGapFillInterval = viper.GetInt("watch.fill.interval")
}

View File

@ -4,8 +4,6 @@ set -o xtrace
export ETH_FORWARD_ETH_CALLS=false
export DB_WRITE=true
export ETH_PROXY_ON_ERROR=false
export WATCHED_ADDRESS_GAP_FILLER_ENABLED=false
export WATCHED_ADDRESS_GAP_FILLER_INTERVAL=5
export PGPASSWORD=password
export DATABASE_USER=vdbm

View File

@ -4,8 +4,6 @@ set -o xtrace
export ETH_FORWARD_ETH_CALLS=true
export DB_WRITE=false
export ETH_PROXY_ON_ERROR=false
export WATCHED_ADDRESS_GAP_FILLER_ENABLED=false
export WATCHED_ADDRESS_GAP_FILLER_INTERVAL=5
export PGPASSWORD=password
export DATABASE_USER=vdbm

View File

@ -1,19 +0,0 @@
set -e
set -o xtrace
export ETH_FORWARD_ETH_CALLS=false
export DB_WRITE=true
export ETH_PROXY_ON_ERROR=false
export WATCHED_ADDRESS_GAP_FILLER_ENABLED=true
export WATCHED_ADDRESS_GAP_FILLER_INTERVAL=5
export PGPASSWORD=password
export DATABASE_USER=vdbm
export DATABASE_PORT=8077
export DATABASE_PASSWORD=password
export DATABASE_HOSTNAME=127.0.0.1
# Wait for containers to be up and execute the integration test.
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8081)" != "200" ]; do echo "waiting for ipld-eth-server..." && sleep 5; done && \
while [ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8545)" != "200" ]; do echo "waiting for geth-statediff..." && sleep 5; done && \
make integrationtest

View File

@ -106,31 +106,3 @@
```bash
./scripts/run_integration_test_forward_eth_calls.sh
```
- Update stack-orchestrator `config.sh` file:
```bash
#!/bin/bash
# Path to go-ethereum repo.
vulcanize_go_ethereum=~/go-ethereum/
# Path to ipld-eth-server repo.
vulcanize_ipld_eth_server=~/ipld-eth-server/
# Path to test contract.
vulcanize_test_contract=~/ipld-eth-server/test/contract
db_write=true
eth_forward_eth_calls=false
eth_proxy_on_error=false
eth_http_path="go-ethereum:8545"
watched_addres_gap_filler_enabled=true
watched_addres_gap_filler_interval=5
```
- Stop the stack-orchestrator and start again using the same command
- Run integration tests for watched addresses with gap filling service enabled:
```bash
./scripts/run_integration_test_watched_address_gap_filler.sh
```

View File

@ -20,8 +20,6 @@ import (
var _ = Describe("Integration test", func() {
directProxyEthCalls, err := strconv.ParseBool(os.Getenv("ETH_FORWARD_ETH_CALLS"))
Expect(err).To(BeNil())
watchedAddressServiceEnabled, err := strconv.ParseBool(os.Getenv("WATCHED_ADDRESS_GAP_FILLER_ENABLED"))
Expect(err).To(BeNil())
gethHttpPath := "http://127.0.0.1:8545"
gethClient, err := ethclient.Dial(gethHttpPath)
Expect(err).ToNot(HaveOccurred())
@ -41,7 +39,7 @@ var _ = Describe("Integration test", func() {
sleepInterval := 2 * time.Second
BeforeEach(func() {
if !directProxyEthCalls || watchedAddressServiceEnabled {
if !directProxyEthCalls {
Skip("skipping direct-proxy-forwarding integration tests")
}
})

View File

@ -31,8 +31,6 @@ var (
var _ = Describe("Integration test", func() {
directProxyEthCalls, err := strconv.ParseBool(os.Getenv("ETH_FORWARD_ETH_CALLS"))
Expect(err).To(BeNil())
watchedAddressServiceEnabled, err := strconv.ParseBool(os.Getenv("WATCHED_ADDRESS_GAP_FILLER_ENABLED"))
Expect(err).To(BeNil())
gethHttpPath := "http://127.0.0.1:8545"
gethClient, err := ethclient.Dial(gethHttpPath)
Expect(err).ToNot(HaveOccurred())
@ -52,7 +50,7 @@ var _ = Describe("Integration test", func() {
sleepInterval := 2 * time.Second
BeforeEach(func() {
if directProxyEthCalls || watchedAddressServiceEnabled {
if directProxyEthCalls {
Skip("skipping no-direct-proxy-forwarding integration tests")
}
})

View File

@ -27,9 +27,6 @@ var _ = Describe("WatchAddress integration test", func() {
dbWrite, err := strconv.ParseBool(os.Getenv("DB_WRITE"))
Expect(err).To(BeNil())
watchedAddressServiceEnabled, err := strconv.ParseBool(os.Getenv("WATCHED_ADDRESS_GAP_FILLER_ENABLED"))
Expect(err).To(BeNil())
gethHttpPath := "http://127.0.0.1:8545"
gethRPCClient, err := rpc.Dial(gethHttpPath)
Expect(err).ToNot(HaveOccurred())
@ -71,7 +68,7 @@ var _ = Describe("WatchAddress integration test", func() {
)
BeforeEach(func() {
if !dbWrite || watchedAddressServiceEnabled {
if !dbWrite {
Skip("skipping WatchAddress API integration tests")
}
})

View File

@ -1,318 +0,0 @@
package integration_test
import (
"context"
"math/big"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
integration "github.com/vulcanize/ipld-eth-server/v4/test"
)
var _ = Describe("Watched address gap filling service integration test", func() {
dbWrite, err := strconv.ParseBool(os.Getenv("DB_WRITE"))
Expect(err).To(BeNil())
watchedAddressServiceEnabled, err := strconv.ParseBool(os.Getenv("WATCHED_ADDRESS_GAP_FILLER_ENABLED"))
Expect(err).To(BeNil())
serviceInterval, err := strconv.ParseInt(os.Getenv("WATCHED_ADDRESS_GAP_FILLER_INTERVAL"), 10, 0)
Expect(err).To(BeNil())
gethHttpPath := "http://127.0.0.1:8545"
gethRPCClient, err := rpc.Dial(gethHttpPath)
Expect(err).ToNot(HaveOccurred())
ipldEthHttpPath := "http://127.0.0.1:8081"
ipldClient, err := ethclient.Dial(ipldEthHttpPath)
Expect(err).ToNot(HaveOccurred())
ipldRPCClient, err := rpc.Dial(ipldEthHttpPath)
Expect(err).ToNot(HaveOccurred())
var (
ctx = context.Background()
contractErr error
txErr error
GLD1 *integration.ContractDeployed
SLV1 *integration.ContractDeployed
SLV2 *integration.ContractDeployed
countAIndex string
countBIndex string
oldCountA1 = big.NewInt(0)
oldCountA2 = big.NewInt(0)
oldCountB2 = big.NewInt(0)
updatedCountA1 = big.NewInt(1)
updatedCountA2 = big.NewInt(1)
updatedCountB2 = big.NewInt(1)
SLV2CountBIncrementedAt *integration.CountIncremented
contract1Salt = "contract1SLV"
)
BeforeEach(func() {
if !dbWrite || !watchedAddressServiceEnabled {
Skip("skipping watched address gap filling service integration tests")
}
})
It("test init", func() {
// Clear out watched addresses
err := integration.ClearWatchedAddresses(gethRPCClient)
Expect(err).ToNot(HaveOccurred())
// Deploy a GLD contract
GLD1, contractErr = integration.DeployContract()
Expect(contractErr).ToNot(HaveOccurred())
// Watch GLD1 contract
operation := types.Add
args := []types.WatchAddressArg{
{
Address: GLD1.Address,
CreatedAt: uint64(GLD1.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Deploy two SLV contracts and update storage slots
SLV1, contractErr = integration.Create2Contract("SLVToken", contract1Salt)
Expect(contractErr).ToNot(HaveOccurred())
_, txErr = integration.IncrementCount(SLV1.Address, "A")
Expect(txErr).ToNot(HaveOccurred())
SLV2, contractErr = integration.DeploySLVContract()
Expect(contractErr).ToNot(HaveOccurred())
_, txErr = integration.IncrementCount(SLV2.Address, "A")
Expect(txErr).ToNot(HaveOccurred())
SLV2CountBIncrementedAt, txErr = integration.IncrementCount(SLV2.Address, "B")
Expect(txErr).ToNot(HaveOccurred())
// Get storage slot keys
storageSlotAKey, err := integration.GetStorageSlotKey("SLVToken", "countA")
Expect(err).ToNot(HaveOccurred())
countAIndex = storageSlotAKey.Key
storageSlotBKey, err := integration.GetStorageSlotKey("SLVToken", "countB")
Expect(err).ToNot(HaveOccurred())
countBIndex = storageSlotBKey.Key
})
defer It("test cleanup", func() {
// Destroy create2 contract
_, err := integration.DestroySLVContract(SLV1.Address)
Expect(err).ToNot(HaveOccurred())
// Clear out watched addresses
err = integration.ClearWatchedAddresses(gethRPCClient)
Expect(err).ToNot(HaveOccurred())
})
Context("previously unwatched contract watched", func() {
It("indexes state only for watched contract", func() {
// WatchedAddresses = [GLD1]
// SLV1, countA
countA1Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV1.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA1 := new(big.Int).SetBytes(countA1Storage)
Expect(countA1.String()).To(Equal(oldCountA1.String()))
// SLV2, countA
countA2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA2 := new(big.Int).SetBytes(countA2Storage)
Expect(countA2.String()).To(Equal(oldCountA2.String()))
// SLV2, countB
countB2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countBIndex), nil)
Expect(err).ToNot(HaveOccurred())
countB2 := new(big.Int).SetBytes(countB2Storage)
Expect(countB2.String()).To(Equal(oldCountB2.String()))
})
It("indexes past state on watching a contract", func() {
// Watch SLV1 contract
args := []types.WatchAddressArg{
{
Address: SLV1.Address,
CreatedAt: uint64(SLV1.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, types.Add, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Sleep for service interval + few extra seconds
time.Sleep(time.Duration(serviceInterval+2) * time.Second)
// WatchedAddresses = [GLD1, SLV1]
// SLV1, countA
countA1Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV1.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA1 := new(big.Int).SetBytes(countA1Storage)
Expect(countA1.String()).To(Equal(updatedCountA1.String()))
// SLV2, countA
countA2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA2 := new(big.Int).SetBytes(countA2Storage)
Expect(countA2.String()).To(Equal(oldCountA2.String()))
// SLV2, countB
countB2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countBIndex), nil)
Expect(err).ToNot(HaveOccurred())
countB2 := new(big.Int).SetBytes(countB2Storage)
Expect(countB2.String()).To(Equal(oldCountB2.String()))
})
})
Context("previously unwatched contract watched (different 'created_at')", func() {
It("indexes past state from 'created_at' onwards on watching a contract", func() {
// Watch SLV2 (created_at -> countB incremented) contract
args := []types.WatchAddressArg{
{
Address: SLV2.Address,
CreatedAt: uint64(SLV2CountBIncrementedAt.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, types.Add, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Sleep for service interval + few extra seconds
time.Sleep(time.Duration(serviceInterval+2) * time.Second)
// WatchedAddresses = [GLD1, SLV1, SLV2]
// SLV2, countA
countA2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA2 := new(big.Int).SetBytes(countA2Storage)
Expect(countA2.String()).To(Equal(oldCountA2.String()))
// SLV2, countB
countB2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countBIndex), nil)
Expect(err).ToNot(HaveOccurred())
countB2 := new(big.Int).SetBytes(countB2Storage)
Expect(countB2.String()).To(Equal(updatedCountB2.String()))
})
It("indexes missing past state on watching a contract from an earlier 'created_at'", func() {
// Remove SLV2 (created_at -> countB incremented) watched contract
args := []types.WatchAddressArg{
{
Address: SLV2.Address,
CreatedAt: uint64(SLV2CountBIncrementedAt.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, types.Remove, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Watch SLV2 (created_at -> deployment) contract
args = []types.WatchAddressArg{
{
Address: SLV2.Address,
CreatedAt: uint64(SLV2.BlockNumber),
},
}
ipldErr = ipldRPCClient.Call(nil, ipldMethod, types.Add, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Sleep for service interval + few extra seconds
time.Sleep(time.Duration(serviceInterval+2) * time.Second)
// WatchedAddresses = [GLD1, SLV1, SLV2]
// SLV2, countA
countA2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA2 := new(big.Int).SetBytes(countA2Storage)
Expect(countA2.String()).To(Equal(updatedCountA2.String()))
// SLV2, countB
countB2Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV2.Address), common.HexToHash(countBIndex), nil)
Expect(err).ToNot(HaveOccurred())
countB2 := new(big.Int).SetBytes(countB2Storage)
Expect(countB2.String()).To(Equal(updatedCountB2.String()))
})
})
Context("destroyed contract redeployed and watched", func() {
It("returns zero value for destroyed contract", func() {
_, err := integration.DestroySLVContract(SLV1.Address)
Expect(err).ToNot(HaveOccurred())
updatedCountA1 = big.NewInt(0)
operation := types.Remove
args := []types.WatchAddressArg{
{
Address: SLV1.Address,
CreatedAt: uint64(SLV1.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [GLD1, SLV2]
// SLV1, countA
countA1Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV1.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA1 := new(big.Int).SetBytes(countA1Storage)
Expect(countA1.String()).To(Equal(updatedCountA1.String()))
oldCountA1.Set(updatedCountA1)
})
It("indexes state for redeployed contract", func() {
// Redeploy contract
SLV1, contractErr = integration.Create2Contract("SLVToken", contract1Salt)
Expect(contractErr).ToNot(HaveOccurred())
// Add to watched address
operation := types.Add
args := []types.WatchAddressArg{
{
Address: SLV1.Address,
CreatedAt: uint64(SLV1.BlockNumber),
},
}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// Sleep for service interval + few extra seconds
time.Sleep(time.Duration(serviceInterval+2) * time.Second)
// WatchedAddresses = [GLD1, SLV2, SLV1]
// SLV1, countA
countA1Storage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV1.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA1 := new(big.Int).SetBytes(countA1Storage)
Expect(countA1.String()).To(Equal(updatedCountA1.String()))
oldCountA1.Set(updatedCountA1)
// Update storage slots
_, txErr = integration.IncrementCount(SLV1.Address, "A")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
updatedCountA1.Add(updatedCountA1, big.NewInt(1))
countA1AfterIncrementStorage, err := ipldClient.StorageAt(ctx, common.HexToAddress(SLV1.Address), common.HexToHash(countAIndex), nil)
Expect(err).ToNot(HaveOccurred())
countA1AfterIncrement := new(big.Int).SetBytes(countA1AfterIncrementStorage)
Expect(countA1AfterIncrement.String()).To(Equal(updatedCountA1.String()))
oldCountA1.Set(updatedCountA1)
})
})
})