Add ERC20 token watcher example

- starting with the totalSupply function
- sets contract config on transformer by passing it into the transformer
initializer
- handles block records with the same number for different nodes for both creating token_supply records, and finding missing blocks
This commit is contained in:
Elizabeth Engelman 2018-05-05 15:25:54 -05:00
parent 7533e6d476
commit 7ee253b2a3
21 changed files with 1357 additions and 1 deletions

2
Gopkg.lock generated
View File

@ -363,6 +363,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "8e609f758d10041b1746db11078eca9cad29a7dbf9939d14e49d70ee172dfd2e"
inputs-digest = "a69d27ddc33f08d7b39242508f04e5c339b52cc65e70596100d17a870b2183a6"
solver-name = "gps-cdcl"
solver-version = 1

70
cmd/erc20.go Normal file
View File

@ -0,0 +1,70 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"log"
"time"
)
// erc20Cmd represents the erc20 command
var erc20Cmd = &cobra.Command{
Use: "erc20",
Short: "Fetches and persists token supply",
Long: `Fetches the totalSupply for the configured token from each block and persists it in Vulcanize DB.
vulcanizedb erc20 --config environments/public
Expects an ethereum node to be running and requires a .toml config file:
[database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
`,
Run: func(cmd *cobra.Command, args []string) {
watchERC20s()
},
}
func watchERC20s() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
blockchain := geth.NewBlockchain(ipc)
db, err := postgres.NewDB(databaseConfig, blockchain.Node())
if err != nil {
log.Fatal("Failed to initialize database.")
}
watcher := shared.Watcher{
DB: *db,
Blockchain: blockchain,
}
watcher.AddTransformers(every_block.TransformerInitializers())
for range ticker.C {
watcher.Execute()
}
}
func init() {
rootCmd.AddCommand(erc20Cmd)
}

View File

@ -0,0 +1 @@
DROP TABLE token_supply;

View File

@ -0,0 +1,9 @@
CREATE TABLE token_supply (
id SERIAL,
block_id INTEGER NOT NULL,
supply DECIMAL NOT NULL,
token_address CHARACTER VARYING(66) NOT NULL,
CONSTRAINT blocks_fk FOREIGN KEY (block_id)
REFERENCES blocks (id)
ON DELETE CASCADE
)

View File

@ -247,6 +247,38 @@ CREATE TABLE public.schema_migrations (
);
--
-- Name: token_supply; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.token_supply (
id integer NOT NULL,
block_id integer NOT NULL,
supply numeric NOT NULL,
token_address character varying(66) NOT NULL
);
--
-- Name: token_supply_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.token_supply_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: token_supply_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.token_supply_id_seq OWNED BY public.token_supply.id;
--
-- Name: transactions; Type: TABLE; Schema: public; Owner: -
--
@ -374,6 +406,13 @@ ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id
ALTER TABLE ONLY public.receipts ALTER COLUMN id SET DEFAULT nextval('public.receipts_id_seq'::regclass);
--
-- Name: token_supply id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.token_supply ALTER COLUMN id SET DEFAULT nextval('public.token_supply_id_seq'::regclass);
--
-- Name: transactions id; Type: DEFAULT; Schema: public; Owner: -
--
@ -527,6 +566,14 @@ ALTER TABLE ONLY public.receipts
ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
--
-- Name: token_supply blocks_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.token_supply
ADD CONSTRAINT blocks_fk FOREIGN KEY (block_id) REFERENCES public.blocks(id) ON DELETE CASCADE;
--
-- Name: blocks node_fk; Type: FK CONSTRAINT; Schema: public; Owner: -
--

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,19 @@
# ERC20 Transformers
## Description
The Transformers in this directory are associated with contract functions and events that conform to the [ERC20 Token interface](https://theethereum.wiki/w/index.php/ERC20_Token_Standard#The_ERC20_Token_Standard_Interface).
See `libraries/shared/TransformerREADME.md` for further information regarding the Transformer interface.
## Configuration
In addition to environment configuration mentioned in the main VulcanizeDB README, the ERC20 transformers also need to be configured with contract information for the desired token(s) to be watched. This configuration file is located at `./vulcanizedb/examples/erc20_watcher/config.go`.
## ERC20 Functions
The `everyblock` directory contains transformers that fetch data from the contract itself, via one of the standard functions.
Currently, the `totalSupply` function transformer has been implemented. This transformer will fetch the total supply for the given contract address and persist `total_supply` records in the database.
## Running the transformers
1. If running against a local node, make sure that the node has been started.
1. In a separate terminal run the following command:
`./vulcanizedb erc20 --config <config.toml>`

View File

@ -0,0 +1,33 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package erc20_watcher
import "github.com/vulcanize/vulcanizedb/examples/constants"
type ContractConfig struct {
Address string
Abi string
FirstBlock int64
LastBlock int64
Name string
}
var DaiConfig = ContractConfig{
Address: constants.DaiContractAddress,
Abi: constants.DaiAbiString,
FirstBlock: int64(4752008),
LastBlock: -1,
Name: "Dai",
}

View File

@ -0,0 +1,33 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
"log"
)
func TestEveryBlock(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "EveryBlock Suite")
}
var _ = BeforeSuite(func() {
log.SetOutput(ioutil.Discard)
})

