diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e5b01ff2..ac9fc6a4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -10,6 +10,7 @@ type BlockChain interface { ContractDataFetcher GetBlockByNumber(blockNumber int64) (Block, error) GetHeaderByNumber(blockNumber int64) (Header, error) + GetHeaderByNumbers(blockNumbers []int64) ([]Header, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) LastBlock() *big.Int diff --git a/pkg/core/rpc_client.go b/pkg/core/rpc_client.go index d8b99354..51309d93 100644 --- a/pkg/core/rpc_client.go +++ b/pkg/core/rpc_client.go @@ -1,9 +1,13 @@ package core -import "context" +import ( + "context" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" +) type RpcClient interface { CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error + BatchCall(batch []client.BatchElem) error IpcPath() string SupportedModules() (map[string]string, error) } diff --git a/pkg/fakes/mock_blockchain.go b/pkg/fakes/mock_blockchain.go index 2ce7e3f0..2b7159ee 100644 --- a/pkg/fakes/mock_blockchain.go +++ b/pkg/fakes/mock_blockchain.go @@ -75,6 +75,15 @@ func (chain *MockBlockChain) GetHeaderByNumber(blockNumber int64) (core.Header, return core.Header{BlockNumber: blockNumber}, nil } +func (chain *MockBlockChain) GetHeaderByNumbers(blockNumbers []int64) ([]core.Header, error) { + var headers []core.Header + for _, blockNumber := range blockNumbers { + var header = core.Header{BlockNumber: int64(blockNumber)} + headers = append(headers, header) + } + return headers, nil +} + func (chain *MockBlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) { return []core.Log{}, nil } diff --git a/pkg/fakes/mock_eth_client.go b/pkg/fakes/mock_eth_client.go index c1e544d3..fd040b58 100644 --- a/pkg/fakes/mock_eth_client.go +++ b/pkg/fakes/mock_eth_client.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) type MockEthClient struct { @@ -24,12 +25,16 @@ type MockEthClient struct { headerByNumberPassedContext context.Context headerByNumberPassedNumber *big.Int headerByNumberReturnHeader *types.Header + headerByNumbersReturnHeader []*types.Header + headerByNumbersPassedNumber []*big.Int filterLogsErr error filterLogsPassedContext context.Context filterLogsPassedQuery ethereum.FilterQuery filterLogsReturnLogs []types.Log transactionReceipts map[string]*types.Receipt err error + passedBatch []client.BatchElem + passedMethod string transactionSenderErr error transactionReceiptErr error } @@ -55,6 +60,8 @@ func NewMockEthClient() *MockEthClient { filterLogsReturnLogs: nil, transactionReceipts: make(map[string]*types.Receipt), err: nil, + passedBatch: nil, + passedMethod: "123", } } @@ -82,6 +89,10 @@ func (client *MockEthClient) SetHeaderByNumberReturnHeader(header *types.Header) client.headerByNumberReturnHeader = header } +func (client *MockEthClient) SetHeaderByNumbersReturnHeader(headers []*types.Header) { + client.headerByNumbersReturnHeader = headers +} + func (client *MockEthClient) SetFilterLogsErr(err error) { client.filterLogsErr = err } @@ -111,6 +122,13 @@ func (client *MockEthClient) CallContract(ctx context.Context, msg ethereum.Call return client.callContractReturnBytes, client.callContractErr } +func (client *MockEthClient) BatchCall(batch []client.BatchElem) error { + client.passedBatch = batch + client.passedMethod = batch[0].Method + + return nil +} + func (client *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { client.blockByNumberPassedContext = ctx client.blockByNumberPassedNumber = number @@ -123,6 +141,11 @@ func (client *MockEthClient) HeaderByNumber(ctx context.Context, number *big.Int return client.headerByNumberReturnHeader, client.headerByNumberErr } +func (client *MockEthClient) HeaderByNumbers(numbers []*big.Int) ([]*types.Header, error) { + client.headerByNumbersPassedNumber = numbers + return client.headerByNumbersReturnHeader, client.headerByNumberErr +} + func (client *MockEthClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { client.filterLogsPassedContext = ctx client.filterLogsPassedQuery = q @@ -156,7 +179,15 @@ func (client *MockEthClient) AssertHeaderByNumberCalledWith(ctx context.Context, Expect(client.headerByNumberPassedNumber).To(Equal(number)) } +func (client *MockEthClient) AssertHeaderByNumbersCalledWith(number []*big.Int) { + Expect(client.headerByNumbersPassedNumber).To(Equal(number)) +} + func (client *MockEthClient) AssertFilterLogsCalledWith(ctx context.Context, q ethereum.FilterQuery) { Expect(client.filterLogsPassedContext).To(Equal(ctx)) Expect(client.filterLogsPassedQuery).To(Equal(q)) } + +func (client *MockEthClient) AssertBatchCalledWith(method string) { + Expect(client.passedMethod).To(Equal(method)) +} diff --git a/pkg/fakes/mock_rpc_client.go b/pkg/fakes/mock_rpc_client.go index 5f120986..8f25ef87 100644 --- a/pkg/fakes/mock_rpc_client.go +++ b/pkg/fakes/mock_rpc_client.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) type MockRpcClient struct { @@ -18,7 +19,11 @@ type MockRpcClient struct { passedContext context.Context passedMethod string passedResult interface{} + passedBatch []client.BatchElem + lengthOfBatch int returnPOAHeader core.POAHeader + returnPOAHeaders []core.POAHeader + returnPOWHeaders []*types.Header supportedModules map[string]string } @@ -30,6 +35,27 @@ func (client *MockRpcClient) SetIpcPath(ipcPath string) { client.ipcPath = ipcPath } +func (client *MockRpcClient) BatchCall(batch []client.BatchElem) error { + client.passedBatch = batch + client.passedMethod = batch[0].Method + client.lengthOfBatch = len(batch) + + for _, batchElem := range batch { + client.passedContext = context.Background() + client.passedResult = &batchElem.Result + client.passedMethod = batchElem.Method + if p, ok := batchElem.Result.(*types.Header); ok { + *p = types.Header{Number: big.NewInt(100)} + } + if p, ok := batchElem.Result.(*core.POAHeader); ok { + + *p = client.returnPOAHeader + } + } + + return nil +} + func (client *MockRpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { client.passedContext = ctx client.passedResult = result @@ -42,14 +68,16 @@ func (client *MockRpcClient) CallContext(ctx context.Context, result interface{} } case "eth_getBlockByNumber": if p, ok := result.(*types.Header); ok { - *p = types.Header{Number: big.NewInt(123)} + *p = types.Header{Number: big.NewInt(100)} } if p, ok := result.(*core.POAHeader); ok { + *p = client.returnPOAHeader } if client.callContextErr != nil { return client.callContextErr } + case "parity_versionInfo": if p, ok := result.(*core.ParityNodeInfo); ok { *p = core.ParityNodeInfo{ @@ -94,8 +122,24 @@ func (client *MockRpcClient) SetReturnPOAHeader(header core.POAHeader) { client.returnPOAHeader = header } +func (client *MockRpcClient) SetReturnPOWHeaders(headers []*types.Header) { + client.returnPOWHeaders = headers +} + +func (client *MockRpcClient) SetReturnPOAHeaders(headers []core.POAHeader) { + client.returnPOAHeaders = headers +} + func (client *MockRpcClient) AssertCallContextCalledWith(ctx context.Context, result interface{}, method string) { Expect(client.passedContext).To(Equal(ctx)) Expect(client.passedResult).To(BeAssignableToTypeOf(result)) Expect(client.passedMethod).To(Equal(method)) } + +func (client *MockRpcClient) AssertBatchCalledWith(method string, lengthOfBatch int) { + Expect(client.lengthOfBatch).To(Equal(lengthOfBatch)) + for _, batch := range client.passedBatch { + Expect(batch.Method).To(Equal(method)) + } + Expect(client.passedMethod).To(Equal(method)) +} diff --git a/pkg/geth/blockchain.go b/pkg/geth/blockchain.go index d749e701..7b9c49dd 100644 --- a/pkg/geth/blockchain.go +++ b/pkg/geth/blockchain.go @@ -6,16 +6,19 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "golang.org/x/net/context" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common" + "golang.org/x/net/context" + "strconv" ) var ErrEmptyHeader = errors.New("empty header returned over RPC") +const MAX_BATCH_SIZE = 100 + type BlockChain struct { blockConverter vulcCommon.BlockConverter ethClient core.EthClient @@ -49,6 +52,13 @@ func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core. return blockChain.getPOWHeader(blockNumber) } +func (blockChain *BlockChain) GetHeaderByNumbers(blockNumbers []int64) (header []core.Header, err error) { + if blockChain.node.NetworkID == core.KOVAN_NETWORK_ID { + return blockChain.getPOAHeaders(blockNumbers) + } + return blockChain.getPOWHeaders(blockNumbers) +} + func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Header, err error) { gethHeader, err := blockChain.ethClient.HeaderByNumber(context.Background(), big.NewInt(blockNumber)) if err != nil { @@ -57,6 +67,44 @@ func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Heade return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String()) } +func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) { + var batch []client.BatchElem + var POWHeaders [MAX_BATCH_SIZE]types.Header + includeTransactions := false + + for index, blockNumber := range blockNumbers { + + if index >= MAX_BATCH_SIZE { + break + } + + blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber)) + + batchElem := client.BatchElem{ + Method: "eth_getBlockByNumber", + Result: &POWHeaders[index], + Args: []interface{}{blockNumberArg, includeTransactions}, + } + + batch = append(batch, batchElem) + } + + err = blockChain.rpcClient.BatchCall(batch) + if err != nil { + return headers, err + } + + for _, POWHeader := range POWHeaders { + if POWHeader.Number != nil { + header, _ := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String()) + + headers = append(headers, header) + } + } + + return headers, err +} + func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Header, err error) { var POAHeader core.POAHeader blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber)) @@ -85,6 +133,61 @@ func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Heade }, POAHeader.Hash.String()) } +func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []core.Header, err error) { + + var batch []client.BatchElem + var POAHeaders [MAX_BATCH_SIZE]core.POAHeader + includeTransactions := false + + for index, blockNumber := range blockNumbers { + + if index >= MAX_BATCH_SIZE { + break + } + + blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber)) + + batchElem := client.BatchElem{ + Method: "eth_getBlockByNumber", + Result: &POAHeaders[index], + Args: []interface{}{blockNumberArg, includeTransactions}, + } + + batch = append(batch, batchElem) + } + + err = blockChain.rpcClient.BatchCall(batch) + if err != nil { + return headers, err + } + + for _, POAHeader := range POAHeaders { + var header core.Header + //Header.Number of the newest block will return nil. + if _, err := strconv.ParseUint(POAHeader.Number.ToInt().String(), 16, 64); err == nil { + header, _ = blockChain.headerConverter.Convert(&types.Header{ + ParentHash: POAHeader.ParentHash, + UncleHash: POAHeader.UncleHash, + Coinbase: POAHeader.Coinbase, + Root: POAHeader.Root, + TxHash: POAHeader.TxHash, + ReceiptHash: POAHeader.ReceiptHash, + Bloom: POAHeader.Bloom, + Difficulty: POAHeader.Difficulty.ToInt(), + Number: POAHeader.Number.ToInt(), + GasLimit: uint64(POAHeader.GasLimit), + GasUsed: uint64(POAHeader.GasUsed), + Time: POAHeader.Time.ToInt(), + Extra: POAHeader.Extra, + }, POAHeader.Hash.String()) + + headers = append(headers, header) + } + } + + return headers, err +} + func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) { if endingBlockNumber == nil { endingBlockNumber = startingBlockNumber diff --git a/pkg/geth/blockchain_test.go b/pkg/geth/blockchain_test.go index db750296..5f81ef78 100644 --- a/pkg/geth/blockchain_test.go +++ b/pkg/geth/blockchain_test.go @@ -5,12 +5,12 @@ import ( "math/big" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" vulcCore "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/geth" @@ -71,12 +71,21 @@ var _ = Describe("Geth blockchain", func() { Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) }) + + It("fetches headers with multiple blocks", func() { + blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) + + _, err := blockChain.GetHeaderByNumbers([]int64{100, 99}) + + Expect(err).NotTo(HaveOccurred()) + mockRpcClient.AssertBatchCalledWith("eth_getBlockByNumber", 2) + }) }) Describe("POA/Kovan", func() { It("fetches header from rpcClient", func() { node.NetworkID = vulcCore.KOVAN_NETWORK_ID - blockNumber := hexutil.Big(*big.NewInt(123)) + blockNumber := hexutil.Big(*big.NewInt(100)) mockRpcClient.SetReturnPOAHeader(vulcCore.POAHeader{Number: &blockNumber}) blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) @@ -106,6 +115,18 @@ var _ = Describe("Geth blockchain", func() { Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(geth.ErrEmptyHeader)) }) + + It("returns multiple headers with multiple blocknumbers", func() { + node.NetworkID = vulcCore.KOVAN_NETWORK_ID + blockNumber := hexutil.Big(*big.NewInt(100)) + mockRpcClient.SetReturnPOAHeaders([]vulcCore.POAHeader{{Number: &blockNumber}}) + blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter()) + + _, err := blockChain.GetHeaderByNumbers([]int64{100, 99}) + + Expect(err).NotTo(HaveOccurred()) + mockRpcClient.AssertBatchCalledWith("eth_getBlockByNumber", 2) + }) }) }) diff --git a/pkg/geth/client/rpc_client.go b/pkg/geth/client/rpc_client.go index e8453e87..e2adf7c5 100644 --- a/pkg/geth/client/rpc_client.go +++ b/pkg/geth/client/rpc_client.go @@ -10,6 +10,13 @@ type RpcClient struct { ipcPath string } +type BatchElem struct { + Method string + Args []interface{} + Result interface{} + Error error +} + func NewRpcClient(client *rpc.Client, ipcPath string) RpcClient { return RpcClient{ client: client, @@ -35,3 +42,14 @@ func (client RpcClient) IpcPath() string { func (client RpcClient) SupportedModules() (map[string]string, error) { return client.client.SupportedModules() } + +func (client RpcClient) BatchCall(batch []BatchElem) error { + var rpcBatch []rpc.BatchElem + for index, batchElem := range batch { + rpcBatch[index].Result = batchElem.Result + rpcBatch[index].Method = batchElem.Method + rpcBatch[index].Args = batchElem.Args + rpcBatch[index].Error = batchElem.Error + } + return client.client.BatchCall(rpcBatch) +} diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 77043921..1339e9e0 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -34,11 +34,8 @@ func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datasto } func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64) (int, error) { - for _, blockNumber := range blockNumbers { - header, err := chain.GetHeaderByNumber(blockNumber) - if err != nil { - return 0, err - } + headers, err := chain.GetHeaderByNumbers(blockNumbers) + for _, header := range headers { _, err = headerRepository.CreateOrUpdateHeader(header) if err != nil { if err == repositories.ErrValidHeaderExists { @@ -47,5 +44,6 @@ func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore. return 0, err } } + return len(blockNumbers), nil }