refactor types & use plugin builder

- removes StateTrieAt, we can no longer return a full trie without inner nodes
This commit is contained in:
Roy Crihfield 2023-08-25 15:57:33 +08:00
parent f6fa9ad72b
commit 472b2a7f61
14 changed files with 415 additions and 3869 deletions

View File

@ -320,7 +320,7 @@ func getConfig(nodeInfo node.Info) (interfaces.Config, error) {
case dump.STDOUT: case dump.STDOUT:
indexerConfig = dump.Config{Dump: os.Stderr} indexerConfig = dump.Config{Dump: os.Stderr}
case dump.DISCARD: case dump.DISCARD:
indexerConfig = dump.Config{Dump: dump.NewDiscardWriterCloser()} indexerConfig = dump.Config{Dump: dump.Discard}
default: default:
return nil, fmt.Errorf("unrecognized dump destination: %s", dumpDst) return nil, fmt.Errorf("unrecognized dump destination: %s", dumpDst)
} }

View File

@ -50,14 +50,11 @@ func init() {
func serve() { func serve() {
logWithCommand.Info("Running eth-statediff-service serve command") logWithCommand.Info("Running eth-statediff-service serve command")
statediffService, err := createStateDiffService() service := createStateDiffService()
if err != nil {
logWithCommand.Fatal(err)
}
// short circuit if we only want to perform prerun // short circuit if we only want to perform prerun
if viper.GetBool("prerun.only") { if viper.GetBool("prerun.only") {
if err := statediffService.Run(nil); err != nil { if err := service.Run(nil); err != nil {
logWithCommand.Fatal("unable to perform prerun: %v", err) logWithCommand.Fatal("unable to perform prerun: %v", err)
} }
return return
@ -65,12 +62,12 @@ func serve() {
// start service and servers // start service and servers
logWithCommand.Info("Starting statediff service") logWithCommand.Info("Starting statediff service")
wg := new(sync.WaitGroup) var wg sync.WaitGroup
if err := statediffService.Loop(wg); err != nil { if err := service.Loop(&wg); err != nil {
logWithCommand.Fatalf("unable to start statediff service: %v", err) logWithCommand.Fatalf("unable to start statediff service: %v", err)
} }
logWithCommand.Info("Starting RPC servers") logWithCommand.Info("Starting RPC servers")
if err := startServers(statediffService); err != nil { if err := startServers(service); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
logWithCommand.Info("RPC servers successfully spun up; awaiting requests") logWithCommand.Info("RPC servers successfully spun up; awaiting requests")
@ -80,11 +77,11 @@ func serve() {
signal.Notify(shutdown, os.Interrupt) signal.Notify(shutdown, os.Interrupt)
<-shutdown <-shutdown
logWithCommand.Info("Received interrupt signal, shutting down") logWithCommand.Info("Received interrupt signal, shutting down")
statediffService.Stop() service.Stop()
wg.Wait() wg.Wait()
} }
func startServers(serv sd.StateDiffService) error { func startServers(serv *sd.Service) error {
ipcPath := viper.GetString("server.ipcPath") ipcPath := viper.GetString("server.ipcPath")
httpPath := viper.GetString("server.httpPath") httpPath := viper.GetString("server.httpPath")
if ipcPath == "" && httpPath == "" { if ipcPath == "" && httpPath == "" {

View File

@ -2,10 +2,13 @@ package cmd
import ( import (
"context" "context"
"encoding/json"
"fmt"
"os"
"github.com/cerc-io/plugeth-statediff"
gethsd "github.com/cerc-io/plugeth-statediff" gethsd "github.com/cerc-io/plugeth-statediff"
ind "github.com/cerc-io/plugeth-statediff/indexer" ind "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/utils/log"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
@ -16,7 +19,7 @@ import (
type blockRange [2]uint64 type blockRange [2]uint64
func createStateDiffService() (sd.StateDiffService, error) { func createStateDiffService() *sd.Service {
// load some necessary params // load some necessary params
logWithCommand.Info("Loading statediff service parameters") logWithCommand.Info("Loading statediff service parameters")
mode := viper.GetString("leveldb.mode") mode := viper.GetString("leveldb.mode")
@ -41,13 +44,7 @@ func createStateDiffService() (sd.StateDiffService, error) {
var chainConf *params.ChainConfig var chainConf *params.ChainConfig
var err error var err error
chainConfigPath := viper.GetString("ethereum.chainConfig") chainConfigPath := viper.GetString("ethereum.chainConfig")
chainConf, err = LoadConfig(chainConfigPath)
if chainConfigPath != "" {
chainConf, err = statediff.LoadConfig(chainConfigPath)
} else {
chainConf, err = statediff.ChainConfig(nodeInfo.ChainID)
}
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
@ -84,7 +81,7 @@ func createStateDiffService() (sd.StateDiffService, error) {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
logWithCommand.Info("Creating statediff service") logWithCommand.Info("Creating statediff service")
sdConf := sd.Config{ sdConf := sd.ServiceConfig{
ServiceWorkers: viper.GetUint("statediff.serviceWorkers"), ServiceWorkers: viper.GetUint("statediff.serviceWorkers"),
TrieWorkers: viper.GetUint("statediff.trieWorkers"), TrieWorkers: viper.GetUint("statediff.trieWorkers"),
WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"), WorkerQueueSize: viper.GetUint("statediff.workerQueueSize"),
@ -98,8 +95,6 @@ func setupPreRunRanges() []sd.RangeRequest {
return nil return nil
} }
preRunParams := gethsd.Params{ preRunParams := gethsd.Params{
IntermediateStateNodes: viper.GetBool("prerun.params.intermediateStateNodes"),
IntermediateStorageNodes: viper.GetBool("prerun.params.intermediateStorageNodes"),
IncludeBlock: viper.GetBool("prerun.params.includeBlock"), IncludeBlock: viper.GetBool("prerun.params.includeBlock"),
IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"), IncludeReceipts: viper.GetBool("prerun.params.includeReceipts"),
IncludeTD: viper.GetBool("prerun.params.includeTD"), IncludeTD: viper.GetBool("prerun.params.includeTD"),
@ -134,3 +129,25 @@ func setupPreRunRanges() []sd.RangeRequest {
return blockRanges return blockRanges
} }
// LoadConfig loads chain config from json file
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath)
if err != nil {
log.Error(fmt.Sprintf("Failed to read chain config file: %v", err))
return nil, err
}
defer file.Close()
chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error(fmt.Sprintf("invalid chain config file: %v", err))
return nil, err
}
log.Info(fmt.Sprintf("Using chain config from %s file. Content %+v", chainConfigPath, chainConfig))
return chainConfig, nil
}

141
go.mod
View File

@ -3,17 +3,144 @@ module github.com/vulcanize/eth-statediff-service
go 1.19 go 1.19
require ( require (
github.com/cerc-io/plugeth-statediff v0.0.0-00010101000000-000000000000
github.com/ethereum/go-ethereum v1.12.0
github.com/jmoiron/sqlx v1.2.0 github.com/jmoiron/sqlx v1.2.0
github.com/prometheus/client_golang v1.4.0 github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.7.0 github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.3.0 github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1 github.com/spf13/viper v1.10.1
github.com/vulcanize/go-eth-state-node-iterator v1.0.3
github.com/vulcanize/leveldb-ethdb-rpc v0.1.2 github.com/vulcanize/leveldb-ethdb-rpc v0.1.2
) )
replace ( require (
github.com/cerc-io/plugeth-statediff => git.vdb.to/cerc-io/plugeth-statediff 840d4a4e5d42110e694941afa734dac8fea126fe github.com/DataDog/zstd v1.5.5 // indirect
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
// github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cerc-io/eth-iterator-utils v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.10.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230720154706-692f3b61a3c4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.3.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/georgysavva/scany v0.2.9 // indirect
github.com/getsentry/sentry-go v0.22.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/go-bexpr v0.1.12 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/inconshreveable/log15 v2.16.0+incompatible // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.4.0 // indirect
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.8.1 // indirect
github.com/jackc/pgx/v4 v4.13.0 // indirect
github.com/jackc/puddle v1.1.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/pointerstructure v1.2.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/openrelayxyz/plugeth-utils v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect
github.com/pganalyze/pg_query_go/v4 v4.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/cors v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
replace (
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.0.0-20230803115933-6bb6d4e27fd2
// github.com/cerc-io/plugeth-statediff => git.vdb.to/cerc-io/plugeth-statediff v0.1.0
github.com/cerc-io/plugeth-statediff => ../plugeth-statediff
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
) )

497
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -30,11 +30,11 @@ const APIVersion = "0.0.1"
// PublicStateDiffAPI provides an RPC interface // PublicStateDiffAPI provides an RPC interface
// that can be used to fetch historical diffs from LevelDB directly // that can be used to fetch historical diffs from LevelDB directly
type PublicStateDiffAPI struct { type PublicStateDiffAPI struct {
sds StateDiffService sds *Service
} }
// NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service // NewPublicStateDiffAPI creates an rpc interface for the underlying statediff service
func NewPublicStateDiffAPI(sds StateDiffService) *PublicStateDiffAPI { func NewPublicStateDiffAPI(sds *Service) *PublicStateDiffAPI {
return &PublicStateDiffAPI{ return &PublicStateDiffAPI{
sds: sds, sds: sds,
} }
@ -45,11 +45,6 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
return api.sds.StateDiffAt(blockNumber, params) return api.sds.StateDiffAt(blockNumber, params)
} }
// StateTrieAt returns a state trie payload at the specific blockheight
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) {
return api.sds.StateTrieAt(blockNumber, params)
}
// WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight // WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error { func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) error {
return api.sds.WriteStateDiffAt(blockNumber, params) return api.sds.WriteStateDiffAt(blockNumber, params)

View File

@ -1,886 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package statediff
import (
"bytes"
"fmt"
"math/bits"
"sync"
sd "github.com/cerc-io/plugeth-statediff"
sdtrie "github.com/cerc-io/plugeth-statediff/trie_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/sirupsen/logrus"
iter "github.com/vulcanize/go-eth-state-node-iterator"
)
var (
nullHashBytes = common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000000000")
emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes()
)
// Builder interface exposes the method for building a state diff between two blocks
type Builder interface {
BuildStateDiffObject(args sd.Args, params sd.Params) (sdtypes.StateObject, error)
BuildStateTrieObject(current *types.Block) (sdtypes.StateObject, error)
WriteStateDiffObject(args sdtypes.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error
}
type builder struct {
stateCache state.Database
numWorkers uint
}
type iterPair struct {
older, newer trie.NodeIterator
}
func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (sdtypes.StateNode, []interface{}, error) {
nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path())
node, err := trieDB.Node(it.Hash())
if err != nil {
return sdtypes.StateNode{}, nil, err
}
var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil {
return sdtypes.StateNode{}, nil, err
}
ty, err := sdtrie.CheckKeyType(nodeElements)
if err != nil {
return sdtypes.StateNode{}, nil, err
}
return sdtypes.StateNode{
NodeType: ty,
Path: nodePath,
NodeValue: node,
}, nodeElements, nil
}
// convenience
func stateNodeAppender(nodes *[]sdtypes.StateNode) sdtypes.StateNodeSink {
return func(node sdtypes.StateNode) error {
*nodes = append(*nodes, node)
return nil
}
}
func storageNodeAppender(nodes *[]sdtypes.StorageNode) sdtypes.StorageNodeSink {
return func(node sdtypes.StorageNode) error {
*nodes = append(*nodes, node)
return nil
}
}
func codeMappingAppender(data *[]sdtypes.CodeAndCodeHash) sdtypes.CodeSink {
return func(c sdtypes.CodeAndCodeHash) error {
*data = append(*data, c)
return nil
}
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache state.Database, workers uint) (Builder, error) {
if workers == 0 {
workers = 1
}
if bits.OnesCount(workers) != 1 {
return nil, fmt.Errorf("workers must be a power of 2")
}
return &builder{
stateCache: stateCache, // state cache is safe for concurrent reads
numWorkers: workers,
}, nil
}
// BuildStateTrieObject builds a state trie object from the provided block
func (sdb *builder) BuildStateTrieObject(current *types.Block) (sdtypes.StateObject, error) {
currentTrie, err := sdb.stateCache.OpenTrie(current.Root())
if err != nil {
return sdtypes.StateObject{}, fmt.Errorf("error creating trie for block %d: %v", current.Number(), err)
}
it := currentTrie.NodeIterator([]byte{})
stateNodes, codeAndCodeHashes, err := sdb.buildStateTrie(it)
if err != nil {
return sdtypes.StateObject{}, fmt.Errorf("error collecting state nodes for block %d: %v", current.Number(), err)
}
return sdtypes.StateObject{
BlockNumber: current.Number(),
BlockHash: current.Hash(),
Nodes: stateNodes,
CodeAndCodeHashes: codeAndCodeHashes,
}, nil
}
func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]sdtypes.StateNode, []sdtypes.CodeAndCodeHash, error) {
stateNodes := make([]sdtypes.StateNode, 0)
codeAndCodeHashes := make([]sdtypes.CodeAndCodeHash, 0)
for it.Next(true) {
// skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return nil, nil, err
}
switch node.NodeType {
case sdtypes.Leaf:
var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
node.LeafKey = leafKey
if !bytes.Equal(account.CodeHash, nullCodeHash) {
var storageNodes []sdtypes.StorageNode
err := sdb.buildStorageNodesEventual(account.Root, true, storageNodeAppender(&storageNodes))
if err != nil {
return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err)
}
node.StorageNodes = storageNodes
// emit codehash => code mappings for code
codeHash := common.BytesToHash(account.CodeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
codeAndCodeHashes = append(codeAndCodeHashes, sdtypes.CodeAndCodeHash{
Hash: codeHash,
Code: code,
})
}
stateNodes = append(stateNodes, node)
case sdtypes.Extension, sdtypes.Branch:
stateNodes = append(stateNodes, node)
default:
return nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType)
}
}
return stateNodes, codeAndCodeHashes, it.Error()
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sdtypes.StateObject, error) {
var stateNodes []sdtypes.StateNode
var codeAndCodeHashes []sdtypes.CodeAndCodeHash
err := sdb.WriteStateDiffObject(
sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot},
params, stateNodeAppender(&stateNodes), codeMappingAppender(&codeAndCodeHashes))
if err != nil {
return sdtypes.StateObject{}, err
}
return sdtypes.StateObject{
BlockHash: args.BlockHash,
BlockNumber: args.BlockNumber,
Nodes: stateNodes,
CodeAndCodeHashes: codeAndCodeHashes,
}, nil
}
// WriteStateDiffObject writes a statediff object to output callback
func (sdb *builder) WriteStateDiffObject(args sdtypes.StateRoots, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
if len(params.WatchedAddresses) > 0 {
// if we are watching only specific accounts then we are only diffing leaf nodes
log.Info("Ignoring intermediate state nodes because WatchedAddresses was passed")
params.IntermediateStateNodes = false
}
// Load tries for old and new states
oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for oldStateRoot: %v", err)
}
newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for newStateRoot: %v", err)
}
// Split old and new tries into corresponding subtrie iterators
oldIters1 := iter.SubtrieIterators(oldTrie, sdb.numWorkers)
oldIters2 := iter.SubtrieIterators(oldTrie, sdb.numWorkers)
newIters1 := iter.SubtrieIterators(newTrie, sdb.numWorkers)
newIters2 := iter.SubtrieIterators(newTrie, sdb.numWorkers)
// Create iterators ahead of time to avoid race condition in state.Trie access
// We do two state iterations per subtrie: one for new/updated nodes,
// one for deleted/updated nodes; prepare 2 iterator instances for each task
var iterPairs [][]iterPair
for i := uint(0); i < sdb.numWorkers; i++ {
iterPairs = append(iterPairs, []iterPair{
{older: oldIters1[i], newer: newIters1[i]},
{older: oldIters2[i], newer: newIters2[i]},
})
}
// Dispatch workers to process trie data; sync and collect results here via channels
nodeChan := make(chan sdtypes.StateNode)
codeChan := make(chan sdtypes.CodeAndCodeHash)
go func() {
nodeSender := func(node sdtypes.StateNode) error { nodeChan <- node; return nil }
codeSender := func(code sdtypes.CodeAndCodeHash) error { codeChan <- code; return nil }
var wg sync.WaitGroup
for w := uint(0); w < sdb.numWorkers; w++ {
wg.Add(1)
go func(worker uint) {
defer wg.Done()
if err := sdb.buildStateDiff(iterPairs[worker], params, nodeSender, codeSender); err != nil {
logrus.Errorf("buildStateDiff error for worker %d, pparams %+v", worker, params)
}
}(w)
}
wg.Wait()
close(nodeChan)
close(codeChan)
}()
for nodeChan != nil || codeChan != nil {
select {
case node, more := <-nodeChan:
if more {
if err := output(node); err != nil {
return err
}
} else {
nodeChan = nil
}
case codeAndCodeHash, more := <-codeChan:
if more {
if err := codeOutput(codeAndCodeHash); err != nil {
return err
}
} else {
codeChan = nil
}
}
}
return nil
}
func (sdb *builder) buildStateDiff(args []iterPair, params sd.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
// collect a slice of all the intermediate nodes that were touched and exist at B
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
var diffAccountsAtB AccountMap
var diffPathsAtB map[string]bool
var err error
if params.IntermediateStateNodes {
diffAccountsAtB, diffPathsAtB, err = sdb.createdAndUpdatedStateWithIntermediateNodes(args[0], output)
} else {
diffAccountsAtB, diffPathsAtB, err = sdb.createdAndUpdatedState(args[0], params.WatchedAddressesLeafKeys())
}
if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
}
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState(args[1], diffAccountsAtB, diffPathsAtB, params.WatchedAddressesLeafKeys(), params.IntermediateStorageNodes, output)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
}
// collect and sort the leafkeys for both account mappings into a slice
createKeys := sortKeys(diffAccountsAtB)
deleteKeys := sortKeys(diffAccountsAtA)
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
updatedKeys := findIntersection(createKeys, deleteKeys)
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(
diffAccountsAtB, diffAccountsAtA, updatedKeys, params.IntermediateStorageNodes, output)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, params.IntermediateStorageNodes, output, codeOutput)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err)
}
return nil
}
// createdAndUpdatedState returns
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *builder) createdAndUpdatedState(iters iterPair, watchedAddressesLeafKeys map[common.Hash]struct{}) (AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(AccountMap)
it, _ := trie.NewDifferenceIterator(iters.older, iters.newer)
for it.Next(true) {
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := sdtrie.ResolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return nil, nil, err
}
switch node.NodeType {
case sdtypes.Leaf:
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
diffAcountsAtB[common.Bytes2Hex(leafKey)] = accountWrapper{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
Account: &account,
}
}
}
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(node.Path)] = true
}
return diffAcountsAtB, diffPathsAtB, it.Error()
}
// createdAndUpdatedStateWithIntermediateNodes returns
// a slice of all the intermediate nodes that exist in a different state at B than A
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(iters iterPair, output sdtypes.StateNodeSink) (AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(AccountMap)
it, _ := trie.NewDifferenceIterator(iters.older, iters.newer)
for it.Next(true) {
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return nil, nil, err
}
switch node.NodeType {
case sdtypes.Leaf:
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change paths but not leafkey
var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
diffAcountsAtB[common.Bytes2Hex(leafKey)] = accountWrapper{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
Account: &account,
}
case sdtypes.Extension, sdtypes.Branch:
// create a diff for any intermediate node that has changed at b
// created vs updated makes no difference for intermediate nodes since we do not need to diff storage
if err := output(sdtypes.StateNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
}); err != nil {
return nil, nil, err
}
default:
return nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType)
}
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(node.Path)] = true
}
return diffAcountsAtB, diffPathsAtB, it.Error()
}
// deletedOrUpdatedState returns a slice of all the paths that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *builder) deletedOrUpdatedState(iters iterPair, diffAccountsAtB AccountMap, diffPathsAtB map[string]bool, watchedAddressesLeafKeys map[common.Hash]struct{}, intermediateStorageNodes bool, output sdtypes.StateNodeSink) (AccountMap, error) {
diffAccountAtA := make(AccountMap)
it, _ := trie.NewDifferenceIterator(iters.newer, iters.older)
for it.Next(true) {
// skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := sdtrie.ResolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return nil, err
}
switch node.NodeType {
case sdtypes.Leaf:
// map all different accounts at A to their leafkey
var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
diffAccountAtA[common.Bytes2Hex(leafKey)] = accountWrapper{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
Account: &account,
}
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
var diff sdtypes.StateNode
// if this node's leaf key also did not show up in diffAccountsAtB
// that means the node was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
if _, ok := diffAccountsAtB[common.Bytes2Hex(leafKey)]; !ok {
diff = sdtypes.StateNode{
NodeType: sdtypes.Removed,
Path: node.Path,
LeafKey: leafKey,
NodeValue: []byte{},
}
var storageDiffs []sdtypes.StorageNode
err := sdb.buildRemovedAccountStorageNodes(account.Root, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
if err != nil {
return nil, fmt.Errorf("failed building storage diffs for removed node %x\r\nerror: %v", node.Path, err)
}
diff.StorageNodes = storageDiffs
} else {
// emit an empty "removed" diff with empty leaf key if the account was moved
diff = sdtypes.StateNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
}
}
if err := output(diff); err != nil {
return nil, err
}
}
}
case sdtypes.Extension, sdtypes.Branch:
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B
// emit an empty "removed" diff to signify as such
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
if err := output(sdtypes.StateNode{
Path: node.Path,
NodeValue: []byte{},
NodeType: sdtypes.Removed,
}); err != nil {
return nil, err
}
}
// fall through, we did everything we need to do with these node types
default:
return nil, fmt.Errorf("unexpected node type %s", node.NodeType)
}
}
return diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updatedKeys []string, intermediateStorageNodes bool, output sdtypes.StateNodeSink) error {
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
deletedAcc := deletions[key]
var storageDiffs []sdtypes.StorageNode
if deletedAcc.Account != nil && createdAcc.Account != nil {
oldSR := deletedAcc.Account.Root
newSR := createdAcc.Account.Root
err = sdb.buildStorageNodesIncremental(oldSR, newSR, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %s\r\nerror: %v", key, err)
}
}
if err = output(sdtypes.StateNode{
NodeType: createdAcc.NodeType,
Path: createdAcc.Path,
NodeValue: createdAcc.NodeValue,
LeafKey: createdAcc.LeafKey,
StorageNodes: storageDiffs,
}); err != nil {
return err
}
delete(creations, key)
delete(deletions, key)
}
return nil
}
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *builder) buildAccountCreations(accounts AccountMap, intermediateStorageNodes bool, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error {
for _, val := range accounts {
diff := sdtypes.StateNode{
NodeType: val.NodeType,
Path: val.Path,
LeafKey: val.LeafKey,
NodeValue: val.NodeValue,
}
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
var storageDiffs []sdtypes.StorageNode
err := sdb.buildStorageNodesEventual(val.Account.Root, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node %x\r\nerror: %v", val.Path, err)
}
diff.StorageNodes = storageDiffs
// emit codehash => code mappings for code
codeHash := common.BytesToHash(val.Account.CodeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
if err := codeOutput(sdtypes.CodeAndCodeHash{
Hash: codeHash,
Code: code,
}); err != nil {
return err
}
}
if err := output(diff); err != nil {
return err
}
}
return nil
}
// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *builder) buildStorageNodesEventual(sr common.Hash, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
log.Debug("Storage Root For Eventual Diff", "root", sr.Hex())
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build storage diff eventual", "error", err)
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
err = sdb.buildStorageNodesFromTrie(it, intermediateNodes, output)
if err != nil {
return err
}
return nil
}
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
// if any storage keys are provided it will only return those leaf nodes
// including intermediate nodes can be turned on or off
func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
for it.Next(true) {
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return err
}
switch node.NodeType {
case sdtypes.Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if err := output(sdtypes.StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return err
}
case sdtypes.Extension, sdtypes.Branch:
if intermediateNodes {
if err := output(sdtypes.StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
}); err != nil {
return err
}
}
default:
return fmt.Errorf("unexpected node type %s", node.NodeType)
}
}
return it.Error()
}
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *builder) buildRemovedAccountStorageNodes(sr common.Hash, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
log.Debug("Storage Root For Removed Diffs", "root", sr.Hex())
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
err = sdb.buildRemovedStorageNodesFromTrie(it, intermediateNodes, output)
if err != nil {
return err
}
return nil
}
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
// including intermediate nodes can be turned on or off
func (sdb *builder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
for it.Next(true) {
// skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return err
}
switch node.NodeType {
case sdtypes.Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if err := output(sdtypes.StorageNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
LeafKey: leafKey,
}); err != nil {
return err
}
case sdtypes.Extension, sdtypes.Branch:
if intermediateNodes {
if err := output(sdtypes.StorageNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
}); err != nil {
return err
}
}
default:
return fmt.Errorf("unexpected node type %s", node.NodeType)
}
}
return it.Error()
}
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
return nil
}
log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex())
oldTrie, err := sdb.stateCache.OpenTrie(oldSR)
if err != nil {
return err
}
newTrie, err := sdb.stateCache.OpenTrie(newSR)
if err != nil {
return err
}
diffSlotsAtB, diffPathsAtB, err := sdb.createdAndUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), intermediateNodes, output)
if err != nil {
return err
}
err = sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), diffSlotsAtB, diffPathsAtB, intermediateNodes, output)
if err != nil {
return err
}
return nil
}
func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, intermediateNodes bool, output sdtypes.StorageNodeSink) (map[string]bool, map[string]bool, error) {
diffPathsAtB := make(map[string]bool)
diffSlotsAtB := make(map[string]bool)
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return nil, nil, err
}
switch node.NodeType {
case sdtypes.Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
diffSlotsAtB[common.Bytes2Hex(leafKey)] = true
if err := output(sdtypes.StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return nil, nil, err
}
case sdtypes.Extension, sdtypes.Branch:
if intermediateNodes {
if err := output(sdtypes.StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
}); err != nil {
return nil, nil, err
}
}
default:
return nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType)
}
diffPathsAtB[common.Bytes2Hex(node.Path)] = true
}
return diffSlotsAtB, diffPathsAtB, it.Error()
}
func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB, diffPathsAtB map[string]bool, intermediateNodes bool, output sdtypes.StorageNodeSink) error {
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
// skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue
}
node, nodeElements, err := sdtrie.ResolveNode(it, sdb.stateCache.TrieDB())
if err != nil {
return err
}
switch node.NodeType {
case sdtypes.Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
// if this node's leaf key also did not show up in diffSlotsAtB
// that means the node was deleted
// in that case, emit an empty "removed" diff storage node
if _, ok := diffSlotsAtB[common.Bytes2Hex(leafKey)]; !ok {
if err := output(sdtypes.StorageNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
LeafKey: leafKey,
}); err != nil {
return err
}
} else {
// emit an empty "removed" diff with empty leaf key if the account was moved
if err := output(sdtypes.StorageNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
}); err != nil {
return err
}
}
}
case sdtypes.Extension, sdtypes.Branch:
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted in B
// in that case, emit an empty "removed" diff storage node
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
if intermediateNodes {
if err := output(sdtypes.StorageNode{
NodeType: sdtypes.Removed,
Path: node.Path,
NodeValue: []byte{},
}); err != nil {
return err
}
}
}
default:
return fmt.Errorf("unexpected node type %s", node.NodeType)
}
}
return it.Error()
}
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func isWatchedAddress(watchedAddressesLeafKeys map[common.Hash]struct{}, stateLeafKey []byte) bool {
// If we aren't watching any specific addresses, we are watching everything
if len(watchedAddressesLeafKeys) == 0 {
return true
}
_, ok := watchedAddressesLeafKeys[common.BytesToHash(stateLeafKey)]
return ok
}
// isWatchedStorageKey is used to check if a storage leaf corresponds to one of the storage slots the builder is configured to watch
func isWatchedStorageKey(watchedKeys []common.Hash, storageLeafKey []byte) bool {
// If we aren't watching any specific addresses, we are watching everything
if len(watchedKeys) == 0 {
return true
}
for _, hashKey := range watchedKeys {
if bytes.Equal(hashKey.Bytes(), storageLeafKey) {
return true
}
}
return false
}

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
package statediff package statediff
// Config holds config params for the statediffing service // ServiceConfig holds config params for the statediffing service
type Config struct { type ServiceConfig struct {
ServiceWorkers uint ServiceWorkers uint
TrieWorkers uint TrieWorkers uint
WorkerQueueSize uint WorkerQueueSize uint

View File

@ -22,9 +22,11 @@ package statediff
import ( import (
"sort" "sort"
"strings" "strings"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
func sortKeys(data AccountMap) []string { func sortKeys(data sdtypes.AccountMap) []string {
keys := make([]string, 0, len(data)) keys := make([]string, 0, len(data))
for key := range data { for key := range data {
keys = append(keys, key) keys = append(keys, key)

View File

@ -54,22 +54,29 @@ type LvLDBReaderConfig struct {
DBCacheSize int DBCacheSize int
} }
// NewLvlDBReader creates a new Read using LevelDB // NewLvlDBReader creates a new Reader using LevelDB
func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) { func NewLvlDBReader(conf LvLDBReaderConfig) (*LvlDBReader, error) {
var edb ethdb.Database var edb ethdb.Database
var err error var err error
if conf.Mode == "local" { switch conf.Mode {
edb, err = rawdb.NewLevelDBDatabaseWithFreezer(conf.Path, conf.DBCacheSize, 256, conf.AncientPath, "eth-statediff-service", true) case "local":
} edb, err = rawdb.NewLevelDBDatabase(conf.Path, conf.DBCacheSize, 256, "eth-statediff-service", true)
if conf.Mode == "remote" {
edb, err = client.NewDatabaseClient(conf.Url)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
edb, err = rawdb.NewDatabaseWithFreezer(edb, conf.AncientPath, "eth-statediff-service", true)
if err != nil {
return nil, err
}
case "remote":
edb, err = client.NewDatabaseClient(conf.Url)
if err != nil {
return nil, err
}
}
return &LvlDBReader{ return &LvlDBReader{
ethDB: edb, ethDB: edb,
stateDB: state.NewDatabaseWithConfig(edb, conf.TrieConfig), stateDB: state.NewDatabaseWithConfig(edb, conf.TrieConfig),

View File

@ -31,7 +31,7 @@ import (
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) { func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) {
srv := rpc.NewServer() srv := rpc.NewServer()
err := node.RegisterApis(apis, modules, srv, false) err := node.RegisterApis(apis, modules, srv)
if err != nil { if err != nil {
utils.Fatalf("Could not register HTTP API: %w", err) utils.Fatalf("Could not register HTTP API: %w", err)
} }

View File

@ -22,12 +22,12 @@ import (
"sync" "sync"
"time" "time"
sd "github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -38,35 +38,10 @@ import (
const defaultQueueSize = 1024 const defaultQueueSize = 1024
// StateDiffService is the state-diffing service interface
type StateDiffService interface {
// Lifecycle Start() and Stop()
node.Lifecycle
// APIs and Protocols() interface for node service registration
APIs() []rpc.API
Protocols() []p2p.Protocol
// Loop is the main event loop for processing state diffs
Loop(wg *sync.WaitGroup) error
// Run is a one-off command to run on a predefined set of ranges
Run(ranges []RangeRequest) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error)
// StateTrieAt method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// WriteStateDiffFor method to get state trie object at specific block
WriteStateDiffFor(blockHash common.Hash, params sd.Params) error
// WriteStateDiffsInRange method to wrtie state diff objects within the range directly to the DB
WriteStateDiffsInRange(start, stop uint64, params sd.Params) error
}
// Service is the underlying struct for the state diffing service // Service is the underlying struct for the state diffing service
type Service struct { type Service struct {
// Used to build the state diff objects // Used to build the state diff objects
Builder Builder Builder statediff.Builder
// Used to read data from LevelDB // Used to read data from LevelDB
lvlDBReader Reader lvlDBReader Reader
// Used to signal shutdown of the service // Used to signal shutdown of the service
@ -82,22 +57,22 @@ type Service struct {
} }
// NewStateDiffService creates a new Service // NewStateDiffService creates a new Service
func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer, conf Config) (*Service, error) { func NewStateDiffService(lvlDBReader Reader, indexer interfaces.StateDiffIndexer, conf ServiceConfig) *Service {
b, err := NewBuilder(lvlDBReader.StateDB(), conf.TrieWorkers) builder := statediff.NewBuilderWithOptions(
if err != nil { adapt.GethStateView(lvlDBReader.StateDB()),
return nil, err statediff.BuilderOptions{SubtrieWorkers: conf.TrieWorkers},
} )
if conf.WorkerQueueSize == 0 { if conf.WorkerQueueSize == 0 {
conf.WorkerQueueSize = defaultQueueSize conf.WorkerQueueSize = defaultQueueSize
} }
return &Service{ return &Service{
lvlDBReader: lvlDBReader, lvlDBReader: lvlDBReader,
Builder: b, Builder: builder,
indexer: indexer, indexer: indexer,
workers: conf.ServiceWorkers, workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize), queue: make(chan RangeRequest, conf.WorkerQueueSize),
preruns: conf.PreRuns, preruns: conf.PreRuns,
}, nil }
} }
// Protocols exports the services p2p protocols, this service has none // Protocols exports the services p2p protocols, this service has none
@ -188,7 +163,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) error {
// StateDiffAt returns a state diff object payload at the specific blockheight // StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) { func (sds *Service) StateDiffAt(blockNumber uint64, params statediff.Params) (*statediff.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil { if err != nil {
return nil, err return nil, err
@ -196,7 +171,7 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Paylo
logrus.Infof("sending state diff at block %d", blockNumber) logrus.Infof("sending state diff at block %d", blockNumber)
// compute leaf keys of watched addresses in the params // compute leaf keys of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
if blockNumber == 0 { if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
@ -210,7 +185,7 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Paylo
// StateDiffFor returns a state diff object payload for the specific blockhash // StateDiffFor returns a state diff object payload for the specific blockhash
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.Payload, error) { func (sds *Service) StateDiffFor(blockHash common.Hash, params statediff.Params) (*statediff.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash) currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash)
if err != nil { if err != nil {
return nil, err return nil, err
@ -218,7 +193,7 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.P
logrus.Infof("sending state diff at block %s", blockHash.Hex()) logrus.Infof("sending state diff at block %s", blockHash.Hex())
// compute leaf keys of watched addresses in the params // compute leaf keys of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
if currentBlock.NumberU64() == 0 { if currentBlock.NumberU64() == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
@ -231,8 +206,8 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params sd.Params) (*sd.P
} }
// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params // processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params sd.Params) (*sd.Payload, error) { func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params statediff.Params) (*statediff.Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(sd.Args{ stateDiff, err := sds.Builder.BuildStateDiffObject(statediff.Args{
BlockHash: currentBlock.Hash(), BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(), BlockNumber: currentBlock.Number(),
OldStateRoot: parentRoot, OldStateRoot: parentRoot,
@ -249,8 +224,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
return sds.newPayload(stateDiffRlp, currentBlock, params) return sds.newPayload(stateDiffRlp, currentBlock, params)
} }
func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd.Params) (*sd.Payload, error) { func (sds *Service) newPayload(stateObject []byte, block *types.Block, params statediff.Params) (*statediff.Payload, error) {
payload := &sd.Payload{ payload := &statediff.Payload{
StateObjectRlp: stateObject, StateObjectRlp: stateObject,
} }
if params.IncludeBlock { if params.IncludeBlock {
@ -281,34 +256,6 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd
return payload, nil return payload, nil
} }
// StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil {
return nil, err
}
logrus.Infof("sending state trie at block %d", blockNumber)
// compute leaf keys of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys()
return sds.processStateTrie(currentBlock, params)
}
func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil {
return nil, err
}
stateTrieRlp, err := rlp.EncodeToBytes(&stateNodes)
if err != nil {
return nil, err
}
logrus.Infof("state trie object at block %d is %d bytes in length", block.Number().Uint64(), len(stateTrieRlp))
return sds.newPayload(stateTrieRlp, block, params)
}
// Start is used to begin the service // Start is used to begin the service
func (sds *Service) Start() error { func (sds *Service) Start() error {
logrus.Info("starting statediff service") logrus.Info("starting statediff service")
@ -325,7 +272,7 @@ func (sds *Service) Stop() error {
// WriteStateDiffAt writes a state diff at the specific blockheight directly to the database // WriteStateDiffAt writes a state diff at the specific blockheight directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error { func (sds *Service) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error {
logrus.Infof("Writing state diff at block %d", blockNumber) logrus.Infof("Writing state diff at block %d", blockNumber)
t := time.Now() t := time.Now()
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
@ -334,7 +281,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error
} }
// compute leaf keys of watched addresses in the params // compute leaf keys of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
parentRoot := common.Hash{} parentRoot := common.Hash{}
if blockNumber != 0 { if blockNumber != 0 {
@ -350,7 +297,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error
// WriteStateDiffFor writes a state diff for the specific blockHash directly to the database // WriteStateDiffFor writes a state diff for the specific blockHash directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params statediff.Params) error {
logrus.Infof("Writing state diff for block %s", blockHash.Hex()) logrus.Infof("Writing state diff for block %s", blockHash.Hex())
t := time.Now() t := time.Now()
currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash) currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash)
@ -359,7 +306,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) e
} }
// compute leaf keys of watched addresses in the params // compute leaf keys of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
parentRoot := common.Hash{} parentRoot := common.Hash{}
if currentBlock.NumberU64() != 0 { if currentBlock.NumberU64() != 0 {
@ -373,7 +320,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) e
} }
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params sd.Params, t time.Time) error { func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params statediff.Params, t time.Time) error {
var totalDifficulty *big.Int var totalDifficulty *big.Int
var receipts types.Receipts var receipts types.Receipts
var err error var err error
@ -398,28 +345,30 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err return err
} }
// defer handling of commit/rollback for any return case // defer handling of commit/rollback for any return case
output := func(node sdtypes.StateNode) error { output := func(node sdtypes.StateLeafNode) error {
return sds.indexer.PushStateNode(tx, node, block.Hash().String()) return sds.indexer.PushStateNode(tx, node, block.Hash().String())
} }
codeOutput := func(c sdtypes.CodeAndCodeHash) error { codeOutput := func(c sdtypes.IPLD) error {
return sds.indexer.PushCodeAndCodeHash(tx, c) return sds.indexer.PushIPLD(tx, c)
} }
prom.SetTimeMetric(prom.T_BLOCK_PROCESSING, time.Now().Sub(t)) prom.SetTimeMetric(prom.T_BLOCK_PROCESSING, time.Now().Sub(t))
t = time.Now() t = time.Now()
err = sds.Builder.WriteStateDiffObject(sdtypes.StateRoots{ err = sds.Builder.WriteStateDiff(statediff.Args{
NewStateRoot: block.Root(), NewStateRoot: block.Root(),
OldStateRoot: parentRoot, OldStateRoot: parentRoot,
BlockNumber: block.Number(),
BlockHash: block.Hash(),
}, params, output, codeOutput) }, params, output, codeOutput)
prom.SetTimeMetric(prom.T_STATE_PROCESSING, time.Now().Sub(t)) prom.SetTimeMetric(prom.T_STATE_PROCESSING, time.Now().Sub(t))
t = time.Now() t = time.Now()
err = tx.Submit(err) err = tx.Submit()
prom.SetLastProcessedHeight(height) prom.SetLastProcessedHeight(height)
prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t)) prom.SetTimeMetric(prom.T_POSTGRES_TX_COMMIT, time.Now().Sub(t))
return err return err
} }
// WriteStateDiffsInRange adds a RangeRequest to the work queue // WriteStateDiffsInRange adds a RangeRequest to the work queue
func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) error { func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params statediff.Params) error {
if stop < start { if stop < start {
return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop) return fmt.Errorf("invalid block range (%d, %d): stop height must be greater or equal to start height", start, stop)
} }

View File

@ -21,22 +21,8 @@ package statediff
import ( import (
sd "github.com/cerc-io/plugeth-statediff" sd "github.com/cerc-io/plugeth-statediff"
sdTypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/core/types"
) )
// AccountMap is a mapping of hex encoded path => account wrapper
type AccountMap map[string]accountWrapper
// accountWrapper is used to temporary associate the unpacked node with its raw values
type accountWrapper struct {
Account *types.StateAccount
NodeType sdTypes.NodeType
Path []byte
NodeValue []byte
LeafKey []byte
}
// RangeRequest holds range quest work params // RangeRequest holds range quest work params
type RangeRequest struct { type RangeRequest struct {
Start, Stop uint64 Start, Stop uint64