[WIP] Add a service to fill indexing gap for watched addresses #135

Closed
prathamesh0 wants to merge 16 commits from pm-watched-addresses into master
Showing only changes of commit 48b582f0ac - Show all commits

View File

@ -5,6 +5,7 @@ import (
"math/big"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -17,6 +18,13 @@ import (
integration "github.com/vulcanize/ipld-eth-server/test"
)
var (
gethMethod = "statediff_watchAddress"
ipldMethod = "vdb_watchAddress"
sleepInterval = 2 * time.Second
)
var _ = Describe("WatchAddressIntegration", func() {
dbWrite, err := strconv.ParseBool(os.Getenv("DB_WRITE"))
Expect(err).To(BeNil())
@ -31,17 +39,19 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldRPCClient, err := rpc.Dial(ipldEthHttpPath)
Expect(err).ToNot(HaveOccurred())
ctx := context.Background()
var (
contractErr error
ctx = context.Background()
gethMethod = "statediff_watchAddress"
ipldMethod = "vdb_watchAddress"
contractErr error
contract1 *integration.ContractDeployed
contract2 *integration.ContractDeployed
contract3 *integration.ContractDeployed
countAIndex string
countBIndex string
countAStorageHash common.Hash
countBStorageHash common.Hash
)
BeforeEach(func() {
@ -50,7 +60,7 @@ var _ = Describe("WatchAddressIntegration", func() {
}
})
It("WatchAddress test init", func() {
It("test init", func() {
// Deploy three contracts
contract1, contractErr = integration.DeploySLVContract()
Expect(contractErr).ToNot(HaveOccurred())
@ -60,6 +70,23 @@ var _ = Describe("WatchAddressIntegration", func() {
contract3, contractErr = integration.DeploySLVContract()
Expect(contractErr).ToNot(HaveOccurred())
// Get storage slot keys
storageSlotAKey, err := integration.GetStorageSlotKey("SLVToken", "countA")
Expect(err).ToNot(HaveOccurred())
countAIndex = storageSlotAKey.Key
countAStorageHash = crypto.Keccak256Hash(common.HexToHash(countAIndex).Bytes())
storageSlotBKey, err := integration.GetStorageSlotKey("SLVToken", "countB")
Expect(err).ToNot(HaveOccurred())
countBIndex = storageSlotBKey.Key
countBStorageHash = crypto.Keccak256Hash(common.HexToHash(countBIndex).Bytes())
})
defer It("test cleanup", func() {
// Clear out watched addresses | storage slots
err := clearWatchedAddresses(gethRPCClient)
Expect(err).ToNot(HaveOccurred())
})
Describe("watched addresses", func() {
@ -75,7 +102,11 @@ var _ = Describe("WatchAddressIntegration", func() {
actualBalance3 *big.Int
)
It("gets initial balances", func() {
It("watched addresses test init", func() {
// Clear out watched addresses | storage slots
err := clearWatchedAddresses(gethRPCClient)
Expect(err).ToNot(HaveOccurred())
// Get initial balances for all the contracts
// Contract 1
actualBalance1 = big.NewInt(0)
@ -101,9 +132,11 @@ var _ = Describe("WatchAddressIntegration", func() {
Context("no contracts being watched", func() {
It("indexes state for all the contracts", func() {
// WatchedAddresses = []
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -114,6 +147,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -124,6 +158,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -146,9 +181,11 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract1]
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -159,6 +196,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -168,6 +206,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -189,9 +228,11 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract1, Contract2]
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -202,6 +243,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -212,6 +254,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -233,9 +276,11 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract2]
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -245,6 +290,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -255,6 +301,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -264,7 +311,7 @@ var _ = Describe("WatchAddressIntegration", func() {
})
})
Context("set the list of watched addresses", func() {
Context("list of watched addresses set", func() {
It("indexes state only for the watched contracts", func() {
operation := statediff.SetAddresses
args := []sdtypes.WatchAddressArg{
@ -280,9 +327,11 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract1, Contract3]
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -293,6 +342,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -302,6 +352,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -312,16 +363,18 @@ var _ = Describe("WatchAddressIntegration", func() {
})
})
Context("clear the list of watched addresses", func() {
Context("list of watched addresses cleared", func() {
It("indexes state for all the contracts", func() {
operation := statediff.ClearAddresses
args := []sdtypes.WatchAddressArg{}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = []
// Send eth to all three contract accounts
// Contract 1
_, txErr = integration.SendEth(contract1.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance1.Add(actualBalance1, big.NewInt(10000000000000000))
@ -332,6 +385,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2
_, txErr = integration.SendEth(contract2.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance2.Add(actualBalance2, big.NewInt(10000000000000000))
@ -342,6 +396,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 3
_, txErr = integration.SendEth(contract3.Address, "0.01")
time.Sleep(sleepInterval)
Expect(txErr).ToNot(HaveOccurred())
actualBalance3.Add(actualBalance3, big.NewInt(10000000000000000))
@ -354,20 +409,9 @@ var _ = Describe("WatchAddressIntegration", func() {
})
Describe("watched storage slots", func() {
storageSlotAKey, err := integration.GetStorageSlotKey("SLVToken", "countA")
Expect(err).ToNot(HaveOccurred())
countAIndex := storageSlotAKey.Key
storageSlotBKey, err := integration.GetStorageSlotKey("SLVToken", "countB")
Expect(err).ToNot(HaveOccurred())
countBIndex := storageSlotBKey.Key
var (
incErr error
countAStorageHash = crypto.Keccak256Hash(common.HexToHash(countAIndex).Bytes())
countBStorageHash = crypto.Keccak256Hash(common.HexToHash(countBIndex).Bytes())
prevCountA1 *big.Int
prevCountB1 *big.Int
prevCountA2 *big.Int
@ -379,7 +423,11 @@ var _ = Describe("WatchAddressIntegration", func() {
actualCountB2 *big.Int
)
It("gets initial storage values", func() {
It("watched addresses test init", func() {
// Clear out watched addresses | storage slots
err := clearWatchedAddresses(gethRPCClient)
Expect(err).ToNot(HaveOccurred())
// Get initial storage values for the contracts
// Contract 1, countA
actualCountA1 = big.NewInt(0)
@ -415,10 +463,13 @@ var _ = Describe("WatchAddressIntegration", func() {
})
Context("no addresses or storage slots being watched", func() {
It("indexes state for all the slots", func() {
It("indexes state for all the storage slots", func() {
// WatchedAddresses = []
// WatchedStorageSlots = []
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -430,6 +481,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -441,6 +493,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -452,6 +505,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -464,7 +518,7 @@ var _ = Describe("WatchAddressIntegration", func() {
})
Context("one storage slot being watched", func() {
It("indexes only for the watched storage slot", func() {
It("indexes state only for the watched storage slot", func() {
operation := statediff.AddStorageSlots
args := []sdtypes.WatchAddressArg{
{
@ -475,9 +529,12 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = []
// WatchedStorageSlots = [countA]
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -489,6 +546,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -499,6 +557,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -510,6 +569,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -520,7 +580,7 @@ var _ = Describe("WatchAddressIntegration", func() {
})
})
Context("set the list of watched storage slots", func() {
Context("list of watched storage slots set", func() {
It("indexes state only for the watched storage slots", func() {
operation := statediff.SetStorageSlots
args := []sdtypes.WatchAddressArg{
@ -536,9 +596,12 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = []
// WatchedStorageSlots = [countA, countB]
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -550,6 +613,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -561,6 +625,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -572,6 +637,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -595,9 +661,12 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = []
// WatchedStorageSlots = [countB]
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -608,6 +677,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -619,6 +689,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -629,6 +700,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -652,9 +724,12 @@ var _ = Describe("WatchAddressIntegration", func() {
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract1]
// WatchedStorageSlots = [countB]
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -665,6 +740,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -676,6 +752,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -686,6 +763,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -696,16 +774,19 @@ var _ = Describe("WatchAddressIntegration", func() {
})
})
Context("clear the list of watched storage slots", func() {
Context("list of watched storage slots cleared", func() {
It("indexes state for all the storage slots of watched contracts", func() {
operation := statediff.ClearStorageSlots
args := []sdtypes.WatchAddressArg{}
ipldErr := ipldRPCClient.Call(nil, ipldMethod, operation, args)
Expect(ipldErr).ToNot(HaveOccurred())
// WatchedAddresses = [Contract1]
// WatchedStorageSlots = []
// Increment counts
// Contract 1, countA
incErr = integration.IncrementCountA(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA1.Add(actualCountA1, big.NewInt(1))
@ -717,6 +798,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 1, countB
incErr = integration.IncrementCountB(contract1.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB1.Add(actualCountB1, big.NewInt(1))
@ -728,6 +810,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countA
incErr = integration.IncrementCountA(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountA2.Add(actualCountA2, big.NewInt(1))
@ -738,6 +821,7 @@ var _ = Describe("WatchAddressIntegration", func() {
// Contract 2, countB
incErr = integration.IncrementCountB(contract2.Address)
time.Sleep(sleepInterval)
Expect(incErr).ToNot(HaveOccurred())
actualCountB2.Add(actualCountB2, big.NewInt(1))
@ -777,3 +861,21 @@ var _ = Describe("WatchAddressIntegration", func() {
})
})
})
func clearWatchedAddresses(gethRPCClient *rpc.Client) error {
args := []sdtypes.WatchAddressArg{}
// Clear watched addresses
gethErr := gethRPCClient.Call(nil, gethMethod, statediff.ClearAddresses, args)
if gethErr != nil {
return gethErr
}
// Clear watched storage slots
gethErr = gethRPCClient.Call(nil, gethMethod, statediff.ClearStorageSlots, args)
if gethErr != nil {
return gethErr
}
return nil
}