updates for light sync transactions

This commit is contained in:
Rob Mulholand 2019-03-26 23:05:30 -05:00
parent 79e011aad2
commit 54d46638a8
39 changed files with 769 additions and 256 deletions

View File

@ -5,7 +5,7 @@ CREATE TABLE full_sync_transactions (
gaslimit NUMERIC, gaslimit NUMERIC,
gasprice NUMERIC, gasprice NUMERIC,
hash VARCHAR(66), hash VARCHAR(66),
input_data VARCHAR, input_data BYTEA,
nonce NUMERIC, nonce NUMERIC,
raw BYTEA, raw BYTEA,
tx_from VARCHAR(66), tx_from VARCHAR(66),

View File

@ -1,12 +1,17 @@
-- +goose Up -- +goose Up
CREATE TABLE light_sync_transactions ( CREATE TABLE light_sync_transactions (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE, header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE,
hash TEXT, hash TEXT,
raw JSONB, gaslimit NUMERIC,
tx_index INTEGER, gasprice NUMERIC,
tx_from TEXT, input_data BYTEA,
tx_to TEXT, nonce NUMERIC,
raw BYTEA,
tx_from TEXT,
tx_index INTEGER,
tx_to TEXT,
"value" NUMERIC,
UNIQUE (header_id, hash) UNIQUE (header_id, hash)
); );

View File

@ -161,7 +161,7 @@ CREATE TABLE public.full_sync_transactions (
gaslimit numeric, gaslimit numeric,
gasprice numeric, gasprice numeric,
hash character varying(66), hash character varying(66),
input_data character varying, input_data bytea,
nonce numeric, nonce numeric,
raw bytea, raw bytea,
tx_from character varying(66), tx_from character varying(66),
@ -266,10 +266,15 @@ CREATE TABLE public.light_sync_transactions (
id integer NOT NULL, id integer NOT NULL,
header_id integer NOT NULL, header_id integer NOT NULL,
hash text, hash text,
raw jsonb, gaslimit numeric,
tx_index integer, gasprice numeric,
input_data bytea,
nonce numeric,
raw bytea,
tx_from text, tx_from text,
tx_to text tx_index integer,
tx_to text,
value numeric
); );

View File

@ -0,0 +1,55 @@
package transactions
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
)
type ITransactionsSyncer interface {
SyncTransactions(headerID int64, logs []types.Log) error
}
type TransactionsSyncer struct {
BlockChain core.BlockChain
Repository datastore.HeaderRepository
}
func NewTransactionsSyncer(db *postgres.DB, blockChain core.BlockChain) TransactionsSyncer {
repository := repositories.NewHeaderRepository(db)
return TransactionsSyncer{
BlockChain: blockChain,
Repository: repository,
}
}
func (syncer TransactionsSyncer) SyncTransactions(headerID int64, logs []types.Log) error {
transactionHashes := getUniqueTransactionHashes(logs)
transactions, transactionErr := syncer.BlockChain.GetTransactions(transactionHashes)
if transactionErr != nil {
return transactionErr
}
for _, transaction := range transactions {
writeErr := syncer.Repository.CreateTransaction(headerID, transaction)
if writeErr != nil {
return writeErr
}
}
return nil
}
func getUniqueTransactionHashes(logs []types.Log) []common.Hash {
seen := make(map[common.Hash]struct{}, len(logs))
var result []common.Hash
for _, log := range logs {
if _, ok := seen[log.TxHash]; ok {
continue
}
seen[log.TxHash] = struct{}{}
result = append(result, log.TxHash)
}
return result
}

View File

@ -0,0 +1,80 @@
package transactions_test
import (
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/transactions"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Transaction syncer", func() {
It("fetches transactions for logs", func() {
db := test_config.NewTestDB(test_config.NewTestNode())
blockChain := fakes.NewMockBlockChain()
syncer := transactions.NewTransactionsSyncer(db, blockChain)
err := syncer.SyncTransactions(0, []types.Log{})
Expect(err).NotTo(HaveOccurred())
Expect(blockChain.GetTransactionsCalled).To(BeTrue())
})
It("only fetches transactions with unique hashes", func() {
db := test_config.NewTestDB(test_config.NewTestNode())
blockChain := fakes.NewMockBlockChain()
syncer := transactions.NewTransactionsSyncer(db, blockChain)
err := syncer.SyncTransactions(0, []types.Log{{
TxHash: fakes.FakeHash,
}, {
TxHash: fakes.FakeHash,
}})
Expect(err).NotTo(HaveOccurred())
Expect(len(blockChain.GetTransactionsPassedHashes)).To(Equal(1))
})
It("returns error if fetching transactions fails", func() {
db := test_config.NewTestDB(test_config.NewTestNode())
blockChain := fakes.NewMockBlockChain()
blockChain.GetTransactionsError = fakes.FakeError
syncer := transactions.NewTransactionsSyncer(db, blockChain)
err := syncer.SyncTransactions(0, []types.Log{})
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("passes transactions to repository for persistence", func() {
db := test_config.NewTestDB(test_config.NewTestNode())
blockChain := fakes.NewMockBlockChain()
blockChain.Transactions = []core.TransactionModel{{}}
syncer := transactions.NewTransactionsSyncer(db, blockChain)
mockHeaderRepository := fakes.NewMockHeaderRepository()
syncer.Repository = mockHeaderRepository
err := syncer.SyncTransactions(0, []types.Log{})
Expect(err).NotTo(HaveOccurred())
Expect(mockHeaderRepository.CreateTransactionCalled).To(BeTrue())
})
It("returns error if persisting transactions fails", func() {
db := test_config.NewTestDB(test_config.NewTestNode())
blockChain := fakes.NewMockBlockChain()
blockChain.Transactions = []core.TransactionModel{{}}
syncer := transactions.NewTransactionsSyncer(db, blockChain)
mockHeaderRepository := fakes.NewMockHeaderRepository()
mockHeaderRepository.CreateTransactionError = fakes.FakeError
syncer.Repository = mockHeaderRepository
err := syncer.SyncTransactions(0, []types.Log{})
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
})

View File

@ -0,0 +1,13 @@
package transactions_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestTransactions(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Shared Transactions Suite")
}

View File

@ -18,14 +18,16 @@ package watcher
import ( import (
"fmt" "fmt"
"github.com/vulcanize/vulcanizedb/libraries/shared/transactions"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"
chunk "github.com/vulcanize/vulcanizedb/libraries/shared/chunker" "github.com/vulcanize/vulcanizedb/libraries/shared/chunker"
"github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/libraries/shared/constants"
fetch "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
repo "github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/libraries/shared/repository"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -33,21 +35,26 @@ import (
type EventWatcher struct { type EventWatcher struct {
Transformers []transformer.EventTransformer Transformers []transformer.EventTransformer
BlockChain core.BlockChain
DB *postgres.DB DB *postgres.DB
Fetcher fetch.LogFetcher Fetcher fetcher.LogFetcher
Chunker chunk.Chunker Chunker chunker.Chunker
Addresses []common.Address Addresses []common.Address
Topics []common.Hash Topics []common.Hash
StartingBlock *int64 StartingBlock *int64
Syncer transactions.ITransactionsSyncer
} }
func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
chunker := chunk.NewLogChunker() logChunker := chunker.NewLogChunker()
fetcher := fetch.NewFetcher(bc) logFetcher := fetcher.NewFetcher(bc)
transactionSyncer := transactions.NewTransactionsSyncer(db, bc)
return EventWatcher{ return EventWatcher{
DB: db, BlockChain: bc,
Fetcher: fetcher, DB: db,
Chunker: chunker, Fetcher: logFetcher,
Chunker: logChunker,
Syncer: transactionSyncer,
} }
} }
@ -85,15 +92,15 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti
return fmt.Errorf("No transformers added to watcher") return fmt.Errorf("No transformers added to watcher")
} }
checkedColumnNames, err := repo.GetCheckedColumnNames(watcher.DB) checkedColumnNames, err := repository.GetCheckedColumnNames(watcher.DB)
if err != nil { if err != nil {
return err return err
} }
notCheckedSQL := repo.CreateNotCheckedSQL(checkedColumnNames, recheckHeaders) notCheckedSQL := repository.CreateNotCheckedSQL(checkedColumnNames, recheckHeaders)
missingHeaders, err := repo.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL) missingHeaders, err := repository.MissingHeaders(*watcher.StartingBlock, -1, watcher.DB, notCheckedSQL)
if err != nil { if err != nil {
log.Error("Fetching of missing headers failed in watcher!") logrus.Error("Fetching of missing headers failed in watcher!")
return err return err
} }
@ -101,28 +108,41 @@ func (watcher *EventWatcher) Execute(recheckHeaders constants.TransformerExecuti
// TODO Extend FetchLogs for doing several blocks at a time // TODO Extend FetchLogs for doing several blocks at a time
logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header) logs, err := watcher.Fetcher.FetchLogs(watcher.Addresses, watcher.Topics, header)
if err != nil { if err != nil {
// TODO Handle fetch error in watcher logrus.Errorf("Error while fetching logs for header %v in watcher", header.Id)
log.Errorf("Error while fetching logs for header %v in watcher", header.Id)
return err return err
} }
chunkedLogs := watcher.Chunker.ChunkLogs(logs) transactionsSyncErr := watcher.Syncer.SyncTransactions(header.Id, logs)
if transactionsSyncErr != nil {
logrus.Errorf("error syncing transactions: %s", transactionsSyncErr.Error())
return transactionsSyncErr
}
// Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync, transformErr := watcher.transformLogs(logs, header)
// not all logs we're interested in might have been fetched. if transformErr != nil {
for _, t := range watcher.Transformers { return transformErr
transformerName := t.GetConfig().TransformerName
logChunk := chunkedLogs[transformerName]
err = t.Execute(logChunk, header, constants.HeaderMissing)
if err != nil {
log.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err)
return err
}
} }
} }
return err return err
} }
func (watcher *EventWatcher) transformLogs(logs []types.Log, header core.Header) error {
chunkedLogs := watcher.Chunker.ChunkLogs(logs)
// Can't quit early and mark as checked if there are no logs. If we are running continuousLogSync,
// not all logs we're interested in might have been fetched.
for _, t := range watcher.Transformers {
transformerName := t.GetConfig().TransformerName
logChunk := chunkedLogs[transformerName]
err := t.Execute(logChunk, header, constants.HeaderMissing)
if err != nil {
logrus.Errorf("%v transformer failed to execute in watcher: %v", transformerName, err)
return err
}
}
return nil
}
func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool {
return transformerBlock < watcherBlock return transformerBlock < watcherBlock
} }

