From ebaf4b6447e89e8685d24b91752e4e9dae0e3dd6 Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Fri, 13 Aug 2021 18:24:19 +0530 Subject: [PATCH] Use groupcache pool for state db access --- cmd/serve.go | 32 ++++++++++++++++++++++++++++++++ go.mod | 1 + pkg/eth/backend.go | 40 ++++++++++++++++++++++++---------------- pkg/serve/config.go | 19 +++++++++++++++++++ pkg/serve/service.go | 16 +++++----------- pkg/shared/types.go | 32 ++++++++++++++++++++++++++++++++ 6 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 pkg/shared/types.go diff --git a/cmd/serve.go b/cmd/serve.go index e45e5ddf..a5b9bdb3 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/rpc" + "github.com/mailgun/groupcache/v2" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -86,6 +87,11 @@ func serve() { logWithCommand.Fatal(err) } + err = startGroupCacheService(serverConfig) + if err != nil { + logWithCommand.Fatal(err) + } + shutdown := make(chan os.Signal) signal.Notify(shutdown, os.Interrupt) <-shutdown @@ -200,6 +206,32 @@ func startIpldGraphQL(settings *s.Config) error { return nil } +func startGroupCacheService(settings *s.Config) error { + gcc := settings.GroupCache + + pool := groupcache.NewHTTPPoolOpts(gcc.Pool.HttpEndpoint, &groupcache.HTTPPoolOptions{}) + pool.Set(gcc.Pool.PeerHttpEndpoints...) + + httpURL, err := url.Parse(gcc.Pool.HttpEndpoint) + if err != nil { + return err + } + + server := http.Server{ + Addr: httpURL.Host, + Handler: pool, + } + + // Start a HTTP server to listen for peer requests from the groupcache + go func() { + if err := server.ListenAndServe(); err != nil { + logWithCommand.Fatal(err) + } + }() + + return nil +} + func parseRpcAddresses(value string) ([]*rpc.Client, error) { rpcAddresses := strings.Split(value, ",") rpcClients := make([]*rpc.Client, 0, len(rpcAddresses)) diff --git a/go.mod b/go.mod index a7d1b5a8..e35005d6 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/jmoiron/sqlx v1.2.0 github.com/lib/pq v1.10.2 github.com/machinebox/graphql v0.2.2 + github.com/mailgun/groupcache/v2 v2.2.1 github.com/matryer/is v1.4.0 // indirect github.com/multiformats/go-multihash v0.0.14 github.com/onsi/ginkgo v1.16.2 diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 16e3a6bc..1af668ed 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -41,9 +42,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/statediff/indexer/shared" + ethServerShared "github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/trie" - pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres" + ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -100,17 +102,23 @@ type Backend struct { } type Config struct { - ChainConfig *params.ChainConfig - VmConfig vm.Config - DefaultSender *common.Address - RPCGasCap *big.Int - CacheConfig pgipfsethdb.CacheConfig + ChainConfig *params.ChainConfig + VmConfig vm.Config + DefaultSender *common.Address + RPCGasCap *big.Int + GroupCacheConfig *shared.GroupCacheConfig } func NewEthBackend(db *postgres.DB, c *Config) (*Backend, error) { - r := NewCIDRetriever(db) + gcc := c.GroupCacheConfig + + r := NewCIDRetriever(db) + ethDB := ipfsethdb.NewDatabase(db.DB, ipfsethdb.CacheConfig{ + Name: "statedb", + Size: gcc.StateDB.CacheSizeInMB, + ExpiryDuration: time.Minute * time.Duration(gcc.StateDB.CacheExpiryInMins), + }) - ethDB := pgipfsethdb.NewDatabase(db.DB, c.CacheConfig) return &Backend{ DB: db, Retriever: r, @@ -292,10 +300,10 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber } defer func() { if p := recover(); p != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) panic(p) } else if err != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) } else { err = tx.Commit() } @@ -383,10 +391,10 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo } defer func() { if p := recover(); p != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) panic(p) } else if err != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) } else { err = tx.Commit() } @@ -704,10 +712,10 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has } defer func() { if p := recover(); p != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) panic(p) } else if err != nil { - shared.Rollback(tx) + ethServerShared.Rollback(tx) } else { err = tx.Commit() } @@ -717,7 +725,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has return nil, err } var mhKey string - mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) + mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) if err != nil { return nil, err } diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 390ced1b..5f8502c9 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -31,6 +31,7 @@ import ( "github.com/vulcanize/ipld-eth-server/pkg/prom" "github.com/vulcanize/ipld-eth-server/pkg/eth" + ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // Env variables @@ -79,6 +80,9 @@ type Config struct { EthHttpEndpoint string Client *rpc.Client SupportStateDiff bool + + // Cache configuration. + GroupCache *ethServerShared.GroupCacheConfig } // NewConfig is used to initialize a watcher config from a .toml file @@ -205,6 +209,9 @@ func NewConfig() (*Config, error) { } else { c.ChainConfig, err = eth.ChainConfig(nodeInfo.ChainID) } + + c.loadGroupCacheConfig() + return c, err } @@ -236,3 +243,15 @@ func (d *Config) dbInit() { d.DBConfig.MaxOpen = viper.GetInt("database.maxOpen") d.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime") } + +func (c *Config) loadGroupCacheConfig() { + gcc := ethServerShared.GroupCacheConfig{} + + gcc.Pool.HttpEndpoint = viper.GetString("groupcache.pool.httpEndpoint") + gcc.Pool.PeerHttpEndpoints = viper.GetStringSlice("groupcache.pool.peerHttpEndpoints") + + gcc.StateDB.CacheSizeInMB = viper.GetInt("groupcache.statedb.cacheSizeInMB") + gcc.StateDB.CacheExpiryInMins = viper.GetInt("groupcache.statedb.cacheExpiryInMins") + + c.GroupCache = &gcc +} diff --git a/pkg/serve/service.go b/pkg/serve/service.go index ed403ae2..45e1c7fa 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -20,7 +20,6 @@ import ( "fmt" "strconv" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/vm" @@ -31,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" log "github.com/sirupsen/logrus" - pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres" "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/net" ) @@ -100,15 +98,11 @@ func NewServer(settings *Config) (Server, error) { sap.supportsStateDiffing = settings.SupportStateDiff var err error sap.backend, err = eth.NewEthBackend(sap.db, ð.Config{ - ChainConfig: settings.ChainConfig, - VmConfig: vm.Config{}, - DefaultSender: settings.DefaultSender, - RPCGasCap: settings.RPCGasCap, - CacheConfig: pgipfsethdb.CacheConfig{ - Name: "ipld-eth-server", - Size: 3000000, // 3MB - ExpiryDuration: time.Hour, - }, + ChainConfig: settings.ChainConfig, + VmConfig: vm.Config{}, + DefaultSender: settings.DefaultSender, + RPCGasCap: settings.RPCGasCap, + GroupCacheConfig: settings.GroupCache, }) return sap, err } diff --git a/pkg/shared/types.go b/pkg/shared/types.go new file mode 100644 index 00000000..dc1cc29b --- /dev/null +++ b/pkg/shared/types.go @@ -0,0 +1,32 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +type PoolConfig struct { + HttpEndpoint string + PeerHttpEndpoints []string +} + +type GroupConfig struct { + CacheSizeInMB int + CacheExpiryInMins int +} + +type GroupCacheConfig struct { + Pool PoolConfig + StateDB GroupConfig +}