View File

@ -0,0 +1,70 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/core"
"log"
"math/big"
)
type ERC20FetcherInterface interface {
FetchSupplyOf(contractAbi string, contractAddress string, blockNumber int64) (big.Int, error)
GetBlockchain() core.Blockchain
}
func NewFetcher(blockchain core.Blockchain) Fetcher {
return Fetcher{
Blockchain: blockchain,
}
}
type Fetcher struct {
Blockchain core.Blockchain
ContractAbi string
ContractAddress string
}
type fetcherError struct {
err string
fetchMethod string
}
func (fe *fetcherError) Error() string {
return fmt.Sprintf("Error fetching %s: %s", fe.fetchMethod, fe.err)
}
func newFetcherError(err error, fetchMethod string) *fetcherError {
e := fetcherError{err.Error(), fetchMethod}
log.Println(e.Error())
return &e
}
func (f Fetcher) FetchSupplyOf(contractAbi string, contractAddress string, blockNumber int64) (big.Int, error) {
method := "totalSupply"
var result = new(big.Int)
err := f.Blockchain.FetchContractData(contractAbi, contractAddress, method, nil, &result, blockNumber)
if err != nil {
return *result, newFetcherError(err, method)
}
return *result, nil
}
func (f Fetcher) GetBlockchain() core.Blockchain {
return f.Blockchain
}

View File

@ -0,0 +1,75 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/examples/constants"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/examples/mocks"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"math/big"
)
var _ = Describe("ERC20 Fetcher", func() {
blockNumber := int64(5502914)
infuraIPC := "https://mainnet.infura.io/J5Vd2fRtGsw0zZ0Ov3BL"
realBlockchain := geth.NewBlockchain(infuraIPC)
realFetcher := every_block.NewFetcher(realBlockchain)
fakeBlockchain := &mocks.Blockchain{}
testFetcher := every_block.NewFetcher(fakeBlockchain)
testAbi := "testAbi"
testContractAddress := "testContractAddress"
errorBlockchain := &mocks.FailureBlockchain{}
errorFetcher := every_block.NewFetcher(errorBlockchain)
Describe("FetchSupplyOf", func() {
It("fetches data from the blockchain with the correct arguments", func() {
_, err := testFetcher.FetchSupplyOf(testAbi, testContractAddress, blockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(fakeBlockchain.FetchedAbi).To(Equal(testAbi))
Expect(fakeBlockchain.FetchedContractAddress).To(Equal(testContractAddress))
Expect(fakeBlockchain.FetchedMethod).To(Equal("totalSupply"))
Expect(fakeBlockchain.FetchedMethodArg).To(BeNil())
expectedResult := big.Int{}
expected := &expectedResult
Expect(fakeBlockchain.FetchedResult).To(Equal(&expected))
Expect(fakeBlockchain.FetchedBlockNumber).To(Equal(blockNumber))
})
It("fetches a token's total supply at the given block height", func() {
result, err := realFetcher.FetchSupplyOf(constants.DaiAbiString, constants.DaiContractAddress, blockNumber)
Expect(err).NotTo(HaveOccurred())
expectedResult := big.Int{}
expectedResult.SetString("27647235749155415536952630", 10)
Expect(result).To(Equal(expectedResult))
})
It("returns an error if the call to the blockchain fails", func() {
result, err := errorFetcher.FetchSupplyOf("", "", 0)
Expect(result.String()).To(Equal("0"))
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("totalSupply"))
Expect(err.Error()).To(ContainSubstring(mocks.TestError.Error()))
})
})
})

