Use nodeID array instead of a single value #239

Closed
abdulrabbani00 wants to merge 52 commits from feature/node_id_array into v1.10.19-statediff-v4
19 changed files with 1476 additions and 273 deletions

View File

@ -1,13 +1,35 @@
name: Publish geth to release
name: Test, Build, and/or Publish
on:
release:
types: [published]
pull_request:
jobs:
pre_job:
# continue-on-error: true # Uncomment once integration is finished
runs-on: ubuntu-latest
# Map a step output to a job output
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
- id: skip_check
uses: fkirc/skip-duplicate-actions@v4
with:
# All of these options are optional, so you can remove them if you are happy with the defaults
concurrent_skipping: "never"
skip_after_successful_duplicate: "true"
do_not_skip: '["workflow_dispatch", "schedule"]'
run-tests:
uses: ./.github/workflows/tests.yml
if: ${{ needs.pre_job.outputs.should_skip != 'true' }}
needs: pre_job
build:
name: Run docker build and publish
needs: run-tests
if: |
always() &&
(needs.run-tests.result == 'success' || needs.run-tests.result == 'skipped') &&
github.event_name == 'release'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
@ -25,6 +47,10 @@ jobs:
push_to_registries:
name: Publish assets to Release
runs-on: ubuntu-latest
if: |
always() &&
(needs.build.result == 'success') &&
github.event_name == 'release'
needs: build
steps:
- name: Get the version

View File

@ -1,7 +0,0 @@
name: Build and test
on: [pull_request]
jobs:
run-tests:
uses: ./.github/workflows/tests.yml

View File

@ -80,9 +80,6 @@ jobs:
--env-file $GITHUB_WORKSPACE/config.sh \
up -d --build
- name: Give the migration a few seconds
run: sleep 30;
- name: Run unit tests
run: make statedifftest

View File

