use configurable timeout for geth batch http requests; additional error log info in payload fetchers

This commit is contained in:
Ian Norden 2020-04-19 18:26:23 -05:00
parent b3be8aaa05
commit eceaa0aecb
16 changed files with 128 additions and 32 deletions

View File

@ -71,6 +71,7 @@ func init() {
resyncCmd.PersistentFlags().Int("resync-batch-number", 0, "how many goroutines to fetch data concurrently")
resyncCmd.PersistentFlags().Bool("resync-clear-old-cache", false, "if true, clear out old data of the provided type within the resync range before resyncing")
resyncCmd.PersistentFlags().Bool("resync-reset-validation", false, "if true, reset times_validated to 0")
resyncCmd.PersistentFlags().Int("resync-timeout", 15, "timeout used for resync http requests")
resyncCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
resyncCmd.PersistentFlags().String("btc-password", "", "password for btc node")
@ -81,6 +82,10 @@ func init() {
resyncCmd.PersistentFlags().String("btc-network-id", "", "btc network id")
resyncCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
resyncCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
resyncCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
resyncCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
resyncCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
// and their bindings
viper.BindPFlag("ipfs.path", resyncCmd.PersistentFlags().Lookup("ipfs-path"))
@ -93,6 +98,7 @@ func init() {
viper.BindPFlag("resync.batchNumber", resyncCmd.PersistentFlags().Lookup("resync-batch-number"))
viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old-cache"))
viper.BindPFlag("resync.resetValidation", resyncCmd.PersistentFlags().Lookup("resync-reset-validation"))
viper.BindPFlag("resync.timeout", resyncCmd.PersistentFlags().Lookup("resync-timeout"))
viper.BindPFlag("bitcoin.httpPath", resyncCmd.PersistentFlags().Lookup("btc-http-path"))
viper.BindPFlag("bitcoin.pass", resyncCmd.PersistentFlags().Lookup("btc-password"))
@ -103,4 +109,8 @@ func init() {
viper.BindPFlag("bitcoin.networkID", resyncCmd.PersistentFlags().Lookup("btc-network-id"))
viper.BindPFlag("ethereum.httpPath", resyncCmd.PersistentFlags().Lookup("eth-http-path"))
viper.BindPFlag("ethereum.nodeID", resyncCmd.PersistentFlags().Lookup("eth-node-id"))
viper.BindPFlag("ethereum.clientName", resyncCmd.PersistentFlags().Lookup("eth-client-name"))
viper.BindPFlag("ethereum.genesisBlock", resyncCmd.PersistentFlags().Lookup("eth-genesis-block"))
viper.BindPFlag("ethereum.networkID", resyncCmd.PersistentFlags().Lookup("eth-network-id"))
}

View File

@ -121,6 +121,7 @@ func init() {
superNodeCmd.PersistentFlags().Int("supernode-batch-size", 0, "data fetching batch size")
superNodeCmd.PersistentFlags().Int("supernode-batch-number", 0, "how many goroutines to fetch data concurrently")
superNodeCmd.PersistentFlags().Int("supernode-validation-level", 0, "backfill will resync any data below this level")
superNodeCmd.PersistentFlags().Int("supernode-timeout", 0, "timeout used for backfill http requests")
superNodeCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node")
superNodeCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
@ -133,6 +134,10 @@ func init() {
superNodeCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
superNodeCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
superNodeCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
superNodeCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
superNodeCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
superNodeCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
// and their bindings
viper.BindPFlag("ipfs.path", superNodeCmd.PersistentFlags().Lookup("ipfs-path"))
@ -149,6 +154,7 @@ func init() {
viper.BindPFlag("superNode.batchSize", superNodeCmd.PersistentFlags().Lookup("supernode-batch-size"))
viper.BindPFlag("superNode.batchNumber", superNodeCmd.PersistentFlags().Lookup("supernode-batch-number"))
viper.BindPFlag("superNode.validationLevel", superNodeCmd.PersistentFlags().Lookup("supernode-validation-level"))
viper.BindPFlag("superNode.timeout", superNodeCmd.PersistentFlags().Lookup("supernode-timeout"))
viper.BindPFlag("bitcoin.wsPath", superNodeCmd.PersistentFlags().Lookup("btc-ws-path"))
viper.BindPFlag("bitcoin.httpPath", superNodeCmd.PersistentFlags().Lookup("btc-http-path"))
@ -161,4 +167,8 @@ func init() {
viper.BindPFlag("ethereum.wsPath", superNodeCmd.PersistentFlags().Lookup("eth-ws-path"))
viper.BindPFlag("ethereum.httpPath", superNodeCmd.PersistentFlags().Lookup("eth-http-path"))
viper.BindPFlag("ethereum.nodeID", superNodeCmd.PersistentFlags().Lookup("eth-node-id"))
viper.BindPFlag("ethereum.clientName", superNodeCmd.PersistentFlags().Lookup("eth-client-name"))
viper.BindPFlag("ethereum.genesisBlock", superNodeCmd.PersistentFlags().Lookup("eth-genesis-block"))
viper.BindPFlag("ethereum.networkID", superNodeCmd.PersistentFlags().Lookup("eth-network-id"))
}

View File

@ -73,6 +73,7 @@ This set of parameters needs to be set no matter the chain type.
frequency = 45 # $SUPERNODE_FREQUENCY
batchSize = 1 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL
```
@ -98,6 +99,10 @@ For Ethereum:
[ethereum]
wsPath = "127.0.0.1:8546" # $ETH_WS_PATH
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
nodeID = "arch1" # $ETH_NODE_ID
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID
```
## Database

View File

@ -41,6 +41,7 @@ This set of parameters needs to be set no matter the chain type.
stop = 1000 # $RESYNC_STOP
batchSize = 10 # $RESYNC_BATCH_SIZE
batchNumber = 100 # $RESYNC_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION
```
@ -65,4 +66,8 @@ For Ethereum:
```toml
[ethereum]
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
nodeID = "arch1" # $ETH_NODE_ID
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID
```

View File

@ -15,6 +15,7 @@
stop = 0 # $RESYNC_STOP
batchSize = 10 # $RESYNC_BATCH_SIZE
batchNumber = 100 # $RESYNC_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION
@ -30,8 +31,13 @@
frequency = 15 # $SUPERNODE_FREQUENCY
batchSize = 5 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL
[ethereum]
wsPath = "127.0.0.1:8546" # $ETH_WS_PATH
httpPath = "127.0.0.1:8545" # $ETH_HTTP_PATH
nodeID = "arch1" # $ETH_NODE_ID
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID

View File

@ -17,6 +17,7 @@
package mocks
import (
"context"
"encoding/json"
"errors"
@ -62,3 +63,24 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
}
return nil
}
// BatchCallContext mockClient method to simulate batch call to geth
func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []client.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}
for _, batchElem := range batch {
if len(batchElem.Args) != 1 {
return errors.New("expected batch elem to contain single argument")
}
blockHeight, ok := batchElem.Args[0].(uint64)
if !ok {
return errors.New("expected batch elem argument to be a uint64")
}
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil {
return err
}
}
return nil
}

View File

@ -85,7 +85,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
if err != nil {
return nil, err
}
fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient)
fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
if err != nil {
return nil, err
}

View File

@ -17,6 +17,8 @@
package btc
import (
"fmt"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
@ -48,11 +50,11 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain
for i, height := range blockHeights {
hash, err := fetcher.client.GetBlockHash(int64(height))
if err != nil {
return nil, err
return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlockHash err at blockheight %d: %s", height, err.Error())
}
block, err := fetcher.client.GetBlock(hash)
if err != nil {
return nil, err
return nil, fmt.Errorf("bitcoin PayloadFetcher GetBlock err at blockheight %d: %s", height, err.Error())
}
blockPayloads[i] = BlockPayload{
BlockHeight: int64(height),

View File

@ -72,6 +72,7 @@ type Config struct {
BatchSize uint64
BatchNumber uint64
ValidationLevel int
Timeout time.Duration // HTTP connection timeout in seconds
}
// NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file
@ -90,6 +91,13 @@ func NewSuperNodeConfig() (*Config, error) {
viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
chain := viper.GetString("superNode.chain")
c.Chain, err = shared.NewChainType(chain)

View File

@ -18,6 +18,7 @@ package super_node
import (
"fmt"
"time"
"github.com/btcsuite/btcd/chaincfg"
@ -92,7 +93,7 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha
}
// NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type
func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.PayloadFetcher, error) {
func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) {
switch chain {
case shared.Ethereum:
batchClient, ok := client.(eth.BatchClient)
@ -100,7 +101,7 @@ func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.Paylo
var expectedClient eth.BatchClient
return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client)
}
return eth.NewPayloadFetcher(batchClient), nil
return eth.NewPayloadFetcher(batchClient, timeout), nil
case shared.Bitcoin:
connConfig, ok := client.(*rpcclient.ConnConfig)
if !ok {

View File

@ -17,33 +17,36 @@
package eth
import (
"context"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"time"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
type BatchClient interface {
BatchCall(batch []client.BatchElem) error
BatchCallContext(ctx context.Context, batch []client.BatchElem) error
}
// PayloadFetcher satisfies the PayloadFetcher interface for ethereum
type PayloadFetcher struct {
// PayloadFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state
// http.Client is thread-safe
client BatchClient
client BatchClient
timeout time.Duration
}
const method = "statediff_stateDiffAt"
// NewStateDiffFetcher returns a PayloadFetcher
func NewPayloadFetcher(bc BatchClient) *PayloadFetcher {
func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
return &PayloadFetcher{
client: bc,
client: bc,
timeout: timeout,
}
}
@ -58,14 +61,16 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain
Result: new(statediff.Payload),
})
}
batchErr := fetcher.client.BatchCall(batch)
ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout)
defer cancel()
batchErr := fetcher.client.BatchCallContext(ctx, batch)
if batchErr != nil {
return nil, fmt.Errorf("PayloadFetcher err: %s", batchErr.Error())
return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error())
}
results := make([]shared.RawChainData, 0, len(blockHeights))
for _, batchElem := range batch {
if batchElem.Error != nil {
return nil, fmt.Errorf("PayloadFetcher err: %s", batchElem.Error.Error())
return nil, fmt.Errorf("ethereum PayloadFetcher err at blockheight %d: %s", batchElem.Args[0].(uint64), batchElem.Error.Error())
}
payload, ok := batchElem.Result.(*statediff.Payload)
if ok {

View File

@ -17,6 +17,8 @@
package eth_test
import (
"time"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -38,7 +40,7 @@ var _ = Describe("StateDiffFetcher", func() {
Expect(setDiffAtErr1).ToNot(HaveOccurred())
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
Expect(setDiffAtErr2).ToNot(HaveOccurred())
stateDiffFetcher = eth.NewPayloadFetcher(mc)
stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60)
})
It("Batch calls statediff_stateDiffAt", func() {
blockHeights := []uint64{

View File

@ -18,6 +18,7 @@ package resync
import (
"fmt"
"time"
"github.com/spf13/viper"
@ -52,10 +53,11 @@ type Config struct {
DBConfig config.Database
IPFSPath string
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
NodeInfo core.Node // Info for the associated node
Ranges [][2]uint64 // The block height ranges to resync
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
NodeInfo core.Node // Info for the associated node
Ranges [][2]uint64 // The block height ranges to resync
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
Timeout time.Duration // HTTP connection timeout in seconds
BatchNumber uint64
Quit chan bool // Channel for shutting down
@ -76,6 +78,13 @@ func NewReSyncConfig() (*Config, error) {
viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE)
viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER)
viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION)
viper.BindEnv("resync.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("resync.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
start := uint64(viper.GetInt64("resync.start"))
stop := uint64(viper.GetInt64("resync.stop"))

View File

@ -80,7 +80,7 @@ func NewResyncService(settings *Config) (Resync, error) {
if err != nil {
return nil, err
}
fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient)
fetcher, err := super_node.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
if err != nil {
return nil, err
}

View File

@ -24,17 +24,20 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/eth/node"
)
// Env variables
const (
IPFS_PATH = "IPFS_PATH"
IPFS_PATH = "IPFS_PATH"
HTTP_TIMEOUT = "HTTP_TIMEOUT"
ETH_WS_PATH = "ETH_WS_PATH"
ETH_HTTP_PATH = "ETH_HTTP_PATH"
ETH_WS_PATH = "ETH_WS_PATH"
ETH_HTTP_PATH = "ETH_HTTP_PATH"
ETH_NODE_ID = "ETH_NODE_ID"
ETH_CLIENT_NAME = "ETH_CLIENT_NAME"
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK"
ETH_NETWORK_ID = "ETH_NETWORK_ID"
BTC_WS_PATH = "BTC_WS_PATH"
BTC_HTTP_PATH = "BTC_HTTP_PATH"
@ -47,14 +50,22 @@ const (
)
// GetEthNodeAndClient returns eth node info and client from path url
func GetEthNodeAndClient(path string) (core.Node, core.RPCClient, error) {
rawRPCClient, err := rpc.Dial(path)
func GetEthNodeAndClient(path string) (core.Node, *rpc.Client, error) {
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
rpcClient, err := rpc.Dial(path)
if err != nil {
return core.Node{}, nil, err
}
rpcClient := client.NewRPCClient(rawRPCClient, path)
vdbNode := node.MakeNode(rpcClient)
return vdbNode, rpcClient, nil
return core.Node{
ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"),
}, rpcClient, nil
}
// GetIPFSPath returns the ipfs path from the config or env variable