Merge pull request #134 from 8thlight/batch-historical-headers

VDB-303 Batch headers by 100
This commit is contained in:
Takayuki Goto 2019-01-21 15:29:08 -06:00 committed by GitHub
commit 005576572f
9 changed files with 241 additions and 12 deletions

View File

@ -10,6 +10,7 @@ type BlockChain interface {
ContractDataFetcher
GetBlockByNumber(blockNumber int64) (Block, error)
GetHeaderByNumber(blockNumber int64) (Header, error)
GetHeaderByNumbers(blockNumbers []int64) ([]Header, error)
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error)
LastBlock() *big.Int

View File

@ -1,9 +1,13 @@
package core
import "context"
import (
"context"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
)
type RpcClient interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCall(batch []client.BatchElem) error
IpcPath() string
SupportedModules() (map[string]string, error)
}

View File

@ -75,6 +75,15 @@ func (chain *MockBlockChain) GetHeaderByNumber(blockNumber int64) (core.Header,
return core.Header{BlockNumber: blockNumber}, nil
}
func (chain *MockBlockChain) GetHeaderByNumbers(blockNumbers []int64) ([]core.Header, error) {
var headers []core.Header
for _, blockNumber := range blockNumbers {
var header = core.Header{BlockNumber: int64(blockNumber)}
headers = append(headers, header)
}
return headers, nil
}
func (chain *MockBlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) {
return []core.Log{}, nil
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
)
type MockEthClient struct {
@ -24,12 +25,16 @@ type MockEthClient struct {
headerByNumberPassedContext context.Context
headerByNumberPassedNumber *big.Int
headerByNumberReturnHeader *types.Header
headerByNumbersReturnHeader []*types.Header
headerByNumbersPassedNumber []*big.Int
filterLogsErr error
filterLogsPassedContext context.Context
filterLogsPassedQuery ethereum.FilterQuery
filterLogsReturnLogs []types.Log
transactionReceipts map[string]*types.Receipt
err error
passedBatch []client.BatchElem
passedMethod string
transactionSenderErr error
transactionReceiptErr error
}
@ -55,6 +60,8 @@ func NewMockEthClient() *MockEthClient {
filterLogsReturnLogs: nil,
transactionReceipts: make(map[string]*types.Receipt),
err: nil,
passedBatch: nil,
passedMethod: "123",
}
}
@ -82,6 +89,10 @@ func (client *MockEthClient) SetHeaderByNumberReturnHeader(header *types.Header)
client.headerByNumberReturnHeader = header
}
func (client *MockEthClient) SetHeaderByNumbersReturnHeader(headers []*types.Header) {
client.headerByNumbersReturnHeader = headers
}
func (client *MockEthClient) SetFilterLogsErr(err error) {
client.filterLogsErr = err
}
@ -111,6 +122,13 @@ func (client *MockEthClient) CallContract(ctx context.Context, msg ethereum.Call
return client.callContractReturnBytes, client.callContractErr
}
func (client *MockEthClient) BatchCall(batch []client.BatchElem) error {
client.passedBatch = batch
client.passedMethod = batch[0].Method
return nil
}
func (client *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
client.blockByNumberPassedContext = ctx
client.blockByNumberPassedNumber = number
@ -123,6 +141,11 @@ func (client *MockEthClient) HeaderByNumber(ctx context.Context, number *big.Int
return client.headerByNumberReturnHeader, client.headerByNumberErr
}
func (client *MockEthClient) HeaderByNumbers(numbers []*big.Int) ([]*types.Header, error) {
client.headerByNumbersPassedNumber = numbers
return client.headerByNumbersReturnHeader, client.headerByNumberErr
}
func (client *MockEthClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
client.filterLogsPassedContext = ctx
client.filterLogsPassedQuery = q
@ -156,7 +179,15 @@ func (client *MockEthClient) AssertHeaderByNumberCalledWith(ctx context.Context,
Expect(client.headerByNumberPassedNumber).To(Equal(number))
}
func (client *MockEthClient) AssertHeaderByNumbersCalledWith(number []*big.Int) {
Expect(client.headerByNumbersPassedNumber).To(Equal(number))
}
func (client *MockEthClient) AssertFilterLogsCalledWith(ctx context.Context, q ethereum.FilterQuery) {
Expect(client.filterLogsPassedContext).To(Equal(ctx))
Expect(client.filterLogsPassedQuery).To(Equal(q))
}
func (client *MockEthClient) AssertBatchCalledWith(method string) {
Expect(client.passedMethod).To(Equal(method))
}

View File

@ -9,6 +9,7 @@ import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
)
type MockRpcClient struct {
@ -18,7 +19,11 @@ type MockRpcClient struct {
passedContext context.Context
passedMethod string
passedResult interface{}
passedBatch []client.BatchElem
lengthOfBatch int
returnPOAHeader core.POAHeader
returnPOAHeaders []core.POAHeader
returnPOWHeaders []*types.Header
supportedModules map[string]string
}
@ -30,6 +35,27 @@ func (client *MockRpcClient) SetIpcPath(ipcPath string) {
client.ipcPath = ipcPath
}
func (client *MockRpcClient) BatchCall(batch []client.BatchElem) error {
client.passedBatch = batch
client.passedMethod = batch[0].Method
client.lengthOfBatch = len(batch)
for _, batchElem := range batch {
client.passedContext = context.Background()
client.passedResult = &batchElem.Result
client.passedMethod = batchElem.Method
if p, ok := batchElem.Result.(*types.Header); ok {
*p = types.Header{Number: big.NewInt(100)}
}
if p, ok := batchElem.Result.(*core.POAHeader); ok {
*p = client.returnPOAHeader
}
}
return nil
}
func (client *MockRpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
client.passedContext = ctx
client.passedResult = result
@ -42,14 +68,16 @@ func (client *MockRpcClient) CallContext(ctx context.Context, result interface{}
}
case "eth_getBlockByNumber":
if p, ok := result.(*types.Header); ok {
*p = types.Header{Number: big.NewInt(123)}
*p = types.Header{Number: big.NewInt(100)}
}
if p, ok := result.(*core.POAHeader); ok {
*p = client.returnPOAHeader
}
if client.callContextErr != nil {
return client.callContextErr
}
case "parity_versionInfo":
if p, ok := result.(*core.ParityNodeInfo); ok {
*p = core.ParityNodeInfo{
@ -94,8 +122,24 @@ func (client *MockRpcClient) SetReturnPOAHeader(header core.POAHeader) {
client.returnPOAHeader = header
}
func (client *MockRpcClient) SetReturnPOWHeaders(headers []*types.Header) {
client.returnPOWHeaders = headers
}
func (client *MockRpcClient) SetReturnPOAHeaders(headers []core.POAHeader) {
client.returnPOAHeaders = headers
}
func (client *MockRpcClient) AssertCallContextCalledWith(ctx context.Context, result interface{}, method string) {
Expect(client.passedContext).To(Equal(ctx))
Expect(client.passedResult).To(BeAssignableToTypeOf(result))
Expect(client.passedMethod).To(Equal(method))
}
func (client *MockRpcClient) AssertBatchCalledWith(method string, lengthOfBatch int) {
Expect(client.lengthOfBatch).To(Equal(lengthOfBatch))
for _, batch := range client.passedBatch {
Expect(batch.Method).To(Equal(method))
}
Expect(client.passedMethod).To(Equal(method))
}

View File

@ -6,16 +6,19 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vulcCommon "github.com/vulcanize/vulcanizedb/pkg/geth/converters/common"
"golang.org/x/net/context"
"strconv"
)
var ErrEmptyHeader = errors.New("empty header returned over RPC")
const MAX_BATCH_SIZE = 100
type BlockChain struct {
blockConverter vulcCommon.BlockConverter
ethClient core.EthClient
@ -49,6 +52,13 @@ func (blockChain *BlockChain) GetHeaderByNumber(blockNumber int64) (header core.
return blockChain.getPOWHeader(blockNumber)
}
func (blockChain *BlockChain) GetHeaderByNumbers(blockNumbers []int64) (header []core.Header, err error) {
if blockChain.node.NetworkID == core.KOVAN_NETWORK_ID {
return blockChain.getPOAHeaders(blockNumbers)
}
return blockChain.getPOWHeaders(blockNumbers)
}
func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Header, err error) {
gethHeader, err := blockChain.ethClient.HeaderByNumber(context.Background(), big.NewInt(blockNumber))
if err != nil {
@ -57,6 +67,44 @@ func (blockChain *BlockChain) getPOWHeader(blockNumber int64) (header core.Heade
return blockChain.headerConverter.Convert(gethHeader, gethHeader.Hash().String())
}
func (blockChain *BlockChain) getPOWHeaders(blockNumbers []int64) (headers []core.Header, err error) {
var batch []client.BatchElem
var POWHeaders [MAX_BATCH_SIZE]types.Header
includeTransactions := false
for index, blockNumber := range blockNumbers {
if index >= MAX_BATCH_SIZE {
break
}
blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber))
batchElem := client.BatchElem{
Method: "eth_getBlockByNumber",
Result: &POWHeaders[index],
Args: []interface{}{blockNumberArg, includeTransactions},
}
batch = append(batch, batchElem)
}
err = blockChain.rpcClient.BatchCall(batch)
if err != nil {
return headers, err
}
for _, POWHeader := range POWHeaders {
if POWHeader.Number != nil {
header, _ := blockChain.headerConverter.Convert(&POWHeader, POWHeader.Hash().String())
headers = append(headers, header)
}
}
return headers, err
}
func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Header, err error) {
var POAHeader core.POAHeader
blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber))
@ -85,6 +133,61 @@ func (blockChain *BlockChain) getPOAHeader(blockNumber int64) (header core.Heade
}, POAHeader.Hash.String())
}
func (blockChain *BlockChain) getPOAHeaders(blockNumbers []int64) (headers []core.Header, err error) {
var batch []client.BatchElem
var POAHeaders [MAX_BATCH_SIZE]core.POAHeader
includeTransactions := false
for index, blockNumber := range blockNumbers {
if index >= MAX_BATCH_SIZE {
break
}
blockNumberArg := hexutil.EncodeBig(big.NewInt(blockNumber))
batchElem := client.BatchElem{
Method: "eth_getBlockByNumber",
Result: &POAHeaders[index],
Args: []interface{}{blockNumberArg, includeTransactions},
}
batch = append(batch, batchElem)
}
err = blockChain.rpcClient.BatchCall(batch)
if err != nil {
return headers, err
}
for _, POAHeader := range POAHeaders {
var header core.Header
//Header.Number of the newest block will return nil.
if _, err := strconv.ParseUint(POAHeader.Number.ToInt().String(), 16, 64); err == nil {
header, _ = blockChain.headerConverter.Convert(&types.Header{
ParentHash: POAHeader.ParentHash,
UncleHash: POAHeader.UncleHash,
Coinbase: POAHeader.Coinbase,
Root: POAHeader.Root,
TxHash: POAHeader.TxHash,
ReceiptHash: POAHeader.ReceiptHash,
Bloom: POAHeader.Bloom,
Difficulty: POAHeader.Difficulty.ToInt(),
Number: POAHeader.Number.ToInt(),
GasLimit: uint64(POAHeader.GasLimit),
GasUsed: uint64(POAHeader.GasUsed),
Time: POAHeader.Time.ToInt(),
Extra: POAHeader.Extra,
}, POAHeader.Hash.String())
headers = append(headers, header)
}
}
return headers, err
}
func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) {
if endingBlockNumber == nil {
endingBlockNumber = startingBlockNumber

View File

@ -5,12 +5,12 @@ import (
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
vulcCore "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/geth"
@ -71,12 +71,21 @@ var _ = Describe("Geth blockchain", func() {
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("fetches headers with multiple blocks", func() {
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter())
_, err := blockChain.GetHeaderByNumbers([]int64{100, 99})
Expect(err).NotTo(HaveOccurred())
mockRpcClient.AssertBatchCalledWith("eth_getBlockByNumber", 2)
})
})
Describe("POA/Kovan", func() {
It("fetches header from rpcClient", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID
blockNumber := hexutil.Big(*big.NewInt(123))
blockNumber := hexutil.Big(*big.NewInt(100))
mockRpcClient.SetReturnPOAHeader(vulcCore.POAHeader{Number: &blockNumber})
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter())
@ -106,6 +115,18 @@ var _ = Describe("Geth blockchain", func() {
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(geth.ErrEmptyHeader))
})
It("returns multiple headers with multiple blocknumbers", func() {
node.NetworkID = vulcCore.KOVAN_NETWORK_ID
blockNumber := hexutil.Big(*big.NewInt(100))
mockRpcClient.SetReturnPOAHeaders([]vulcCore.POAHeader{{Number: &blockNumber}})
blockChain = geth.NewBlockChain(mockClient, mockRpcClient, node, cold_db.NewColdDbTransactionConverter())
_, err := blockChain.GetHeaderByNumbers([]int64{100, 99})
Expect(err).NotTo(HaveOccurred())
mockRpcClient.AssertBatchCalledWith("eth_getBlockByNumber", 2)
})
})
})