View File

@ -0,0 +1,77 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/examples/constants"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/examples/mocks"
"github.com/vulcanize/vulcanizedb/examples/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"math/big"
"strconv"
)
func setLastBlockOnChain(blockchain *mocks.Blockchain, blockNumber int64) {
blockNumberString := strconv.FormatInt(blockNumber, 10)
lastBlockOnChain := big.Int{}
lastBlockOnChain.SetString(blockNumberString, 10)
blockchain.SetLastBlock(&lastBlockOnChain)
}
var _ = Describe("Everyblock transformers", func() {
var db *postgres.DB
var blockchain mocks.Blockchain
var blockNumber int64
var blockId int64
var err error
BeforeEach(func() {
blockNumber = erc20_watcher.DaiConfig.FirstBlock
lastBlockNumber := blockNumber + 1
db = test_helpers.CreateNewDatabase()
setLastBlockOnChain(&blockchain, lastBlockNumber)
blockRepository := repositories.NewBlockRepository(db)
blockId, err = blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred())
_, err = blockRepository.CreateOrUpdateBlock(core.Block{Number: lastBlockNumber})
Expect(err).NotTo(HaveOccurred())
})
It("creates a token_supply record for each block in the given range", func() {
initializer := every_block.TokenSupplyTransformerInitializer{erc20_watcher.DaiConfig}
transformer := initializer.NewTokenSupplyTransformer(db, &blockchain)
transformer.Execute()
var tokenSupplyCount int
err := db.QueryRow(`SELECT COUNT(*) FROM token_supply where block_id = $1`, blockId).Scan(&tokenSupplyCount)
Expect(err).ToNot(HaveOccurred())
Expect(tokenSupplyCount).To(Equal(1))
var tokenSupply test_helpers.TokenSupplyDBRow
err = db.Get(&tokenSupply, `SELECT * from token_supply where block_id = $1`, blockId)
Expect(err).ToNot(HaveOccurred())
Expect(tokenSupply.BlockID).To(Equal(blockId))
Expect(tokenSupply.TokenAddress).To(Equal(constants.DaiContractAddress))
Expect(tokenSupply.Supply).To(Equal(int64(0)))
})
})

View File

@ -0,0 +1,21 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block
type TokenSupply struct {
Value string
TokenAddress string
BlockNumber int64
}

View File

@ -0,0 +1,91 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"log"
)
type ERC20RepositoryInterface interface {
Create(supply TokenSupply) error
MissingBlocks(startingBlock int64, highestBlock int64) ([]int64, error)
}
type TokenSupplyRepository struct {
*postgres.DB
}
type repositoryError struct {
err string
msg string
blockNumber int64
}
func (re *repositoryError) Error() string {
return fmt.Sprintf(re.msg, re.blockNumber, re.err)
}
func newRepositoryError(err error, msg string, blockNumber int64) error {
e := repositoryError{err.Error(), msg, blockNumber}
log.Println(e.Error())
return &e
}
const (
GetBlockError = "Error fetching block number %d: %s"
InsertTokenSupplyError = "Error inserting token_supply for block number %d: %s"
MissingBlockError = "Error finding missing token_supply records starting at block %d: %s"
)
func (tsp *TokenSupplyRepository) Create(supply TokenSupply) error {
var blockId int
err := tsp.DB.Get(&blockId, `SELECT id FROM blocks WHERE number = $1 AND eth_node_id = $2`, supply.BlockNumber, tsp.NodeID)
if err != nil {
return newRepositoryError(err, GetBlockError, supply.BlockNumber)
}
_, err = tsp.DB.Exec(
`INSERT INTO token_supply (supply, token_address, block_id)
VALUES($1, $2, $3)`,
supply.Value, supply.TokenAddress, blockId)
if err != nil {
return newRepositoryError(err, InsertTokenSupplyError, supply.BlockNumber)
}
return nil
}
func (tsp *TokenSupplyRepository) MissingBlocks(startingBlock int64, highestBlock int64) ([]int64, error) {
blockNumbers := make([]int64, 0)
err := tsp.DB.Select(
&blockNumbers,
`SELECT number FROM BLOCKS
LEFT JOIN token_supply ON blocks.id = block_id
WHERE block_id ISNULL
AND eth_node_id = $1
AND number >= $2
AND number <= $3
LIMIT 20`,
tsp.NodeID,
startingBlock,
highestBlock,
)
if err != nil {
return []int64{}, newRepositoryError(err, MissingBlockError, startingBlock)
}
return blockNumbers, err
}

