From 78adba02ea8e848252e82ad64170dd76c04a12eb Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 1 Jul 2020 13:44:59 -0500 Subject: [PATCH] state snapshot extractor --- cmd/root.go | 122 +++++++++++++++----------- cmd/stateSnapshot.go | 46 +++++++--- pkg/snapshot/config.go | 54 ++++++++++++ pkg/snapshot/node_type.go | 51 +++++++++++ pkg/snapshot/publisher.go | 128 ++++++++++++++++++++++++++++ pkg/snapshot/service.go | 175 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 515 insertions(+), 61 deletions(-) create mode 100644 pkg/snapshot/config.go create mode 100644 pkg/snapshot/node_type.go create mode 100644 pkg/snapshot/publisher.go create mode 100644 pkg/snapshot/service.go diff --git a/cmd/root.go b/cmd/root.go index 882faeb..1cae6ed 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,90 +1,114 @@ -// Copyright © 2020 Vulcanize, Inc -// +// VulcanizeDB +// Copyright © 2019 Vulcanize + // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// + // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -// + // You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . +// along with this program. If not, see . package cmd import ( "fmt" "os" + "strings" - homedir "github.com/mitchellh/go-homedir" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" ) -var cfgFile string +var ( + cfgFile string + subCommand string + logWithCommand log.Entry +) -// rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ - Use: "eth-pg-ipfs-state-snapshot", - Short: "A brief description of your application", - Long: `A longer description that spans multiple lines and likely contains -examples and usage of using your application. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, - // Uncomment the following line if your bare application - // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { }, + Use: "eth-pg-ipfs-state-snapshot", + PersistentPreRun: initFuncs, } -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { + log.Info("----- Starting vDB -----") if err := rootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + log.Fatal(err) } } +func initFuncs(cmd *cobra.Command, args []string) { + logfile := viper.GetString("logfile") + if logfile != "" { + file, err := os.OpenFile(logfile, + os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + log.Infof("Directing output to %s", logfile) + log.SetOutput(file) + } else { + log.SetOutput(os.Stdout) + log.Info("Failed to log to file, using default stdout") + } + } else { + log.SetOutput(os.Stdout) + } + if err := logLevel(); err != nil { + log.Fatal("Could not set log level: ", err) + } +} + +func logLevel() error { + lvl, err := log.ParseLevel(viper.GetString("log.level")) + if err != nil { + return err + } + log.SetLevel(lvl) + if lvl > log.InfoLevel { + log.SetReportCaller(true) + } + log.Info("Log level set to ", lvl.String()) + return nil +} + func init() { cobra.OnInitialize(initConfig) + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.AutomaticEnv() - // Here you will define your flags and configuration settings. - // Cobra supports persistent flags, which, if defined here, - // will be global for your application. - rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.eth-pg-ipfs-state-snapshot.yaml)") + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") + rootCmd.PersistentFlags().String("logfile", "", "file path for logging") + rootCmd.PersistentFlags().String("database-name", "vulcanize_public", "database name") + rootCmd.PersistentFlags().Int("database-port", 5432, "database port") + rootCmd.PersistentFlags().String("database-hostname", "localhost", "database hostname") + rootCmd.PersistentFlags().String("database-user", "", "database user") + rootCmd.PersistentFlags().String("database-password", "", "database password") + rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") - // Cobra also supports local flags, which will only run - // when this action is called directly. - rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + viper.BindPFlag("logfile", rootCmd.PersistentFlags().Lookup("logfile")) + viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) + viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port")) + viper.BindPFlag("database.hostname", rootCmd.PersistentFlags().Lookup("database-hostname")) + viper.BindPFlag("database.user", rootCmd.PersistentFlags().Lookup("database-user")) + viper.BindPFlag("database.password", rootCmd.PersistentFlags().Lookup("database-password")) + viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) } -// initConfig reads in config file and ENV variables if set. func initConfig() { if cfgFile != "" { - // Use config file from the flag. viper.SetConfigFile(cfgFile) - } else { - // Find home directory. - home, err := homedir.Dir() - if err != nil { - fmt.Println(err) - os.Exit(1) + if err := viper.ReadInConfig(); err == nil { + log.Printf("Using config file: %s", viper.ConfigFileUsed()) + } else { + log.Fatal(fmt.Sprintf("Couldn't read config file: %s", err.Error())) } - - // Search config in home directory with name ".eth-pg-ipfs-state-snapshot" (without extension). - viper.AddConfigPath(home) - viper.SetConfigName(".eth-pg-ipfs-state-snapshot") - } - - viper.AutomaticEnv() // read in environment variables that match - - // If a config file is found, read it in. - if err := viper.ReadInConfig(); err == nil { - fmt.Println("Using config file:", viper.ConfigFileUsed()) + } else { + log.Warn("No config file passed with --config flag") } } diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index b8ff395..1438414 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -16,15 +16,17 @@ package cmd import ( - "fmt" - + "github.com/ethereum/go-ethereum/common" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot" ) // stateSnapshotCmd represents the stateSnapshot command var stateSnapshotCmd = &cobra.Command{ Use: "stateSnapshot", - Short: "A brief description of your command", + Short: "Extract the entire Ethereum state from leveldb and publish into PG-IPFS", Long: `A longer description that spans multiple lines and likely contains examples and usage of using your command. For example: @@ -32,20 +34,40 @@ Cobra is a CLI library for Go that empowers applications. This application is a tool to generate the needed files to quickly create a Cobra application.`, Run: func(cmd *cobra.Command, args []string) { - fmt.Println("stateSnapshot called") + subCommand = cmd.CalledAs() + logWithCommand = *logrus.WithField("SubCommand", subCommand) + stateSnapshot() }, } +func stateSnapshot() { + snapConfig := snapshot.Config{} + snapConfig.Init() + snapshotService, err := snapshot.NewSnapshotService(snapConfig) + if err != nil { + logWithCommand.Fatal(err) + } + height := viper.Get("snapshot.blockHeight") + uHeight, ok := height.(uint64) + if !ok { + logWithCommand.Fatal("snapshot.blockHeight needs to be a uint") + } + hashStr := viper.GetString("snapshot.blockHash") + hash := common.HexToHash(hashStr) + if err := snapshotService.CreateSnapshot(uHeight, hash); err != nil { + logWithCommand.Fatal(err) + } + logWithCommand.Infof("state snapshot for height %d and hash %s is complete", uHeight, hashStr) +} + func init() { rootCmd.AddCommand(stateSnapshotCmd) - // Here you will define your flags and configuration settings. + stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to leveldb") + stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") + stateSnapshotCmd.PersistentFlags().String("block-hash", "", "blockhash to extract state at") - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // stateSnapshotCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // stateSnapshotCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) + viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) + viper.BindPFlag("snapshot.blockHash", stateSnapshotCmd.PersistentFlags().Lookup("block-hash")) } diff --git a/pkg/snapshot/config.go b/pkg/snapshot/config.go new file mode 100644 index 0000000..fc93977 --- /dev/null +++ b/pkg/snapshot/config.go @@ -0,0 +1,54 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "github.com/spf13/viper" + + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/core" +) + +const ( + LVL_DB_PATH = "LVL_DB_PATH" + ETH_NODE_ID = "ETH_NODE_ID" + ETH_CLIENT_NAME = "ETH_CLIENT_NAME" + ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" + ETH_NETWORK_ID = "ETH_NETWORK_ID" +) + +type Config struct { + LevelDBPath string + Node core.Node + DBConfig config.Database +} + +func (c *Config) Init() { + c.DBConfig.Init() + viper.BindEnv("leveldb.path", LVL_DB_PATH) + viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) + viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) + viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) + viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) + + c.Node = core.Node{ + ID: viper.GetString("ethereum.nodeID"), + ClientName: viper.GetString("ethereum.clientName"), + GenesisBlock: viper.GetString("ethereum.genesisBlock"), + NetworkID: viper.GetString("ethereum.networkID"), + } + c.LevelDBPath = viper.GetString("leveldb.path") +} diff --git a/pkg/snapshot/node_type.go b/pkg/snapshot/node_type.go new file mode 100644 index 0000000..a62f3ae --- /dev/null +++ b/pkg/snapshot/node_type.go @@ -0,0 +1,51 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import "fmt" + +// NodeType for explicitly setting type of node +type NodeType string + +const ( + Unknown NodeType = "Unknown" + Leaf NodeType = "Leaf" + Extension NodeType = "Extension" + Branch NodeType = "Branch" + Removed NodeType = "Removed" // used to represent pathes which have been emptied +) + +// CheckKeyType checks what type of key we have +func CheckKeyType(elements []interface{}) (NodeType, error) { + if len(elements) > 2 { + return Branch, nil + } + if len(elements) < 2 { + return Unknown, fmt.Errorf("node cannot be less than two elements in length") + } + switch elements[0].([]byte)[0] / 16 { + case '\x00': + return Extension, nil + case '\x01': + return Extension, nil + case '\x02': + return Leaf, nil + case '\x03': + return Leaf, nil + default: + return Unknown, fmt.Errorf("unknown hex prefix") + } +} diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go new file mode 100644 index 0000000..9b4ef8d --- /dev/null +++ b/pkg/snapshot/publisher.go @@ -0,0 +1,128 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/multiformats/go-multihash" + + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" +) + +type Publisher struct { + db *postgres.DB +} + +func NewPublisher(db *postgres.DB) *Publisher { + return &Publisher{ + db: db, + } +} + +func (p *Publisher) PublishHeader(header *types.Header) (int64, error) { + headerNode, err := ipld.NewEthHeader(header) + if err != nil { + return 0, err + } + tx, err := p.db.Beginx() + if err != nil { + return 0, err + } + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + if err := shared.PublishIPLD(tx, headerNode); err != nil { + return 0, err + } + var headerID int64 + err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT (block_number, block_hash) DO UPDATE SET block_number = header_cids.block_number + RETURNING id`, + header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(), + header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, 0).Scan(&headerID) + return headerID, err +} + +func (p *Publisher) PublishStateNode(meta eth.StateNodeModel, nodeVal []byte, headerID int64) (int64, error) { + var stateID int64 + var stateKey string + if meta.StateKey != nullHash.String() { + stateKey = meta.StateKey + } + tx, err := p.db.Beginx() + if err != nil { + return 0, err + } + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, nodeVal) + if err != nil { + return 0, err + } + err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type) = ($2, $3, $5, $6) + RETURNING id`, + headerID, stateKey, stateCIDStr, meta.Path, meta.NodeType, false).Scan(&stateID) + return stateID, err +} + +func (p *Publisher) PublishStorageNode(meta eth.StorageNodeModel, nodeVal []byte, stateID int64) error { + var storageKey string + if meta.StorageKey != nullHash.String() { + storageKey = meta.StorageKey + } + tx, err := p.db.Beginx() + if err != nil { + return err + } + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, nodeVal) + if err != nil { + return err + } + _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)`, + stateID, storageKey, storageCIDStr, meta.Path, meta.NodeType, false) + return err +} diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go new file mode 100644 index 0000000..9eb73a3 --- /dev/null +++ b/pkg/snapshot/service.go @@ -0,0 +1,175 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "bytes" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" +) + +var ( + nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + emptyNode, _ = rlp.EncodeToBytes([]byte{}) + emptyContractRoot = crypto.Keccak256Hash(emptyNode) +) + +type Service struct { + ethDB ethdb.Database + stateDB state.Database + ipfsPublisher *Publisher +} + +func NewSnapshotService(con Config) (*Service, error) { + pgdb, err := postgres.NewDB(con.DBConfig, con.Node) + if err != nil { + return nil, err + } + edb, err := rawdb.NewLevelDBDatabase(con.LevelDBPath, 256, 0, "") + if err != nil { + return nil, err + } + return &Service{ + ethDB: edb, + stateDB: state.NewDatabase(edb), + ipfsPublisher: NewPublisher(pgdb), + }, nil +} + +func (s *Service) CreateSnapshot(height uint64, hash common.Hash) error { + // extract header from lvldb and publish to PG-IPFS + // hold onto the headerID so that we can link the state nodes to this header + header := rawdb.ReadHeader(s.ethDB, hash, height) + headerID, err := s.ipfsPublisher.PublishHeader(header) + if err != nil { + return err + } + t, err := s.stateDB.OpenTrie(header.Root) + if err != nil { + return err + } + trieDB := s.stateDB.TrieDB() + return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) +} + +func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { + for it.Next(true) { + if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves + continue + } + if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { + continue + } + nodePath := make([]byte, len(it.Path())) + copy(nodePath, it.Path()) + node, err := trieDB.Node(it.Hash()) + if err != nil { + return err + } + var nodeElements []interface{} + if err := rlp.DecodeBytes(node, &nodeElements); err != nil { + return err + } + ty, err := CheckKeyType(nodeElements) + if err != nil { + return err + } + switch ty { + case Leaf: + var account state.Account + if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { + return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err) + } + partialPath := trie.CompactToHex(nodeElements[0].([]byte)) + valueNodePath := append(nodePath, partialPath...) + encodedPath := trie.HexToCompact(valueNodePath) + leafKey := encodedPath[1:] + // publish state node + stateNode := eth.StateNodeModel{} + if err := s.storageSnapshot(account.Root, stateID); err != nil { + return fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) + } + case Extension, Branch: + // publish state node + stateNode := eth.StateNodeModel{} + default: + return fmt.Errorf("unexpected node type %s", ty) + } + } +} + +// buildStorageNodesEventual builds the storage diff node objects for a created account +// i.e. it returns all the storage nodes at this state, since there is no previous state +func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { + if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { + return nil + } + log.Debug("Storage Root For Eventual Diff", "root", sr.Hex()) + sTrie, err := s.stateDB.OpenTrie(sr) + if err != nil { + log.Info("error in build storage diff eventual", "error", err) + return err + } + it := sTrie.NodeIterator(make([]byte, 0)) + for it.Next(true) { + // skip value nodes + if it.Leaf() { + continue + } + if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { + continue + } + nodePath := make([]byte, len(it.Path())) + copy(nodePath, it.Path()) + node, err := s.stateDB.TrieDB().Node(it.Hash()) + if err != nil { + return err + } + var nodeElements []interface{} + if err := rlp.DecodeBytes(node, &nodeElements); err != nil { + return err + } + ty, err := CheckKeyType(nodeElements) + if err != nil { + return err + } + switch ty { + case Leaf: + partialPath := trie.CompactToHex(nodeElements[0].([]byte)) + valueNodePath := append(nodePath, partialPath...) + encodedPath := trie.HexToCompact(valueNodePath) + leafKey := encodedPath[1:] + storageNode := eth.StorageNodeModel{} + + case Extension, Branch: + storageNode := eth.StorageNodeModel{} + default: + return fmt.Errorf("unexpected node type %s", ty) + } + } + return nil +}