From e1cab4fadc060f4f63c3f571042fa9e935860a0c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 26 Oct 2020 08:58:37 -0500 Subject: [PATCH 1/2] forward cache misses (err and/or empty results) to remote node --- pkg/eth/api.go | 105 ++++++++++++++++++++++++++++++++++++++----- pkg/serve/config.go | 17 +++++-- pkg/serve/service.go | 4 +- 3 files changed, 112 insertions(+), 14 deletions(-) diff --git a/pkg/eth/api.go b/pkg/eth/api.go index cc09bde2..be8f689b 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" @@ -44,13 +45,20 @@ const APIName = "eth" const APIVersion = "0.0.1" type PublicEthAPI struct { + // Local db backend B *Backend + + // Remote node for forwarding cache misses + rpc *rpc.Client + ethClient *ethclient.Client } // NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend -func NewPublicEthAPI(b *Backend) *PublicEthAPI { +func NewPublicEthAPI(b *Backend, client *rpc.Client) *PublicEthAPI { return &PublicEthAPI{ - B: b, + B: b, + rpc: client, + ethClient: ethclient.NewClient(client), } } @@ -64,6 +72,19 @@ func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { + logs, err := pea.getLogs(ctx, crit) + if err != nil && pea.rpc != nil { + if arg, err := toFilterArg(crit); err == nil { + var result []*types.Log + if err := pea.rpc.CallContext(ctx, &result, "eth_getLogs", arg); err == nil { + return result, nil + } + } + } + return logs, err +} + +func (pea *PublicEthAPI) getLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { // Convert FilterQuery into ReceiptFilter addrStrs := make([]string, len(crit.Addresses)) for i, addr := range crit.Addresses { @@ -162,6 +183,11 @@ func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.Block if header != nil && err == nil { return pea.rpcMarshalHeader(header) } + if pea.ethClient != nil { + if header, err := pea.ethClient.HeaderByNumber(ctx, big.NewInt(number.Int64())); header != nil && err == nil { + return pea.rpcMarshalHeader(header) + } + } return nil, err } @@ -175,6 +201,11 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN if block != nil && err == nil { return pea.rpcMarshalBlock(block, true, fullTx) } + if pea.ethClient != nil { + if block, err := pea.ethClient.BlockByNumber(ctx, big.NewInt(number.Int64())); block != nil && err == nil { + return pea.rpcMarshalBlock(block, true, fullTx) + } + } return nil, err } @@ -182,25 +213,38 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN // detail, otherwise only the transaction hash is returned. func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { block, err := pea.B.BlockByHash(ctx, hash) - if block != nil { + if block != nil && err == nil { return pea.rpcMarshalBlock(block, true, fullTx) } + if pea.ethClient != nil { + if block, err := pea.ethClient.BlockByHash(ctx, hash); block != nil && err == nil { + return pea.rpcMarshalBlock(block, true, fullTx) + } + } return nil, err } // GetTransactionByHash returns the transaction for the given hash // eth ipld-eth-server cannot currently handle pending/tx_pool txs func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { - // Try to return an already finalized transaction tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash) - if err != nil { - return nil, err - } - if tx != nil { + if tx != nil && err == nil { return NewRPCTransaction(tx, blockHash, blockNumber, index), nil } - // Transaction unknown, return as such - return nil, nil + if pea.rpc != nil { + if tx, err := pea.remoteGetTransactionByHash(ctx, hash); tx != nil && err == nil { + return tx, nil + } + } + return nil, err +} + +func (pea *PublicEthAPI) remoteGetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { + var tx *RPCTransaction + if err := pea.rpc.CallContext(ctx, &tx, "eth_getTransactionByHash", hash); err != nil { + return nil, err + } + return tx, nil } // Call executes the given transaction on the state for the given block number. @@ -215,12 +259,25 @@ func (pea *PublicEthAPI) Call(ctx context.Context, args CallArgs, blockNrOrHash accounts = *overrides } result, _, failed, err := DoCall(ctx, pea.B, args, blockNrOrHash, accounts, 5*time.Second, pea.B.Config.RPCGasCap) + if (failed || err != nil) && pea.rpc != nil { + if res, err := pea.remoteCall(ctx, args, blockNrOrHash, overrides); res != nil && err == nil { + return res, nil + } + } if failed && err == nil { return nil, errors.New("eth_call failed without error") } return (hexutil.Bytes)(result), err } +func (pea *PublicEthAPI) remoteCall(ctx context.Context, msg CallArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *map[common.Address]account) (hexutil.Bytes, error) { + var hex hexutil.Bytes + if err := pea.rpc.CallContext(ctx, &hex, "eth_call", msg, blockNrOrHash, overrides); err != nil { + return nil, err + } + return hex, nil +} + // CallArgs represents the arguments for a call. type CallArgs struct { From *common.Address `json:"from"` @@ -351,3 +408,31 @@ func DoCall(ctx context.Context, b *Backend, args CallArgs, blockNrOrHash rpc.Bl } return res, gas, failed, err } + +func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { + arg := map[string]interface{}{ + "address": q.Addresses, + "topics": q.Topics, + } + if q.BlockHash != nil { + arg["blockHash"] = *q.BlockHash + if q.FromBlock != nil || q.ToBlock != nil { + return nil, fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock") + } + } else { + if q.FromBlock == nil { + arg["fromBlock"] = "0x0" + } else { + arg["fromBlock"] = toBlockNumArg(q.FromBlock) + } + arg["toBlock"] = toBlockNumArg(q.ToBlock) + } + return arg, nil +} + +func toBlockNumArg(number *big.Int) string { + if number == nil { + return "latest" + } + return hexutil.EncodeBig(number) +} diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 95f7d97e..6a9d327a 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -17,15 +17,18 @@ package serve import ( + "fmt" "math/big" "os" "path/filepath" + "github.com/ethereum/go-ethereum/rpc" + "github.com/vulcanize/ipld-eth-indexer/pkg/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" "github.com/spf13/viper" - "github.com/vulcanize/ipld-eth-indexer/pkg/node" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-server/pkg/prom" @@ -60,6 +63,7 @@ type Config struct { ChainConfig *params.ChainConfig DefaultSender *common.Address RPCGasCap *big.Int + Client *rpc.Client } // NewConfig is used to initialize a watcher config from a .toml file @@ -67,6 +71,7 @@ type Config struct { func NewConfig() (*Config, error) { c := new(Config) + viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("server.wsPath", SERVER_WS_PATH) viper.BindEnv("server.ipcPath", SERVER_IPC_PATH) viper.BindEnv("server.httpPath", SERVER_HTTP_PATH) @@ -76,6 +81,13 @@ func NewConfig() (*Config, error) { c.DBConfig.Init() + ethHTTP := viper.GetString("ethereum.httpPath") + nodeInfo, cli, err := shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP)) + if err != nil { + return nil, err + } + c.Client = cli + wsPath := viper.GetString("server.wsPath") if wsPath == "" { wsPath = "127.0.0.1:8080" @@ -96,7 +108,7 @@ func NewConfig() (*Config, error) { } c.HTTPEndpoint = httpPath overrideDBConnConfig(&c.DBConfig) - serveDB := utils.LoadPostgres(c.DBConfig, node.Info{}) + serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo) prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) c.DB = &serveDB @@ -112,7 +124,6 @@ func NewConfig() (*Config, error) { } } chainID := viper.GetUint64("ethereum.chainID") - var err error c.ChainConfig, err = eth.ChainConfig(chainID) return c, err } diff --git a/pkg/serve/service.go b/pkg/serve/service.go index 5f72819f..6c005631 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -76,6 +76,8 @@ type Service struct { serveWg *sync.WaitGroup // config for backend config *eth.Config + // rpc client for forwarding cache misses + client *rpc.Client } // NewServer creates a new Server using an underlying Service struct @@ -139,7 +141,7 @@ func (sap *Service) APIs() []rpc.API { return append(apis, rpc.API{ Namespace: eth.APIName, Version: eth.APIVersion, - Service: eth.NewPublicEthAPI(backend), + Service: eth.NewPublicEthAPI(backend, sap.client), Public: true, }) } From 2d0367fe6cd13e0f445106316585a21880338052 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 27 Oct 2020 12:42:22 -0500 Subject: [PATCH 2/2] update env --- cmd/serve.go | 10 ++++++++++ environments/example.toml | 7 ++++++- pkg/serve/config.go | 11 +++-------- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 502f2918..8ccc63c5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -103,6 +103,11 @@ func init() { serveCmd.PersistentFlags().String("server-http-path", "", "vdb server http path") serveCmd.PersistentFlags().String("server-ipc-path", "", "vdb server ipc path") + serveCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node") + serveCmd.PersistentFlags().String("eth-node-id", "", "eth node id") + serveCmd.PersistentFlags().String("eth-client-name", "Geth", "eth client name") + serveCmd.PersistentFlags().String("eth-genesis-block", "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", "eth genesis block hash") + serveCmd.PersistentFlags().String("eth-network-id", "1", "eth network id") serveCmd.PersistentFlags().String("eth-chain-id", "1", "eth chain id") serveCmd.PersistentFlags().String("eth-default-sender", "", "default sender address") serveCmd.PersistentFlags().String("eth-rpc-gas-cap", "", "rpc gas cap (for eth_Call execution)") @@ -112,6 +117,11 @@ func init() { viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path")) viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path")) + viper.BindPFlag("ethereum.httpPath", serveCmd.PersistentFlags().Lookup("eth-http-path")) + viper.BindPFlag("ethereum.nodeID", serveCmd.PersistentFlags().Lookup("eth-node-id")) + viper.BindPFlag("ethereum.clientName", serveCmd.PersistentFlags().Lookup("eth-client-name")) + viper.BindPFlag("ethereum.genesisBlock", serveCmd.PersistentFlags().Lookup("eth-genesis-block")) + viper.BindPFlag("ethereum.networkID", serveCmd.PersistentFlags().Lookup("eth-network-id")) viper.BindPFlag("ethereum.chainID", serveCmd.PersistentFlags().Lookup("eth-chain-id")) viper.BindPFlag("ethereum.defaultSender", serveCmd.PersistentFlags().Lookup("eth-default-sender")) viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap")) diff --git a/environments/example.toml b/environments/example.toml index 1cffa983..187bf1b6 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -16,4 +16,9 @@ [ethereum] chainID = "1" # $ETH_CHAIN_ID defaultSender = "" # $ETH_DEFAULT_SENDER_ADDR - rpcGasCap = "1000000000000" # $ETH_RPC_GAS_CAP \ No newline at end of file + rpcGasCap = "1000000000000" # $ETH_RPC_GAS_CAP + httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH + nodeID = "arch1" # $ETH_NODE_ID + clientName = "Geth" # $ETH_CLIENT_NAME + genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK + networkID = "1" # $ETH_NETWORK_ID \ No newline at end of file diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 6a9d327a..a501f9e7 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -46,11 +46,8 @@ const ( SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS" SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME" - ETH_CHAIN_ID = "ETH_CHAIN_ID" - ETH_DEFAULT_SENDER_ADDR = "ETH_DEFAULT_SENDER_ADDR" - - ETH_RPC_GAS_CAP = "ETH_RPC_GAS_CAP" + ETH_RPC_GAS_CAP = "ETH_RPC_GAS_CAP" ) // Config struct @@ -71,11 +68,10 @@ type Config struct { func NewConfig() (*Config, error) { c := new(Config) - viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("server.wsPath", SERVER_WS_PATH) viper.BindEnv("server.ipcPath", SERVER_IPC_PATH) viper.BindEnv("server.httpPath", SERVER_HTTP_PATH) - viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID) + viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("ethereum.defaultSender", ETH_DEFAULT_SENDER_ADDR) viper.BindEnv("ethereum.rpcGasCap", ETH_RPC_GAS_CAP) @@ -123,8 +119,7 @@ func NewConfig() (*Config, error) { c.RPCGasCap = rpcGasCap } } - chainID := viper.GetUint64("ethereum.chainID") - c.ChainConfig, err = eth.ChainConfig(chainID) + c.ChainConfig, err = eth.ChainConfig(nodeInfo.ChainID) return c, err }