From e1cab4fadc060f4f63c3f571042fa9e935860a0c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 26 Oct 2020 08:58:37 -0500 Subject: [PATCH] forward cache misses (err and/or empty results) to remote node --- pkg/eth/api.go | 105 ++++++++++++++++++++++++++++++++++++++----- pkg/serve/config.go | 17 +++++-- pkg/serve/service.go | 4 +- 3 files changed, 112 insertions(+), 14 deletions(-) diff --git a/pkg/eth/api.go b/pkg/eth/api.go index cc09bde2..be8f689b 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" @@ -44,13 +45,20 @@ const APIName = "eth" const APIVersion = "0.0.1" type PublicEthAPI struct { + // Local db backend B *Backend + + // Remote node for forwarding cache misses + rpc *rpc.Client + ethClient *ethclient.Client } // NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend -func NewPublicEthAPI(b *Backend) *PublicEthAPI { +func NewPublicEthAPI(b *Backend, client *rpc.Client) *PublicEthAPI { return &PublicEthAPI{ - B: b, + B: b, + rpc: client, + ethClient: ethclient.NewClient(client), } } @@ -64,6 +72,19 @@ func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { + logs, err := pea.getLogs(ctx, crit) + if err != nil && pea.rpc != nil { + if arg, err := toFilterArg(crit); err == nil { + var result []*types.Log + if err := pea.rpc.CallContext(ctx, &result, "eth_getLogs", arg); err == nil { + return result, nil + } + } + } + return logs, err +} + +func (pea *PublicEthAPI) getLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { // Convert FilterQuery into ReceiptFilter addrStrs := make([]string, len(crit.Addresses)) for i, addr := range crit.Addresses { @@ -162,6 +183,11 @@ func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.Block if header != nil && err == nil { return pea.rpcMarshalHeader(header) } + if pea.ethClient != nil { + if header, err := pea.ethClient.HeaderByNumber(ctx, big.NewInt(number.Int64())); header != nil && err == nil { + return pea.rpcMarshalHeader(header) + } + } return nil, err } @@ -175,6 +201,11 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN if block != nil && err == nil { return pea.rpcMarshalBlock(block, true, fullTx) } + if pea.ethClient != nil { + if block, err := pea.ethClient.BlockByNumber(ctx, big.NewInt(number.Int64())); block != nil && err == nil { + return pea.rpcMarshalBlock(block, true, fullTx) + } + } return nil, err } @@ -182,25 +213,38 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN // detail, otherwise only the transaction hash is returned. func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { block, err := pea.B.BlockByHash(ctx, hash) - if block != nil { + if block != nil && err == nil { return pea.rpcMarshalBlock(block, true, fullTx) } + if pea.ethClient != nil { + if block, err := pea.ethClient.BlockByHash(ctx, hash); block != nil && err == nil { + return pea.rpcMarshalBlock(block, true, fullTx) + } + } return nil, err } // GetTransactionByHash returns the transaction for the given hash // eth ipld-eth-server cannot currently handle pending/tx_pool txs func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { - // Try to return an already finalized transaction tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash) - if err != nil { - return nil, err - } - if tx != nil { + if tx != nil && err == nil { return NewRPCTransaction(tx, blockHash, blockNumber, index), nil } - // Transaction unknown, return as such - return nil, nil + if pea.rpc != nil { + if tx, err := pea.remoteGetTransactionByHash(ctx, hash); tx != nil && err == nil { + return tx, nil + } + } + return nil, err +} + +func (pea *PublicEthAPI) remoteGetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { + var tx *RPCTransaction + if err := pea.rpc.CallContext(ctx, &tx, "eth_getTransactionByHash", hash); err != nil { + return nil, err + } + return tx, nil } // Call executes the given transaction on the state for the given block number. @@ -215,12 +259,25 @@ func (pea *PublicEthAPI) Call(ctx context.Context, args CallArgs, blockNrOrHash accounts = *overrides } result, _, failed, err := DoCall(ctx, pea.B, args, blockNrOrHash, accounts, 5*time.Second, pea.B.Config.RPCGasCap) + if (failed || err != nil) && pea.rpc != nil { + if res, err := pea.remoteCall(ctx, args, blockNrOrHash, overrides); res != nil && err == nil { + return res, nil + } + } if failed && err == nil { return nil, errors.New("eth_call failed without error") } return (hexutil.Bytes)(result), err } +func (pea *PublicEthAPI) remoteCall(ctx context.Context, msg CallArgs, blockNrOrHash rpc.BlockNumberOrHash, overrides *map[common.Address]account) (hexutil.Bytes, error) { + var hex hexutil.Bytes + if err := pea.rpc.CallContext(ctx, &hex, "eth_call", msg, blockNrOrHash, overrides); err != nil { + return nil, err + } + return hex, nil +} + // CallArgs represents the arguments for a call. type CallArgs struct { From *common.Address `json:"from"` @@ -351,3 +408,31 @@ func DoCall(ctx context.Context, b *Backend, args CallArgs, blockNrOrHash rpc.Bl } return res, gas, failed, err } + +func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { + arg := map[string]interface{}{ + "address": q.Addresses, + "topics": q.Topics, + } + if q.BlockHash != nil { + arg["blockHash"] = *q.BlockHash + if q.FromBlock != nil || q.ToBlock != nil { + return nil, fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock") + } + } else { + if q.FromBlock == nil { + arg["fromBlock"] = "0x0" + } else { + arg["fromBlock"] = toBlockNumArg(q.FromBlock) + } + arg["toBlock"] = toBlockNumArg(q.ToBlock) + } + return arg, nil +} + +func toBlockNumArg(number *big.Int) string { + if number == nil { + return "latest" + } + return hexutil.EncodeBig(number) +} diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 95f7d97e..6a9d327a 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -17,15 +17,18 @@ package serve import ( + "fmt" "math/big" "os" "path/filepath" + "github.com/ethereum/go-ethereum/rpc" + "github.com/vulcanize/ipld-eth-indexer/pkg/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" "github.com/spf13/viper" - "github.com/vulcanize/ipld-eth-indexer/pkg/node" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-server/pkg/prom" @@ -60,6 +63,7 @@ type Config struct { ChainConfig *params.ChainConfig DefaultSender *common.Address RPCGasCap *big.Int + Client *rpc.Client } // NewConfig is used to initialize a watcher config from a .toml file @@ -67,6 +71,7 @@ type Config struct { func NewConfig() (*Config, error) { c := new(Config) + viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("server.wsPath", SERVER_WS_PATH) viper.BindEnv("server.ipcPath", SERVER_IPC_PATH) viper.BindEnv("server.httpPath", SERVER_HTTP_PATH) @@ -76,6 +81,13 @@ func NewConfig() (*Config, error) { c.DBConfig.Init() + ethHTTP := viper.GetString("ethereum.httpPath") + nodeInfo, cli, err := shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP)) + if err != nil { + return nil, err + } + c.Client = cli + wsPath := viper.GetString("server.wsPath") if wsPath == "" { wsPath = "127.0.0.1:8080" @@ -96,7 +108,7 @@ func NewConfig() (*Config, error) { } c.HTTPEndpoint = httpPath overrideDBConnConfig(&c.DBConfig) - serveDB := utils.LoadPostgres(c.DBConfig, node.Info{}) + serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo) prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) c.DB = &serveDB @@ -112,7 +124,6 @@ func NewConfig() (*Config, error) { } } chainID := viper.GetUint64("ethereum.chainID") - var err error c.ChainConfig, err = eth.ChainConfig(chainID) return c, err } diff --git a/pkg/serve/service.go b/pkg/serve/service.go index 5f72819f..6c005631 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -76,6 +76,8 @@ type Service struct { serveWg *sync.WaitGroup // config for backend config *eth.Config + // rpc client for forwarding cache misses + client *rpc.Client } // NewServer creates a new Server using an underlying Service struct @@ -139,7 +141,7 @@ func (sap *Service) APIs() []rpc.API { return append(apis, rpc.API{ Namespace: eth.APIName, Version: eth.APIVersion, - Service: eth.NewPublicEthAPI(backend), + Service: eth.NewPublicEthAPI(backend, sap.client), Public: true, }) }