Use groupcache pool for state db access

This commit is contained in:
Ashwin Phatak 2021-08-13 18:24:19 +05:30 committed by Arijit Das
parent 838ed033f8
commit ebaf4b6447
6 changed files with 113 additions and 27 deletions

View File

@ -25,6 +25,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -86,6 +87,11 @@ func serve() {
logWithCommand.Fatal(err)
}
err = startGroupCacheService(serverConfig)
if err != nil {
logWithCommand.Fatal(err)
}
shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
@ -200,6 +206,32 @@ func startIpldGraphQL(settings *s.Config) error {
return nil
}
func startGroupCacheService(settings *s.Config) error {
gcc := settings.GroupCache
pool := groupcache.NewHTTPPoolOpts(gcc.Pool.HttpEndpoint, &groupcache.HTTPPoolOptions{})
pool.Set(gcc.Pool.PeerHttpEndpoints...)
httpURL, err := url.Parse(gcc.Pool.HttpEndpoint)
if err != nil {
return err
}
server := http.Server{
Addr: httpURL.Host,
Handler: pool,
}
// Start a HTTP server to listen for peer requests from the groupcache
go func() {
if err := server.ListenAndServe(); err != nil {
logWithCommand.Fatal(err)
}
}()
return nil
}
func parseRpcAddresses(value string) ([]*rpc.Client, error) {
rpcAddresses := strings.Split(value, ",")
rpcClients := make([]*rpc.Client, 0, len(rpcAddresses))

1
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.10.2
github.com/machinebox/graphql v0.2.2
github.com/mailgun/groupcache/v2 v2.2.1
github.com/matryer/is v1.4.0 // indirect
github.com/multiformats/go-multihash v0.0.14
github.com/onsi/ginkgo v1.16.2

View File

@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -41,9 +42,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
ethServerShared "github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
)
var (
@ -100,17 +102,23 @@ type Backend struct {
}
type Config struct {
ChainConfig *params.ChainConfig
VmConfig vm.Config
DefaultSender *common.Address
RPCGasCap *big.Int
CacheConfig pgipfsethdb.CacheConfig
ChainConfig *params.ChainConfig
VmConfig vm.Config
DefaultSender *common.Address
RPCGasCap *big.Int
GroupCacheConfig *shared.GroupCacheConfig
}
func NewEthBackend(db *postgres.DB, c *Config) (*Backend, error) {
r := NewCIDRetriever(db)
gcc := c.GroupCacheConfig
r := NewCIDRetriever(db)
ethDB := ipfsethdb.NewDatabase(db.DB, ipfsethdb.CacheConfig{
Name: "statedb",
Size: gcc.StateDB.CacheSizeInMB,
ExpiryDuration: time.Minute * time.Duration(gcc.StateDB.CacheExpiryInMins),
})
ethDB := pgipfsethdb.NewDatabase(db.DB, c.CacheConfig)
return &Backend{
DB: db,
Retriever: r,
@ -292,10 +300,10 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -383,10 +391,10 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -704,10 +712,10 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
ethServerShared.Rollback(tx)
} else {
err = tx.Commit()
}
@ -717,7 +725,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
return nil, err
}
var mhKey string
mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
mhKey, err = ethServerShared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
if err != nil {
return nil, err
}

View File

@ -31,6 +31,7 @@ import (
"github.com/vulcanize/ipld-eth-server/pkg/prom"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
ethServerShared "github.com/vulcanize/ipld-eth-server/pkg/shared"
)
// Env variables
@ -79,6 +80,9 @@ type Config struct {
EthHttpEndpoint string
Client *rpc.Client
SupportStateDiff bool
// Cache configuration.
GroupCache *ethServerShared.GroupCacheConfig
}
// NewConfig is used to initialize a watcher config from a .toml file
@ -205,6 +209,9 @@ func NewConfig() (*Config, error) {
} else {
c.ChainConfig, err = eth.ChainConfig(nodeInfo.ChainID)
}
c.loadGroupCacheConfig()
return c, err
}
@ -236,3 +243,15 @@ func (d *Config) dbInit() {
d.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
d.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}
func (c *Config) loadGroupCacheConfig() {
gcc := ethServerShared.GroupCacheConfig{}
gcc.Pool.HttpEndpoint = viper.GetString("groupcache.pool.httpEndpoint")
gcc.Pool.PeerHttpEndpoints = viper.GetStringSlice("groupcache.pool.peerHttpEndpoints")
gcc.StateDB.CacheSizeInMB = viper.GetInt("groupcache.statedb.cacheSizeInMB")
gcc.StateDB.CacheExpiryInMins = viper.GetInt("groupcache.statedb.cacheExpiryInMins")
c.GroupCache = &gcc
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm"
@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
log "github.com/sirupsen/logrus"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/net"
)
@ -100,15 +98,11 @@ func NewServer(settings *Config) (Server, error) {
sap.supportsStateDiffing = settings.SupportStateDiff
var err error
sap.backend, err = eth.NewEthBackend(sap.db, &eth.Config{
ChainConfig: settings.ChainConfig,
VmConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
CacheConfig: pgipfsethdb.CacheConfig{
Name: "ipld-eth-server",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
ChainConfig: settings.ChainConfig,
VmConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
GroupCacheConfig: settings.GroupCache,
})
return sap, err
}

32
pkg/shared/types.go Normal file
View File

@ -0,0 +1,32 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
type PoolConfig struct {
HttpEndpoint string
PeerHttpEndpoints []string
}
type GroupConfig struct {
CacheSizeInMB int
CacheExpiryInMins int
}
type GroupCacheConfig struct {
Pool PoolConfig
StateDB GroupConfig
}