View File

@ -0,0 +1,207 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/examples/test_helpers"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/test_config"
"math/rand"
)
var _ = Describe("ERC20 Token Repository", func() {
var db *postgres.DB
var blockId int64
var blockNumber int64
var repository every_block.TokenSupplyRepository
var blockRepository repositories.BlockRepository
testAddress := "abc"
BeforeEach(func() {
db = test_helpers.CreateNewDatabase()
repository = every_block.TokenSupplyRepository{DB: db}
_, err := db.Query(`DELETE FROM token_supply`)
Expect(err).NotTo(HaveOccurred())
blockRepository = *repositories.NewBlockRepository(db)
blockNumber = rand.Int63()
blockId = test_helpers.CreateBlock(blockNumber, blockRepository)
})
Describe("Create", func() {
It("creates a token supply record", func() {
supply := supplyModel(blockNumber, testAddress, "100")
err := repository.Create(supply)
Expect(err).NotTo(HaveOccurred())
dbResult := test_helpers.TokenSupplyDBRow{}
expectedTokenSupply := test_helpers.TokenSupplyDBRow{
Supply: int64(100),
BlockID: blockId,
TokenAddress: testAddress,
}
var count int
err = repository.DB.QueryRowx(`SELECT count(*) FROM token_supply`).Scan(&count)
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(1))
err = repository.DB.QueryRowx(`SELECT * FROM token_supply`).StructScan(&dbResult)
Expect(err).NotTo(HaveOccurred())
Expect(dbResult.Supply).To(Equal(expectedTokenSupply.Supply))
Expect(dbResult.BlockID).To(Equal(expectedTokenSupply.BlockID))
Expect(dbResult.TokenAddress).To(Equal(expectedTokenSupply.TokenAddress))
})
It("returns an error if fetching the block's id from the database fails", func() {
errorSupply := supplyModel(-1, "", "")
err := repository.Create(errorSupply)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql"))
Expect(err.Error()).To(ContainSubstring("block number -1"))
})
It("returns an error if inserting the token_supply fails", func() {
errorSupply := supplyModel(blockNumber, "", "")
err := repository.Create(errorSupply)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pq"))
Expect(err.Error()).To(ContainSubstring("token_supply for block number"))
})
})
Describe("When there are multiple nodes", func() {
var node2DB *postgres.DB
var node2BlockRepo *repositories.BlockRepository
var node2BlockId int64
var node2TokenSupplyRepo every_block.TokenSupplyRepository
var tokenSupply every_block.TokenSupply
BeforeEach(func() {
node2DB = createDbForAnotherNode()
//create another block with the same number on node2
node2BlockRepo = repositories.NewBlockRepository(node2DB)
node2BlockId = test_helpers.CreateBlock(blockNumber, *node2BlockRepo)
tokenSupply = supplyModel(blockNumber, "abc", "100")
node2TokenSupplyRepo = every_block.TokenSupplyRepository{DB: node2DB}
})
It("only creates token_supply records for the current node (node2)", func() {
err := node2TokenSupplyRepo.Create(tokenSupply)
Expect(err).NotTo(HaveOccurred())
var tokenSupplies []test_helpers.TokenSupplyDBRow
err = node2TokenSupplyRepo.DB.Select(&tokenSupplies, `SELECT * FROM token_supply`)
Expect(err).NotTo(HaveOccurred())
Expect(len(tokenSupplies)).To(Equal(1))
Expect(tokenSupplies[0].BlockID).To(Equal(node2BlockId))
})
It("only includes missing block numbers for the current node", func() {
//create token_supply on original node
err := repository.Create(tokenSupply)
Expect(err).NotTo(HaveOccurred())
originalNodeMissingBlocks, err := repository.MissingBlocks(blockNumber, blockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(len(originalNodeMissingBlocks)).To(Equal(0))
node2MissingBlocks, err := node2TokenSupplyRepo.MissingBlocks(blockNumber, blockNumber)
Expect(err).NotTo(HaveOccurred())
Expect(len(node2MissingBlocks)).To(Equal(1))
})
})
Describe("MissingBlocks", func() {
It("returns the block numbers for which an associated TokenSupply record hasn't been created", func() {
createTokenSupplyFor(repository, blockNumber)
newBlockNumber := blockNumber + 1
test_helpers.CreateBlock(newBlockNumber, blockRepository)
blocks, err := repository.MissingBlocks(blockNumber, newBlockNumber)
Expect(blocks).To(ConsistOf(newBlockNumber))
Expect(err).NotTo(HaveOccurred())
})
It("only returns blocks within the given range", func() {
newBlockNumber := blockNumber + 1
test_helpers.CreateBlock(newBlockNumber, blockRepository)
blocks, err := repository.MissingBlocks(blockNumber, blockNumber)
Expect(blocks).NotTo(ConsistOf(newBlockNumber))
Expect(err).NotTo(HaveOccurred())
})
It("does not return numbers that already have an associated TokenSupply record", func() {
createTokenSupplyFor(repository, blockNumber)
blocks, err := repository.MissingBlocks(blockNumber, blockNumber)
Expect(blocks).To(BeEmpty())
Expect(err).NotTo(HaveOccurred())
})
})
It("deletes the token supply record when the associated block is deleted", func() {
err := repository.Create(every_block.TokenSupply{BlockNumber: blockNumber, Value: "0"})
Expect(err).NotTo(HaveOccurred())
var count int
err = repository.DB.QueryRowx(`SELECT count(*) FROM token_supply`).Scan(&count)
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(1))
_, err = db.Query(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
err = repository.DB.QueryRowx(`SELECT count(*) FROM token_supply`).Scan(&count)
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(0))
})
})
func supplyModel(blockNumber int64, tokenAddress string, supplyValue string) every_block.TokenSupply {
return every_block.TokenSupply{
Value: supplyValue,
TokenAddress: tokenAddress,
BlockNumber: int64(blockNumber),
}
}
func createTokenSupplyFor(repository every_block.TokenSupplyRepository, blockNumber int64) {
err := repository.Create(every_block.TokenSupply{BlockNumber: blockNumber, Value: "0"})
Expect(err).NotTo(HaveOccurred())
}
func createDbForAnotherNode() *postgres.DB {
anotherNode := core.Node{
GenesisBlock: "GENESIS",
NetworkID: 1,
ID: "testNodeId",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
return test_config.NewTestDBWithoutDeletingRecords(anotherNode)
}

View File

@ -0,0 +1,116 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block
import (
"fmt"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher"
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"log"
"math/big"
)
type Transformer struct {
Fetcher ERC20FetcherInterface
Repository ERC20RepositoryInterface
Config erc20_watcher.ContractConfig
}
func (t *Transformer) SetConfiguration(config erc20_watcher.ContractConfig) {
t.Config = config
}
type TokenSupplyTransformerInitializer struct {
Config erc20_watcher.ContractConfig
}
func (i TokenSupplyTransformerInitializer) NewTokenSupplyTransformer(db *postgres.DB, blockchain core.Blockchain) shared.Transformer {
fetcher := NewFetcher(blockchain)
repository := TokenSupplyRepository{DB: db}
transformer := Transformer{
Fetcher: &fetcher,
Repository: &repository,
Config: i.Config,
}
return transformer
}
const (
FetchingBlocksError = "Error fetching missing blocks starting at block number %d: %s"
FetchingSupplyError = "Error fetching supply for block %d: %s"
CreateSupplyError = "Error inserting token_supply for block %d: %s"
)
type transformerError struct {
err string
blockNumber int64
msg string
}
func (te *transformerError) Error() string {
return fmt.Sprintf(te.msg, te.blockNumber, te.err)
}
func newTransformerError(err error, blockNumber int64, msg string) error {
e := transformerError{err.Error(), blockNumber, msg}
log.Println(e.Error())
return &e
}
func (t Transformer) Execute() error {
var upperBoundBlock int64
blockchain := t.Fetcher.GetBlockchain()
lastBlock := blockchain.LastBlock().Int64()
if t.Config.LastBlock == -1 {
upperBoundBlock = lastBlock
} else {
upperBoundBlock = t.Config.LastBlock
}
blocks, err := t.Repository.MissingBlocks(t.Config.FirstBlock, upperBoundBlock)
if err != nil {
return newTransformerError(err, t.Config.FirstBlock, FetchingBlocksError)
}
log.Printf("Fetching totalSupply for %d blocks", len(blocks))
for _, blockNumber := range blocks {
totalSupply, err := t.Fetcher.FetchSupplyOf(t.Config.Abi, t.Config.Address, blockNumber)
if err != nil {
return newTransformerError(err, blockNumber, FetchingSupplyError)
}
model := createTokenSupplyModel(totalSupply, t.Config.Address, blockNumber)
err = t.Repository.Create(model)
if err != nil {
return newTransformerError(err, blockNumber, CreateSupplyError)
}
}
return nil
}
func createTokenSupplyModel(totalSupply big.Int, address string, blockNumber int64) TokenSupply {
return TokenSupply{
Value: totalSupply.String(),
TokenAddress: address,
BlockNumber: blockNumber,
}
}

View File

@ -0,0 +1,184 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/examples/constants"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/examples/mocks"
"math/big"
"math/rand"
"strconv"
)
var testContractConfig = erc20_watcher.ContractConfig{
Address: constants.DaiContractAddress,
Abi: constants.DaiAbiString,
FirstBlock: int64(4752008),
LastBlock: int64(5750050),
Name: "Dai",
}
var config = testContractConfig
var _ = Describe("Everyblock transformer", func() {
var fetcher mocks.Fetcher
var repository mocks.TotalSupplyRepository
var transformer every_block.Transformer
var blockchain mocks.Blockchain
var initialSupply = "27647235749155415536952630"
var initialSupplyPlusOne = "27647235749155415536952631"
var initialSupplyPlusTwo = "27647235749155415536952632"
var initialSupplyPlusThree = "27647235749155415536952633"
var defaultLastBlock = big.Int{}
BeforeEach(func() {
blockchain = mocks.Blockchain{}
blockchain.SetLastBlock(&defaultLastBlock)
fetcher = mocks.Fetcher{Blockchain: &blockchain}
fetcher.SetSupply(initialSupply)
repository = mocks.TotalSupplyRepository{}
repository.SetMissingBlocks([]int64{config.FirstBlock})
//setting the mock repository to return the first block as the missing blocks
transformer = every_block.Transformer{
Fetcher: &fetcher,
Repository: &repository,
}
transformer.SetConfiguration(config)
})
It("fetches and persists the total supply of a token for a single block", func() {
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(len(fetcher.FetchedBlocks)).To(Equal(1))
Expect(fetcher.FetchedBlocks).To(ConsistOf(config.FirstBlock))
Expect(fetcher.Abi).To(Equal(config.Abi))
Expect(fetcher.ContractAddress).To(Equal(config.Address))
Expect(repository.StartingBlock).To(Equal(config.FirstBlock))
Expect(repository.EndingBlock).To(Equal(config.LastBlock))
Expect(len(repository.TotalSuppliesCreated)).To(Equal(1))
expectedSupply := big.Int{}
expectedSupply.SetString(initialSupply, 10)
expectedSupply.Add(&expectedSupply, big.NewInt(1))
Expect(repository.TotalSuppliesCreated[0].Value).To(Equal(expectedSupply.String()))
})
It("retrieves the total supply for every missing block", func() {
missingBlocks := []int64{
config.FirstBlock,
config.FirstBlock + 1,
config.FirstBlock + 2,
}
repository.SetMissingBlocks(missingBlocks)
transformer.Execute()
Expect(len(fetcher.FetchedBlocks)).To(Equal(3))
Expect(fetcher.FetchedBlocks).To(ConsistOf(config.FirstBlock, config.FirstBlock+1, config.FirstBlock+2))
Expect(fetcher.Abi).To(Equal(config.Abi))
Expect(fetcher.ContractAddress).To(Equal(config.Address))
Expect(len(repository.TotalSuppliesCreated)).To(Equal(3))
Expect(repository.TotalSuppliesCreated[0].Value).To(Equal(initialSupplyPlusOne))
Expect(repository.TotalSuppliesCreated[1].Value).To(Equal(initialSupplyPlusTwo))
Expect(repository.TotalSuppliesCreated[2].Value).To(Equal(initialSupplyPlusThree))
})
It("uses the set contract configuration", func() {
repository.SetMissingBlocks([]int64{testContractConfig.FirstBlock})
transformer.SetConfiguration(testContractConfig)
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(fetcher.FetchedBlocks).To(ConsistOf(testContractConfig.FirstBlock))
Expect(fetcher.Abi).To(Equal(testContractConfig.Abi))
Expect(fetcher.ContractAddress).To(Equal(testContractConfig.Address))
Expect(repository.StartingBlock).To(Equal(testContractConfig.FirstBlock))
Expect(repository.EndingBlock).To(Equal(testContractConfig.LastBlock))
Expect(len(repository.TotalSuppliesCreated)).To(Equal(1))
expectedTokenSupply := every_block.TokenSupply{
Value: initialSupplyPlusOne,
TokenAddress: testContractConfig.Address,
BlockNumber: testContractConfig.FirstBlock,
}
Expect(repository.TotalSuppliesCreated[0]).To(Equal(expectedTokenSupply))
})
It("uses the most recent block if the Config.LastBlock is -1", func() {
testContractConfig.LastBlock = -1
transformer.SetConfiguration(testContractConfig)
randomBlockNumber := rand.Int63()
numberToString := strconv.FormatInt(randomBlockNumber, 10)
mostRecentBlock := big.Int{}
mostRecentBlock.SetString(numberToString, 10)
blockchain.SetLastBlock(&mostRecentBlock)
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.EndingBlock).To(Equal(randomBlockNumber))
})
It("returns an error if the call to get missing blocks fails", func() {
failureRepository := mocks.FailureRepository{}
failureRepository.SetMissingBlocksFail(true)
transformer = every_block.Transformer{
Fetcher: &fetcher,
Repository: &failureRepository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("TestError"))
Expect(err.Error()).To(ContainSubstring("fetching missing blocks"))
})
It("returns an error if the call to the blockchain fails", func() {
failureBlockchain := mocks.FailureBlockchain{}
failureBlockchain.SetLastBlock(&defaultLastBlock)
fetcher := every_block.NewFetcher(failureBlockchain)
transformer = every_block.Transformer{
Fetcher: &fetcher,
Repository: &repository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("TestError"))
Expect(err.Error()).To(ContainSubstring("supply"))
})
It("returns an error if the call to save the token_supply fails", func() {
failureRepository := mocks.FailureRepository{}
failureRepository.SetMissingBlocks([]int64{config.FirstBlock})
failureRepository.SetCreateFail(true)
transformer = every_block.Transformer{
Fetcher: &fetcher,
Repository: &failureRepository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("TestError"))
Expect(err.Error()).To(ContainSubstring("supply"))
})
})