@ -1,247 +0,0 @@
language: go
go_import_path: github.com/ethereum/go-ethereum
sudo: false
jobs:
allow_failures:
- stage: build
os: osx
go: 1.17.x
env:
- azure-osx
- azure-ios
- cocoapods-ios
include:
# This builder only tests code linters on latest version of Go
- stage: lint
os: linux
dist: bionic
go: 1.18.x
env:
- lint
git:
submodules: false # avoid cloning ethereum/tests
script:
- go run build/ci.go lint
# These builders create the Docker sub-images for multi-arch push and each
# will attempt to push the multi-arch image if they are the last builder
- stage: build
if: type = push
os: linux
arch: amd64
dist: bionic
go: 1.18.x
env:
- docker
services:
- docker
git:
submodules: false # avoid cloning ethereum/tests
before_install:
- export DOCKER_CLI_EXPERIMENTAL=enabled
script:
- go run build/ci.go docker -image -manifest amd64,arm64 -upload ethereum/client-go
- stage: build
if: type = push
os: linux
arch: arm64
dist: bionic
go: 1.18.x
env:
- docker
services:
- docker
git:
submodules: false # avoid cloning ethereum/tests
before_install:
- export DOCKER_CLI_EXPERIMENTAL=enabled
script:
- go run build/ci.go docker -image -manifest amd64,arm64 -upload ethereum/client-go
# This builder does the Ubuntu PPA upload
- stage: build
if: type = push
os: linux
dist: bionic
go: 1.18.x
env:
- ubuntu-ppa
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
addons:
apt:
packages:
- devscripts
- debhelper
- dput
- fakeroot
- python-bzrlib
- python-paramiko
script:
- echo '|1|7SiYPr9xl3uctzovOTj4gMwAC1M=|t6ReES75Bo/PxlOPJ6/GsGbTrM0= ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA0aKz5UTUndYgIGG7dQBV+HaeuEZJ2xPHo2DS2iSKvUL4xNMSAY4UguNW+pX56nAQmZKIZZ8MaEvSj6zMEDiq6HFfn5JcTlM80UwlnyKe8B8p7Nk06PPQLrnmQt5fh0HmEcZx+JU9TZsfCHPnX7MNz4ELfZE6cFsclClrKim3BHUIGq//t93DllB+h4O9LHjEUsQ1Sr63irDLSutkLJD6RXchjROXkNirlcNVHH/jwLWR5RcYilNX7S5bIkK8NlWPjsn/8Ua5O7I9/YoE97PpO6i73DTGLh5H9JN/SITwCKBkgSDWUt61uPK3Y11Gty7o2lWsBjhBUm2Y38CBsoGmBw==' >> ~/.ssh/known_hosts
- go run build/ci.go debsrc -upload ethereum/ethereum -sftp-user geth-ci -signer "Go Ethereum Linux Builder <geth-ci@ethereum.org>"
# This builder does the Linux Azure uploads
- stage: build
if: type = push
os: linux
dist: bionic
sudo: required
go: 1.18.x
env:
- azure-linux
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
addons:
apt:
packages:
- gcc-multilib
script:
# Build for the primary platforms that Trusty can manage
- go run build/ci.go install -dlgo
- go run build/ci.go archive -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
- go run build/ci.go install -dlgo -arch 386
- go run build/ci.go archive -arch 386 -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
# Switch over GCC to cross compilation (breaks 386, hence why do it here only)
- sudo -E apt-get -yq --no-install-suggests --no-install-recommends --force-yes install gcc-arm-linux-gnueabi libc6-dev-armel-cross gcc-arm-linux-gnueabihf libc6-dev-armhf-cross gcc-aarch64-linux-gnu libc6-dev-arm64-cross
- sudo ln -s /usr/include/asm-generic /usr/include/asm
- GOARM=5 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabi-gcc
- GOARM=5 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
- GOARM=6 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabi-gcc
- GOARM=6 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
- GOARM=7 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabihf-gcc
- GOARM=7 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
- go run build/ci.go install -dlgo -arch arm64 -cc aarch64-linux-gnu-gcc
- go run build/ci.go archive -arch arm64 -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
# This builder does the Android Maven and Azure uploads
- stage: build
if: type = push
os: linux
dist: bionic
addons:
apt:
packages:
- openjdk-8-jdk
env:
- azure-android
- maven-android
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
before_install:
# Install Android and it's dependencies manually, Travis is stale
- export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
- curl https://dl.google.com/android/repository/commandlinetools-linux-6858069_latest.zip -o android.zip
- unzip -q android.zip -d $HOME/sdk && rm android.zip
- mv $HOME/sdk/cmdline-tools $HOME/sdk/latest && mkdir $HOME/sdk/cmdline-tools && mv $HOME/sdk/latest $HOME/sdk/cmdline-tools
- export PATH=$PATH:$HOME/sdk/cmdline-tools/latest/bin
- export ANDROID_HOME=$HOME/sdk
- yes | sdkmanager --licenses >/dev/null
- sdkmanager "platform-tools" "platforms;android-15" "platforms;android-19" "platforms;android-24" "ndk-bundle"
# Install Go to allow building with
- curl https://dl.google.com/go/go1.18.linux-amd64.tar.gz | tar -xz
- export PATH=`pwd`/go/bin:$PATH
- export GOROOT=`pwd`/go
- export GOPATH=$HOME/go
script:
# Build the Android archive and upload it to Maven Central and Azure
- mkdir -p $GOPATH/src/github.com/ethereum
- ln -s `pwd` $GOPATH/src/github.com/ethereum/go-ethereum
- go run build/ci.go aar -signer ANDROID_SIGNING_KEY -signify SIGNIFY_KEY -deploy https://oss.sonatype.org -upload gethstore/builds
# This builder does the OSX Azure, iOS CocoaPods and iOS Azure uploads
- stage: build
if: type = push
os: osx
go: 1.18.x
env:
- azure-osx
- azure-ios
- cocoapods-ios
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
script:
- go run build/ci.go install -dlgo
- go run build/ci.go archive -type tar -signer OSX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds
# Build the iOS framework and upload it to CocoaPods and Azure
- gem uninstall cocoapods -a -x
- gem install cocoapods
- mv ~/.cocoapods/repos/master ~/.cocoapods/repos/master.bak
- sed -i '.bak' 's/repo.join/!repo.join/g' $(dirname `gem which cocoapods`)/cocoapods/sources_manager.rb
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then git clone --depth=1 https://github.com/CocoaPods/Specs.git ~/.cocoapods/repos/master && pod setup --verbose; fi
- xctool -version
- xcrun simctl list
# Workaround for https://github.com/golang/go/issues/23749
- export CGO_CFLAGS_ALLOW='-fmodules|-fblocks|-fobjc-arc'
- go run build/ci.go xcode -signer IOS_SIGNING_KEY -signify SIGNIFY_KEY -deploy trunk -upload gethstore/builds
# These builders run the tests
- stage: build
os: linux
arch: amd64
dist: bionic
go: 1.18.x
env:
- GO111MODULE=on
script:
- go run build/ci.go test -coverage $TEST_PACKAGES
- stage: build
if: type = pull_request
os: linux
arch: arm64
dist: bionic
go: 1.18.x
env:
- GO111MODULE=on
script:
- go run build/ci.go test -coverage $TEST_PACKAGES
- stage: build
os: linux
dist: bionic
go: 1.17.x
env:
- GO111MODULE=on
script:
- go run build/ci.go test -coverage $TEST_PACKAGES
# This builder does the Azure archive purges to avoid accumulating junk
- stage: build
if: type = cron
os: linux
dist: bionic
go: 1.18.x
env:
- azure-purge
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
script:
- go run build/ci.go purge -store gethstore/builds -days 14
# This builder executes race tests
- stage: build
if: type = cron
os: linux
dist: bionic
go: 1.18.x
env:
- GO111MODULE=on
script:
- go run build/ci.go test -race -coverage $TEST_PACKAGES