View File

@ -121,6 +121,33 @@ var _ = Describe("Watcher", func() {
w = watcher.NewEventWatcher(db, &mockBlockChain) w = watcher.NewEventWatcher(db, &mockBlockChain)
}) })
It("syncs transactions for fetched logs", func() {
fakeTransformer := &mocks.MockTransformer{}
w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
w.Syncer = mockTransactionSyncer
err := w.Execute(constants.HeaderMissing)
Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionSyncer.SyncTransactionsCalled).To(BeTrue())
})
It("returns error if syncing transactions fails", func() {
fakeTransformer := &mocks.MockTransformer{}
w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
repository.SetMissingHeaders([]core.Header{fakes.FakeHeader})
mockTransactionSyncer := &fakes.MockTransactionSyncer{}
mockTransactionSyncer.SyncTransactionsError = fakes.FakeError
w.Syncer = mockTransactionSyncer
err := w.Execute(constants.HeaderMissing)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("executes each transformer", func() { It("executes each transformer", func() {
fakeTransformer := &mocks.MockTransformer{} fakeTransformer := &mocks.MockTransformer{}
w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) w.AddTransformers([]transformer.EventTransformerInitializer{fakeTransformer.FakeTransformerInitializer})

View File

@ -40,7 +40,7 @@ var _ = Describe("Block Retriever", func() {
var block1 = core.Block{ var block1 = core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 1, Number: 1,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
BeforeEach(func() { BeforeEach(func() {
@ -63,7 +63,7 @@ var _ = Describe("Block Retriever", func() {
block2 := core.Block{ block2 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 2, Number: 2,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
GasPrice: 0, GasPrice: 0,
GasLimit: 0, GasLimit: 0,
@ -83,7 +83,7 @@ var _ = Describe("Block Retriever", func() {
block3 := core.Block{ block3 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui",
Number: 3, Number: 3,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs",
GasPrice: 0, GasPrice: 0,
GasLimit: 0, GasLimit: 0,
@ -127,7 +127,7 @@ var _ = Describe("Block Retriever", func() {
block2 := core.Block{ block2 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 2, Number: 2,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad6546ae",
GasPrice: 0, GasPrice: 0,
GasLimit: 0, GasLimit: 0,
@ -158,7 +158,7 @@ var _ = Describe("Block Retriever", func() {
block3 := core.Block{ block3 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui",
Number: 3, Number: 3,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs",
GasPrice: 0, GasPrice: 0,
GasLimit: 0, GasLimit: 0,
@ -202,13 +202,13 @@ var _ = Describe("Block Retriever", func() {
block2 := core.Block{ block2 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 2, Number: 2,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
block3 := core.Block{ block3 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui",
Number: 3, Number: 3,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
_, insertErrOne := blockRepository.CreateOrUpdateBlock(block1) _, insertErrOne := blockRepository.CreateOrUpdateBlock(block1)
@ -228,13 +228,13 @@ var _ = Describe("Block Retriever", func() {
block2 := core.Block{ block2 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 2, Number: 2,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
block3 := core.Block{ block3 := core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad456yui",
Number: 3, Number: 3,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
_, insertErrOne := blockRepository.CreateOrUpdateBlock(block1) _, insertErrOne := blockRepository.CreateOrUpdateBlock(block1)

View File

@ -33,7 +33,7 @@ import (
var TransferBlock1 = core.Block{ var TransferBlock1 = core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ert",
Number: 6194633, Number: 6194633,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654aaa", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654aaa",
@ -63,7 +63,7 @@ var TransferBlock1 = core.Block{
var TransferBlock2 = core.Block{ var TransferBlock2 = core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ooo", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ooo",
Number: 6194634, Number: 6194634,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654eee",
@ -93,7 +93,7 @@ var TransferBlock2 = core.Block{
var NewOwnerBlock1 = core.Block{ var NewOwnerBlock1 = core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ppp", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ppp",
Number: 6194635, Number: 6194635,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654bbb",
@ -123,7 +123,7 @@ var NewOwnerBlock1 = core.Block{
var NewOwnerBlock2 = core.Block{ var NewOwnerBlock2 = core.Block{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ggg", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad123ggg",
Number: 6194636, Number: 6194636,
Transactions: []core.Transaction{{ Transactions: []core.TransactionModel{{
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654lll", Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad654lll",

View File

@ -30,7 +30,7 @@ type Block struct {
ParentHash string `db:"parenthash"` ParentHash string `db:"parenthash"`
Size string `db:"size"` Size string `db:"size"`
Time int64 `db:"time"` Time int64 `db:"time"`
Transactions []Transaction Transactions []TransactionModel
UncleHash string `db:"uncle_hash"` UncleHash string `db:"uncle_hash"`
UnclesReward float64 `db:"uncles_reward"` UnclesReward float64 `db:"uncles_reward"`
} }

View File

@ -17,6 +17,7 @@
package core package core
import ( import (
"github.com/ethereum/go-ethereum/common"
"math/big" "math/big"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
@ -26,10 +27,11 @@ import (
type BlockChain interface { type BlockChain interface {
ContractDataFetcher ContractDataFetcher
GetBlockByNumber(blockNumber int64) (Block, error) GetBlockByNumber(blockNumber int64) (Block, error)
GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error)
GetHeaderByNumber(blockNumber int64) (Header, error) GetHeaderByNumber(blockNumber int64) (Header, error)
GetHeaderByNumbers(blockNumbers []int64) ([]Header, error) GetHeaderByNumbers(blockNumbers []int64) ([]Header, error)
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) GetTransactions(transactionHashes []common.Hash) ([]TransactionModel, error)
LastBlock() (*big.Int, error) LastBlock() (*big.Int, error)
Node() Node Node() Node
} }

View File

@ -19,5 +19,5 @@ package core
type Contract struct { type Contract struct {
Abi string Abi string
Hash string Hash string
Transactions []Transaction Transactions []TransactionModel
} }

View File

@ -16,8 +16,8 @@
package core package core
type Transaction struct { type TransactionModel struct {
Data string `db:"input_data"` Data []byte `db:"input_data"`
From string `db:"tx_from"` From string `db:"tx_from"`
GasLimit uint64 GasLimit uint64
GasPrice int64 GasPrice int64
@ -29,3 +29,18 @@ type Transaction struct {
TxIndex int64 `db:"tx_index"` TxIndex int64 `db:"tx_index"`
Value string Value string
} }
type RpcTransaction struct {
Nonce string `json:"nonce"`
GasPrice string `json:"gasPrice"`
GasLimit string `json:"gas"`
Recipient string `json:"to"`
Amount string `json:"value"`
Payload []byte `json:"input"`
V string `json:"v"`
R string `json:"r"`
S string `json:"s"`
Hash string
From string
TransactionIndex string `json:"transactionIndex"`
}

19
pkg/datastore/errors.go Normal file
View File

@ -0,0 +1,19 @@
package datastore
import "fmt"
var ErrBlockDoesNotExist = func(blockNumber int64) error {
return fmt.Errorf("Block number %d does not exist", blockNumber)
}
var ErrContractDoesNotExist = func(contractHash string) error {
return fmt.Errorf("Contract %v does not exist", contractHash)
}
var ErrFilterDoesNotExist = func(name string) error {
return fmt.Errorf("filter %s does not exist", name)
}
var ErrReceiptDoesNotExist = func(txHash string) error {
return fmt.Errorf("Receipt for tx: %v does not exist", txHash)
}

View File

@ -88,7 +88,7 @@ var _ = Describe("Postgres DB", func() {
badBlock := core.Block{ badBlock := core.Block{
Number: 123, Number: 123,
Nonce: badNonce, Nonce: badNonce,
Transactions: []core.Transaction{}, Transactions: []core.TransactionModel{},
} }
node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"}
db := test_config.NewTestDB(node) db := test_config.NewTestDB(node)
@ -146,10 +146,10 @@ var _ = Describe("Postgres DB", func() {
It("does not commit block or transactions if transaction is invalid", func() { It("does not commit block or transactions if transaction is invalid", func() {
//badHash violates db To field length //badHash violates db To field length
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badTransaction := core.Transaction{To: badHash} badTransaction := core.TransactionModel{To: badHash}
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{badTransaction}, Transactions: []core.TransactionModel{badTransaction},
} }
node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"} node := core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "geth"}
db, _ := postgres.NewDB(test_config.DBConfig, node) db, _ := postgres.NewDB(test_config.DBConfig, node)

View File

@ -160,7 +160,7 @@ func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, err
return blockId, nil return blockId, nil
} }
func (blockRepository BlockRepository) createTransactions(tx *sqlx.Tx, blockId int64, transactions []core.Transaction) error { func (blockRepository BlockRepository) createTransactions(tx *sqlx.Tx, blockId int64, transactions []core.TransactionModel) error {
for _, transaction := range transactions { for _, transaction := range transactions {
err := blockRepository.createTransaction(tx, blockId, transaction) err := blockRepository.createTransaction(tx, blockId, transaction)
if err != nil { if err != nil {
@ -180,7 +180,7 @@ func nullStringToZero(s string) string {
return s return s
} }
func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId int64, transaction core.Transaction) error { func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId int64, transaction core.TransactionModel) error {
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO full_sync_transactions `INSERT INTO full_sync_transactions
(block_id, gaslimit, gasprice, hash, input_data, nonce, raw, tx_from, tx_index, tx_to, "value") (block_id, gaslimit, gasprice, hash, input_data, nonce, raw, tx_from, tx_index, tx_to, "value")
@ -205,11 +205,11 @@ func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId in
return nil return nil
} }
func hasLogs(transaction core.Transaction) bool { func hasLogs(transaction core.TransactionModel) bool {
return len(transaction.Receipt.Logs) > 0 return len(transaction.Receipt.Logs) > 0
} }
func hasReceipt(transaction core.Transaction) bool { func hasReceipt(transaction core.TransactionModel) bool {
return transaction.Receipt.TxHash != "" return transaction.Receipt.TxHash != ""
} }
@ -302,10 +302,10 @@ func (blockRepository BlockRepository) loadBlock(blockRows *sqlx.Row) (core.Bloc
return block.Block, nil return block.Block, nil
} }
func (blockRepository BlockRepository) LoadTransactions(transactionRows *sqlx.Rows) []core.Transaction { func (blockRepository BlockRepository) LoadTransactions(transactionRows *sqlx.Rows) []core.TransactionModel {
var transactions []core.Transaction var transactions []core.TransactionModel
for transactionRows.Next() { for transactionRows.Next() {
var transaction core.Transaction var transaction core.TransactionModel
err := transactionRows.StructScan(&transaction) err := transactionRows.StructScan(&transaction)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -133,7 +133,7 @@ var _ = Describe("Saving blocks", func() {
It("saves one transaction associated to the block", func() { It("saves one transaction associated to the block", func() {
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{fakes.FakeTransaction}, Transactions: []core.TransactionModel{fakes.FakeTransaction},
} }
_, insertErr := blockRepository.CreateOrUpdateBlock(block) _, insertErr := blockRepository.CreateOrUpdateBlock(block)
@ -147,7 +147,7 @@ var _ = Describe("Saving blocks", func() {
It("saves two transactions associated to the block", func() { It("saves two transactions associated to the block", func() {
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{fakes.FakeTransaction, fakes.FakeTransaction}, Transactions: []core.TransactionModel{fakes.FakeTransaction, fakes.FakeTransaction},
} }
_, insertErr := blockRepository.CreateOrUpdateBlock(block) _, insertErr := blockRepository.CreateOrUpdateBlock(block)
@ -163,7 +163,7 @@ var _ = Describe("Saving blocks", func() {
blockOne := core.Block{ blockOne := core.Block{
Number: 123, Number: 123,
Hash: "xabc", Hash: "xabc",
Transactions: []core.Transaction{ Transactions: []core.TransactionModel{
fakes.GetFakeTransaction("x123", core.Receipt{}), fakes.GetFakeTransaction("x123", core.Receipt{}),
fakes.GetFakeTransaction("x345", core.Receipt{}), fakes.GetFakeTransaction("x345", core.Receipt{}),
}, },
@ -171,7 +171,7 @@ var _ = Describe("Saving blocks", func() {
blockTwo := core.Block{ blockTwo := core.Block{
Number: 123, Number: 123,
Hash: "xdef", Hash: "xdef",
Transactions: []core.Transaction{ Transactions: []core.TransactionModel{
fakes.GetFakeTransaction("x678", core.Receipt{}), fakes.GetFakeTransaction("x678", core.Receipt{}),
fakes.GetFakeTransaction("x9ab", core.Receipt{}), fakes.GetFakeTransaction("x9ab", core.Receipt{}),
}, },
@ -192,14 +192,14 @@ var _ = Describe("Saving blocks", func() {
but block number + node id is`, func() { but block number + node id is`, func() {
blockOne := core.Block{ blockOne := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{ Transactions: []core.TransactionModel{
fakes.GetFakeTransaction("x123", core.Receipt{}), fakes.GetFakeTransaction("x123", core.Receipt{}),
fakes.GetFakeTransaction("x345", core.Receipt{}), fakes.GetFakeTransaction("x345", core.Receipt{}),
}, },
} }
blockTwo := core.Block{ blockTwo := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{ Transactions: []core.TransactionModel{
fakes.GetFakeTransaction("x678", core.Receipt{}), fakes.GetFakeTransaction("x678", core.Receipt{}),
fakes.GetFakeTransaction("x9ab", core.Receipt{}), fakes.GetFakeTransaction("x9ab", core.Receipt{}),
}, },
@ -256,8 +256,8 @@ var _ = Describe("Saving blocks", func() {
var raw bytes.Buffer var raw bytes.Buffer
rlpErr := gethTransaction.EncodeRLP(&raw) rlpErr := gethTransaction.EncodeRLP(&raw)
Expect(rlpErr).NotTo(HaveOccurred()) Expect(rlpErr).NotTo(HaveOccurred())
transaction := core.Transaction{ transaction := core.TransactionModel{
Data: inputData, Data: common.Hex2Bytes(inputData),
From: from, From: from,
GasLimit: gasLimit, GasLimit: gasLimit,
GasPrice: gasPrice, GasPrice: gasPrice,
@ -271,7 +271,7 @@ var _ = Describe("Saving blocks", func() {
} }
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{transaction}, Transactions: []core.TransactionModel{transaction},
} }
_, insertErr := blockRepository.CreateOrUpdateBlock(block) _, insertErr := blockRepository.CreateOrUpdateBlock(block)

View File

@ -73,24 +73,26 @@ var _ = Describe("Creating contracts", func() {
blockRepository = repositories.NewBlockRepository(db) blockRepository = repositories.NewBlockRepository(db)
block := core.Block{ block := core.Block{
Number: 123, Number: 123,
Transactions: []core.Transaction{ Transactions: []core.TransactionModel{
{Hash: "TRANSACTION1", To: "x123", Value: "0"}, {Hash: "TRANSACTION1", To: "x123", Value: "0"},
{Hash: "TRANSACTION2", To: "x345", Value: "0"}, {Hash: "TRANSACTION2", To: "x345", Value: "0"},
{Hash: "TRANSACTION3", To: "x123", Value: "0"}, {Hash: "TRANSACTION3", To: "x123", Value: "0"},
}, },
} }
blockRepository.CreateOrUpdateBlock(block) _, insertBlockErr := blockRepository.CreateOrUpdateBlock(block)
Expect(insertBlockErr).NotTo(HaveOccurred())
contractRepository.CreateContract(core.Contract{Hash: "x123"}) insertContractErr := contractRepository.CreateContract(core.Contract{Hash: "x123"})
Expect(insertContractErr).NotTo(HaveOccurred())
contract, err := contractRepository.GetContract("x123") contract, err := contractRepository.GetContract("x123")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sort.Slice(contract.Transactions, func(i, j int) bool { sort.Slice(contract.Transactions, func(i, j int) bool {
return contract.Transactions[i].Hash < contract.Transactions[j].Hash return contract.Transactions[i].Hash < contract.Transactions[j].Hash
}) })
Expect(contract.Transactions).To( Expect(contract.Transactions).To(
Equal([]core.Transaction{ Equal([]core.TransactionModel{
{Hash: "TRANSACTION1", To: "x123", Value: "0"}, {Data: []byte{}, Hash: "TRANSACTION1", To: "x123", Value: "0"},
{Hash: "TRANSACTION3", To: "x123", Value: "0"}, {Data: []byte{}, Hash: "TRANSACTION3", To: "x123", Value: "0"},
})) }))
}) })

View File

@ -49,11 +49,13 @@ func (repository HeaderRepository) CreateOrUpdateHeader(header core.Header) (int
return 0, ErrValidHeaderExists return 0, ErrValidHeaderExists
} }
func (repository HeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error { func (repository HeaderRepository) CreateTransaction(headerID int64, transaction core.TransactionModel) error {
_, err := repository.database.Exec(`INSERT INTO public.light_sync_transactions _, err := repository.database.Exec(`INSERT INTO public.light_sync_transactions
(header_id, hash, tx_to, tx_from, tx_index) VALUES ($1, $2, $3, $4, $5) (header_id, hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value")
ON CONFLICT DO NOTHING`, VALUES ($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6::NUMERIC, $7, $8, $9::NUMERIC, $10, $11::NUMERIC)
headerID, transaction.Hash, transaction.To, transaction.From, transaction.TxIndex) ON CONFLICT DO NOTHING`, headerID, transaction.Hash, transaction.GasLimit, transaction.GasPrice,
transaction.Data, transaction.Nonce, transaction.Raw, transaction.From, transaction.TxIndex, transaction.To,
transaction.Value)
return err return err
} }

View File

@ -188,18 +188,26 @@ var _ = Describe("Block header repository", func() {
toAddress := common.HexToAddress("0x5678") toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876") txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123) txIndex := big.NewInt(123)
transaction := core.Transaction{ transaction := core.TransactionModel{
From: fromAddress.Hex(), Data: []byte{},
Hash: txHash.Hex(), From: fromAddress.Hex(),
To: toAddress.Hex(), GasLimit: 0,
TxIndex: txIndex.Int64(), GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
} }
insertErr := repo.CreateTransaction(headerID, transaction) insertErr := repo.CreateTransaction(headerID, transaction)
Expect(insertErr).NotTo(HaveOccurred()) Expect(insertErr).NotTo(HaveOccurred())
var dbTransaction core.Transaction var dbTransaction core.TransactionModel
err = db.Get(&dbTransaction, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID) err = db.Get(&dbTransaction,
`SELECT hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value"
FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(dbTransaction).To(Equal(transaction)) Expect(dbTransaction).To(Equal(transaction))
}) })
@ -211,11 +219,18 @@ var _ = Describe("Block header repository", func() {
toAddress := common.HexToAddress("0x5678") toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876") txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123) txIndex := big.NewInt(123)
transaction := core.Transaction{ transaction := core.TransactionModel{
From: fromAddress.Hex(), Data: []byte{},
Hash: txHash.Hex(), From: fromAddress.Hex(),
To: toAddress.Hex(), GasLimit: 0,
TxIndex: txIndex.Int64(), GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
Receipt: core.Receipt{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
} }
insertErr := repo.CreateTransaction(headerID, transaction) insertErr := repo.CreateTransaction(headerID, transaction)
@ -224,8 +239,10 @@ var _ = Describe("Block header repository", func() {
insertTwoErr := repo.CreateTransaction(headerID, transaction) insertTwoErr := repo.CreateTransaction(headerID, transaction)
Expect(insertTwoErr).NotTo(HaveOccurred()) Expect(insertTwoErr).NotTo(HaveOccurred())
var dbTransactions []core.Transaction var dbTransactions []core.TransactionModel
err = db.Select(&dbTransactions, `SELECT hash, tx_from, tx_to, tx_index FROM public.light_sync_transactions WHERE header_id = $1`, headerID) err = db.Select(&dbTransactions,
`SELECT hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value"
FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(dbTransactions)).To(Equal(1)) Expect(len(dbTransactions)).To(Equal(1))
}) })

View File

@ -204,7 +204,7 @@ var _ = Describe("Logs Repository", func() {
} }
transaction := fakes.GetFakeTransaction(receipt.TxHash, receipt) transaction := fakes.GetFakeTransaction(receipt.TxHash, receipt)
block := core.Block{Transactions: []core.Transaction{transaction}} block := core.Block{Transactions: []core.TransactionModel{transaction}}
_, err := blockRepository.CreateOrUpdateBlock(block) _, err := blockRepository.CreateOrUpdateBlock(block)
Expect(err).To(Not(HaveOccurred())) Expect(err).To(Not(HaveOccurred()))
retrievedLogs, err := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407) retrievedLogs, err := logsRepository.GetLogs("0x99041f808d598b782d5a3e498681c2452a31da08", 4745407)

View File

@ -27,7 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
var _ = Describe("Receipts Repository", func() { var _ = Describe("Receipt Repository", func() {
var blockRepository datastore.BlockRepository var blockRepository datastore.BlockRepository
var logRepository datastore.LogRepository var logRepository datastore.LogRepository
var receiptRepository datastore.ReceiptRepository var receiptRepository datastore.ReceiptRepository
@ -119,7 +119,7 @@ var _ = Describe("Receipts Repository", func() {
} }
transaction := fakes.GetFakeTransaction(expected.TxHash, expected) transaction := fakes.GetFakeTransaction(expected.TxHash, expected)
block := core.Block{Transactions: []core.Transaction{transaction}} block := core.Block{Transactions: []core.TransactionModel{transaction}}
_, err := blockRepository.CreateOrUpdateBlock(block) _, err := blockRepository.CreateOrUpdateBlock(block)
@ -148,7 +148,7 @@ var _ = Describe("Receipts Repository", func() {
transaction := fakes.GetFakeTransaction(receipt.TxHash, receipt) transaction := fakes.GetFakeTransaction(receipt.TxHash, receipt)
block := core.Block{ block := core.Block{
Transactions: []core.Transaction{transaction}, Transactions: []core.TransactionModel{transaction},
} }
_, err := blockRepository.CreateOrUpdateBlock(block) _, err := blockRepository.CreateOrUpdateBlock(block)

View File

@ -17,16 +17,10 @@
package datastore package datastore
import ( import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/filters"
) )
var ErrBlockDoesNotExist = func(blockNumber int64) error {
return fmt.Errorf("Block number %d does not exist", blockNumber)
}
type BlockRepository interface { type BlockRepository interface {
CreateOrUpdateBlock(block core.Block) (int64, error) CreateOrUpdateBlock(block core.Block) (int64, error)
GetBlock(blockNumber int64) (core.Block, error) GetBlock(blockNumber int64) (core.Block, error)
@ -34,20 +28,12 @@ type BlockRepository interface {
SetBlocksStatus(chainHead int64) error SetBlocksStatus(chainHead int64) error
} }
var ErrContractDoesNotExist = func(contractHash string) error {
return fmt.Errorf("Contract %v does not exist", contractHash)
}
type ContractRepository interface { type ContractRepository interface {
CreateContract(contract core.Contract) error CreateContract(contract core.Contract) error
GetContract(contractHash string) (core.Contract, error) GetContract(contractHash string) (core.Contract, error)
ContractExists(contractHash string) (bool, error) ContractExists(contractHash string) (bool, error)
} }
var ErrFilterDoesNotExist = func(name string) error {
return fmt.Errorf("filter %s does not exist", name)
}
type FilterRepository interface { type FilterRepository interface {
CreateFilter(filter filters.LogFilter) error CreateFilter(filter filters.LogFilter) error
GetFilter(name string) (filters.LogFilter, error) GetFilter(name string) (filters.LogFilter, error)
@ -55,7 +41,7 @@ type FilterRepository interface {
type HeaderRepository interface { type HeaderRepository interface {
CreateOrUpdateHeader(header core.Header) (int64, error) CreateOrUpdateHeader(header core.Header) (int64, error)
CreateTransaction(headerID int64, transaction core.Transaction) error CreateTransaction(headerID int64, transaction core.TransactionModel) error
GetHeader(blockNumber int64) (core.Header, error) GetHeader(blockNumber int64) (core.Header, error)
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error)
} }
@ -65,10 +51,6 @@ type LogRepository interface {
GetLogs(address string, blockNumber int64) ([]core.Log, error) GetLogs(address string, blockNumber int64) ([]core.Log, error)
} }
var ErrReceiptDoesNotExist = func(txHash string) error {
return fmt.Errorf("Receipt for tx: %v does not exist", txHash)
}
type ReceiptRepository interface { type ReceiptRepository interface {
CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error
CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error)

View File

@ -29,6 +29,7 @@ import (
) )
var ( var (
FakeAddress = common.HexToAddress("0x1234567890abcdef")
FakeError = errors.New("failed") FakeError = errors.New("failed")
FakeHash = common.BytesToHash([]byte{1, 2, 3, 4, 5}) FakeHash = common.BytesToHash([]byte{1, 2, 3, 4, 5})
fakeTimestamp = int64(111111111) fakeTimestamp = int64(111111111)
@ -53,8 +54,8 @@ func GetFakeHeader(blockNumber int64) core.Header {
var fakeTransaction types.Transaction var fakeTransaction types.Transaction
var rawTransaction bytes.Buffer var rawTransaction bytes.Buffer
var _ = fakeTransaction.EncodeRLP(&rawTransaction) var _ = fakeTransaction.EncodeRLP(&rawTransaction)
var FakeTransaction = core.Transaction{ var FakeTransaction = core.TransactionModel{
Data: "", Data: []byte{},
From: "", From: "",
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,
@ -67,15 +68,15 @@ var FakeTransaction = core.Transaction{
Value: "0", Value: "0",
} }
func GetFakeTransaction(hash string, receipt core.Receipt) core.Transaction { func GetFakeTransaction(hash string, receipt core.Receipt) core.TransactionModel {
gethTransaction := types.Transaction{} gethTransaction := types.Transaction{}
var raw bytes.Buffer var raw bytes.Buffer
err := gethTransaction.EncodeRLP(&raw) err := gethTransaction.EncodeRLP(&raw)
if err != nil { if err != nil {
panic("failed to marshal transaction creating test fake") panic("failed to marshal transaction creating test fake")
} }
return core.Transaction{ return core.TransactionModel{
Data: "", Data: []byte{},
From: "", From: "",
GasLimit: 0, GasLimit: 0,
GasPrice: 0, GasPrice: 0,

View File

@ -17,6 +17,7 @@
package fakes package fakes
import ( import (
"github.com/ethereum/go-ethereum/common"
"math/big" "math/big"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
@ -35,11 +36,15 @@ type MockBlockChain struct {
fetchContractDataPassedResult interface{} fetchContractDataPassedResult interface{}
fetchContractDataPassedBlockNumber int64 fetchContractDataPassedBlockNumber int64
getBlockByNumberErr error getBlockByNumberErr error
GetTransactionsCalled bool
GetTransactionsError error
GetTransactionsPassedHashes []common.Hash
logQuery ethereum.FilterQuery logQuery ethereum.FilterQuery
logQueryErr error logQueryErr error
logQueryReturnLogs []types.Log logQueryReturnLogs []types.Log
lastBlock *big.Int lastBlock *big.Int
node core.Node node core.Node
Transactions []core.TransactionModel
} }
func NewMockBlockChain() *MockBlockChain { func NewMockBlockChain() *MockBlockChain {
@ -104,6 +109,12 @@ func (chain *MockBlockChain) GetLogs(contract core.Contract, startingBlockNumber
return []core.Log{}, nil return []core.Log{}, nil
} }
func (chain *MockBlockChain) GetTransactions(transactionHashes []common.Hash) ([]core.TransactionModel, error) {
chain.GetTransactionsCalled = true
chain.GetTransactionsPassedHashes = transactionHashes
return chain.Transactions, chain.GetTransactionsError
}
func (chain *MockBlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { func (chain *MockBlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) {
return []byte{}, nil return []byte{}, nil
} }

View File

@ -27,6 +27,8 @@ type MockHeaderRepository struct {
createOrUpdateHeaderErr error createOrUpdateHeaderErr error
createOrUpdateHeaderPassedBlockNumbers []int64 createOrUpdateHeaderPassedBlockNumbers []int64
createOrUpdateHeaderReturnID int64 createOrUpdateHeaderReturnID int64
CreateTransactionCalled bool
CreateTransactionError error
getHeaderError error getHeaderError error
getHeaderReturnBlockHash string getHeaderReturnBlockHash string
missingBlockNumbers []int64 missingBlockNumbers []int64
@ -56,8 +58,9 @@ func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header)
return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr return repository.createOrUpdateHeaderReturnID, repository.createOrUpdateHeaderErr
} }
func (repository *MockHeaderRepository) CreateTransaction(headerID int64, transaction core.Transaction) error { func (repository *MockHeaderRepository) CreateTransaction(headerID int64, transaction core.TransactionModel) error {
panic("implement me") repository.CreateTransactionCalled = true
return repository.CreateTransactionError
} }
func (repository *MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository *MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {

View File

@ -18,39 +18,30 @@ package fakes
import ( import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
type MockTransactionConverter struct { type MockTransactionConverter struct {
convertTransactionsToCoreCalled bool ConvertHeaderTransactionIndexToIntCalled bool
convertTransactionsToCorePassedBlock *types.Block ConvertBlockTransactionsToCoreCalled bool
convertTransactionsToCoreReturnTransactions []core.Transaction ConvertBlockTransactionsToCorePassedBlock *types.Block
convertTransactionsToCoreReturnError error
} }
func NewMockTransactionConverter() *MockTransactionConverter { func NewMockTransactionConverter() *MockTransactionConverter {
return &MockTransactionConverter{ return &MockTransactionConverter{
convertTransactionsToCoreCalled: false, ConvertHeaderTransactionIndexToIntCalled: false,
convertTransactionsToCorePassedBlock: nil, ConvertBlockTransactionsToCoreCalled: false,
convertTransactionsToCoreReturnTransactions: nil, ConvertBlockTransactionsToCorePassedBlock: nil,
convertTransactionsToCoreReturnError: nil,
} }
} }
func (mtc *MockTransactionConverter) SetConvertTransactionsToCoreReturnVals(transactions []core.Transaction, err error) { func (converter *MockTransactionConverter) ConvertBlockTransactionsToCore(gethBlock *types.Block) ([]core.TransactionModel, error) {
mtc.convertTransactionsToCoreReturnTransactions = transactions converter.ConvertBlockTransactionsToCoreCalled = true
mtc.convertTransactionsToCoreReturnError = err converter.ConvertBlockTransactionsToCorePassedBlock = gethBlock
return []core.TransactionModel{}, nil
} }
func (mtc *MockTransactionConverter) ConvertTransactionsToCore(gethBlock *types.Block) ([]core.Transaction, error) { func (converter *MockTransactionConverter) ConvertRpcTransactionsToModels(transactions []core.RpcTransaction) ([]core.TransactionModel, error) {
mtc.convertTransactionsToCoreCalled = true converter.ConvertHeaderTransactionIndexToIntCalled = true
mtc.convertTransactionsToCorePassedBlock = gethBlock return nil, nil
return mtc.convertTransactionsToCoreReturnTransactions, mtc.convertTransactionsToCoreReturnError
}
func (mtc *MockTransactionConverter) AssertConvertTransactionsToCoreCalledWith(gethBlock *types.Block) {
Expect(mtc.convertTransactionsToCoreCalled).To(BeTrue())
Expect(mtc.convertTransactionsToCorePassedBlock).To(Equal(gethBlock))
} }

View File

@ -0,0 +1,13 @@
package fakes
import "github.com/ethereum/go-ethereum/core/types"
type MockTransactionSyncer struct {
SyncTransactionsCalled bool
SyncTransactionsError error
}
func (syncer *MockTransactionSyncer) SyncTransactions(headerID int64, logs []types.Log) error {
syncer.SyncTransactionsCalled = true
return syncer.SyncTransactionsError
}

View File

@ -18,10 +18,11 @@ package geth
import ( import (
"errors" "errors"
"fmt"
"github.com/ethereum/go-ethereum"
"math/big" "math/big"
"strconv" "strconv"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -37,20 +38,22 @@ var ErrEmptyHeader = errors.New("empty header returned over RPC")
const MAX_BATCH_SIZE = 100 const MAX_BATCH_SIZE = 100
type BlockChain struct { type BlockChain struct {
blockConverter vulcCommon.BlockConverter blockConverter vulcCommon.BlockConverter
ethClient core.EthClient ethClient core.EthClient
headerConverter vulcCommon.HeaderConverter headerConverter vulcCommon.HeaderConverter
node core.Node node core.Node
rpcClient core.RpcClient rpcClient core.RpcClient
transactionConverter vulcCommon.TransactionConverter
} }
func NewBlockChain(ethClient core.EthClient, rpcClient core.RpcClient, node core.Node, converter vulcCommon.TransactionConverter) *BlockChain { func NewBlockChain(ethClient core.EthClient, rpcClient core.RpcClient, node core.Node, converter vulcCommon.TransactionConverter) *BlockChain {
return &BlockChain{ return &BlockChain{
blockConverter: vulcCommon.NewBlockConverter(converter), blockConverter: vulcCommon.NewBlockConverter(converter),
ethClient: ethClient, ethClient: ethClient,
headerConverter: vulcCommon.HeaderConverter{}, headerConverter: vulcCommon.HeaderConverter{},
node: node, node: node,
rpcClient: rpcClient, rpcClient: rpcClient,
transactionConverter: converter,
} }
} }
@ -62,6 +65,14 @@ func (blockChain *BlockChain) GetBlockByNumber(blockNumber int64) (block core.Bl
return blockChain.blockConverter.ToCoreBlock(gethBlock) return blockChain.blockConverter.ToCoreBlock(gethBlock)
} }
func (blockChain *BlockChain) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) {
gethLogs, err := blockChain.ethClient.FilterLogs(context.Background(), query)
if err != nil {
return []types.Log{}, err
}
return gethLogs, nil
}
func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core.Header, err error) { func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core.Header, err error) {
if blockChain.node.NetworkID == core.KOVAN_NETWORK_ID { if blockChain.node.NetworkID == core.KOVAN_NETWORK_ID {
return blockChain.getPOAHeader(blockNumber) return blockChain.getPOAHeader(blockNumber)
@ -76,49 +87,55 @@ func (blockChain *BlockChain) GetHeaderByNumbers(blockNumbers []int64) (header [
return blockChain.getPOWHeaders(blockNumbers) return blockChain.getPOWHeaders(blockNumbers)
} }
func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Header, err error) { func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) {
gethHeader, err := blockChain.ethClient.HeaderByNumber(context.Background(), big.NewInt(blockNumber)) if endingBlockNumber == nil {
if err != nil { endingBlockNumber = startingBlockNumber
return header, err
} }
return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()), nil contractAddress := common.HexToAddress(contract.Hash)
fc := ethereum.FilterQuery{
FromBlock: startingBlockNumber,
ToBlock: endingBlockNumber,
Addresses: []common.Address{contractAddress},
Topics: nil,
}
gethLogs, err := blockChain.GetEthLogsWithCustomQuery(fc)
if err != nil {
return []core.Log{}, err
}
logs := vulcCommon.ToCoreLogs(gethLogs)
return logs, nil
} }
func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) { func (blockChain *BlockChain) GetTransactions(transactionHashes []common.Hash) ([]core.TransactionModel, error) {
numTransactions := len(transactionHashes)
var batch []client.BatchElem var batch []client.BatchElem
var POWHeaders [MAX_BATCH_SIZE]types.Header transactions := make([]core.RpcTransaction, numTransactions)
includeTransactions := false
for index, blockNumber := range blockNumbers {
if index >= MAX_BATCH_SIZE {
break
}
blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber))
for index, transactionHash := range transactionHashes {
batchElem := client.BatchElem{ batchElem := client.BatchElem{
Method: "eth_getBlockByNumber", Method: "eth_getTransactionByHash",
Result: &POWHeaders[index], Result: &transactions[index],
Args: []interface{}{blockNumberArg, includeTransactions}, Args: []interface{}{transactionHash},
} }
batch = append(batch, batchElem) batch = append(batch, batchElem)
} }
err = blockChain.rpcClient.BatchCall(batch) rpcErr := blockChain.rpcClient.BatchCall(batch)
if err != nil { if rpcErr != nil {
return headers, err fmt.Println("rpc err")
return []core.TransactionModel{}, rpcErr
} }
for _, POWHeader := range POWHeaders { return blockChain.transactionConverter.ConvertRpcTransactionsToModels(transactions)
if POWHeader.Number != nil { }
header := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String())
headers = append(headers, header)
}
}
return headers, err func (blockChain *BlockChain) LastBlock() (*big.Int, error) {
block, err := blockChain.ethClient.HeaderByNumber(context.Background(), nil)
return block.Number, err
}
func (blockChain *BlockChain) Node() core.Node {
return blockChain.node
} }
func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Header, err error) { func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Header, err error) {
@ -204,38 +221,47 @@ func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []cor
return headers, err return headers, err
} }
func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) { func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Header, err error) {
if endingBlockNumber == nil { gethHeader, err := blockChain.ethClient.HeaderByNumber(context.Background(), big.NewInt(blockNumber))
endingBlockNumber = startingBlockNumber
}
contractAddress := common.HexToAddress(contract.Hash)
fc := ethereum.FilterQuery{
FromBlock: startingBlockNumber,
ToBlock: endingBlockNumber,
Addresses: []common.Address{contractAddress},
Topics: nil,
}
gethLogs, err := blockChain.GetEthLogsWithCustomQuery(fc)
if err != nil { if err != nil {
return []core.Log{}, err return header, err
} }
logs := vulcCommon.ToCoreLogs(gethLogs) return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()), nil
return logs, nil
} }
func (blockChain *BlockChain) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) { func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) {
gethLogs, err := blockChain.ethClient.FilterLogs(context.Background(), query) var batch []client.BatchElem
var POWHeaders [MAX_BATCH_SIZE]types.Header
includeTransactions := false
for index, blockNumber := range blockNumbers {
if index >= MAX_BATCH_SIZE {
break
}
blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber))
batchElem := client.BatchElem{
Method: "eth_getBlockByNumber",
Result: &POWHeaders[index],
Args: []interface{}{blockNumberArg, includeTransactions},
}
batch = append(batch, batchElem)
}
err = blockChain.rpcClient.BatchCall(batch)
if err != nil { if err != nil {
return []types.Log{}, err return headers, err
} }
return gethLogs, nil
}
func (blockChain *BlockChain) LastBlock() (*big.Int, error) { for _, POWHeader := range POWHeaders {
block, err := blockChain.ethClient.HeaderByNumber(context.Background(), nil) if POWHeader.Number != nil {
return block.Number, err header := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String())
} headers = append(headers, header)
}
}
func (blockChain *BlockChain) Node() core.Node { return headers, err
return blockChain.node
} }

View File

@ -30,20 +30,23 @@ import (
vulcCore "github.com/vulcanize/vulcanizedb/pkg/core" vulcCore "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/geth/converters/cold_db"
) )
var _ = Describe("Geth blockchain", func() { var _ = Describe("Geth blockchain", func() {
var mockClient *fakes.MockEthClient var (
var mockRpcClient *fakes.MockRpcClient mockClient *fakes.MockEthClient
var node vulcCore.Node blockChain *geth.BlockChain
var blockChain *geth.BlockChain mockRpcClient *fakes.MockRpcClient
mockTransactionConverter *fakes.MockTransactionConverter
node vulcCore.Node
)
BeforeEach(func() { BeforeEach(func() {
mockClient = fakes.NewMockEthClient() mockClient = fakes.NewMockEthClient()
mockRpcClient = fakes.NewMockRpcClient() mockRpcClient = fakes.NewMockRpcClient()
mockTransactionConverter = fakes.NewMockTransactionConverter()
node = vulcCore.Node{} node = vulcCore.Node{}
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, mockTransactionConverter)
}) })
Describe("getting a block", func() { Describe("getting a block", func() {
@ -89,8 +92,6 @@ var _ = Describe("Geth blockchain", func() {
}) })
It("fetches headers with multiple blocks", func() { It("fetches headers with multiple blocks", func() {
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter())
_, err := blockChain.GetHeaderByNumbers([]int64{100, 99}) _, err := blockChain.GetHeaderByNumbers([]int64{100, 99})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -103,7 +104,7 @@ var _ = Describe("Geth blockchain", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID node.NetworkID = vulcCore.KOVAN_NETWORK_ID
blockNumber := hexutil.Big(*big.NewInt(100)) blockNumber := hexutil.Big(*big.NewInt(100))
mockRpcClient.SetReturnPOAHeader(vulcCore.POAHeader{Number: &blockNumber}) mockRpcClient.SetReturnPOAHeader(vulcCore.POAHeader{Number: &blockNumber})
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, fakes.NewMockTransactionConverter())
_, err := blockChain.GetHeaderByNumber(100) _, err := blockChain.GetHeaderByNumber(100)
@ -114,7 +115,7 @@ var _ = Describe("Geth blockchain", func() {
It("returns err if rpcClient returns err", func() { It("returns err if rpcClient returns err", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID node.NetworkID = vulcCore.KOVAN_NETWORK_ID
mockRpcClient.SetCallContextErr(fakes.FakeError) mockRpcClient.SetCallContextErr(fakes.FakeError)
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, fakes.NewMockTransactionConverter())
_, err := blockChain.GetHeaderByNumber(100) _, err := blockChain.GetHeaderByNumber(100)
@ -124,7 +125,7 @@ var _ = Describe("Geth blockchain", func() {
It("returns error if returned header is empty", func() { It("returns error if returned header is empty", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID node.NetworkID = vulcCore.KOVAN_NETWORK_ID
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, fakes.NewMockTransactionConverter())
_, err := blockChain.GetHeaderByNumber(100) _, err := blockChain.GetHeaderByNumber(100)
@ -136,7 +137,6 @@ var _ = Describe("Geth blockchain", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID node.NetworkID = vulcCore.KOVAN_NETWORK_ID
blockNumber := hexutil.Big(*big.NewInt(100)) blockNumber := hexutil.Big(*big.NewInt(100))
mockRpcClient.SetReturnPOAHeaders([]vulcCore.POAHeader{{Number: &blockNumber}}) mockRpcClient.SetReturnPOAHeaders([]vulcCore.POAHeader{{Number: &blockNumber}})
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter())
_, err := blockChain.GetHeaderByNumbers([]int64{100, 99}) _, err := blockChain.GetHeaderByNumbers([]int64{100, 99})
@ -216,6 +216,22 @@ var _ = Describe("Geth blockchain", func() {
}) })
}) })
Describe("getting transactions", func() {
It("fetches transaction for each hash", func() {
_, err := blockChain.GetTransactions([]common.Hash{{}, {}})
Expect(err).NotTo(HaveOccurred())
mockRpcClient.AssertBatchCalledWith("eth_getTransactionByHash", 2)
})
It("converts transaction indexes from hex to int", func() {
_, err := blockChain.GetTransactions([]common.Hash{{}, {}})
Expect(err).NotTo(HaveOccurred())
Expect(mockTransactionConverter.ConvertHeaderTransactionIndexToIntCalled).To(BeTrue())
})
})
Describe("getting the most recent block number", func() { Describe("getting the most recent block number", func() {
It("fetches latest header from ethClient", func() { It("fetches latest header from ethClient", func() {
blockNumber := int64(100) blockNumber := int64(100)

View File

@ -81,7 +81,8 @@ var _ = Describe("Geth cold importer", func() {
mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber) mockEthereumDatabase.AssertGetBlockHashCalledWith(blockNumber)
mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber) mockEthereumDatabase.AssertGetBlockCalledWith(fakeHash, blockNumber)
mockTransactionConverter.AssertConvertTransactionsToCoreCalledWith(fakeGethBlock) Expect(mockTransactionConverter.ConvertBlockTransactionsToCoreCalled).To(BeTrue())
Expect(mockTransactionConverter.ConvertBlockTransactionsToCorePassedBlock).To(Equal(fakeGethBlock))
convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock) convertedBlock, err := blockConverter.ToCoreBlock(fakeGethBlock)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock) mockBlockRepository.AssertCreateOrUpdateBlockCalledWith(convertedBlock)

View File

@ -18,7 +18,6 @@ package cold_db
import ( import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -31,9 +30,9 @@ func NewColdDbTransactionConverter() *ColdDbTransactionConverter {
return &ColdDbTransactionConverter{} return &ColdDbTransactionConverter{}
} }
func (cdtc *ColdDbTransactionConverter) ConvertTransactionsToCore(gethBlock *types.Block) ([]core.Transaction, error) { func (cdtc *ColdDbTransactionConverter) ConvertBlockTransactionsToCore(gethBlock *types.Block) ([]core.TransactionModel, error) {
var g errgroup.Group var g errgroup.Group
coreTransactions := make([]core.Transaction, len(gethBlock.Transactions())) coreTransactions := make([]core.TransactionModel, len(gethBlock.Transactions()))
for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() { for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() {
transaction := gethTransaction transaction := gethTransaction
@ -55,6 +54,10 @@ func (cdtc *ColdDbTransactionConverter) ConvertTransactionsToCore(gethBlock *typ
return coreTransactions, nil return coreTransactions, nil
} }
func (cdtc *ColdDbTransactionConverter) ConvertRpcTransactionsToModels(transactions []core.RpcTransaction) ([]core.TransactionModel, error) {
panic("converting transaction indexes to integer not supported for cold import")
}
func getSigner(tx *types.Transaction) types.Signer { func getSigner(tx *types.Transaction) types.Signer {
v, _, _ := tx.RawSignatureValues() v, _, _ := tx.RawSignatureValues()
if v.Sign() != 0 && tx.Protected() { if v.Sign() != 0 && tx.Protected() {
@ -63,9 +66,8 @@ func getSigner(tx *types.Transaction) types.Signer {
return types.HomesteadSigner{} return types.HomesteadSigner{}
} }
func transToCoreTrans(transaction *types.Transaction, from *common.Address) core.Transaction { func transToCoreTrans(transaction *types.Transaction, from *common.Address) core.TransactionModel {
data := hexutil.Encode(transaction.Data()) return core.TransactionModel{
return core.Transaction{
Hash: transaction.Hash().Hex(), Hash: transaction.Hash().Hex(),
Nonce: transaction.Nonce(), Nonce: transaction.Nonce(),
To: strings.ToLower(addressToHex(transaction.To())), To: strings.ToLower(addressToHex(transaction.To())),
@ -73,7 +75,7 @@ func transToCoreTrans(transaction *types.Transaction, from *common.Address) core
GasLimit: transaction.Gas(), GasLimit: transaction.Gas(),
GasPrice: transaction.GasPrice().Int64(), GasPrice: transaction.GasPrice().Int64(),
Value: transaction.Value().String(), Value: transaction.Value().String(),
Data: data, Data: transaction.Data(),
} }
} }

View File

@ -32,7 +32,7 @@ func NewBlockConverter(transactionConverter TransactionConverter) BlockConverter
} }
func (bc BlockConverter) ToCoreBlock(gethBlock *types.Block) (core.Block, error) { func (bc BlockConverter) ToCoreBlock(gethBlock *types.Block) (core.Block, error) {
transactions, err := bc.transactionConverter.ConvertTransactionsToCore(gethBlock) transactions, err := bc.transactionConverter.ConvertBlockTransactionsToCore(gethBlock)
if err != nil { if err != nil {
return core.Block{}, err return core.Block{}, err
} }

View File

@ -260,7 +260,8 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(coreBlock.Transactions)).To(Equal(1)) Expect(len(coreBlock.Transactions)).To(Equal(1))
coreTransaction := coreBlock.Transactions[0] coreTransaction := coreBlock.Transactions[0]
Expect(coreTransaction.Data).To(Equal("0xf7d8c8830000000000000000000000000000000000000000000000000000000000037788000000000000000000000000000000000000000000000000000000000003bd14")) expectedData := common.FromHex("0xf7d8c8830000000000000000000000000000000000000000000000000000000000037788000000000000000000000000000000000000000000000000000000000003bd14")
Expect(coreTransaction.Data).To(Equal(expectedData))
Expect(coreTransaction.To).To(Equal(gethTransaction.To().Hex())) Expect(coreTransaction.To).To(Equal(gethTransaction.To().Hex()))
Expect(coreTransaction.From).To(Equal("0x0000000000000000000000000000000000000123")) Expect(coreTransaction.From).To(Equal("0x0000000000000000000000000000000000000123"))
Expect(coreTransaction.GasLimit).To(Equal(gethTransaction.Gas())) Expect(coreTransaction.GasLimit).To(Equal(gethTransaction.Gas()))

View File

@ -22,5 +22,6 @@ import (
) )
type TransactionConverter interface { type TransactionConverter interface {
ConvertTransactionsToCore(gethBlock *types.Block) ([]core.Transaction, error) ConvertBlockTransactionsToCore(gethBlock *types.Block) ([]core.TransactionModel, error)
ConvertRpcTransactionsToModels(transactions []core.RpcTransaction) ([]core.TransactionModel, error)
} }

View File

@ -0,0 +1,13 @@
package rpc_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestRpc(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Rpc Suite")
}

View File

@ -19,11 +19,13 @@ package rpc
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"log" "log"
"math/big"
"strings" "strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -35,29 +37,75 @@ type RpcTransactionConverter struct {
client core.EthClient client core.EthClient
} }
// raw transaction data, required for generating RLP
type transactionData struct {
AccountNonce uint64
Price *big.Int
GasLimit uint64
Recipient *common.Address `rlp:"nil"` // nil means contract creation
Amount *big.Int
Payload []byte
V *big.Int
R *big.Int
S *big.Int
}
func NewRpcTransactionConverter(client core.EthClient) *RpcTransactionConverter { func NewRpcTransactionConverter(client core.EthClient) *RpcTransactionConverter {
return &RpcTransactionConverter{client: client} return &RpcTransactionConverter{client: client}
} }
func (rtc *RpcTransactionConverter) ConvertTransactionsToCore(gethBlock *types.Block) ([]core.Transaction, error) { func (converter *RpcTransactionConverter) ConvertRpcTransactionsToModels(transactions []core.RpcTransaction) ([]core.TransactionModel, error) {
var results []core.TransactionModel
for _, transaction := range transactions {
txData, convertErr := getTransactionData(transaction)
if convertErr != nil {
return nil, convertErr
}
txRLP, rlpErr := getTransactionRLP(txData)
if rlpErr != nil {
return nil, rlpErr
}
txIndex, txIndexErr := hexToBigInt(transaction.TransactionIndex)
if txIndexErr != nil {
return nil, txIndexErr
}
transactionModel := core.TransactionModel{
Data: transaction.Payload,
From: transaction.From,
GasLimit: txData.GasLimit,
GasPrice: txData.Price.Int64(),
Hash: transaction.Hash,
Nonce: txData.AccountNonce,
Raw: txRLP,
// NOTE: Light Sync transactions don't include receipt; would require separate RPC call
To: transaction.Recipient,
TxIndex: txIndex.Int64(),
Value: txData.Amount.String(),
}
results = append(results, transactionModel)
}
return results, nil
}
func (converter *RpcTransactionConverter) ConvertBlockTransactionsToCore(gethBlock *types.Block) ([]core.TransactionModel, error) {
var g errgroup.Group var g errgroup.Group
coreTransactions := make([]core.Transaction, len(gethBlock.Transactions())) coreTransactions := make([]core.TransactionModel, len(gethBlock.Transactions()))
for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() { for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() {
//https://golang.org/doc/faq#closures_and_goroutines //https://golang.org/doc/faq#closures_and_goroutines
transaction := gethTransaction transaction := gethTransaction
transactionIndex := uint(gethTransactionIndex) transactionIndex := uint(gethTransactionIndex)
g.Go(func() error { g.Go(func() error {
from, err := rtc.client.TransactionSender(context.Background(), transaction, gethBlock.Hash(), transactionIndex) from, err := converter.client.TransactionSender(context.Background(), transaction, gethBlock.Hash(), transactionIndex)
if err != nil { if err != nil {
log.Println("transaction sender: ", err) log.Println("transaction sender: ", err)
return err return err
} }
coreTransaction, convertErr := transToCoreTrans(transaction, &from, int64(gethTransactionIndex)) coreTransaction, convertErr := convertGethTransactionToModel(transaction, &from, int64(gethTransactionIndex))
if convertErr != nil { if convertErr != nil {
return convertErr return convertErr
} }
coreTransaction, err = rtc.appendReceiptToTransaction(coreTransaction) coreTransaction, err = converter.appendReceiptToTransaction(coreTransaction)
if err != nil { if err != nil {
log.Println("receipt: ", err) log.Println("receipt: ", err)
return err return err
@ -73,7 +121,7 @@ func (rtc *RpcTransactionConverter) ConvertTransactionsToCore(gethBlock *types.B
return coreTransactions, nil return coreTransactions, nil
} }
func (rtc *RpcTransactionConverter) appendReceiptToTransaction(transaction core.Transaction) (core.Transaction, error) { func (rtc *RpcTransactionConverter) appendReceiptToTransaction(transaction core.TransactionModel) (core.TransactionModel, error) {
gethReceipt, err := rtc.client.TransactionReceipt(context.Background(), common.HexToHash(transaction.Hash)) gethReceipt, err := rtc.client.TransactionReceipt(context.Background(), common.HexToHash(transaction.Hash))
if err != nil { if err != nil {
return transaction, err return transaction, err
@ -83,15 +131,14 @@ func (rtc *RpcTransactionConverter) appendReceiptToTransaction(transaction core.
return transaction, nil return transaction, nil
} }
func transToCoreTrans(transaction *types.Transaction, from *common.Address, transactionIndex int64) (core.Transaction, error) { func convertGethTransactionToModel(transaction *types.Transaction, from *common.Address, transactionIndex int64) (core.TransactionModel, error) {
data := hexutil.Encode(transaction.Data()) raw := bytes.Buffer{}
var raw bytes.Buffer
encodeErr := transaction.EncodeRLP(&raw) encodeErr := transaction.EncodeRLP(&raw)
if encodeErr != nil { if encodeErr != nil {
return core.Transaction{}, encodeErr return core.TransactionModel{}, encodeErr
} }
return core.Transaction{ return core.TransactionModel{
Data: data, Data: transaction.Data(),
From: strings.ToLower(addressToHex(from)), From: strings.ToLower(addressToHex(from)),
GasLimit: transaction.Gas(), GasLimit: transaction.Gas(),
GasPrice: transaction.GasPrice().Int64(), GasPrice: transaction.GasPrice().Int64(),
@ -104,9 +151,70 @@ func transToCoreTrans(transaction *types.Transaction, from *common.Address, tran
}, nil }, nil
} }
func getTransactionData(transaction core.RpcTransaction) (transactionData, error) {
nonce, nonceErr := hexToBigInt(transaction.Nonce)
if nonceErr != nil {
return transactionData{}, nonceErr
}
gasPrice, gasPriceErr := hexToBigInt(transaction.GasPrice)
if gasPriceErr != nil {
return transactionData{}, gasPriceErr
}
gasLimit, gasLimitErr := hexToBigInt(transaction.GasLimit)
if gasLimitErr != nil {
return transactionData{}, gasLimitErr
}
recipient := common.HexToAddress(transaction.Recipient)
amount, amountErr := hexToBigInt(transaction.Amount)
if amountErr != nil {
return transactionData{}, amountErr
}
v, vErr := hexToBigInt(transaction.V)
if vErr != nil {
return transactionData{}, vErr
}
r, rErr := hexToBigInt(transaction.R)
if rErr != nil {
return transactionData{}, rErr
}
s, sErr := hexToBigInt(transaction.S)
if sErr != nil {
return transactionData{}, sErr
}
return transactionData{
AccountNonce: nonce.Uint64(),
Price: gasPrice,
GasLimit: gasLimit.Uint64(),
Recipient: &recipient,
Amount: amount,
Payload: transaction.Payload,
V: v,
R: r,
S: s,
}, nil
}
func getTransactionRLP(txData transactionData) ([]byte, error) {
transactionRlp := bytes.Buffer{}
encodeErr := rlp.Encode(&transactionRlp, txData)
if encodeErr != nil {
return nil, encodeErr
}
return transactionRlp.Bytes(), nil
}
func addressToHex(to *common.Address) string { func addressToHex(to *common.Address) string {
if to == nil { if to == nil {
return "" return ""
} }
return to.Hex() return to.Hex()
} }
func hexToBigInt(hex string) (*big.Int, error) {
result := big.NewInt(0)
_, scanErr := fmt.Sscan(hex, result)
if scanErr != nil {
return nil, scanErr
}
return result, nil
}

View File

@ -0,0 +1,82 @@
package rpc_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
)
var _ = Describe("RPC transaction converter", func() {
It("converts hex fields to integers", func() {
converter := rpc.RpcTransactionConverter{}
rpcTransaction := getFakeRpcTransaction("0x1")
transactionModels, err := converter.ConvertRpcTransactionsToModels([]core.RpcTransaction{rpcTransaction})
Expect(err).NotTo(HaveOccurred())
Expect(len(transactionModels)).To(Equal(1))
Expect(transactionModels[0].GasLimit).To(Equal(uint64(1)))
Expect(transactionModels[0].GasPrice).To(Equal(int64(1)))
Expect(transactionModels[0].Nonce).To(Equal(uint64(1)))
Expect(transactionModels[0].TxIndex).To(Equal(int64(1)))
Expect(transactionModels[0].Value).To(Equal("1"))
})
It("returns error if invalid hex cannot be converted", func() {
converter := rpc.RpcTransactionConverter{}
invalidTransaction := getFakeRpcTransaction("invalid")
_, err := converter.ConvertRpcTransactionsToModels([]core.RpcTransaction{invalidTransaction})
Expect(err).To(HaveOccurred())
})
It("copies RPC transaction hash, from, and to values to model", func() {
converter := rpc.RpcTransactionConverter{}
rpcTransaction := getFakeRpcTransaction("0x1")
transactionModels, err := converter.ConvertRpcTransactionsToModels([]core.RpcTransaction{rpcTransaction})
Expect(err).NotTo(HaveOccurred())
Expect(len(transactionModels)).To(Equal(1))
Expect(transactionModels[0].Hash).To(Equal(rpcTransaction.Hash))
Expect(transactionModels[0].From).To(Equal(rpcTransaction.From))
Expect(transactionModels[0].To).To(Equal(rpcTransaction.Recipient))
})
XIt("derives transaction RLP", func() {
// actual transaction: https://kovan.etherscan.io/tx/0x73aefdf70fc5650e0dd82affbb59d107f12dfabc50a78625b434ea68b7a69ee6
// actual RLP hex: 0x2926af093b6b72e3f10089bde6da0f99b0d4e13354f6f37c8334efc9d7e99a47
})
It("does not include transaction receipt", func() {
converter := rpc.RpcTransactionConverter{}
rpcTransaction := getFakeRpcTransaction("0x1")
transactionModels, err := converter.ConvertRpcTransactionsToModels([]core.RpcTransaction{rpcTransaction})
Expect(err).NotTo(HaveOccurred())
Expect(len(transactionModels)).To(Equal(1))
Expect(transactionModels[0].Receipt).To(Equal(core.Receipt{}))
})
})
func getFakeRpcTransaction(hex string) core.RpcTransaction {
return core.RpcTransaction{
Hash: "0x2",
Amount: hex,
GasLimit: hex,
GasPrice: hex,
Nonce: hex,
From: fakes.FakeAddress.Hex(),
Recipient: fakes.FakeAddress.Hex(),
V: "0x2",
R: "0x2",
S: "0x2",
Payload: nil,
TransactionIndex: hex,
}
}