View File

@ -0,0 +1,28 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package every_block
import (
"github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher"
)
func TransformerInitializers() []shared.TransformerInitializer {
config := erc20_watcher.DaiConfig
initializer := TokenSupplyTransformerInitializer{config}
return []shared.TransformerInitializer{
initializer.NewTokenSupplyTransformer,
}
}

175
examples/mocks/mocks.go Normal file
View File

@ -0,0 +1,175 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mocks
import (
"errors"
"github.com/vulcanize/vulcanizedb/examples/erc20_watcher/every_block"
"github.com/vulcanize/vulcanizedb/pkg/core"
"math/big"
)
var TestError = errors.New("TestError")
type Fetcher struct {
ContractAddress string
Abi string
FetchedBlocks []int64
Blockchain core.Blockchain
supply big.Int
}
func (f *Fetcher) SetSupply(supply string) {
f.supply.SetString(supply, 10)
}
func (f Fetcher) GetBlockchain() core.Blockchain {
return f.Blockchain
}
func (f *Fetcher) FetchSupplyOf(contractAbi string, contractAddress string, blockNumber int64) (big.Int, error) {
f.Abi = contractAbi
f.ContractAddress = contractAddress
f.FetchedBlocks = append(f.FetchedBlocks, blockNumber)
accumulator := big.NewInt(1)
f.supply.Add(&f.supply, accumulator)
return f.supply, nil
}
type TotalSupplyRepository struct {
TotalSuppliesCreated []every_block.TokenSupply
MissingBlockNumbers []int64
StartingBlock int64
EndingBlock int64
}
func (fr *TotalSupplyRepository) Create(supply every_block.TokenSupply) error {
fr.TotalSuppliesCreated = append(fr.TotalSuppliesCreated, supply)
return nil
}
func (fr *TotalSupplyRepository) MissingBlocks(startingBlock int64, highestBlock int64) ([]int64, error) {
fr.StartingBlock = startingBlock
fr.EndingBlock = highestBlock
return fr.MissingBlockNumbers, nil
}
func (fr *TotalSupplyRepository) SetMissingBlocks(missingBlocks []int64) {
fr.MissingBlockNumbers = missingBlocks
}
type FailureRepository struct {
createFail bool
missingBlocksFail bool
missingBlocksNumbers []int64
}
func (fr *FailureRepository) Create(supply every_block.TokenSupply) error {
if fr.createFail {
return TestError
} else {
return nil
}
}
func (fr *FailureRepository) MissingBlocks(startingBlock int64, highestBlock int64) ([]int64, error) {
if fr.missingBlocksFail {
return []int64{}, TestError
} else {
return fr.missingBlocksNumbers, nil
}
}
func (fr *FailureRepository) SetCreateFail(fail bool) {
fr.createFail = fail
}
func (fr *FailureRepository) SetMissingBlocksFail(fail bool) {
fr.missingBlocksFail = fail
}
func (fr *FailureRepository) SetMissingBlocks(missingBlocks []int64) {
fr.missingBlocksNumbers = missingBlocks
}
type Blockchain struct {
FetchedAbi string
FetchedContractAddress string
FetchedMethod string
FetchedMethodArg interface{}
FetchedResult interface{}
FetchedBlockNumber int64
lastBlock *big.Int
}
func (fb *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
fb.FetchedAbi = abiJSON
fb.FetchedContractAddress = address
fb.FetchedMethod = method
fb.FetchedMethodArg = methodArg
fb.FetchedResult = result
fb.FetchedBlockNumber = blockNumber
return nil
}
func (fb *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
panic("implement me")
}
func (fb *Blockchain) GetLogs(contract core.Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]core.Log, error) {
panic("implement me")
}
func (fb *Blockchain) LastBlock() *big.Int {
return fb.lastBlock
}
func (fb *Blockchain) Node() core.Node {
panic("implement me")
}
func (fb *Blockchain) SetLastBlock(lastBlock *big.Int) {
fb.lastBlock = lastBlock
}
type FailureBlockchain struct {
lastBlock *big.Int
}
func (FailureBlockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
return errors.New("TestError")
}
func (FailureBlockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
panic("implement me")
}
func (FailureBlockchain) GetLogs(contract core.Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]core.Log, error) {
panic("implement me")
}
func (fb FailureBlockchain) LastBlock() *big.Int {
return fb.lastBlock
}
func (FailureBlockchain) Node() core.Node {
panic("implement me")
}
func (fb *FailureBlockchain) SetLastBlock(lastBlock *big.Int) {
fb.lastBlock = lastBlock
}

