Use groupcache pool for state db access (#91)
* Use groupcache pool for state db access * Group cache config and logging stats on timer * Integrate state validator into server * Use tagged ipfs-ethdb * groupcache config for tests * Work around duplicate registration of groupcache error in tests * Use tagged version of eth-ipfs-state-validator * State validation command. * Validator for static replicas to keep cache warm * Update docker go-version and go.mod. * Address comments and self review. * Fix ipfs-ethdb version. Co-authored-by: Arijit Das <arijitad.in@gmail.com>
This commit is contained in:
parent
838ed033f8
commit
2de9c5bd48
@ -1,4 +1,4 @@
|
||||
FROM golang:1.13-alpine as builder
|
||||
FROM golang:1.14-alpine as builder
|
||||
|
||||
RUN apk --update --no-cache add make git g++ linux-headers
|
||||
# DEBUG
|
||||
|
37
cmd/common.go
Normal file
37
cmd/common.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright © 2021 Vulcanize, Inc
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
func addDatabaseFlags(command *cobra.Command) {
|
||||
// database flags
|
||||
command.PersistentFlags().String("database-name", "vulcanize_public", "database name")
|
||||
command.PersistentFlags().Int("database-port", 5432, "database port")
|
||||
command.PersistentFlags().String("database-hostname", "localhost", "database hostname")
|
||||
command.PersistentFlags().String("database-user", "", "database user")
|
||||
command.PersistentFlags().String("database-password", "", "database password")
|
||||
|
||||
// database flag bindings
|
||||
viper.BindPFlag("database.name", command.PersistentFlags().Lookup("database-name"))
|
||||
viper.BindPFlag("database.port", command.PersistentFlags().Lookup("database-port"))
|
||||
viper.BindPFlag("database.hostname", command.PersistentFlags().Lookup("database-hostname"))
|
||||
viper.BindPFlag("database.user", command.PersistentFlags().Lookup("database-user"))
|
||||
viper.BindPFlag("database.password", command.PersistentFlags().Lookup("database-password"))
|
||||
}
|
133
cmd/serve.go
133
cmd/serve.go
@ -23,14 +23,16 @@ import (
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/mailgun/groupcache/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
|
||||
"github.com/vulcanize/gap-filler/pkg/mux"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
|
||||
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
|
||||
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||
@ -86,6 +88,18 @@ func serve() {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
err = startGroupCacheService(serverConfig)
|
||||
if err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
if serverConfig.StateValidationEnabled {
|
||||
go startStateTrieValidator(serverConfig, server)
|
||||
logWithCommand.Info("state validator enabled")
|
||||
} else {
|
||||
logWithCommand.Info("state validator disabled")
|
||||
}
|
||||
|
||||
shutdown := make(chan os.Signal)
|
||||
signal.Notify(shutdown, os.Interrupt)
|
||||
<-shutdown
|
||||
@ -200,6 +214,83 @@ func startIpldGraphQL(settings *s.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func startGroupCacheService(settings *s.Config) error {
|
||||
gcc := settings.GroupCache
|
||||
|
||||
if gcc.Pool.Enabled {
|
||||
logWithCommand.Info("starting up groupcache pool HTTTP server")
|
||||
|
||||
pool := groupcache.NewHTTPPoolOpts(gcc.Pool.HttpEndpoint, &groupcache.HTTPPoolOptions{})
|
||||
pool.Set(gcc.Pool.PeerHttpEndpoints...)
|
||||
|
||||
httpURL, err := url.Parse(gcc.Pool.HttpEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
server := http.Server{
|
||||
Addr: httpURL.Host,
|
||||
Handler: pool,
|
||||
}
|
||||
|
||||
// Start a HTTP server to listen for peer requests from the groupcache
|
||||
go server.ListenAndServe()
|
||||
|
||||
logWithCommand.Infof("groupcache pool endpoint opened for url %s", httpURL)
|
||||
} else {
|
||||
logWithCommand.Info("Groupcache pool is disabled")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func startStateTrieValidator(config *s.Config, server s.Server) {
|
||||
validateEveryNthBlock := config.StateValidationEveryNthBlock
|
||||
|
||||
var lastBlockNumber uint64
|
||||
backend := server.Backend()
|
||||
|
||||
for {
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
block, err := backend.CurrentBlock()
|
||||
if err != nil {
|
||||
log.Errorln("Error fetching current block for state trie validator")
|
||||
continue
|
||||
}
|
||||
|
||||
stateRoot := block.Root()
|
||||
blockNumber := block.NumberU64()
|
||||
blockHash := block.Hash()
|
||||
|
||||
if validateEveryNthBlock <= 0 || // Used for static replicas where block number doesn't progress.
|
||||
(blockNumber > lastBlockNumber) && (blockNumber%validateEveryNthBlock == 0) {
|
||||
|
||||
// The validate trie call will take a long time on mainnet, e.g. a few hours.
|
||||
if err = backend.ValidateTrie(stateRoot); err != nil {
|
||||
log.Fatalf("Error validating trie for block number %d hash %s state root %s",
|
||||
blockNumber,
|
||||
blockHash,
|
||||
stateRoot,
|
||||
)
|
||||
}
|
||||
|
||||
log.Infof("Successfully validated trie for block number %d hash %s state root %s",
|
||||
blockNumber,
|
||||
blockHash,
|
||||
stateRoot,
|
||||
)
|
||||
|
||||
if validateEveryNthBlock <= 0 {
|
||||
// Static replica, sleep a long-ish time (1/2 of cache expiry time) since we only need to keep the cache warm.
|
||||
time.Sleep((time.Minute * time.Duration(config.GroupCache.StateDB.CacheExpiryInMins)) / 2)
|
||||
}
|
||||
|
||||
lastBlockNumber = blockNumber
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseRpcAddresses(value string) ([]*rpc.Client, error) {
|
||||
rpcAddresses := strings.Split(value, ",")
|
||||
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))
|
||||
@ -224,12 +315,7 @@ func parseRpcAddresses(value string) ([]*rpc.Client, error) {
|
||||
func init() {
|
||||
rootCmd.AddCommand(serveCmd)
|
||||
|
||||
// database credentials
|
||||
serveCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name")
|
||||
serveCmd.PersistentFlags().Int("database-port", 5432, "database port")
|
||||
serveCmd.PersistentFlags().String("database-hostname", "localhost", "database hostname")
|
||||
serveCmd.PersistentFlags().String("database-user", "", "database user")
|
||||
serveCmd.PersistentFlags().String("database-password", "", "database password")
|
||||
addDatabaseFlags(serveCmd)
|
||||
|
||||
// flags for all config variables
|
||||
// eth graphql and json-rpc parameters
|
||||
@ -260,14 +346,19 @@ func init() {
|
||||
serveCmd.PersistentFlags().String("eth-chain-config", "", "json chain config file location")
|
||||
serveCmd.PersistentFlags().Bool("eth-supports-state-diff", false, "whether or not the proxy ethereum client supports statediffing endpoints")
|
||||
|
||||
// and their bindings
|
||||
// database
|
||||
viper.BindPFlag("database.name", serveCmd.PersistentFlags().Lookup("database-name"))
|
||||
viper.BindPFlag("database.port", serveCmd.PersistentFlags().Lookup("database-port"))
|
||||
viper.BindPFlag("database.hostname", serveCmd.PersistentFlags().Lookup("database-hostname"))
|
||||
viper.BindPFlag("database.user", serveCmd.PersistentFlags().Lookup("database-user"))
|
||||
viper.BindPFlag("database.password", serveCmd.PersistentFlags().Lookup("database-password"))
|
||||
// groupcache flags
|
||||
serveCmd.PersistentFlags().Bool("gcache-pool-enabled", false, "turn on the groupcache pool")
|
||||
serveCmd.PersistentFlags().String("gcache-pool-http-path", "", "http url for groupcache node")
|
||||
serveCmd.PersistentFlags().StringArray("gcache-pool-http-peers", []string{}, "http urls for groupcache peers")
|
||||
serveCmd.PersistentFlags().Int("gcache-statedb-cache-size", 16, "state DB cache size in MB")
|
||||
serveCmd.PersistentFlags().Int("gcache-statedb-cache-expiry", 60, "state DB cache expiry time in mins")
|
||||
serveCmd.PersistentFlags().Int("gcache-statedb-log-stats-interval", 60, "state DB cache stats log interval in secs")
|
||||
|
||||
// state validator flags
|
||||
serveCmd.PersistentFlags().Bool("validator-enabled", false, "turn on the state validator")
|
||||
serveCmd.PersistentFlags().Uint("validator-every-nth-block", 1500, "only validate every Nth block")
|
||||
|
||||
// and their bindings
|
||||
// eth graphql server
|
||||
viper.BindPFlag("eth.server.graphql", serveCmd.PersistentFlags().Lookup("eth-server-graphql"))
|
||||
viper.BindPFlag("eth.server.graphqlPath", serveCmd.PersistentFlags().Lookup("eth-server-graphql-path"))
|
||||
@ -301,4 +392,16 @@ func init() {
|
||||
viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
|
||||
viper.BindPFlag("ethereum.chainConfig", serveCmd.PersistentFlags().Lookup("eth-chain-config"))
|
||||
viper.BindPFlag("ethereum.supportsStateDiff", serveCmd.PersistentFlags().Lookup("eth-supports-state-diff"))
|
||||
|
||||
// groupcache flags
|
||||
viper.BindPFlag("groupcache.pool.enabled", serveCmd.PersistentFlags().Lookup("gcache-pool-enabled"))
|
||||
viper.BindPFlag("groupcache.pool.httpEndpoint", serveCmd.PersistentFlags().Lookup("gcache-pool-http-path"))
|
||||
viper.BindPFlag("groupcache.pool.peerHttpEndpoints", serveCmd.PersistentFlags().Lookup("gcache-pool-http-peers"))
|
||||
viper.BindPFlag("groupcache.statedb.cacheSizeInMB", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-size"))
|
||||
viper.BindPFlag("groupcache.statedb.cacheExpiryInMins", serveCmd.PersistentFlags().Lookup("gcache-statedb-cache-expiry"))
|
||||
viper.BindPFlag("groupcache.statedb.logStatsIntervalInSecs", serveCmd.PersistentFlags().Lookup("gcache-statedb-log-stats-interval"))
|
||||
|
||||
// state validator flags
|
||||
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
|
||||
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
|
||||
}
|
||||
|
87
cmd/validate.go
Normal file
87
cmd/validate.go
Normal file
@ -0,0 +1,87 @@
|
||||
// Copyright © 2021 Vulcanize, Inc
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
validator "github.com/vulcanize/eth-ipfs-state-validator/pkg"
|
||||
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||
)
|
||||
|
||||
const GroupName = "statedb-validate"
|
||||
const CacheExpiryInMins = 8 * 60 // 8 hours
|
||||
const CacheSizeInMB = 16 // 16 MB
|
||||
|
||||
var validateCmd = &cobra.Command{
|
||||
Use: "validate",
|
||||
Short: "valdiate state",
|
||||
Long: `This command validates the trie for the given state root`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
subCommand = cmd.CalledAs()
|
||||
logWithCommand = *log.WithField("SubCommand", subCommand)
|
||||
validate()
|
||||
},
|
||||
}
|
||||
|
||||
func validate() {
|
||||
config, err := s.NewConfig()
|
||||
if err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
stateRootStr := viper.GetString("stateRoot")
|
||||
if stateRootStr == "" {
|
||||
logWithCommand.Fatal("must provide a state root for state validation")
|
||||
}
|
||||
|
||||
stateRoot := common.HexToHash(stateRootStr)
|
||||
cacheSize := viper.GetInt("cacheSize")
|
||||
|
||||
ethDB := ipfsethdb.NewDatabase(config.DB.DB, ipfsethdb.CacheConfig{
|
||||
Name: GroupName,
|
||||
Size: cacheSize * 1024 * 1024,
|
||||
ExpiryDuration: time.Minute * time.Duration(CacheExpiryInMins),
|
||||
})
|
||||
|
||||
validator := validator.NewValidator(nil, ethDB)
|
||||
if err = validator.ValidateTrie(stateRoot); err != nil {
|
||||
log.Fatalln("Error validating state root")
|
||||
}
|
||||
|
||||
stats := ethDB.GetCacheStats()
|
||||
log.Debugf("groupcache stats %+v", stats)
|
||||
|
||||
log.Infoln("Successfully validated state root")
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(validateCmd)
|
||||
|
||||
addDatabaseFlags(validateCmd)
|
||||
|
||||
validateCmd.PersistentFlags().String("state-root", "", "root of the state trie we wish to validate")
|
||||
viper.BindPFlag("stateRoot", validateCmd.PersistentFlags().Lookup("state-root"))
|
||||
|
||||
validateCmd.PersistentFlags().Int("cache-size", CacheSizeInMB, "cache size in MB")
|
||||
viper.BindPFlag("cacheSize", validateCmd.PersistentFlags().Lookup("cache-size"))
|
||||
}
|
8
go.mod
8
go.mod
@ -13,20 +13,20 @@ require (
|
||||
github.com/jmoiron/sqlx v1.2.0
|
||||
github.com/lib/pq v1.10.2
|
||||
github.com/machinebox/graphql v0.2.2
|
||||
github.com/mailgun/groupcache/v2 v2.2.1
|
||||
github.com/matryer/is v1.4.0 // indirect
|
||||
github.com/multiformats/go-multihash v0.0.14
|
||||
github.com/onsi/ginkgo v1.16.2
|
||||
github.com/onsi/gomega v1.10.1
|
||||
github.com/prometheus/client_golang v1.5.1
|
||||
github.com/prometheus/client_golang v1.7.1
|
||||
github.com/shirou/gopsutil v3.21.5+incompatible // indirect
|
||||
github.com/sirupsen/logrus v1.7.0
|
||||
github.com/spf13/cobra v1.1.1
|
||||
github.com/spf13/viper v1.7.0
|
||||
github.com/tklauser/go-sysconf v0.3.6 // indirect
|
||||
github.com/vulcanize/eth-ipfs-state-validator v0.0.1
|
||||
github.com/vulcanize/gap-filler v0.3.1
|
||||
github.com/vulcanize/ipfs-ethdb v0.0.4-0.20210824131459-7bb49801fc12
|
||||
github.com/vulcanize/ipfs-ethdb v0.0.4
|
||||
)
|
||||
|
||||
replace github.com/ethereum/go-ethereum v1.10.8 => github.com/vulcanize/go-ethereum v1.10.8-statediff-0.0.26
|
||||
|
||||
replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
@ -37,9 +36,10 @@ import (
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
|
||||
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -213,12 +213,15 @@ var _ = Describe("API", func() {
|
||||
indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db)
|
||||
backend, err := eth.NewEthBackend(db, ð.Config{
|
||||
ChainConfig: chainConfig,
|
||||
VmConfig: vm.Config{},
|
||||
VMConfig: vm.Config{},
|
||||
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
|
||||
CacheConfig: pgipfsethdb.CacheConfig{
|
||||
Name: "api_test",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
GroupCacheConfig: ðServerShared.GroupCacheConfig{
|
||||
StateDB: ethServerShared.GroupConfig{
|
||||
Name: "api_test",
|
||||
CacheSizeInMB: 8,
|
||||
CacheExpiryInMins: 60,
|
||||
LogStatsIntervalInSecs: 0,
|
||||
},
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
@ -41,9 +42,13 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
ethServerShared "github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
log "github.com/sirupsen/logrus"
|
||||
validator "github.com/vulcanize/eth-ipfs-state-validator/pkg"
|
||||
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -83,6 +88,10 @@ const (
|
||||
RetrieveCodeByMhKey = `SELECT data FROM public.blocks WHERE key = $1`
|
||||
)
|
||||
|
||||
const (
|
||||
StateDBGroupCacheName = "statedb"
|
||||
)
|
||||
|
||||
type Backend struct {
|
||||
// underlying postgres db
|
||||
DB *postgres.DB
|
||||
@ -100,17 +109,30 @@ type Backend struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
ChainConfig *params.ChainConfig
|
||||
VmConfig vm.Config
|
||||
DefaultSender *common.Address
|
||||
RPCGasCap *big.Int
|
||||
CacheConfig pgipfsethdb.CacheConfig
|
||||
ChainConfig *params.ChainConfig
|
||||
VMConfig vm.Config
|
||||
DefaultSender *common.Address
|
||||
RPCGasCap *big.Int
|
||||
GroupCacheConfig *shared.GroupCacheConfig
|
||||
}
|
||||
|
||||
func NewEthBackend(db *postgres.DB, c *Config) (*Backend, error) {
|
||||
r := NewCIDRetriever(db)
|
||||
gcc := c.GroupCacheConfig
|
||||
|
||||
groupName := gcc.StateDB.Name
|
||||
if groupName == "" {
|
||||
groupName = StateDBGroupCacheName
|
||||
}
|
||||
|
||||
r := NewCIDRetriever(db)
|
||||
ethDB := ipfsethdb.NewDatabase(db.DB, ipfsethdb.CacheConfig{
|
||||
Name: groupName,
|
||||
Size: gcc.StateDB.CacheSizeInMB * 1024 * 1024,
|
||||
ExpiryDuration: time.Minute * time.Duration(gcc.StateDB.CacheExpiryInMins),
|
||||
})
|
||||
|
||||
logStateDBStatsOnTimer(ethDB, gcc)
|
||||
|
||||
ethDB := pgipfsethdb.NewDatabase(db.DB, c.CacheConfig)
|
||||
return &Backend{
|
||||
DB: db,
|
||||
Retriever: r,
|
||||
@ -292,10 +314,10 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
|
||||
}
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
} else {
|
||||
err = tx.Commit()
|
||||
}
|
||||
@ -383,10 +405,10 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
|
||||
}
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
} else {
|
||||
err = tx.Commit()
|
||||
}
|
||||
@ -592,7 +614,7 @@ func (b *Backend) GetEVM(ctx context.Context, msg core.Message, state *state.Sta
|
||||
state.SetBalance(msg.From(), math.MaxBig256)
|
||||
vmctx := core.NewEVMBlockContext(header, b, nil)
|
||||
txContext := core.NewEVMTxContext(msg)
|
||||
return vm.NewEVM(vmctx, txContext, state, b.Config.ChainConfig, b.Config.VmConfig), nil
|
||||
return vm.NewEVM(vmctx, txContext, state, b.Config.ChainConfig, b.Config.VMConfig), nil
|
||||
}
|
||||
|
||||
// GetAccountByNumberOrHash returns the account object for the provided address at the block corresponding to the provided number or hash
|
||||
@ -704,10 +726,10 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
|
||||
}
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
shared.Rollback(tx)
|
||||
ethServerShared.Rollback(tx)
|
||||
} else {
|
||||
err = tx.Commit()
|
||||
}
|
||||
@ -717,7 +739,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
|
||||
return nil, err
|
||||
}
|
||||
var mhKey string
|
||||
mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
|
||||
mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -794,6 +816,10 @@ func (b *Backend) GetHeader(hash common.Hash, height uint64) *types.Header {
|
||||
return header
|
||||
}
|
||||
|
||||
func (b *Backend) ValidateTrie(stateRoot common.Hash) error {
|
||||
return validator.NewValidator(nil, b.EthDB).ValidateTrie(stateRoot)
|
||||
}
|
||||
|
||||
// RPCGasCap returns the configured gas cap for the rpc server
|
||||
func (b *Backend) RPCGasCap() *big.Int {
|
||||
return b.Config.RPCGasCap
|
||||
@ -826,3 +852,18 @@ func (b *Backend) BloomStatus() (uint64, uint64) {
|
||||
func (b *Backend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func logStateDBStatsOnTimer(ethDB *ipfsethdb.Database, gcc *shared.GroupCacheConfig) {
|
||||
// No stats logging if interval isn't a positive integer.
|
||||
if gcc.StateDB.LogStatsIntervalInSecs <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Duration(gcc.StateDB.LogStatsIntervalInSecs) * time.Second)
|
||||
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
log.Infof("%s groupcache stats: %+v", StateDBGroupCacheName, ethDB.GetCacheStats())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/accounts/abi"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -37,10 +36,10 @@ import (
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
|
||||
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -80,12 +79,15 @@ var _ = Describe("eth state reading tests", func() {
|
||||
transformer := indexer.NewStateDiffIndexer(chainConfig, db)
|
||||
backend, err = eth.NewEthBackend(db, ð.Config{
|
||||
ChainConfig: chainConfig,
|
||||
VmConfig: vm.Config{},
|
||||
VMConfig: vm.Config{},
|
||||
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
|
||||
CacheConfig: pgipfsethdb.CacheConfig{
|
||||
Name: "eth_state",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
GroupCacheConfig: ðServerShared.GroupCacheConfig{
|
||||
StateDB: ethServerShared.GroupConfig{
|
||||
Name: "eth_state_test",
|
||||
CacheSizeInMB: 8,
|
||||
CacheExpiryInMins: 60,
|
||||
LogStatsIntervalInSecs: 0,
|
||||
},
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
@ -37,11 +36,11 @@ import (
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
|
||||
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||
)
|
||||
|
||||
// SetupDB is use to setup a db for watcher tests
|
||||
@ -85,12 +84,15 @@ var _ = Describe("GraphQL", func() {
|
||||
transformer := indexer.NewStateDiffIndexer(chainConfig, db)
|
||||
backend, err = eth.NewEthBackend(db, ð.Config{
|
||||
ChainConfig: chainConfig,
|
||||
VmConfig: vm.Config{},
|
||||
VMConfig: vm.Config{},
|
||||
RPCGasCap: big.NewInt(10000000000),
|
||||
CacheConfig: pgipfsethdb.CacheConfig{
|
||||
Name: "graphql_test",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
GroupCacheConfig: ðServerShared.GroupCacheConfig{
|
||||
StateDB: ethServerShared.GroupConfig{
|
||||
Name: "graphql_test",
|
||||
CacheSizeInMB: 8,
|
||||
CacheExpiryInMins: 60,
|
||||
LogStatsIntervalInSecs: 0,
|
||||
},
|
||||
},
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
@ -28,9 +28,10 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||
)
|
||||
|
||||
// Env variables
|
||||
@ -47,6 +48,9 @@ const (
|
||||
ethRPCGasCap = "ETH_RPC_GAS_CAP"
|
||||
ethChainConfig = "ETH_CHAIN_CONFIG"
|
||||
ethSupportsStatediff = "ETH_SUPPORTS_STATEDIFF"
|
||||
|
||||
ValidatorEnabled = "VALIDATOR_ENABLED"
|
||||
ValidatorEveryNthBlock = "VALIDATOR_EVERY_NTH_BLOCK"
|
||||
)
|
||||
|
||||
// Config struct
|
||||
@ -79,6 +83,12 @@ type Config struct {
|
||||
EthHttpEndpoint string
|
||||
Client *rpc.Client
|
||||
SupportStateDiff bool
|
||||
|
||||
// Cache configuration.
|
||||
GroupCache *ethServerShared.GroupCacheConfig
|
||||
|
||||
StateValidationEnabled bool
|
||||
StateValidationEveryNthBlock uint64
|
||||
}
|
||||
|
||||
// NewConfig is used to initialize a watcher config from a .toml file
|
||||
@ -205,6 +215,11 @@ func NewConfig() (*Config, error) {
|
||||
} else {
|
||||
c.ChainConfig, err = eth.ChainConfig(nodeInfo.ChainID)
|
||||
}
|
||||
|
||||
c.loadGroupCacheConfig()
|
||||
|
||||
c.loadValidatorConfig()
|
||||
|
||||
return c, err
|
||||
}
|
||||
|
||||
@ -217,7 +232,7 @@ func overrideDBConnConfig(con *postgres.ConnectionConfig) {
|
||||
con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
|
||||
}
|
||||
|
||||
func (d *Config) dbInit() {
|
||||
func (c *Config) dbInit() {
|
||||
viper.BindEnv("database.name", databaseName)
|
||||
viper.BindEnv("database.hostname", databaseHostname)
|
||||
viper.BindEnv("database.port", databasePort)
|
||||
@ -227,12 +242,43 @@ func (d *Config) dbInit() {
|
||||
viper.BindEnv("database.maxOpen", databaseMaxOpenConnections)
|
||||
viper.BindEnv("database.maxLifetime", databaseMaxOpenConnLifetime)
|
||||
|
||||
d.DBParams.Name = viper.GetString("database.name")
|
||||
d.DBParams.Hostname = viper.GetString("database.hostname")
|
||||
d.DBParams.Port = viper.GetInt("database.port")
|
||||
d.DBParams.User = viper.GetString("database.user")
|
||||
d.DBParams.Password = viper.GetString("database.password")
|
||||
d.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
|
||||
d.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
|
||||
d.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
|
||||
c.DBParams.Name = viper.GetString("database.name")
|
||||
c.DBParams.Hostname = viper.GetString("database.hostname")
|
||||
c.DBParams.Port = viper.GetInt("database.port")
|
||||
c.DBParams.User = viper.GetString("database.user")
|
||||
c.DBParams.Password = viper.GetString("database.password")
|
||||
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
|
||||
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
|
||||
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
|
||||
}
|
||||
|
||||
func (c *Config) loadGroupCacheConfig() {
|
||||
viper.BindEnv("groupcache.pool.enabled", ethServerShared.GcachePoolEnabled)
|
||||
viper.BindEnv("groupcache.pool.httpEndpoint", ethServerShared.GcachePoolHttpPath)
|
||||
viper.BindEnv("groupcache.pool.peerHttpEndpoints", ethServerShared.GcachePoolHttpPeers)
|
||||
viper.BindEnv("groupcache.statedb.cacheSizeInMB", ethServerShared.GcacheStatedbCacheSize)
|
||||
viper.BindEnv("groupcache.statedb.cacheExpiryInMins", ethServerShared.GcacheStatedbCacheExpiry)
|
||||
viper.BindEnv("groupcache.statedb.logStatsIntervalInSecs", ethServerShared.GcacheStatedbLogStatsInterval)
|
||||
|
||||
gcc := ethServerShared.GroupCacheConfig{}
|
||||
gcc.Pool.Enabled = viper.GetBool("groupcache.pool.enabled")
|
||||
if gcc.Pool.Enabled {
|
||||
gcc.Pool.HttpEndpoint = viper.GetString("groupcache.pool.httpEndpoint")
|
||||
gcc.Pool.PeerHttpEndpoints = viper.GetStringSlice("groupcache.pool.peerHttpEndpoints")
|
||||
}
|
||||
|
||||
// Irrespective of whether the pool is enabled, we always use the hot/local cache.
|
||||
gcc.StateDB.CacheSizeInMB = viper.GetInt("groupcache.statedb.cacheSizeInMB")
|
||||
gcc.StateDB.CacheExpiryInMins = viper.GetInt("groupcache.statedb.cacheExpiryInMins")
|
||||
gcc.StateDB.LogStatsIntervalInSecs = viper.GetInt("groupcache.statedb.logStatsIntervalInSecs")
|
||||
|
||||
c.GroupCache = &gcc
|
||||
}
|
||||
|
||||
func (c *Config) loadValidatorConfig() {
|
||||
viper.BindEnv("validator.enabled", ValidatorEnabled)
|
||||
viper.BindEnv("validator.everyNthBlock", ValidatorEveryNthBlock)
|
||||
|
||||
c.StateValidationEnabled = viper.GetBool("validator.enabled")
|
||||
c.StateValidationEveryNthBlock = viper.GetUint64("validator.everyNthBlock")
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
@ -31,7 +30,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
log "github.com/sirupsen/logrus"
|
||||
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/net"
|
||||
)
|
||||
@ -100,15 +99,11 @@ func NewServer(settings *Config) (Server, error) {
|
||||
sap.supportsStateDiffing = settings.SupportStateDiff
|
||||
var err error
|
||||
sap.backend, err = eth.NewEthBackend(sap.db, ð.Config{
|
||||
ChainConfig: settings.ChainConfig,
|
||||
VmConfig: vm.Config{},
|
||||
DefaultSender: settings.DefaultSender,
|
||||
RPCGasCap: settings.RPCGasCap,
|
||||
CacheConfig: pgipfsethdb.CacheConfig{
|
||||
Name: "ipld-eth-server",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
},
|
||||
ChainConfig: settings.ChainConfig,
|
||||
VMConfig: vm.Config{},
|
||||
DefaultSender: settings.DefaultSender,
|
||||
RPCGasCap: settings.RPCGasCap,
|
||||
GroupCacheConfig: settings.GroupCache,
|
||||
})
|
||||
return sap, err
|
||||
}
|
||||
|
@ -19,4 +19,11 @@ package shared
|
||||
const (
|
||||
DefaultMaxBatchSize uint64 = 100
|
||||
DefaultMaxBatchNumber int64 = 50
|
||||
|
||||
GcachePoolEnabled = "GCACHE_POOL_ENABLED"
|
||||
GcachePoolHttpPath = "GCACHE_POOL_HTTP_PATH"
|
||||
GcachePoolHttpPeers = "GCACHE_POOL_HTTP_PEERS"
|
||||
GcacheStatedbCacheSize = "GCACHE_STATEDB_CACHE_SIZE"
|
||||
GcacheStatedbCacheExpiry = "GCACHE_STATEDB_CACHE_EXPIRY"
|
||||
GcacheStatedbLogStatsInterval = "GCACHE_STATEDB_LOG_STATS_INTERVAL"
|
||||
)
|
||||
|
38
pkg/shared/types.go
Normal file
38
pkg/shared/types.go
Normal file
@ -0,0 +1,38 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package shared
|
||||
|
||||
type PoolConfig struct {
|
||||
Enabled bool
|
||||
HttpEndpoint string
|
||||
PeerHttpEndpoints []string
|
||||
}
|
||||
|
||||
type GroupConfig struct {
|
||||
CacheSizeInMB int
|
||||
CacheExpiryInMins int
|
||||
LogStatsIntervalInSecs int
|
||||
|
||||
// Used in tests to override the cache name, to work around
|
||||
// the "duplicate registration of group" error from groupcache
|
||||
Name string
|
||||
}
|
||||
|
||||
type GroupCacheConfig struct {
|
||||
Pool PoolConfig
|
||||
StateDB GroupConfig
|
||||
}
|
Loading…
Reference in New Issue
Block a user