Use groupcache pool for state db access #91

Merged
ashwinphatak merged 12 commits from ashwinp-group-cache into master 2021-09-21 12:10:56 +00:00
14 changed files with 1209 additions and 102 deletions

View File

@ -1,4 +1,4 @@
FROM golang:1.13-alpine as builder
FROM golang:1.14-alpine as builder
RUN apk --update --no-cache add make git g++ linux-headers
# DEBUG

37
cmd/common.go Normal file
View File

@ -0,0 +1,37 @@
// Copyright © 2021 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
func addDatabaseFlags(command *cobra.Command) {
// database flags
command.PersistentFlags().String("database-name", "vulcanize_public", "database name")
command.PersistentFlags().Int("database-port", 5432, "database port")
command.PersistentFlags().String("database-hostname", "localhost", "database hostname")
command.PersistentFlags().String("database-user", "", "database user")
command.PersistentFlags().String("database-password", "", "database password")
// database flag bindings
viper.BindPFlag("database.name", command.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", command.PersistentFlags().Lookup("database-port"))
viper.BindPFlag("database.hostname", command.PersistentFlags().Lookup("database-hostname"))
viper.BindPFlag("database.user", command.PersistentFlags().Lookup("database-user"))
viper.BindPFlag("database.password", command.PersistentFlags().Lookup("database-password"))
}

View File

@ -23,14 +23,16 @@ import (
"os/signal"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/gap-filler/pkg/mux"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
@ -86,6 +88,18 @@ func serve() {
logWithCommand.Fatal(err)
}
err = startGroupCacheService(serverConfig)
if err != nil {
logWithCommand.Fatal(err)
}
if serverConfig.StateValidationEnabled {
go startStateTrieValidator(serverConfig, server)
logWithCommand.Info("state validator enabled")
} else {
logWithCommand.Info("state validator disabled")
}
shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
@ -200,6 +214,83 @@ func startIpldGraphQL(settings *s.Config) error {
return nil
}
func startGroupCacheService(settings *s.Config) error {
gcc := settings.GroupCache
if gcc.Pool.Enabled {
logWithCommand.Info("starting up groupcache pool HTTTP server")
pool := groupcache.NewHTTPPoolOpts(gcc.Pool.HttpEndpoint, &groupcache.HTTPPoolOptions{})
pool.Set(gcc.Pool.PeerHttpEndpoints...)
httpURL, err := url.Parse(gcc.Pool.HttpEndpoint)
if err != nil {
return err
}
server := http.Server{
Addr: httpURL.Host,
Handler: pool,
}
// Start a HTTP server to listen for peer requests from the groupcache
go server.ListenAndServe()
logWithCommand.Infof("groupcache pool endpoint opened for url %s", httpURL)
} else {
logWithCommand.Info("Groupcache pool is disabled")
}
return nil
}
func startStateTrieValidator(config *s.Config, server s.Server) {
validateEveryNthBlock := config.StateValidationEveryNthBlock
var lastBlockNumber uint64
backend := server.Backend()
for {
time.Sleep(5 * time.Second)
block, err := backend.CurrentBlock()
if err != nil {
log.Errorln("Error fetching current block for state trie validator")
continue
}
stateRoot := block.Root()
blockNumber := block.NumberU64()
blockHash := block.Hash()
if validateEveryNthBlock <= 0 || // Used for static replicas where block number doesn't progress.
(blockNumber > lastBlockNumber) && (blockNumber%validateEveryNthBlock == 0) {
// The validate trie call will take a long time on mainnet, e.g. a few hours.
if err = backend.ValidateTrie(stateRoot); err != nil {
log.Fatalf("Error validating trie for block number %d hash %s state root %s",
blockNumber,
blockHash,
stateRoot,
)
}
log.Infof("Successfully validated trie for block number %d hash %s state root %s",
blockNumber,
blockHash,
stateRoot,
)
if validateEveryNthBlock <= 0 {
// Static replica, sleep a long-ish time (1/2 of cache expiry time) since we only need to keep the cache warm.
time.Sleep((time.Minute * time.Duration(config.GroupCache.StateDB.CacheExpiryInMins)) / 2)
}
lastBlockNumber = blockNumber
}
}
}
func parseRpcAddresses(value string) ([]*rpc.Client, error) {
rpcAddresses := strings.Split(value, ",")
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))
@ -224,12 +315,7 @@ func parseRpcAddresses(value string) ([]*rpc.Client, error) {
func init() {
rootCmd.AddCommand(serveCmd)
// database credentials
serveCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name")
serveCmd.PersistentFlags().Int("database-port", 5432, "database port")
serveCmd.PersistentFlags().String("database-hostname", "localhost", "database hostname")
serveCmd.PersistentFlags().String("database-user", "", "database user")
serveCmd.PersistentFlags().String("database-password", "", "database password")
addDatabaseFlags(serveCmd)
// flags for all config variables
// eth graphql and json-rpc parameters
@ -260,14 +346,19 @@ func init() {
serveCmd.PersistentFlags().String("eth-chain-config", "", "json chain config file location")
serveCmd.PersistentFlags().Bool("eth-supports-state-diff", false, "whether or not the proxy ethereum client supports statediffing endpoints")
// and their bindings
// database
viper.BindPFlag("database.name", serveCmd.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", serveCmd.PersistentFlags().Lookup("database-port"))
viper.BindPFlag("database.hostname", serveCmd.PersistentFlags().Lookup("database-hostname"))
viper.BindPFlag("database.user", serveCmd.PersistentFlags().Lookup("database-user"))
viper.BindPFlag("database.password", serveCmd.PersistentFlags().Lookup("database-password"))
// groupcache flags
serveCmd.PersistentFlags().Bool("gcache-pool-enabled", false, "turn on the groupcache pool")
serveCmd.PersistentFlags().String("gcache-pool-http-path", "", "http url for groupcache node")
serveCmd.PersistentFlags().StringArray("gcache-pool-http-peers", []string{}, "http urls for groupcache peers")
serveCmd.PersistentFlags().Int("gcache-statedb-cache-size", 16, "state DB cache size in MB")
serveCmd.PersistentFlags().Int("gcache-statedb-cache-expiry", 60, "state DB cache expiry time in mins")
serveCmd.PersistentFlags().Int("gcache-statedb-log-stats-interval", 60, "state DB cache stats log interval in secs")
// state validator flags
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
// and their bindings
// eth graphql server
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
viper.BindPFlag("eth.server.graphqlPath", serveCmd.PersistentFlags().Lookup("eth-server-graphql-path"))
@ -301,4 +392,16 @@ func init() {
viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
viper.BindPFlag("ethereum.chainConfig", serveCmd.PersistentFlags().Lookup("eth-chain-config"))
viper.BindPFlag("ethereum.supportsStateDiff", serveCmd.PersistentFlags().Lookup("eth-supports-state-diff"))
// groupcache flags
viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled"))
viper.BindPFlag("groupcache.pool.httpEndpoint", serveCmd.PersistentFlags().Lookup("gcache-pool-http-path"))
viper.BindPFlag("groupcache.pool.peerHttpEndpoints", serveCmd.PersistentFlags().Lookup("gcache-pool-http-peers"))
viper.BindPFlag("groupcache.statedb.cacheSizeInMB", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-size"))
viper.BindPFlag("groupcache.statedb.cacheExpiryInMins", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-expiry"))
viper.BindPFlag("groupcache.statedb.logStatsIntervalInSecs", serveCmd.PersistentFlags().Lookup("gcache-statedb-log-stats-interval"))
// state validator flags
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
}

87
cmd/validate.go Normal file
View File

@ -0,0 +1,87 @@
// Copyright © 2021 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"time"
"github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
validator "github.com/vulcanize/eth-ipfs-state-validator/pkg"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
)
const GroupName = "statedb-validate"
const CacheExpiryInMins = 8 * 60 // 8 hours
const CacheSizeInMB = 16 // 16 MB
var validateCmd = &cobra.Command{
Use: "validate",
Short: "valdiate state",
Long: `This command validates the trie for the given state root`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
validate()
},
}
func validate() {
config, err := s.NewConfig()
if err != nil {
logWithCommand.Fatal(err)
}
stateRootStr := viper.GetString("stateRoot")
if stateRootStr == "" {
logWithCommand.Fatal("must provide a state root for state validation")
}
stateRoot := common.HexToHash(stateRootStr)
cacheSize := viper.GetInt("cacheSize")
ethDB := ipfsethdb.NewDatabase(config.DB.DB, ipfsethdb.CacheConfig{
Name: GroupName,
Size: cacheSize * 1024 * 1024,
ExpiryDuration: time.Minute * time.Duration(CacheExpiryInMins),
})
validator := validator.NewValidator(nil, ethDB)
if err = validator.ValidateTrie(stateRoot); err != nil {
log.Fatalln("Error validating state root")
}
stats := ethDB.GetCacheStats()
log.Debugf("groupcache stats %+v", stats)
log.Infoln("Successfully validated state root")
}
func init() {
rootCmd.AddCommand(validateCmd)
addDatabaseFlags(validateCmd)
validateCmd.PersistentFlags().String("state-root", "", "root of the state trie we wish to validate")
viper.BindPFlag("stateRoot", validateCmd.PersistentFlags().Lookup("state-root"))
validateCmd.PersistentFlags().Int("cache-size", CacheSizeInMB, "cache size in MB")
viper.BindPFlag("cacheSize", validateCmd.PersistentFlags().Lookup("cache-size"))
}

8
go.mod
View File

@ -13,20 +13,20 @@ require (
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.10.2
github.com/machinebox/graphql v0.2.2
github.com/mailgun/groupcache/v2 v2.2.1
github.com/matryer/is v1.4.0 // indirect
github.com/multiformats/go-multihash v0.0.14
github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.10.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_golang v1.7.1
github.com/shirou/gopsutil v3.21.5+incompatible // indirect
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/vulcanize/eth-ipfs-state-validator v0.0.1
github.com/vulcanize/gap-filler v0.3.1
github.com/vulcanize/ipfs-ethdb v0.0.4-0.20210824131459-7bb49801fc12
github.com/vulcanize/ipfs-ethdb v0.0.4
)
replace github.com/ethereum/go-ethereum v1.10.8 => github.com/vulcanize/go-ethereum v1.10.8-statediff-0.0.26
replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha

792
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,6 @@ import (
"context"
"math/big"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -37,9 +36,10 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
)
var (
@ -213,12 +213,15 @@ var _ = Describe("API", func() {
indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err := eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig,
VmConfig: vm.Config{},
VMConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
CacheConfig: pgipfsethdb.CacheConfig{
Name: "api_test",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
GroupCacheConfig: &ethServerShared.GroupCacheConfig{
StateDB: ethServerShared.GroupConfig{
Name: "api_test",
CacheSizeInMB: 8,
CacheExpiryInMins: 60,
LogStatsIntervalInSecs: 0,
},
},
})
Expect(err).ToNot(HaveOccurred())

View File

@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -41,9 +42,13 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
ethServerShared "github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
log "github.com/sirupsen/logrus"
validator "github.com/vulcanize/eth-ipfs-state-validator/pkg"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
)
var (
@ -83,6 +88,10 @@ const (
RetrieveCodeByMhKey = `SELECT data FROM public.blocks WHERE key = $1`
)
const (
StateDBGroupCacheName = "statedb"
)
type Backend struct {
// underlying postgres db
DB *postgres.DB
@ -100,17 +109,30 @@ type Backend struct {
}
type Config struct {
ChainConfig *params.ChainConfig
VmConfig vm.Config
DefaultSender *common.Address
RPCGasCap *big.Int
CacheConfig pgipfsethdb.CacheConfig
ChainConfig *params.ChainConfig
VMConfig vm.Config
DefaultSender *common.Address
RPCGasCap *big.Int
GroupCacheConfig *shared.GroupCacheConfig
}
func NewEthBackend(db *postgres.DB, c *Config) (*Backend, error) {
r := NewCIDRetriever(db)
gcc := c.GroupCacheConfig
groupName := gcc.StateDB.Name
if groupName == "" {
groupName = StateDBGroupCacheName
}
r := NewCIDRetriever(db)
ethDB := ipfsethdb.NewDatabase(db.DB, ipfsethdb.CacheConfig{
Name: groupName,
Size: gcc.StateDB.CacheSizeInMB * 1024 * 1024,
ExpiryDuration: time.Minute * time.Duration(gcc.StateDB.CacheExpiryInMins),
})
logStateDBStatsOnTimer(ethDB, gcc)
ethDB := pgipfsethdb.NewDatabase(db.DB, c.CacheConfig)
return &Backend{
DB: db,
Retriever: r,
@ -292,10 +314,10 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -383,10 +405,10 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -592,7 +614,7 @@ func (b *Backend) GetEVM(ctx context.Context, msg core.Message, state *state.Sta
state.SetBalance(msg.From(), math.MaxBig256)
vmctx := core.NewEVMBlockContext(header, b, nil)
txContext := core.NewEVMTxContext(msg)
return vm.NewEVM(vmctx, txContext, state, b.Config.ChainConfig, b.Config.VmConfig), nil
return vm.NewEVM(vmctx, txContext, state, b.Config.ChainConfig, b.Config.VMConfig), nil
}
// GetAccountByNumberOrHash returns the account object for the provided address at the block corresponding to the provided number or hash
@ -704,10 +726,10 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -717,7 +739,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
return nil, err
}
var mhKey string
mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
if err != nil {
return nil, err
}
@ -794,6 +816,10 @@ func (b *Backend) GetHeader(hash common.Hash, height uint64) *types.Header {
return header
}
func (b *Backend) ValidateTrie(stateRoot common.Hash) error {
return validator.NewValidator(nil, b.EthDB).ValidateTrie(stateRoot)
}
// RPCGasCap returns the configured gas cap for the rpc server
func (b *Backend) RPCGasCap() *big.Int {
return b.Config.RPCGasCap
@ -826,3 +852,18 @@ func (b *Backend) BloomStatus() (uint64, uint64) {
func (b *Backend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
panic("implement me")
}
func logStateDBStatsOnTimer(ethDB *ipfsethdb.Database, gcc *shared.GroupCacheConfig) {
// No stats logging if interval isn't a positive integer.
if gcc.StateDB.LogStatsIntervalInSecs <= 0 {
return
}
ticker := time.NewTicker(time.Duration(gcc.StateDB.LogStatsIntervalInSecs) * time.Second)
go func() {
for range ticker.C {
log.Infof("%s groupcache stats: %+v", StateDBGroupCacheName, ethDB.GetCacheStats())
}
}()
}

View File

@ -21,7 +21,6 @@ import (
"context"
"io/ioutil"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
@ -37,10 +36,10 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
)
var (
@ -80,12 +79,15 @@ var _ = Describe("eth state reading tests", func() {
transformer := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err = eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig,
VmConfig: vm.Config{},
VMConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
CacheConfig: pgipfsethdb.CacheConfig{
Name: "eth_state",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
GroupCacheConfig: &ethServerShared.GroupCacheConfig{
StateDB: ethServerShared.GroupConfig{
Name: "eth_state_test",
CacheSizeInMB: 8,
CacheExpiryInMins: 60,
LogStatsIntervalInSecs: 0,
},
},
})
Expect(err).ToNot(HaveOccurred())

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -37,11 +36,11 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
)
// SetupDB is use to setup a db for watcher tests
@ -85,12 +84,15 @@ var _ = Describe("GraphQL", func() {
transformer := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err = eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig,
VmConfig: vm.Config{},
VMConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000),
CacheConfig: pgipfsethdb.CacheConfig{
Name: "graphql_test",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
GroupCacheConfig: &ethServerShared.GroupCacheConfig{
StateDB: ethServerShared.GroupConfig{
Name: "graphql_test",
CacheSizeInMB: 8,
CacheExpiryInMins: 60,
LogStatsIntervalInSecs: 0,
},
},
})
Expect(err).ToNot(HaveOccurred())

View File

@ -28,9 +28,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
)
// Env variables
@ -47,6 +48,9 @@ const (
ethRPCGasCap = "ETH_RPC_GAS_CAP"
ethChainConfig = "ETH_CHAIN_CONFIG"
ethSupportsStatediff = "ETH_SUPPORTS_STATEDIFF"
ValidatorEnabled = "VALIDATOR_ENABLED"
ValidatorEveryNthBlock = "VALIDATOR_EVERY_NTH_BLOCK"
)
// Config struct
@ -79,6 +83,12 @@ type Config struct {
EthHttpEndpoint string
Client *rpc.Client
SupportStateDiff bool
// Cache configuration.
GroupCache *ethServerShared.GroupCacheConfig
StateValidationEnabled bool
StateValidationEveryNthBlock uint64
}
// NewConfig is used to initialize a watcher config from a .toml file
@ -205,6 +215,11 @@ func NewConfig() (*Config, error) {
} else {
c.ChainConfig, err = eth.ChainConfig(nodeInfo.ChainID)
}
c.loadGroupCacheConfig()
c.loadValidatorConfig()
return c, err
}
@ -217,7 +232,7 @@ func overrideDBConnConfig(con *postgres.ConnectionConfig) {
con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
}
func (d *Config) dbInit() {
func (c *Config) dbInit() {
viper.BindEnv("database.name", databaseName)
viper.BindEnv("database.hostname", databaseHostname)
viper.BindEnv("database.port", databasePort)
@ -227,12 +242,43 @@ func (d *Config) dbInit() {
viper.BindEnv("database.maxOpen", databaseMaxOpenConnections)
viper.BindEnv("database.maxLifetime", databaseMaxOpenConnLifetime)
d.DBParams.Name = viper.GetString("database.name")
d.DBParams.Hostname = viper.GetString("database.hostname")
d.DBParams.Port = viper.GetInt("database.port")
d.DBParams.User = viper.GetString("database.user")
d.DBParams.Password = viper.GetString("database.password")
d.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
d.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
d.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
c.DBParams.Name = viper.GetString("database.name")
c.DBParams.Hostname = viper.GetString("database.hostname")
c.DBParams.Port = viper.GetInt("database.port")
c.DBParams.User = viper.GetString("database.user")
c.DBParams.Password = viper.GetString("database.password")
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}
func (c *Config) loadGroupCacheConfig() {
viper.BindEnv("groupcache.pool.enabled", ethServerShared.GcachePoolEnabled)
viper.BindEnv("groupcache.pool.httpEndpoint", ethServerShared.GcachePoolHttpPath)
viper.BindEnv("groupcache.pool.peerHttpEndpoints", ethServerShared.GcachePoolHttpPeers)
viper.BindEnv("groupcache.statedb.cacheSizeInMB", ethServerShared.GcacheStatedbCacheSize)
viper.BindEnv("groupcache.statedb.cacheExpiryInMins", ethServerShared.GcacheStatedbCacheExpiry)
viper.BindEnv("groupcache.statedb.logStatsIntervalInSecs", ethServerShared.GcacheStatedbLogStatsInterval)
gcc := ethServerShared.GroupCacheConfig{}
gcc.Pool.Enabled = viper.GetBool("groupcache.pool.enabled")
if gcc.Pool.Enabled {
gcc.Pool.HttpEndpoint = viper.GetString("groupcache.pool.httpEndpoint")
gcc.Pool.PeerHttpEndpoints = viper.GetStringSlice("groupcache.pool.peerHttpEndpoints")
}
// Irrespective of whether the pool is enabled, we always use the hot/local cache.
gcc.StateDB.CacheSizeInMB = viper.GetInt("groupcache.statedb.cacheSizeInMB")
gcc.StateDB.CacheExpiryInMins = viper.GetInt("groupcache.statedb.cacheExpiryInMins")
gcc.StateDB.LogStatsIntervalInSecs = viper.GetInt("groupcache.statedb.logStatsIntervalInSecs")
c.GroupCache = &gcc
}
func (c *Config) loadValidatorConfig() {
viper.BindEnv("validator.enabled", ValidatorEnabled)
viper.BindEnv("validator.everyNthBlock", ValidatorEveryNthBlock)
c.StateValidationEnabled = viper.GetBool("validator.enabled")
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm"
@ -31,7 +30,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
log "github.com/sirupsen/logrus"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/net"
)
@ -100,15 +99,11 @@ func NewServer(settings *Config) (Server, error) {
sap.supportsStateDiffing = settings.SupportStateDiff
var err error
sap.backend, err = eth.NewEthBackend(sap.db, &eth.Config{
ChainConfig: settings.ChainConfig,
VmConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
CacheConfig: pgipfsethdb.CacheConfig{
Name: "ipld-eth-server",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
ChainConfig: settings.ChainConfig,
VMConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
GroupCacheConfig: settings.GroupCache,
})
return sap, err
}

View File

@ -19,4 +19,11 @@ package shared
const (
DefaultMaxBatchSize uint64 = 100
DefaultMaxBatchNumber int64 = 50
GcachePoolEnabled = "GCACHE_POOL_ENABLED"
GcachePoolHttpPath = "GCACHE_POOL_HTTP_PATH"
GcachePoolHttpPeers = "GCACHE_POOL_HTTP_PEERS"
GcacheStatedbCacheSize = "GCACHE_STATEDB_CACHE_SIZE"
GcacheStatedbCacheExpiry = "GCACHE_STATEDB_CACHE_EXPIRY"
GcacheStatedbLogStatsInterval = "GCACHE_STATEDB_LOG_STATS_INTERVAL"
)

38
pkg/shared/types.go Normal file
View File

@ -0,0 +1,38 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
type PoolConfig struct {
Enabled bool
HttpEndpoint string
PeerHttpEndpoints []string
}
type GroupConfig struct {
CacheSizeInMB int
CacheExpiryInMins int
LogStatsIntervalInSecs int
// Used in tests to override the cache name, to work around
// the "duplicate registration of group" error from groupcache
Name string
}
type GroupCacheConfig struct {
Pool PoolConfig
StateDB GroupConfig
}