View File

@ -0,0 +1,74 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test_helpers
import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/test_config"
)
type TokenSupplyDBRow struct {
ID int64
Supply int64
BlockID int64 `db:"block_id"`
TokenAddress string `db:"token_address"`
}
type TransferDBRow struct {
ID int64 `db:"id"`
VulcanizeLogID int64 `db:"vulcanize_log_id"`
}
func CreateLogRecord(db *postgres.DB, logRepository repositories.LogRepository, log core.Log) {
blockRepository := repositories.NewBlockRepository(db)
receiptRepository := repositories.ReceiptRepository{DB: db}
blockNumber := log.BlockNumber
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{})
Expect(err).NotTo(HaveOccurred())
err = logRepository.CreateLogs([]core.Log{log}, receiptId)
Expect(err).NotTo(HaveOccurred())
}
func CreateNewDatabase() *postgres.DB {
var node core.Node
node = core.Node{
GenesisBlock: "GENESIS",
NetworkID: 1,
ID: "b6f90c0fdd8ec9607aed8ee45c69322e47b7063f0bfb7a29c8ecafab24d0a22d24dd2329b5ee6ed4125a03cb14e57fd584e67f9e53e6c631055cbbd82f080845",
ClientName: "Geth/v1.7.2-stable-1db4ecdc/darwin-amd64/go1.9",
}
db := test_config.NewTestDB(node)
_, err := db.Exec(`DELETE FROM logs`)
Expect(err).NotTo(HaveOccurred())
return db
}
func CreateBlock(blockNumber int64, repository repositories.BlockRepository) (blockId int64) {
blockId, err := repository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred())
return blockId
}

View File

@ -70,3 +70,8 @@ func NewTestDB(node core.Node) *postgres.DB {
db.MustExec("DELETE FROM log_filters")
return db
}
func NewTestDBWithoutDeletingRecords(node core.Node) *postgres.DB {
db, _ := postgres.NewDB(DBConfig, node)
return db
}