diff --git a/.github/workflows/publish.yaml b/.github/workflows/on-pr-publish.yaml similarity index 73% rename from .github/workflows/publish.yaml rename to .github/workflows/on-pr-publish.yaml index 610f8afbf..51fec9b5a 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/on-pr-publish.yaml @@ -1,13 +1,35 @@ -name: Publish geth to release +name: Test, Build, and/or Publish on: release: types: [published] + pull_request: + jobs: + pre_job: + # continue-on-error: true # Uncomment once integration is finished + runs-on: ubuntu-latest + # Map a step output to a job output + outputs: + should_skip: ${{ steps.skip_check.outputs.should_skip }} + steps: + - id: skip_check + uses: fkirc/skip-duplicate-actions@v4 + with: + # All of these options are optional, so you can remove them if you are happy with the defaults + concurrent_skipping: "never" + skip_after_successful_duplicate: "true" + do_not_skip: '["workflow_dispatch", "schedule"]' run-tests: uses: ./.github/workflows/tests.yml + if: ${{ needs.pre_job.outputs.should_skip != 'true' }} + needs: pre_job build: name: Run docker build and publish needs: run-tests + if: | + always() && + (needs.run-tests.result == 'success' || needs.run-tests.result == 'skipped') && + github.event_name == 'release' runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -25,6 +47,10 @@ jobs: push_to_registries: name: Publish assets to Release runs-on: ubuntu-latest + if: | + always() && + (needs.build.result == 'success') && + github.event_name == 'release' needs: build steps: - name: Get the version diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml deleted file mode 100644 index 2b05dcda5..000000000 --- a/.github/workflows/on-pr.yml +++ /dev/null @@ -1,7 +0,0 @@ -name: Build and test - -on: [pull_request] - -jobs: - run-tests: - uses: ./.github/workflows/tests.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 41f63f235..967850b19 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -80,9 +80,6 @@ jobs: --env-file $GITHUB_WORKSPACE/config.sh \ up -d --build - - name: Give the migration a few seconds - run: sleep 30; - - name: Run unit tests run: make statedifftest diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index e08e271f3..000000000 --- a/.travis.yml +++ /dev/null @@ -1,247 +0,0 @@ -language: go -go_import_path: github.com/ethereum/go-ethereum -sudo: false -jobs: - allow_failures: - - stage: build - os: osx - go: 1.17.x - env: - - azure-osx - - azure-ios - - cocoapods-ios - - include: - # This builder only tests code linters on latest version of Go - - stage: lint - os: linux - dist: bionic - go: 1.18.x - env: - - lint - git: - submodules: false # avoid cloning ethereum/tests - script: - - go run build/ci.go lint - - # These builders create the Docker sub-images for multi-arch push and each - # will attempt to push the multi-arch image if they are the last builder - - stage: build - if: type = push - os: linux - arch: amd64 - dist: bionic - go: 1.18.x - env: - - docker - services: - - docker - git: - submodules: false # avoid cloning ethereum/tests - before_install: - - export DOCKER_CLI_EXPERIMENTAL=enabled - script: - - go run build/ci.go docker -image -manifest amd64,arm64 -upload ethereum/client-go - - - stage: build - if: type = push - os: linux - arch: arm64 - dist: bionic - go: 1.18.x - env: - - docker - services: - - docker - git: - submodules: false # avoid cloning ethereum/tests - before_install: - - export DOCKER_CLI_EXPERIMENTAL=enabled - script: - - go run build/ci.go docker -image -manifest amd64,arm64 -upload ethereum/client-go - - # This builder does the Ubuntu PPA upload - - stage: build - if: type = push - os: linux - dist: bionic - go: 1.18.x - env: - - ubuntu-ppa - - GO111MODULE=on - git: - submodules: false # avoid cloning ethereum/tests - addons: - apt: - packages: - - devscripts - - debhelper - - dput - - fakeroot - - python-bzrlib - - python-paramiko - script: - - echo '|1|7SiYPr9xl3uctzovOTj4gMwAC1M=|t6ReES75Bo/PxlOPJ6/GsGbTrM0= ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA0aKz5UTUndYgIGG7dQBV+HaeuEZJ2xPHo2DS2iSKvUL4xNMSAY4UguNW+pX56nAQmZKIZZ8MaEvSj6zMEDiq6HFfn5JcTlM80UwlnyKe8B8p7Nk06PPQLrnmQt5fh0HmEcZx+JU9TZsfCHPnX7MNz4ELfZE6cFsclClrKim3BHUIGq//t93DllB+h4O9LHjEUsQ1Sr63irDLSutkLJD6RXchjROXkNirlcNVHH/jwLWR5RcYilNX7S5bIkK8NlWPjsn/8Ua5O7I9/YoE97PpO6i73DTGLh5H9JN/SITwCKBkgSDWUt61uPK3Y11Gty7o2lWsBjhBUm2Y38CBsoGmBw==' >> ~/.ssh/known_hosts - - go run build/ci.go debsrc -upload ethereum/ethereum -sftp-user geth-ci -signer "Go Ethereum Linux Builder " - - # This builder does the Linux Azure uploads - - stage: build - if: type = push - os: linux - dist: bionic - sudo: required - go: 1.18.x - env: - - azure-linux - - GO111MODULE=on - git: - submodules: false # avoid cloning ethereum/tests - addons: - apt: - packages: - - gcc-multilib - script: - # Build for the primary platforms that Trusty can manage - - go run build/ci.go install -dlgo - - go run build/ci.go archive -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - go run build/ci.go install -dlgo -arch 386 - - go run build/ci.go archive -arch 386 -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - # Switch over GCC to cross compilation (breaks 386, hence why do it here only) - - sudo -E apt-get -yq --no-install-suggests --no-install-recommends --force-yes install gcc-arm-linux-gnueabi libc6-dev-armel-cross gcc-arm-linux-gnueabihf libc6-dev-armhf-cross gcc-aarch64-linux-gnu libc6-dev-arm64-cross - - sudo ln -s /usr/include/asm-generic /usr/include/asm - - - GOARM=5 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabi-gcc - - GOARM=5 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - GOARM=6 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabi-gcc - - GOARM=6 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - GOARM=7 go run build/ci.go install -dlgo -arch arm -cc arm-linux-gnueabihf-gcc - - GOARM=7 go run build/ci.go archive -arch arm -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - go run build/ci.go install -dlgo -arch arm64 -cc aarch64-linux-gnu-gcc - - go run build/ci.go archive -arch arm64 -type tar -signer LINUX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - # This builder does the Android Maven and Azure uploads - - stage: build - if: type = push - os: linux - dist: bionic - addons: - apt: - packages: - - openjdk-8-jdk - env: - - azure-android - - maven-android - - GO111MODULE=on - git: - submodules: false # avoid cloning ethereum/tests - before_install: - # Install Android and it's dependencies manually, Travis is stale - - export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 - - curl https://dl.google.com/android/repository/commandlinetools-linux-6858069_latest.zip -o android.zip - - unzip -q android.zip -d $HOME/sdk && rm android.zip - - mv $HOME/sdk/cmdline-tools $HOME/sdk/latest && mkdir $HOME/sdk/cmdline-tools && mv $HOME/sdk/latest $HOME/sdk/cmdline-tools - - export PATH=$PATH:$HOME/sdk/cmdline-tools/latest/bin - - export ANDROID_HOME=$HOME/sdk - - - yes | sdkmanager --licenses >/dev/null - - sdkmanager "platform-tools" "platforms;android-15" "platforms;android-19" "platforms;android-24" "ndk-bundle" - - # Install Go to allow building with - - curl https://dl.google.com/go/go1.18.linux-amd64.tar.gz | tar -xz - - export PATH=`pwd`/go/bin:$PATH - - export GOROOT=`pwd`/go - - export GOPATH=$HOME/go - script: - # Build the Android archive and upload it to Maven Central and Azure - - mkdir -p $GOPATH/src/github.com/ethereum - - ln -s `pwd` $GOPATH/src/github.com/ethereum/go-ethereum - - go run build/ci.go aar -signer ANDROID_SIGNING_KEY -signify SIGNIFY_KEY -deploy https://oss.sonatype.org -upload gethstore/builds - - # This builder does the OSX Azure, iOS CocoaPods and iOS Azure uploads - - stage: build - if: type = push - os: osx - go: 1.18.x - env: - - azure-osx - - azure-ios - - cocoapods-ios - - GO111MODULE=on - git: - submodules: false # avoid cloning ethereum/tests - script: - - go run build/ci.go install -dlgo - - go run build/ci.go archive -type tar -signer OSX_SIGNING_KEY -signify SIGNIFY_KEY -upload gethstore/builds - - # Build the iOS framework and upload it to CocoaPods and Azure - - gem uninstall cocoapods -a -x - - gem install cocoapods - - - mv ~/.cocoapods/repos/master ~/.cocoapods/repos/master.bak - - sed -i '.bak' 's/repo.join/!repo.join/g' $(dirname `gem which cocoapods`)/cocoapods/sources_manager.rb - - if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then git clone --depth=1 https://github.com/CocoaPods/Specs.git ~/.cocoapods/repos/master && pod setup --verbose; fi - - - xctool -version - - xcrun simctl list - - # Workaround for https://github.com/golang/go/issues/23749 - - export CGO_CFLAGS_ALLOW='-fmodules|-fblocks|-fobjc-arc' - - go run build/ci.go xcode -signer IOS_SIGNING_KEY -signify SIGNIFY_KEY -deploy trunk -upload gethstore/builds - - # These builders run the tests - - stage: build - os: linux - arch: amd64 - dist: bionic - go: 1.18.x - env: - - GO111MODULE=on - script: - - go run build/ci.go test -coverage $TEST_PACKAGES - - - stage: build - if: type = pull_request - os: linux - arch: arm64 - dist: bionic - go: 1.18.x - env: - - GO111MODULE=on - script: - - go run build/ci.go test -coverage $TEST_PACKAGES - - - stage: build - os: linux - dist: bionic - go: 1.17.x - env: - - GO111MODULE=on - script: - - go run build/ci.go test -coverage $TEST_PACKAGES - - # This builder does the Azure archive purges to avoid accumulating junk - - stage: build - if: type = cron - os: linux - dist: bionic - go: 1.18.x - env: - - azure-purge - - GO111MODULE=on - git: - submodules: false # avoid cloning ethereum/tests - script: - - go run build/ci.go purge -store gethstore/builds -days 14 - - # This builder executes race tests - - stage: build - if: type = cron - os: linux - dist: bionic - go: 1.18.x - env: - - GO111MODULE=on - script: - - go run build/ci.go test -race -coverage $TEST_PACKAGES - diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index fd0e02274..992c5190f 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -250,6 +250,34 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffWatchedAddressesFilePath, }, }, + { + Name: "STATE DIFF", + Flags: []cli.Flag{ + utils.StateDiffFlag, + utils.StateDiffDBTypeFlag, + utils.StateDiffDBDriverTypeFlag, + utils.StateDiffDBDumpDst, + utils.StateDiffDBNameFlag, + utils.StateDiffDBPasswordFlag, + utils.StateDiffDBUserFlag, + utils.StateDiffDBHostFlag, + utils.StateDiffDBPortFlag, + utils.StateDiffDBMaxConnLifetime, + utils.StateDiffDBMaxConnIdleTime, + utils.StateDiffDBMaxConns, + utils.StateDiffDBMinConns, + utils.StateDiffDBMaxIdleConns, + utils.StateDiffDBConnTimeout, + utils.StateDiffDBNodeIDFlag, + utils.StateDiffDBClientNameFlag, + utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, + utils.StateDiffFilePath, + utils.StateDiffKnownGapsFilePath, + utils.StateDiffWaitForSync, + utils.StateDiffWatchedAddressesFilePath, + }, + }, { Name: "MISC", Flags: []cli.Flag{ diff --git a/go.sum b/go.sum index 3b29fcb7c..2b6ecde27 100644 --- a/go.sum +++ b/go.sum @@ -662,6 +662,7 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4= diff --git a/statediff/builder_test.go b/statediff/builder_test.go index 3e8e6bdd9..3e386575a 100644 --- a/statediff/builder_test.go +++ b/statediff/builder_test.go @@ -1861,6 +1861,294 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) { }, } + for _, test := range tests { + diff, err := builder.BuildStateDiffObject(test.startingArguments, params) + if err != nil { + t.Error(err) + } + receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff) + if err != nil { + t.Error(err) + } + + expectedStateDiffRlp, err := rlp.EncodeToBytes(&test.expected) + if err != nil { + t.Error(err) + } + + sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] }) + sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] }) + if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) { + t.Logf("Test failed: %s", test.name) + t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected) + } + } +} + +func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { + blocks, chain := test_helpers.MakeChain(6, test_helpers.Genesis, test_helpers.TestChainGen) + contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr) + defer chain.Stop() + block3 = blocks[2] + block4 = blocks[3] + block5 = blocks[4] + block6 = blocks[5] + params := statediff.Params{ + WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.Account2Addr}, + } + params.ComputeWatchedAddressesLeafKeys() + builder = statediff.NewBuilder(chain.StateCache()) + + var tests = []struct { + name string + startingArguments statediff.Args + expected *types2.StateObject + }{ + { + "testBlock4", + statediff.Args{ + OldStateRoot: block3.Root(), + NewStateRoot: block4.Root(), + BlockNumber: block4.Number(), + BlockHash: block4.Hash(), + }, + &types2.StateObject{ + BlockNumber: block4.Number(), + BlockHash: block4.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x0c'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account2LeafKey, + NodeValue: account2AtBlock4LeafNode, + StorageNodes: emptyStorage, + }, + }, + }, + }, + { + "testBlock5", + statediff.Args{ + OldStateRoot: block4.Root(), + NewStateRoot: block5.Root(), + BlockNumber: block5.Number(), + BlockHash: block5.Hash(), + }, + &types2.StateObject{ + BlockNumber: block5.Number(), + BlockHash: block5.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x0e'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account1LeafKey, + NodeValue: account1AtBlock5LeafNode, + StorageNodes: emptyStorage, + }, + }, + }, + }, + { + "testBlock6", + statediff.Args{ + OldStateRoot: block5.Root(), + NewStateRoot: block6.Root(), + BlockNumber: block6.Number(), + BlockHash: block6.Hash(), + }, + &types2.StateObject{ + BlockNumber: block6.Number(), + BlockHash: block6.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x0c'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account2LeafKey, + NodeValue: account2AtBlock6LeafNode, + StorageNodes: emptyStorage, + }, + { + Path: []byte{'\x0e'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account1LeafKey, + NodeValue: account1AtBlock6LeafNode, + StorageNodes: emptyStorage, + }, + }, + }, + }, + } + + for _, test := range tests { + diff, err := builder.BuildStateDiffObject(test.startingArguments, params) + if err != nil { + t.Error(err) + } + receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff) + if err != nil { + t.Error(err) + } + + expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected) + if err != nil { + t.Error(err) + } + + sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] }) + sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] }) + if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) { + t.Logf("Test failed: %s", test.name) + t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected) + } + } +} + +func TestBuilderWithRemovedWatchedAccount(t *testing.T) { + blocks, chain := test_helpers.MakeChain(6, test_helpers.Genesis, test_helpers.TestChainGen) + contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr) + defer chain.Stop() + block3 = blocks[2] + block4 = blocks[3] + block5 = blocks[4] + block6 = blocks[5] + params := statediff.Params{ + WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr}, + } + params.ComputeWatchedAddressesLeafKeys() + builder = statediff.NewBuilder(chain.StateCache()) + + var tests = []struct { + name string + startingArguments statediff.Args + expected *types2.StateObject + }{ + { + "testBlock4", + statediff.Args{ + OldStateRoot: block3.Root(), + NewStateRoot: block4.Root(), + BlockNumber: block4.Number(), + BlockHash: block4.Hash(), + }, + &types2.StateObject{ + BlockNumber: block4.Number(), + BlockHash: block4.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x06'}, + NodeType: types2.Leaf, + LeafKey: contractLeafKey, + NodeValue: contractAccountAtBlock4LeafNode, + StorageNodes: []types2.StorageNode{ + { + Path: []byte{'\x04'}, + NodeType: types2.Leaf, + LeafKey: slot2StorageKey.Bytes(), + NodeValue: slot2StorageLeafNode, + }, + { + Path: []byte{'\x0b'}, + NodeType: types2.Removed, + LeafKey: slot1StorageKey.Bytes(), + NodeValue: []byte{}, + }, + { + Path: []byte{'\x0c'}, + NodeType: types2.Removed, + LeafKey: slot3StorageKey.Bytes(), + NodeValue: []byte{}, + }, + }, + }, + }, + }, + }, + { + "testBlock5", + statediff.Args{ + OldStateRoot: block4.Root(), + NewStateRoot: block5.Root(), + BlockNumber: block5.Number(), + BlockHash: block5.Hash(), + }, + &types2.StateObject{ + BlockNumber: block5.Number(), + BlockHash: block5.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x06'}, + NodeType: types2.Leaf, + LeafKey: contractLeafKey, + NodeValue: contractAccountAtBlock5LeafNode, + StorageNodes: []types2.StorageNode{ + { + Path: []byte{'\x0c'}, + NodeType: types2.Leaf, + LeafKey: slot3StorageKey.Bytes(), + NodeValue: slot3StorageLeafNode, + }, + { + Path: []byte{'\x04'}, + NodeType: types2.Removed, + LeafKey: slot2StorageKey.Bytes(), + NodeValue: []byte{}, + }, + }, + }, + { + Path: []byte{'\x0e'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account1LeafKey, + NodeValue: account1AtBlock5LeafNode, + StorageNodes: emptyStorage, + }, + }, + }, + }, + { + "testBlock6", + statediff.Args{ + OldStateRoot: block5.Root(), + NewStateRoot: block6.Root(), + BlockNumber: block6.Number(), + BlockHash: block6.Hash(), + }, + &types2.StateObject{ + BlockNumber: block6.Number(), + BlockHash: block6.Hash(), + Nodes: []types2.StateNode{ + { + Path: []byte{'\x06'}, + NodeType: types2.Removed, + LeafKey: contractLeafKey, + NodeValue: []byte{}, + StorageNodes: []types2.StorageNode{ + { + Path: []byte{'\x02'}, + NodeType: types2.Removed, + LeafKey: slot0StorageKey.Bytes(), + NodeValue: []byte{}, + }, + { + Path: []byte{'\x0c'}, + NodeType: types2.Removed, + LeafKey: slot3StorageKey.Bytes(), + NodeValue: []byte{}, + }, + }, + }, + { + Path: []byte{'\x0e'}, + NodeType: types2.Leaf, + LeafKey: test_helpers.Account1LeafKey, + NodeValue: account1AtBlock6LeafNode, + StorageNodes: emptyStorage, + }, + }, + }, + }, + } + for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { diff --git a/statediff/config.go b/statediff/config.go index b4905ab5a..c19473e27 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -66,6 +66,10 @@ func (p *Params) ComputeWatchedAddressesLeafKeys() { } } +func (p *Params) WatchedAddressesLeafKeys() map[common.Hash]struct{} { + return p.watchedAddressesLeafKeys +} + // ParamsWithMutex allows to lock the parameters while they are being updated | read from type ParamsWithMutex struct { Params diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 0b7a5ffb0..f11e2e559 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -534,3 +534,21 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return nil } + +// Written but not tested. Unsure if there is a real use case for this anywhere. +func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error { + log.Info("Dumping known gaps") + if startingBlockNumber.Cmp(endingBlockNumber) != -1 { + return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) + } + knownGap := models.KnownGapsModel{ + StartingBlockNumber: startingBlockNumber.String(), + EndingBlockNumber: endingBlockNumber.String(), + CheckedOut: checkedOut, + ProcessingKey: processingKey, + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil { + return err + } + return nil +} diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index a075e896b..57c20fe8b 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -41,7 +41,7 @@ var TestConfig = Config{ GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", NetworkID: "1", ChainID: 1, - ID: "mockNodeID", + ID: "1", ClientName: "go-ethereum", }, } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 62da6d8c3..e39370cf1 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -220,7 +220,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go index d6d3c4fee..e5a5383f7 100644 --- a/statediff/indexer/database/file/indexer_test.go +++ b/statediff/indexer/database/file/indexer_test.go @@ -1094,3 +1094,341 @@ func TestFileWatchAddressMethods(t *testing.T) { } }) } + +func TestFileWatchAddressMethods(t *testing.T) { + setupIndexer(t) + defer tearDown(t) + + type res struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + } + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + t.Run("Load watched addresses (empty table)", func(t *testing.T) { + expectedData := []common.Address{} + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{} + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Load watched addresses", func(t *testing.T) { + expectedData := []common.Address{ + common.HexToAddress(contract4Address), + common.HexToAddress(contract2Address), + common.HexToAddress(contract3Address), + } + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses (empty table)", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + resetAndDumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) +} diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index 3c11b5eea..2e84250e5 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -197,8 +197,10 @@ func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw [] } func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { + nodeId := fmt.Sprintf("{%s}", header.NodeID) + // {'1'} stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, - header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, + header.TotalDifficulty, nodeId, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) sqw.stmts <- []byte(stmt) indexerMetrics.blocks.Inc(1) diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index d39f2a0e5..bbde207f1 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -55,6 +55,7 @@ type StateDiffIndexer struct { ctx context.Context chainConfig *params.ChainConfig dbWriter *Writer + blockNumber string } // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer @@ -89,6 +90,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { start, t := time.Now(), time.Now() + sdi.blockNumber = block.Number().String() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -201,7 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes) if err != nil { return nil, err } @@ -250,7 +252,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -265,7 +267,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -274,10 +276,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - BlockNumber: blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -333,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: args.headerID, Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), @@ -356,7 +358,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -380,7 +382,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -411,7 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), @@ -683,3 +685,77 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return nil } + +// This is a simple wrapper function which will run QueryRow on the DB +func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { + var ret string + err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret) + if err != nil { + log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString)) + return "", err + } + return ret, nil +} + +// This function is a simple wrapper which will call QueryDb but the return value will be +// a big int instead of a string +func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, error) { + ret := new(big.Int) + res, err := sdi.QueryDb(queryString) + if err != nil { + return ret, err + } + ret, ok := ret.SetString(res, 10) + if !ok { + log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt")) + return ret, fmt.Errorf("Can't turn %s into a bigInt", res) + } + return ret, nil +} + +// Users provide the latestBlockInDb and the latestBlockOnChain +// as well as the expected difference. This function does some simple math. +// The expected difference for the time being is going to be 1, but as we run +// More geth nodes, the expected difference might fluctuate. +func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool { + latestBlock := big.NewInt(0) + if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 { + log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference) + return true + } + return false + +} + +// This function will check for Gaps and update the DB if gaps are found. +// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling +// It might be a useful parameter to update depending on the geth node. +// TODO: +// REmove the return value +// Write to file if err in writing to DB +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error { + dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids" + latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString) + if err != nil { + return err + } + + gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference) + if gapExists { + startBlock := big.NewInt(0) + endBlock := big.NewInt(0) + startBlock.Add(latestBlockInDb, expectedDifference) + endBlock.Sub(latestBlockOnChain, expectedDifference) + + log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock) + err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer) + if err != nil { + log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err) + // Write to file SQL file instead!!! + // If write to SQL file fails, write to disk. Handle this within the write to SQL file function! + return err + } + } + + return nil +} diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 73f17c3f0..f0c93e2f8 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -934,3 +934,334 @@ func TestPGXWatchAddressMethods(t *testing.T) { } }) } + +func TestPGXWatchAddressMethods(t *testing.T) { + setupPGXIndexer(t) + defer tearDown(t) + defer checkTxClosure(t, 1, 0, 1) + + type res struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + } + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + t.Run("Load watched addresses (empty table)", func(t *testing.T) { + expectedData := []common.Address{} + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{} + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Load watched addresses", func(t *testing.T) { + expectedData := []common.Address{ + common.HexToAddress(contract4Address), + common.HexToAddress(contract2Address), + common.HexToAddress(contract3Address), + } + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses (empty table)", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) +} diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 35a9cbc82..f84eefaa6 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -40,7 +40,7 @@ type DB struct { func (db *DB) InsertHeaderStm() string { return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) - ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` + ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, ARRAY(SELECT DISTINCT unnest(array_cat(eth.header_cids.node_id, $6))), $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` } // InsertUncleStm satisfies the sql.Statements interface @@ -107,3 +107,10 @@ func (db *DB) InsertKnownGapsStm() string { ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) WHERE eth_meta.known_gaps.ending_block_number <= $2` } + +// InsertKnownGapsStm satisfies the sql.Statements interface +func (db *DB) InsertKnownGapsStm() string { + return `INSERT INTO eth_meta.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4) + ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) + WHERE eth_meta.known_gaps.ending_block_number <= $2` +} diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index 0db98797c..f235fa844 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -927,3 +927,334 @@ func TestSQLXWatchAddressMethods(t *testing.T) { } }) } + +func TestSQLXWatchAddressMethods(t *testing.T) { + setupSQLXIndexer(t) + defer tearDown(t) + defer checkTxClosure(t, 0, 0, 0) + + type res struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + } + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + t.Run("Load watched addresses (empty table)", func(t *testing.T) { + expectedData := []common.Address{} + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{} + + err = ind.RemoveWatchedAddresses(args) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + } + + err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Load watched addresses", func(t *testing.T) { + expectedData := []common.Address{ + common.HexToAddress(contract4Address), + common.HexToAddress(contract2Address), + common.HexToAddress(contract3Address), + } + + rows, err := ind.LoadWatchedAddresses() + require.NoError(t, err) + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses (empty table)", func(t *testing.T) { + expectedData := []res{} + + err = ind.ClearWatchedAddresses() + require.NoError(t, err) + + rows := []res{} + err = db.Select(context.Background(), &rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) +} diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index c6c378556..bcec19ad5 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/lib/pq" ) var ( @@ -47,11 +48,12 @@ func (w *Writer) Close() error { /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) -ON CONFLICT (block_hash, block_number) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) +ON CONFLICT (block_hash, block_number) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, ARRAY(SELECT DISTINCT unnest(array_cat(eth.header_cids.node_id, $6))), $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) */ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { + nodeIds := []string{w.db.NodeID()} _, err := tx.Exec(w.db.Context(), w.db.InsertHeaderStm(), - header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(), + header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, pq.Array(nodeIds), header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) if err != nil { diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index cbf17d977..b4386fdf8 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -165,3 +165,11 @@ type KnownGapsModel struct { CheckedOut bool `db:"checked_out"` ProcessingKey int64 `db:"processing_key"` } + +// KnownGaps is the data structure for eth_meta.known_gaps +type KnownGapsModel struct { + StartingBlockNumber string `db:"starting_block_number"` + EndingBlockNumber string `db:"ending_block_number"` + CheckedOut bool `db:"checked_out"` + ProcessingKey int64 `db:"processing_key"` +}