refactor & use plugin builder

- removes StateTrieAt, we can no longer return a full trie without inner nodes
This commit is contained in:
Roy Crihfield 2023-08-25 15:57:33 +08:00
parent 31588ddb91
commit eb56eee54e
14 changed files with 407 additions and 3329 deletions

View File

@ -53,8 +53,6 @@ const (
PRERUN_PARALLEL = "PRERUN_PARALLEL"
PRERUN_RANGE_START = "PRERUN_RANGE_START"
PRERUN_RANGE_STOP = "PRERUN_RANGE_STOP"
PRERUN_INTERMEDIATE_STATE_NODES = "PRERUN_INTERMEDIATE_STATE_NODES"
PRERUN_INTERMEDIATE_STORAGE_NODES = "PRERUN_INTERMEDIATE_STORAGE_NODES"
PRERUN_INCLUDE_BLOCK = "PRERUN_INCLUDE_BLOCK"
PRERUN_INCLUDE_RECEIPTS = "PRERUN_INCLUDE_RECEIPTS"
PRERUN_INCLUDE_TD = "PRERUN_INCLUDE_TD"
@ -141,8 +139,6 @@ func init() {
viper.BindEnv("prerun.parallel", PRERUN_PARALLEL)
viper.BindEnv("prerun.start", PRERUN_RANGE_START)
viper.BindEnv("prerun.stop", PRERUN_RANGE_STOP)
viper.BindEnv("prerun.params.intermediateStateNodes", PRERUN_INTERMEDIATE_STATE_NODES)
viper.BindEnv("prerun.params.intermediateStorageNodes", PRERUN_INTERMEDIATE_STORAGE_NODES)
viper.BindEnv("prerun.params.includeBlock", PRERUN_INCLUDE_BLOCK)
viper.BindEnv("prerun.params.includeReceipts", PRERUN_INCLUDE_RECEIPTS)
viper.BindEnv("prerun.params.includeTD", PRERUN_INCLUDE_TD)

View File

@ -49,7 +49,6 @@ var rootCmd = &cobra.Command{
}
func Execute() {
log.Info("----- Starting vDB -----")
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
}
@ -163,8 +162,6 @@ func init() {
rootCmd.PersistentFlags().Bool("prerun-only", false, "only process pre-configured ranges; exit afterwards")
rootCmd.PersistentFlags().Int("prerun-start", 0, "start height for a prerun range")
rootCmd.PersistentFlags().Int("prerun-stop", 0, "stop height for a prerun range")
rootCmd.PersistentFlags().Bool("prerun-intermediate-state-nodes", true, "include intermediate state nodes in state diff")
rootCmd.PersistentFlags().Bool("prerun-intermediate-storage-nodes", true, "include intermediate storage nodes in state diff")
rootCmd.PersistentFlags().Bool("prerun-include-block", true, "include block data in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-include-receipts", true, "include receipts in the statediff payload")
rootCmd.PersistentFlags().Bool("prerun-include-td", true, "include td in the statediff payload")
@ -224,8 +221,6 @@ func init() {
viper.BindPFlag("prerun.parallel", rootCmd.PersistentFlags().Lookup("prerun-parallel"))
viper.BindPFlag("prerun.start", rootCmd.PersistentFlags().Lookup("prerun-start"))
viper.BindPFlag("prerun.stop", rootCmd.PersistentFlags().Lookup("prerun-stop"))
viper.BindPFlag("prerun.params.intermediateStateNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-state-nodes"))
viper.BindPFlag("prerun.params.intermediateStorageNodes", rootCmd.PersistentFlags().Lookup("prerun-intermediate-storage-nodes"))
viper.BindPFlag("prerun.params.includeBlock", rootCmd.PersistentFlags().Lookup("prerun-include-block"))
viper.BindPFlag("prerun.params.includeReceipts", rootCmd.PersistentFlags().Lookup("prerun-include-receipts"))
viper.BindPFlag("prerun.params.includeTD", rootCmd.PersistentFlags().Lookup("prerun-include-td"))
@ -344,7 +339,7 @@ func getConfig(nodeInfo node.Info) (interfaces.Config, error) {
case dump.STDOUT:
indexerConfig = dump.Config{Dump: os.Stderr}
case dump.DISCARD:
indexerConfig = dump.Config{Dump: dump.NewDiscardWriterCloser()}
indexerConfig = dump.Config{Dump: dump.Discard}
default:
return nil, fmt.Errorf("unrecognized dump destination: %s", dumpDst)
}
@ -361,8 +356,6 @@ func getConfig(nodeInfo node.Info) (interfaces.Config, error) {
DatabaseName: viper.GetString("database.name"),
Username: viper.GetString("database.user"),
Password: viper.GetString("database.password"),
ID: nodeInfo.ID,
ClientName: nodeInfo.ClientName,
Driver: driverType,
}
if viper.IsSet("database.maxIdle") {

View File

@ -75,7 +75,7 @@ func serve() {
}
logWithCommand.Infof("Latest block found in the levelDB\r\nheight: %s, hash: %s", header.Number.String(), header.Hash().Hex())
statediffService, err := createStateDiffService(reader, chainConf, nodeInfo)
service, err := createStateDiffService(reader, chainConf, nodeInfo)
if err != nil {
logWithCommand.Fatal(err)
}
@ -93,20 +93,20 @@ func serve() {
// short circuit if we only want to perform prerun
if viper.GetBool("prerun.only") {
parallel := viper.GetBool("prerun.parallel")
if err := statediffService.Run(nil, parallel); err != nil {
logWithCommand.Fatal("Unable to perform prerun: %v", err)
if err := service.Run(nil, parallel); err != nil {
logWithCommand.Fatalf("Unable to perform prerun: %v", err)
}
return
}
// start service and servers
logWithCommand.Info("Starting statediff service")
wg := new(sync.WaitGroup)
if err := statediffService.Loop(wg); err != nil {
var wg sync.WaitGroup
if err := service.Loop(&wg); err != nil {
logWithCommand.Fatalf("unable to start statediff service: %v", err)
}
logWithCommand.Info("Starting RPC servers")
if err := startServers(statediffService); err != nil {
if err := startServers(service); err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Info("RPC servers successfully spun up; awaiting requests")
@ -116,11 +116,11 @@ func serve() {
signal.Notify(shutdown, os.Interrupt)
<-shutdown
logWithCommand.Info("Received interrupt signal, shutting down")
statediffService.Stop()
service.Stop()
wg.Wait()
}
func startServers(serv sd.StateDiffService) error {
func startServers(serv *sd.Service) error {
ipcPath := viper.GetString("server.ipcPath")
httpPath := viper.GetString("server.httpPath")
if ipcPath == "" && httpPath == "" {

View File

@ -2,22 +2,27 @@ package cmd
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/cerc-io/plugeth-statediff"
gethsd "github.com/cerc-io/plugeth-statediff"
ind "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/utils/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/spf13/viper"
sd "github.com/cerc-io/eth-statediff-service/pkg"
pkg "github.com/cerc-io/eth-statediff-service/pkg"
"github.com/cerc-io/eth-statediff-service/pkg/prom"
)
type blockRange [2]uint64
func createStateDiffService(lvlDBReader sd.Reader, chainConf *params.ChainConfig, nodeInfo node.Info) (sd.StateDiffService, error) {
func createStateDiffService(lvlDBReader pkg.Reader, chainConf *params.ChainConfig, nodeInfo node.Info) (*pkg.Service, error) {
// create statediff service
logWithCommand.Info("Setting up database")
conf, err := getConfig(nodeInfo)
@ -26,7 +31,7 @@ func createStateDiffService(lvlDBReader sd.Reader, chainConf *params.ChainConfig
}
logWithCommand.Info("Creating statediff indexer")
db, indexer, err := ind.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf)
db, indexer, err := indexer.NewStateDiffIndexer(context.Background(), chainConf, nodeInfo, conf)
if err != nil {
logWithCommand.Fatal(err)
}
@ -35,22 +40,20 @@ func createStateDiffService(lvlDBReader sd.Reader, chainConf *params.ChainConfig
}
logWithCommand.Info("Creating statediff service")
sdConf := sd.Config{
sdConf := pkg.ServiceConfig{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"),
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
PreRuns: setupPreRunRanges(),
}
return sd.NewStateDiffService(lvlDBReader, indexer, sdConf)
return pkg.NewStateDiffService(lvlDBReader, indexer, sdConf), nil
}
func setupPreRunRanges() []sd.RangeRequest {
func setupPreRunRanges() []pkg.RangeRequest {
if !viper.GetBool("statediff.prerun") {
return nil
}
preRunParams := statediff.Params{
IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"),
IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"),
IncludeBlock: viper.GetBool("prerun.params.includeBlock"),
IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"),
IncludeTD: viper.GetBool("prerun.params.includeTD"),
@ -65,9 +68,9 @@ func setupPreRunRanges() []sd.RangeRequest {
preRunParams.WatchedAddresses = addrs
var rawRanges []blockRange
viper.UnmarshalKey("prerun.ranges", &rawRanges)
blockRanges := make([]sd.RangeRequest, len(rawRanges))
blockRanges := make([]pkg.RangeRequest, len(rawRanges))
for i, rawRange := range rawRanges {
blockRanges[i] = sd.RangeRequest{
blockRanges[i] = pkg.RangeRequest{
Start: rawRange[0],
Stop: rawRange[1],
Params: preRunParams,
@ -76,7 +79,7 @@ func setupPreRunRanges() []sd.RangeRequest {
if viper.IsSet("prerun.start") && viper.IsSet("prerun.stop") {
hardStart := viper.GetInt("prerun.start")
hardStop := viper.GetInt("prerun.stop")
blockRanges = append(blockRanges, sd.RangeRequest{
blockRanges = append(blockRanges, pkg.RangeRequest{
Start: uint64(hardStart),
Stop: uint64(hardStop),
Params: preRunParams,
@ -86,7 +89,29 @@ func setupPreRunRanges() []sd.RangeRequest {
return blockRanges
}
func instantiateLevelDBReader() (sd.Reader, *params.ChainConfig, node.Info) {
// LoadConfig loads chain config from json file
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath)
if err != nil {
log.Error(fmt.Sprintf("Failed to read chain config file: %v", err))
return nil, err
}
defer file.Close()
chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error(fmt.Sprintf("invalid chain config file: %v", err))
return nil, err
}
log.Info(fmt.Sprintf("Using chain config from %s file. Content %+v", chainConfigPath, chainConfig))
return chainConfig, nil
}
func instantiateLevelDBReader() (pkg.Reader, *params.ChainConfig, node.Info) {
// load some necessary params
logWithCommand.Info("Loading statediff service parameters")
mode := viper.GetString("leveldb.mode")
@ -108,23 +133,15 @@ func instantiateLevelDBReader() (sd.Reader, *params.ChainConfig, node.Info) {
nodeInfo := getEthNodeInfo()
var chainConf *params.ChainConfig
var err error
chainConfigPath := viper.GetString("ethereum.chainConfig")
if chainConfigPath != "" {
chainConf, err = statediff.LoadConfig(chainConfigPath)
} else {
chainConf, err = statediff.ChainConfig(nodeInfo.ChainID)
}
chainConf, err := LoadConfig(chainConfigPath)
if err != nil {
logWithCommand.Fatalf("Unable to instantiate chain config: %s", err.Error())
logWithCommand.Fatalf("Unable to instantiate chain config: %s", err)
}
// create LevelDB reader
logWithCommand.Info("Creating LevelDB reader")
readerConf := sd.LvLDBReaderConfig{
readerConf := pkg.LvLDBReaderConfig{
TrieConfig: &trie.Config{
Cache: viper.GetInt("cache.trie"),
Journal: "",
@ -137,9 +154,9 @@ func instantiateLevelDBReader() (sd.Reader, *params.ChainConfig, node.Info) {
Url: url,
DBCacheSize: viper.GetInt("cache.database"),
}
reader, err := sd.NewLvlDBReader(readerConf)
reader, err := pkg.NewLvlDBReader(readerConf)
if err != nil {
logWithCommand.Fatalf("Unable to instantiate levelDB reader: %s", err.Error())
logWithCommand.Fatalf("Unable to instantiate levelDB reader: %s", err)
}
return reader, chainConf, nodeInfo
}

139
go.mod
View File

@ -4,16 +4,143 @@ go 1.19
require (
github.com/cerc-io/leveldb-ethdb-rpc v1.1.13
github.com/ethereum/go-ethereum v1.11.5
github.com/cerc-io/plugeth-statediff v0.0.0-00010101000000-000000000000
github.com/ethereum/go-ethereum v1.12.0
github.com/jmoiron/sqlx v1.3.5 // indirect
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1
)
replace (
github.com/cerc-io/plugeth-statediff => git.vdb.to/cerc-io/plugeth-statediff 840d4a4e5d42110e694941afa734dac8fea126fe
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
// github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cerc-io/eth-iterator-utils v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.10.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230720154706-692f3b61a3c4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.3.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/georgysavva/scany v0.2.9 // indirect
github.com/getsentry/sentry-go v0.22.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/go-bexpr v0.1.12 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/inconshreveable/log15 v2.16.0+incompatible // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.4.0 // indirect
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.8.1 // indirect
github.com/jackc/pgx/v4 v4.13.0 // indirect
github.com/jackc/puddle v1.1.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/pointerstructure v1.2.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/openrelayxyz/plugeth-utils v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect
github.com/pganalyze/pg_query_go/v4 v4.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/cors v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/thoas/go-funk v0.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
replace (
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.0.0-20230803115933-6bb6d4e27fd2
// github.com/cerc-io/plugeth-statediff => git.vdb.to/cerc-io/plugeth-statediff v0.1.0
github.com/cerc-io/plugeth-statediff => ../plugeth-statediff
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
)

547
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -30,11 +30,11 @@ const APIVersion = "0.0.1"
// PublicStateDiffAPI provides an RPC interface
// that can be used to fetch historical diffs from LevelDB directly
type PublicStateDiffAPI struct {
sds StateDiffService
sds *Service
}
// NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service
func NewPublicStateDiffAPI(sds StateDiffService) *PublicStateDiffAPI {
func NewPublicStateDiffAPI(sds *Service) *PublicStateDiffAPI {
return &PublicStateDiffAPI{
sds: sds,
}
@ -45,11 +45,6 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
return api.sds.StateDiffAt(blockNumber, params)
}
// StateTrieAt returns a state trie payload at the specific blockheight
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) {
return api.sds.StateTrieAt(blockNumber, params)
}
// WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error {
return api.sds.WriteStateDiffAt(blockNumber, params)

View File

@ -1,159 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package statediff
import (
"fmt"
"math/bits"
"sync"
sd "github.com/cerc-io/plugeth-statediff"
sdtrie "github.com/cerc-io/plugeth-statediff/trie_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/sirupsen/logrus"
)
type builder struct {
sd.StateDiffBuilder
numWorkers uint
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache state.Database, workers uint) (sd.Builder, error) {
if workers == 0 {
workers = 1
}
if bits.OnesCount(workers) != 1 {
return nil, fmt.Errorf("workers must be a power of 2")
}
return &builder{
StateDiffBuilder: sd.StateDiffBuilder{
StateCache: stateCache,
},
numWorkers: workers,
}, nil
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sdtypes.StateObject, error) {
var stateNodes []sdtypes.StateNode
var codeAndCodeHashes []sdtypes.CodeAndCodeHash
err := sdb.WriteStateDiffObject(
args,
params, sd.StateNodeAppender(&stateNodes), sd.CodeMappingAppender(&codeAndCodeHashes))
if err != nil {
return sdtypes.StateObject{}, err
}
return sdtypes.StateObject{
BlockHash: args.BlockHash,
BlockNumber: args.BlockNumber,
Nodes: stateNodes,
CodeAndCodeHashes: codeAndCodeHashes,
}, nil
}
// WriteStateDiffObject writes a statediff object to output callback
func (sdb *builder) WriteStateDiffObject(args sd.Args, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
// Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for oldStateRoot: %v", err)
}
newTrie, err := sdb.StateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for newStateRoot: %v", err)
}
// Split old and new tries into corresponding subtrie iterators
oldIters1 := iter.SubtrieIterators(oldTrie, sdb.numWorkers)
oldIters2 := iter.SubtrieIterators(oldTrie, sdb.numWorkers)
newIters1 := iter.SubtrieIterators(newTrie, sdb.numWorkers)
newIters2 := iter.SubtrieIterators(newTrie, sdb.numWorkers)
// Create iterators ahead of time to avoid race condition in state.Trie access
// We do two state iterations per subtrie: one for new/updated nodes,
// one for deleted/updated nodes; prepare 2 iterator instances for each task
var iterPairs [][]sd.IterPair
for i := uint(0); i < sdb.numWorkers; i++ {
iterPairs = append(iterPairs, []sd.IterPair{
{Older: oldIters1[i], Newer: newIters1[i]},
{Older: oldIters2[i], Newer: newIters2[i]},
})
}
// Dispatch workers to process trie data; sync and collect results here via channels
nodeChan := make(chan sdtypes.StateNode)
codeChan := make(chan sdtypes.CodeAndCodeHash)
go func() {
nodeSender := func(node sdtypes.StateNode) error { nodeChan <- node; return nil }
codeSender := func(code sdtypes.CodeAndCodeHash) error { codeChan <- code; return nil }
var wg sync.WaitGroup
for w := uint(0); w < sdb.numWorkers; w++ {
wg.Add(1)
go func(worker uint) {
defer wg.Done()
var err error
logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber)
if !params.IntermediateStateNodes {
err = sdb.BuildStateDiffWithoutIntermediateStateNodes(iterPairs[worker], params, nodeSender, codeSender, logger)
} else {
err = sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs[worker], params, nodeSender, codeSender, logger)
}
if err != nil {
logrus.Errorf("buildStateDiff error for worker %d, params %+v", worker, params)
}
}(w)
}
wg.Wait()
close(nodeChan)
close(codeChan)
}()
for nodeChan != nil || codeChan != nil {
select {
case node, more := <-nodeChan:
if more {
if err := output(node); err != nil {
return err
}
} else {
nodeChan = nil
}
case codeAndCodeHash, more := <-codeChan:
if more {
if err := codeOutput(codeAndCodeHash); err != nil {
return err
}
} else {
codeChan = nil
}
}
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
package statediff
// Config holds config params for the statediffing service
type Config struct {
// ServiceConfig holds config params for the statediffing service
type ServiceConfig struct {
ServiceWorkers uint
TrieWorkers uint
WorkerQueueSize uint

View File

@ -22,9 +22,11 @@ package statediff
import (
"sort"
"strings"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
)
func sortKeys(data AccountMap) []string {
func sortKeys(data sdtypes.AccountMap) []string {
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)

View File

@ -56,23 +56,29 @@ type LvLDBReaderConfig struct {
DBCacheSize int
}
// NewLvlDBReader creates a new Read using LevelDB
// NewLvlDBReader creates a new Reader using LevelDB
func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) {
var edb ethdb.Database
var err error
if conf.Mode == "local" {
kvdb, _ := rawdb.NewLevelDBDatabase(conf.Path, conf.DBCacheSize, 256, "eth-statediff-service", true)
edb, err = rawdb.NewDatabaseWithFreezer(kvdb, conf.AncientPath, "eth-statediff-service", true)
}
if conf.Mode == "remote" {
edb, err = client.NewDatabaseClient(conf.Url)
}
switch conf.Mode {
case "local":
edb, err = rawdb.NewLevelDBDatabase(conf.Path, conf.DBCacheSize, 256, "eth-statediff-service", true)
if err != nil {
return nil, err
}
edb, err = rawdb.NewDatabaseWithFreezer(edb, conf.AncientPath, "eth-statediff-service", true)
if err != nil {
return nil, err
}
case "remote":
edb, err = client.NewDatabaseClient(conf.Url)
if err != nil {
return nil, err
}
}
return &LvlDBReader{
ethDB: edb,
stateDB: state.NewDatabaseWithConfig(edb, conf.TrieConfig),

View File

@ -22,12 +22,12 @@ import (
"sync"
"time"
sd "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
@ -38,35 +38,10 @@ import (
const defaultQueueSize = 1024
// StateDiffService is the state-diffing service interface
type StateDiffService interface {
// Lifecycle Start() and Stop()
node.Lifecycle
// APIs and Protocols() interface for node service registration
APIs() []rpc.API
Protocols() []p2p.Protocol
// Loop is the main event loop for processing state diffs
Loop(wg *sync.WaitGroup) error
// Run is a one-off command to run on a predefined set of ranges
Run(ranges []RangeRequest, parallel bool) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error)
// StateTrieAt method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// WriteStateDiffFor method to get state trie object at specific block
WriteStateDiffFor(blockHash common.Hash, params sd.Params) error
// WriteStateDiffsInRange method to wrtie state diff objects within the range directly to the DB
WriteStateDiffsInRange(start, stop uint64, params sd.Params) error
}
// Service is the underlying struct for the state diffing service
type Service struct {
// Used to build the state diff objects
Builder sd.Builder
Builder statediff.Builder
// Used to read data from LevelDB
lvlDBReader Reader
// Used to signal shutdown of the service
@ -82,22 +57,20 @@ type Service struct {
}
// NewStateDiffService creates a new Service
func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer, conf Config) (*Service, error) {
b, err := NewBuilder(lvlDBReader.StateDB(), conf.TrieWorkers)
if err != nil {
return nil, err
}
func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer, conf ServiceConfig) *Service {
builder := statediff.NewBuilder(adapt.GethStateView(lvlDBReader.StateDB()))
builder.SetSubtrieWorkers(conf.TrieWorkers)
if conf.WorkerQueueSize == 0 {
conf.WorkerQueueSize = defaultQueueSize
}
return &Service{
lvlDBReader: lvlDBReader,
Builder: b,
Builder: builder,
indexer: indexer,
workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize),
preruns: conf.PreRuns,
}, nil
}
}
// Protocols exports the services p2p protocols, this service has none
@ -117,7 +90,7 @@ func (sds *Service) APIs() []rpc.API {
}
}
func segmentRange(workers, start, stop uint64, params sd.Params) []RangeRequest {
func segmentRange(workers, start, stop uint64, params statediff.Params) []RangeRequest {
segmentSize := ((stop - start) + 1) / workers
remainder := ((stop - start) + 1) % workers
numOfSegments := workers
@ -251,7 +224,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
// StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) {
func (sds *Service) StateDiffAt(blockNumber uint64, params statediff.Params) (*statediff.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil {
return nil, err
@ -273,7 +246,7 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Paylo
// StateDiffFor returns a state diff object payload for the specific blockhash
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error) {
func (sds *Service) StateDiffFor(blockHash common.Hash, params statediff.Params) (*statediff.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash)
if err != nil {
return nil, err
@ -294,8 +267,8 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.P
}
// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params sd.Params) (*sd.Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(sd.Args{
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params statediff.Params) (*statediff.Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(statediff.Args{
BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(),
OldStateRoot: parentRoot,
@ -312,8 +285,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
return sds.newPayload(stateDiffRlp, currentBlock, params)
}
func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd.Params) (*sd.Payload, error) {
payload := &sd.Payload{
func (sds *Service) newPayload(stateObject []byte, block *types.Block, params statediff.Params) (*statediff.Payload, error) {
payload := &statediff.Payload{
StateObjectRlp: stateObject,
}
if params.IncludeBlock {
@ -344,34 +317,6 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd
return payload, nil
}
// StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil {
return nil, err
}
logrus.Infof("sending state trie at block %d", blockNumber)
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
return sds.processStateTrie(currentBlock, params)
}
func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil {
return nil, err
}
stateTrieRlp, err := rlp.EncodeToBytes(&stateNodes)
if err != nil {
return nil, err
}
logrus.Infof("state trie object at block %d is %d bytes in length", block.Number().Uint64(), len(stateTrieRlp))
return sds.newPayload(stateTrieRlp, block, params)
}
// Start is used to begin the service
func (sds *Service) Start() error {
logrus.Info("starting statediff service")
@ -388,7 +333,7 @@ func (sds *Service) Stop() error {
// WriteStateDiffAt writes a state diff at the specific blockheight directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error {
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error {
logrus.Infof("Writing state diff at block %d", blockNumber)
t := time.Now()
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
@ -413,7 +358,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// WriteStateDiffFor writes a state diff for the specific blockHash directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) error {
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params statediff.Params) error {
logrus.Infof("Writing state diff for block %s", blockHash.Hex())
t := time.Now()
currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash)
@ -436,7 +381,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) e
}
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params sd.Params, t time.Time) error {
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params statediff.Params, t time.Time) error {
var totalDifficulty *big.Int
var receipts types.Receipts
var err error
@ -461,28 +406,30 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err
}
// defer handling of commit/rollback for any return case
output := func(node sdtypes.StateNode) error {
output := func(node sdtypes.StateLeafNode) error {
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
codeOutput := func(c sdtypes.CodeAndCodeHash) error {
return sds.indexer.PushCodeAndCodeHash(tx, c)
codeOutput := func(c sdtypes.IPLD) error {
return sds.indexer.PushIPLD(tx, c)
}
prom.SetTimeMetric(prom.T_BLOCK_PROCESSING, time.Now().Sub(t))
t = time.Now()
err = sds.Builder.WriteStateDiffObject(sd.Args{
err = sds.Builder.WriteStateDiff(statediff.Args{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
BlockNumber: block.Number(),
BlockHash: block.Hash(),
}, params, output, codeOutput)
prom.SetTimeMetric(prom.T_STATE_PROCESSING, time.Now().Sub(t))
t = time.Now()
err = tx.Submit(err)
err = tx.Submit()
prom.SetLastProcessedHeight(height)
prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t))
return err
}
// WriteStateDiffsInRange adds a RangeRequest to the work queue
func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) error {
func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params statediff.Params) error {
if stop < start {
return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop)
}

View File

@ -21,22 +21,8 @@ package statediff
import (
sd "github.com/cerc-io/plugeth-statediff"
sdTypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/core/types"
)
// AccountMap is a mapping of hex encoded path => account wrapper
type AccountMap map[string]accountWrapper
// accountWrapper is used to temporary associate the unpacked node with its raw values
type accountWrapper struct {
Account *types.StateAccount
NodeType sdTypes.NodeType
Path []byte
NodeValue []byte
LeafKey []byte
}
// RangeRequest holds range quest work params
type RangeRequest struct {
Start, Stop uint64