forward cache misses (err and/or empty results) to remote node

This commit is contained in:
Ian Norden 2020-10-26 08:58:37 -05:00
parent 0a8b54d366
commit e1cab4fadc
3 changed files with 112 additions and 14 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -44,13 +45,20 @@ const APIName = "eth"
const APIVersion = "0.0.1" const APIVersion = "0.0.1"
type PublicEthAPI struct { type PublicEthAPI struct {
// Local db backend
B *Backend B *Backend
// Remote node for forwarding cache misses
rpc *rpc.Client
ethClient *ethclient.Client
} }
// NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend // NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend
func NewPublicEthAPI(b *Backend) *PublicEthAPI { func NewPublicEthAPI(b *Backend, client *rpc.Client) *PublicEthAPI {
return &PublicEthAPI{ return &PublicEthAPI{
B: b, B: b,
rpc: client,
ethClient: ethclient.NewClient(client),
} }
} }
@ -64,6 +72,19 @@ func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 {
// //
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) {
logs, err := pea.getLogs(ctx, crit)
if err != nil && pea.rpc != nil {
if arg, err := toFilterArg(crit); err == nil {
var result []*types.Log
if err := pea.rpc.CallContext(ctx, &result, "eth_getLogs", arg); err == nil {
return result, nil
}
}
}
return logs, err
}
func (pea *PublicEthAPI) getLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) {
// Convert FilterQuery into ReceiptFilter // Convert FilterQuery into ReceiptFilter
addrStrs := make([]string, len(crit.Addresses)) addrStrs := make([]string, len(crit.Addresses))
for i, addr := range crit.Addresses { for i, addr := range crit.Addresses {
@ -162,6 +183,11 @@ func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.Block
if header != nil && err == nil { if header != nil && err == nil {
return pea.rpcMarshalHeader(header) return pea.rpcMarshalHeader(header)
} }
if pea.ethClient != nil {
if header, err := pea.ethClient.HeaderByNumber(ctx, big.NewInt(number.Int64())); header != nil && err == nil {
return pea.rpcMarshalHeader(header)
}
}
return nil, err return nil, err
} }
@ -175,6 +201,11 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN
if block != nil && err == nil { if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx) return pea.rpcMarshalBlock(block, true, fullTx)
} }
if pea.ethClient != nil {
if block, err := pea.ethClient.BlockByNumber(ctx, big.NewInt(number.Int64())); block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
}
return nil, err return nil, err
} }
@ -182,25 +213,38 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN
// detail, otherwise only the transaction hash is returned. // detail, otherwise only the transaction hash is returned.
func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
block, err := pea.B.BlockByHash(ctx, hash) block, err := pea.B.BlockByHash(ctx, hash)
if block != nil { if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx) return pea.rpcMarshalBlock(block, true, fullTx)
} }
if pea.ethClient != nil {
if block, err := pea.ethClient.BlockByHash(ctx, hash); block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
}
return nil, err return nil, err
} }
// GetTransactionByHash returns the transaction for the given hash // GetTransactionByHash returns the transaction for the given hash
// eth ipld-eth-server cannot currently handle pending/tx_pool txs // eth ipld-eth-server cannot currently handle pending/tx_pool txs
func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash) tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash)
if err != nil { if tx != nil && err == nil {
return nil, err
}
if tx != nil {
return NewRPCTransaction(tx, blockHash, blockNumber, index), nil return NewRPCTransaction(tx, blockHash, blockNumber, index), nil
} }
// Transaction unknown, return as such if pea.rpc != nil {
return nil, nil if tx, err := pea.remoteGetTransactionByHash(ctx, hash); tx != nil && err == nil {
return tx, nil
}
}
return nil, err
}
func (pea *PublicEthAPI) remoteGetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
var tx *RPCTransaction
if err := pea.rpc.CallContext(ctx, &tx, "eth_getTransactionByHash", hash); err != nil {
return nil, err
}
return tx, nil
} }
// Call executes the given transaction on the state for the given block number. // Call executes the given transaction on the state for the given block number.
@ -215,12 +259,25 @@ func (pea *PublicEthAPI) Call(ctx context.Context, args CallArgs, blockNrOrHash
accounts = *overrides accounts = *overrides
} }
result, _, failed, err := DoCall(ctx, pea.B, args, blockNrOrHash, accounts, 5*time.Second, pea.B.Config.RPCGasCap) result, _, failed, err := DoCall(ctx, pea.B, args, blockNrOrHash, accounts, 5*time.Second, pea.B.Config.RPCGasCap)
if (failed || err != nil) && pea.rpc != nil {
if res, err := pea.remoteCall(ctx, args, blockNrOrHash, overrides); res != nil && err == nil {
return res, nil
}
}
if failed && err == nil { if failed && err == nil {
return nil, errors.New("eth_call failed without error") return nil, errors.New("eth_call failed without error")
} }
return (hexutil.Bytes)(result), err return (hexutil.Bytes)(result), err
} }
func (pea *PublicEthAPI) remoteCall(ctx context.Context, msg CallArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *map[common.Address]account) (hexutil.Bytes, error) {
var hex hexutil.Bytes
if err := pea.rpc.CallContext(ctx, &hex, "eth_call", msg, blockNrOrHash, overrides); err != nil {
return nil, err
}
return hex, nil
}
// CallArgs represents the arguments for a call. // CallArgs represents the arguments for a call.
type CallArgs struct { type CallArgs struct {
From *common.Address `json:"from"` From *common.Address `json:"from"`
@ -351,3 +408,31 @@ func DoCall(ctx context.Context, b *Backend, args CallArgs, blockNrOrHash rpc.Bl
} }
return res, gas, failed, err return res, gas, failed, err
} }
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
return nil, fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock")
}
} else {
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
} else {
arg["fromBlock"] = toBlockNumArg(q.FromBlock)
}
arg["toBlock"] = toBlockNumArg(q.ToBlock)
}
return arg, nil
}
func toBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
}
return hexutil.EncodeBig(number)
}