View File

@ -250,6 +250,34 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffWatchedAddressesFilePath,
},
},
{
Name: "STATE DIFF",
Flags: []cli.Flag{
utils.StateDiffFlag,
utils.StateDiffDBTypeFlag,
utils.StateDiffDBDriverTypeFlag,
utils.StateDiffDBDumpDst,
utils.StateDiffDBNameFlag,
utils.StateDiffDBPasswordFlag,
utils.StateDiffDBUserFlag,
utils.StateDiffDBHostFlag,
utils.StateDiffDBPortFlag,
utils.StateDiffDBMaxConnLifetime,
utils.StateDiffDBMaxConnIdleTime,
utils.StateDiffDBMaxConns,
utils.StateDiffDBMinConns,
utils.StateDiffDBMaxIdleConns,
utils.StateDiffDBConnTimeout,
utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
},
},
{
Name: "MISC",
Flags: []cli.Flag{

1
go.sum
View File

@ -662,6 +662,7 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4=

View File

@ -1861,6 +1861,294 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(&test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected)
}
}
}
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
blocks, chain := test_helpers.MakeChain(6, test_helpers.Genesis, test_helpers.TestChainGen)
contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr)
defer chain.Stop()
block3 = blocks[2]
block4 = blocks[3]
block5 = blocks[4]
block6 = blocks[5]
params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.Account2Addr},
}
params.ComputeWatchedAddressesLeafKeys()
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *types2.StateObject
}{
{
"testBlock4",
statediff.Args{
OldStateRoot: block3.Root(),
NewStateRoot: block4.Root(),
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
},
&types2.StateObject{
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x0c'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account2LeafKey,
NodeValue: account2AtBlock4LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock5",
statediff.Args{
OldStateRoot: block4.Root(),
NewStateRoot: block5.Root(),
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
},
&types2.StateObject{
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x0e'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account1LeafKey,
NodeValue: account1AtBlock5LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock6",
statediff.Args{
OldStateRoot: block5.Root(),
NewStateRoot: block6.Root(),
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
},
&types2.StateObject{
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x0c'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account2LeafKey,
NodeValue: account2AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
{
Path: []byte{'\x0e'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account1LeafKey,
NodeValue: account1AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected)
}
}
}
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
blocks, chain := test_helpers.MakeChain(6, test_helpers.Genesis, test_helpers.TestChainGen)
contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr)
defer chain.Stop()
block3 = blocks[2]
block4 = blocks[3]
block5 = blocks[4]
block6 = blocks[5]
params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr},
}
params.ComputeWatchedAddressesLeafKeys()
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *types2.StateObject
}{
{
"testBlock4",
statediff.Args{
OldStateRoot: block3.Root(),
NewStateRoot: block4.Root(),
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
},
&types2.StateObject{
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x06'},
NodeType: types2.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock4LeafNode,
StorageNodes: []types2.StorageNode{
{
Path: []byte{'\x04'},
NodeType: types2.Leaf,
LeafKey: slot2StorageKey.Bytes(),
NodeValue: slot2StorageLeafNode,
},
{
Path: []byte{'\x0b'},
NodeType: types2.Removed,
LeafKey: slot1StorageKey.Bytes(),
NodeValue: []byte{},
},
{
Path: []byte{'\x0c'},
NodeType: types2.Removed,
LeafKey: slot3StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
},
},
},
{
"testBlock5",
statediff.Args{
OldStateRoot: block4.Root(),
NewStateRoot: block5.Root(),
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
},
&types2.StateObject{
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x06'},
NodeType: types2.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock5LeafNode,
StorageNodes: []types2.StorageNode{
{
Path: []byte{'\x0c'},
NodeType: types2.Leaf,
LeafKey: slot3StorageKey.Bytes(),
NodeValue: slot3StorageLeafNode,
},
{
Path: []byte{'\x04'},
NodeType: types2.Removed,
LeafKey: slot2StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
{
Path: []byte{'\x0e'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account1LeafKey,
NodeValue: account1AtBlock5LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock6",
statediff.Args{
OldStateRoot: block5.Root(),
NewStateRoot: block6.Root(),
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
},
&types2.StateObject{
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
Nodes: []types2.StateNode{
{
Path: []byte{'\x06'},
NodeType: types2.Removed,
LeafKey: contractLeafKey,
NodeValue: []byte{},
StorageNodes: []types2.StorageNode{
{
Path: []byte{'\x02'},
NodeType: types2.Removed,
LeafKey: slot0StorageKey.Bytes(),
NodeValue: []byte{},
},
{
Path: []byte{'\x0c'},
NodeType: types2.Removed,
LeafKey: slot3StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
{
Path: []byte{'\x0e'},
NodeType: types2.Leaf,
LeafKey: test_helpers.Account1LeafKey,
NodeValue: account1AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {

View File

@ -66,6 +66,10 @@ func (p *Params) ComputeWatchedAddressesLeafKeys() {
}
}
func (p *Params) WatchedAddressesLeafKeys() map[common.Hash]struct{} {
return p.watchedAddressesLeafKeys
}
// ParamsWithMutex allows to lock the parameters while they are being updated | read from
type ParamsWithMutex struct {
Params

View File

@ -534,3 +534,21 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg,
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
// Written but not tested. Unsure if there is a real use case for this anywhere.
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
log.Info("Dumping known gaps")
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
}
knownGap := models.KnownGapsModel{
StartingBlockNumber: startingBlockNumber.String(),
EndingBlockNumber: endingBlockNumber.String(),
CheckedOut: checkedOut,
ProcessingKey: processingKey,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil {
return err
}
return nil
}

View File

@ -41,7 +41,7 @@ var TestConfig = Config{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ID: "1",
ClientName: "go-ethereum",
},
}

View File

@ -220,7 +220,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
BlockNumber: sdi.blockNumber,
BlockHash: headerID,
TotalDifficulty: td.String(),
Reward: reward.String(),

View File

@ -1094,3 +1094,341 @@ func TestFileWatchAddressMethods(t *testing.T) {
}
})
}
func TestFileWatchAddressMethods(t *testing.T) {
setupIndexer(t)
defer tearDown(t)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
expectedData := []common.Address{}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Load watched addresses", func(t *testing.T) {
expectedData := []common.Address{
common.HexToAddress(contract4Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
resetAndDumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
}

View File

@ -197,8 +197,10 @@ func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []
}
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
nodeId := fmt.Sprintf("{%s}", header.NodeID)
// {'1'}
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.TotalDifficulty, nodeId, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)

View File

@ -55,6 +55,7 @@ type StateDiffIndexer struct {
ctx context.Context
chainConfig *params.ChainConfig
dbWriter *Writer
blockNumber string
}
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
@ -89,6 +90,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
sdi.blockNumber = block.Number().String()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -201,7 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes)
if err != nil {
return nil, err
}
@ -250,7 +252,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
BlockNumber: sdi.blockNumber,
BlockHash: headerID,
TotalDifficulty: td.String(),
Reward: reward.String(),
@ -265,7 +267,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
}
// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
@ -274,10 +276,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
BlockNumber: blockNumber.String(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
@ -333,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
BlockNumber: args.blockNumber.String(),
BlockNumber: sdi.blockNumber,
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
@ -356,7 +358,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
BlockNumber: sdi.blockNumber,
TxID: txID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
@ -380,7 +382,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}
rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
BlockNumber: sdi.blockNumber,
TxID: txID,
Contract: contract,
ContractHash: contractHash,
@ -411,7 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}
logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(),
BlockNumber: sdi.blockNumber,
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
@ -683,3 +685,77 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
// This is a simple wrapper function which will run QueryRow on the DB
func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
var ret string
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
if err != nil {
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
return "", err
}
return ret, nil
}
// This function is a simple wrapper which will call QueryDb but the return value will be
// a big int instead of a string
func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, error) {
ret := new(big.Int)
res, err := sdi.QueryDb(queryString)
if err != nil {
return ret, err
}
ret, ok := ret.SetString(res, 10)
if !ok {
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
}
return ret, nil
}
// Users provide the latestBlockInDb and the latestBlockOnChain
// as well as the expected difference. This function does some simple math.
// The expected difference for the time being is going to be 1, but as we run
// More geth nodes, the expected difference might fluctuate.
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
latestBlock := big.NewInt(0)
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
return true
}
return false
}
// This function will check for Gaps and update the DB if gaps are found.
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
// It might be a useful parameter to update depending on the geth node.
// TODO:
// REmove the return value
// Write to file if err in writing to DB
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
if err != nil {
return err
}
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
if gapExists {
startBlock := big.NewInt(0)
endBlock := big.NewInt(0)
startBlock.Add(latestBlockInDb, expectedDifference)
endBlock.Sub(latestBlockOnChain, expectedDifference)
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
if err != nil {
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
// Write to file SQL file instead!!!
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
return err
}
}
return nil
}

View File

@ -934,3 +934,334 @@ func TestPGXWatchAddressMethods(t *testing.T) {
}
})
}
func TestPGXWatchAddressMethods(t *testing.T) {
setupPGXIndexer(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
expectedData := []common.Address{}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Load watched addresses", func(t *testing.T) {
expectedData := []common.Address{
common.HexToAddress(contract4Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
}

View File

@ -40,7 +40,7 @@ type DB struct {
func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, ARRAY(SELECT DISTINCT unnest(array_cat(eth.header_cids.node_id, $6))), $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}
// InsertUncleStm satisfies the sql.Statements interface
@ -107,3 +107,10 @@ func (db *DB) InsertKnownGapsStm() string {
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
WHERE eth_meta.known_gaps.ending_block_number <= $2`
}
// InsertKnownGapsStm satisfies the sql.Statements interface
func (db *DB) InsertKnownGapsStm() string {
return `INSERT INTO eth_meta.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
WHERE eth_meta.known_gaps.ending_block_number <= $2`
}

View File

@ -927,3 +927,334 @@ func TestSQLXWatchAddressMethods(t *testing.T) {
}
})
}
func TestSQLXWatchAddressMethods(t *testing.T) {
setupSQLXIndexer(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
expectedData := []common.Address{}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Load watched addresses", func(t *testing.T) {
expectedData := []common.Address{
common.HexToAddress(contract4Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
rows := []res{}
err = db.Select(context.Background(), &rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
}

View File

@ -21,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/lib/pq"
)
var (
@ -47,11 +48,12 @@ func (w *Writer) Close() error {
/*
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
ON CONFLICT (block_hash, block_number) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, ARRAY(SELECT DISTINCT unnest(array_cat(eth.header_cids.node_id, $6))), $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
*/
func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
nodeIds := []string{w.db.NodeID()}
_, err := tx.Exec(w.db.Context(), w.db.InsertHeaderStm(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, pq.Array(nodeIds),
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
header.Timestamp, header.MhKey, 1, header.Coinbase)
if err != nil {

View File

@ -165,3 +165,11 @@ type KnownGapsModel struct {
CheckedOut bool `db:"checked_out"`
ProcessingKey int64 `db:"processing_key"`
}
// KnownGaps is the data structure for eth_meta.known_gaps
type KnownGapsModel struct {
StartingBlockNumber string `db:"starting_block_number"`
EndingBlockNumber string `db:"ending_block_number"`
CheckedOut bool `db:"checked_out"`
ProcessingKey int64 `db:"processing_key"`
}