diff --git a/pkg/snapshot/file/publisher.go b/pkg/snapshot/file/publisher.go index 63b9c54..943d18f 100644 --- a/pkg/snapshot/file/publisher.go +++ b/pkg/snapshot/file/publisher.go @@ -21,38 +21,41 @@ import ( "math/big" "os" "path/filepath" + "strconv" "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "github.com/lib/pq" + "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" - dshelp "github.com/ipfs/go-ipfs-ds-help" "github.com/multiformats/go-multihash" "github.com/sirupsen/logrus" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" + "github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom" snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types" - "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" - "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) var _ snapt.Publisher = (*publisher)(nil) var ( // tables written once per block - perBlockTables = []*snapt.Table{ - &snapt.TableIPLDBlock, - &snapt.TableNodeInfo, - &snapt.TableHeader, + perBlockTables = []*schema.Table{ + &schema.TableIPLDBlock, + &schema.TableNodeInfo, + &schema.TableHeader, } // tables written during state iteration - perNodeTables = []*snapt.Table{ - &snapt.TableIPLDBlock, - &snapt.TableStateNode, - &snapt.TableStorageNode, + perNodeTables = []*schema.Table{ + &schema.TableIPLDBlock, + &schema.TableStateNode, + &schema.TableStorageNode, } ) @@ -101,12 +104,12 @@ func newFileWriter(path string) (ret fileWriter, err error) { return } -func (tx fileWriters) write(tbl *snapt.Table, args ...interface{}) error { +func (tx fileWriters) write(tbl *schema.Table, args ...interface{}) error { row := tbl.ToCsvRow(args...) return tx[tbl.Name].Write(row) } -func makeFileWriters(dir string, tables []*snapt.Table) (fileWriters, error) { +func makeFileWriters(dir string, tables []*schema.Table) (fileWriters, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } @@ -160,21 +163,24 @@ func (p *publisher) BeginTx() (snapt.Tx, error) { } // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx -// returns the CID and blockstore prefixed multihash key -func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) { +// returns the CID +func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) { c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256) if err != nil { return } cid = c.String() - prefixedKey, err = tx.publishIPLD(c, raw, height) - return + return tx.publishIPLD(c, raw, height) } func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) { - dbKey := dshelp.MultihashToDsKey(c.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() - return prefixedKey, tx.write(&snapt.TableIPLDBlock, height.String(), prefixedKey, raw) + return c.String(), tx.write(&schema.TableIPLDBlock, height.String(), c.String(), raw) +} + +// PublishIPLD writes an IPLD to the ipld.blocks blockstore +func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) (string, error) { + tx := snapTx.(fileTx) + return tx.publishIPLD(c, raw, height) } // PublishHeader writes the header to the ipfs backing pg datastore and adds secondary @@ -188,38 +194,47 @@ func (p *publisher) PublishHeader(header *types.Header) error { return err } - mhKey := shared.MultihashKeyFromCID(headerNode.Cid()) - err = p.writers.write(&snapt.TableNodeInfo, p.nodeInfo.GenesisBlock, p.nodeInfo.NetworkID, p.nodeInfo.ID, + err = p.writers.write(&schema.TableNodeInfo, p.nodeInfo.GenesisBlock, p.nodeInfo.NetworkID, p.nodeInfo.ID, p.nodeInfo.ClientName, p.nodeInfo.ChainID) if err != nil { return err } - err = p.writers.write(&snapt.TableHeader, header.Number.String(), header.Hash().Hex(), header.ParentHash.Hex(), - headerNode.Cid().String(), 0, p.nodeInfo.ID, 0, header.Root.Hex(), header.TxHash.Hex(), - header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, - 0, header.Coinbase.String()) + err = p.writers.write(&schema.TableHeader, + header.Number.String(), + header.Hash().Hex(), + header.ParentHash.Hex(), + headerNode.Cid().String(), + 0, + pq.StringArray([]string{p.nodeInfo.ID}), + 0, + header.Root.Hex(), + header.TxHash.Hex(), + header.ReceiptHash.Hex(), + header.UncleHash.Hex(), + header.Bloom.Bytes(), + strconv.FormatUint(header.Time, 10), + header.Coinbase.String()) if err != nil { return err } return p.writers.Commit() } -// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes -// in the state_cids table -func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *big.Int, snapTx snapt.Tx) error { - var stateKey string - if !snapt.IsNullHash(node.Key) { - stateKey = node.Key.Hex() - } - +// PublishStateLeafNode writes the state node eth.state_cids +func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapTx snapt.Tx) error { tx := snapTx.(fileTx) - stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height) - if err != nil { - return err - } - err = tx.write(&snapt.TableStateNode, height.String(), headerID, stateKey, stateCIDStr, node.Path, - node.NodeType, false, mhKey) + err := tx.write(&schema.TableStateNode, + stateNode.BlockNumber, + stateNode.HeaderID, + stateNode.StateKey, + stateNode.CID, + true, + stateNode.Balance, + strconv.FormatUint(stateNode.Nonce, 10), + stateNode.CodeHash, + stateNode.StorageRoot, + false) if err != nil { return err } @@ -232,22 +247,19 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height * return err } -// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary -// indexes in the storage_cids table -func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height *big.Int, statePath []byte, snapTx snapt.Tx) error { - var storageKey string - if !snapt.IsNullHash(node.Key) { - storageKey = node.Key.Hex() - } - +// PublishStorageLeafNode writes the storage node to eth.storage_cids +func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, snapTx snapt.Tx) error { tx := snapTx.(fileTx) - storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height) - if err != nil { - return err - } - err = tx.write(&snapt.TableStorageNode, height.String(), headerID, statePath, storageKey, storageCIDStr, node.Path, - node.NodeType, false, mhKey) + err := tx.write(&schema.TableStorageNode, + storageNode.BlockNumber, + storageNode.HeaderID, + storageNode.StateKey, + storageNode.StorageKey, + storageNode.CID, + true, + storageNode.Value, + false) if err != nil { return err } @@ -262,15 +274,11 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height // PublishCode writes code to the ipfs backing pg datastore func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { - // no codec for code, doesn't matter though since blockstore key is multihash-derived - mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) - if err != nil { - return fmt.Errorf("error deriving multihash key from codehash: %v", err) - } + c := ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()) tx := snapTx.(fileTx) - if err = tx.write(&snapt.TableIPLDBlock, height.String(), mhKey, codeBytes); err != nil { - return fmt.Errorf("error publishing code IPLD: %v", err) + if _, err := tx.publishIPLD(c, codeBytes, height); err != nil { + return err } // increment code node counter. atomic.AddUint64(&p.codeNodeCounter, 1) diff --git a/pkg/snapshot/pg/publisher.go b/pkg/snapshot/pg/publisher.go index df6a42e..d3ce792 100644 --- a/pkg/snapshot/pg/publisher.go +++ b/pkg/snapshot/pg/publisher.go @@ -18,13 +18,12 @@ package pg import ( "context" "math/big" + "strconv" "sync/atomic" "time" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - "github.com/lib/pq" - "github.com/ipfs/go-cid" + "github.com/lib/pq" "github.com/multiformats/go-multihash" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -34,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" "github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom" @@ -90,15 +90,14 @@ func (p *publisher) BeginTx() (snapt.Tx, error) { } // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx -// returns the CID and blockstore prefixed multihash key -func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) { +// returns the CID +func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) { c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256) if err != nil { return } cid = c.String() - prefixedKey, err = tx.publishIPLD(c, raw, height) - return + return tx.publishIPLD(c, raw, height) } func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) { @@ -135,10 +134,21 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) { return err } - _, err = tx.Exec(schema.TableHeader.ToInsertStatement(false), header.Number.Uint64(), header.Hash().Hex(), - header.ParentHash.Hex(), headerNode.Cid().String(), "0", pq.StringArray([]string{p.db.NodeID()}), "0", - header.Root.Hex(), header.TxHash.Hex(), header.ReceiptHash.Hex(), header.UncleHash.Hex(), - header.Bloom.Bytes(), header.Time, header.Coinbase.String()) + _, err = tx.Exec(schema.TableHeader.ToInsertStatement(false), + header.Number.Uint64(), + header.Hash().Hex(), + header.ParentHash.Hex(), + headerNode.Cid().String(), + "0", + pq.StringArray([]string{p.db.NodeID()}), + "0", + header.Root.Hex(), + header.TxHash.Hex(), + header.ReceiptHash.Hex(), + header.UncleHash.Hex(), + header.Bloom.Bytes(), + strconv.FormatUint(header.Time, 10), + header.Coinbase.String()) return err } @@ -155,7 +165,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, - stateNode.Removed) + false) if err != nil { return err } @@ -179,7 +189,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, storageNode.CID, true, storageNode.Value, - storageNode.Removed) + false) if err != nil { return err }