View File

@ -17,15 +17,18 @@
package serve package serve
import ( import (
"fmt"
"math/big" "math/big"
"os" "os"
"path/filepath" "path/filepath"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-indexer/pkg/node"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-indexer/utils"
"github.com/vulcanize/ipld-eth-server/pkg/prom" "github.com/vulcanize/ipld-eth-server/pkg/prom"
@ -60,6 +63,7 @@ type Config struct {
ChainConfig *params.ChainConfig ChainConfig *params.ChainConfig
DefaultSender *common.Address DefaultSender *common.Address
RPCGasCap *big.Int RPCGasCap *big.Int
Client *rpc.Client
} }
// NewConfig is used to initialize a watcher config from a .toml file // NewConfig is used to initialize a watcher config from a .toml file
@ -67,6 +71,7 @@ type Config struct {
func NewConfig() (*Config, error) { func NewConfig() (*Config, error) {
c := new(Config) c := new(Config)
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
viper.BindEnv("server.wsPath", SERVER_WS_PATH) viper.BindEnv("server.wsPath", SERVER_WS_PATH)
viper.BindEnv("server.ipcPath", SERVER_IPC_PATH) viper.BindEnv("server.ipcPath", SERVER_IPC_PATH)
viper.BindEnv("server.httpPath", SERVER_HTTP_PATH) viper.BindEnv("server.httpPath", SERVER_HTTP_PATH)
@ -76,6 +81,13 @@ func NewConfig() (*Config, error) {
c.DBConfig.Init() c.DBConfig.Init()
ethHTTP := viper.GetString("ethereum.httpPath")
nodeInfo, cli, err := shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP))
if err != nil {
return nil, err
}
c.Client = cli
wsPath := viper.GetString("server.wsPath") wsPath := viper.GetString("server.wsPath")
if wsPath == "" { if wsPath == "" {
wsPath = "127.0.0.1:8080" wsPath = "127.0.0.1:8080"
@ -96,7 +108,7 @@ func NewConfig() (*Config, error) {
} }
c.HTTPEndpoint = httpPath c.HTTPEndpoint = httpPath
overrideDBConnConfig(&c.DBConfig) overrideDBConnConfig(&c.DBConfig)
serveDB := utils.LoadPostgres(c.DBConfig, node.Info{}) serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo)
prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB)
c.DB = &serveDB c.DB = &serveDB
@ -112,7 +124,6 @@ func NewConfig() (*Config, error) {
} }
} }
chainID := viper.GetUint64("ethereum.chainID") chainID := viper.GetUint64("ethereum.chainID")
var err error
c.ChainConfig, err = eth.ChainConfig(chainID) c.ChainConfig, err = eth.ChainConfig(chainID)
return c, err return c, err
} }

View File

@ -76,6 +76,8 @@ type Service struct {
serveWg *sync.WaitGroup serveWg *sync.WaitGroup
// config for backend // config for backend
config *eth.Config config *eth.Config
// rpc client for forwarding cache misses
client *rpc.Client
} }
// NewServer creates a new Server using an underlying Service struct // NewServer creates a new Server using an underlying Service struct
@ -139,7 +141,7 @@ func (sap *Service) APIs() []rpc.API {
return append(apis, rpc.API{ return append(apis, rpc.API{
Namespace: eth.APIName, Namespace: eth.APIName,
Version: eth.APIVersion, Version: eth.APIVersion,
Service: eth.NewPublicEthAPI(backend), Service: eth.NewPublicEthAPI(backend, sap.client),
Public: true, Public: true,
}) })
} }