View File

@ -10,6 +10,13 @@ type RpcClient struct {
ipcPath string
}
type BatchElem struct {
Method string
Args []interface{}
Result interface{}
Error error
}
func NewRpcClient(client *rpc.Client, ipcPath string) RpcClient {
return RpcClient{
client: client,
@ -35,3 +42,14 @@ func (client RpcClient) IpcPath() string {
func (client RpcClient) SupportedModules() (map[string]string, error) {
return client.client.SupportedModules()
}
func (client RpcClient) BatchCall(batch []BatchElem) error {
var rpcBatch []rpc.BatchElem
for index, batchElem := range batch {
rpcBatch[index].Result = batchElem.Result
rpcBatch[index].Method = batchElem.Method
rpcBatch[index].Args = batchElem.Args
rpcBatch[index].Error = batchElem.Error
}
return client.client.BatchCall(rpcBatch)
}

View File

@ -34,11 +34,8 @@ func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datasto
}
func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64) (int, error) {
for _, blockNumber := range blockNumbers {
header, err := chain.GetHeaderByNumber(blockNumber)
if err != nil {
return 0, err
}
headers, err := chain.GetHeaderByNumbers(blockNumbers)
for _, header := range headers {
_, err = headerRepository.CreateOrUpdateHeader(header)
if err != nil {
if err == repositories.ErrValidHeaderExists {
@ -47,5 +44,6 @@ func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.
return 0, err
}
}
return len(blockNumbers), nil
}