update csv file publisher
This commit is contained in:
parent
f1a980f37c
commit
ead007f159
@ -21,38 +21,41 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/lib/pq"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
|
||||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
|
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
||||||
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
|
||||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ snapt.Publisher = (*publisher)(nil)
|
var _ snapt.Publisher = (*publisher)(nil)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// tables written once per block
|
// tables written once per block
|
||||||
perBlockTables = []*snapt.Table{
|
perBlockTables = []*schema.Table{
|
||||||
&snapt.TableIPLDBlock,
|
&schema.TableIPLDBlock,
|
||||||
&snapt.TableNodeInfo,
|
&schema.TableNodeInfo,
|
||||||
&snapt.TableHeader,
|
&schema.TableHeader,
|
||||||
}
|
}
|
||||||
// tables written during state iteration
|
// tables written during state iteration
|
||||||
perNodeTables = []*snapt.Table{
|
perNodeTables = []*schema.Table{
|
||||||
&snapt.TableIPLDBlock,
|
&schema.TableIPLDBlock,
|
||||||
&snapt.TableStateNode,
|
&schema.TableStateNode,
|
||||||
&snapt.TableStorageNode,
|
&schema.TableStorageNode,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -101,12 +104,12 @@ func newFileWriter(path string) (ret fileWriter, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx fileWriters) write(tbl *snapt.Table, args ...interface{}) error {
|
func (tx fileWriters) write(tbl *schema.Table, args ...interface{}) error {
|
||||||
row := tbl.ToCsvRow(args...)
|
row := tbl.ToCsvRow(args...)
|
||||||
return tx[tbl.Name].Write(row)
|
return tx[tbl.Name].Write(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeFileWriters(dir string, tables []*snapt.Table) (fileWriters, error) {
|
func makeFileWriters(dir string, tables []*schema.Table) (fileWriters, error) {
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -160,21 +163,24 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||||
// returns the CID and blockstore prefixed multihash key
|
// returns the CID
|
||||||
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) {
|
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) {
|
||||||
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cid = c.String()
|
cid = c.String()
|
||||||
prefixedKey, err = tx.publishIPLD(c, raw, height)
|
return tx.publishIPLD(c, raw, height)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
||||||
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
return c.String(), tx.write(&schema.TableIPLDBlock, height.String(), c.String(), raw)
|
||||||
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
|
}
|
||||||
return prefixedKey, tx.write(&snapt.TableIPLDBlock, height.String(), prefixedKey, raw)
|
|
||||||
|
// PublishIPLD writes an IPLD to the ipld.blocks blockstore
|
||||||
|
func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) (string, error) {
|
||||||
|
tx := snapTx.(fileTx)
|
||||||
|
return tx.publishIPLD(c, raw, height)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary
|
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary
|
||||||
@ -188,38 +194,47 @@ func (p *publisher) PublishHeader(header *types.Header) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mhKey := shared.MultihashKeyFromCID(headerNode.Cid())
|
err = p.writers.write(&schema.TableNodeInfo, p.nodeInfo.GenesisBlock, p.nodeInfo.NetworkID, p.nodeInfo.ID,
|
||||||
err = p.writers.write(&snapt.TableNodeInfo, p.nodeInfo.GenesisBlock, p.nodeInfo.NetworkID, p.nodeInfo.ID,
|
|
||||||
p.nodeInfo.ClientName, p.nodeInfo.ChainID)
|
p.nodeInfo.ClientName, p.nodeInfo.ChainID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = p.writers.write(&snapt.TableHeader, header.Number.String(), header.Hash().Hex(), header.ParentHash.Hex(),
|
err = p.writers.write(&schema.TableHeader,
|
||||||
headerNode.Cid().String(), 0, p.nodeInfo.ID, 0, header.Root.Hex(), header.TxHash.Hex(),
|
header.Number.String(),
|
||||||
header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey,
|
header.Hash().Hex(),
|
||||||
0, header.Coinbase.String())
|
header.ParentHash.Hex(),
|
||||||
|
headerNode.Cid().String(),
|
||||||
|
0,
|
||||||
|
pq.StringArray([]string{p.nodeInfo.ID}),
|
||||||
|
0,
|
||||||
|
header.Root.Hex(),
|
||||||
|
header.TxHash.Hex(),
|
||||||
|
header.ReceiptHash.Hex(),
|
||||||
|
header.UncleHash.Hex(),
|
||||||
|
header.Bloom.Bytes(),
|
||||||
|
strconv.FormatUint(header.Time, 10),
|
||||||
|
header.Coinbase.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return p.writers.Commit()
|
return p.writers.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes
|
// PublishStateLeafNode writes the state node eth.state_cids
|
||||||
// in the state_cids table
|
func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapTx snapt.Tx) error {
|
||||||
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *big.Int, snapTx snapt.Tx) error {
|
|
||||||
var stateKey string
|
|
||||||
if !snapt.IsNullHash(node.Key) {
|
|
||||||
stateKey = node.Key.Hex()
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := snapTx.(fileTx)
|
tx := snapTx.(fileTx)
|
||||||
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = tx.write(&snapt.TableStateNode, height.String(), headerID, stateKey, stateCIDStr, node.Path,
|
err := tx.write(&schema.TableStateNode,
|
||||||
node.NodeType, false, mhKey)
|
stateNode.BlockNumber,
|
||||||
|
stateNode.HeaderID,
|
||||||
|
stateNode.StateKey,
|
||||||
|
stateNode.CID,
|
||||||
|
true,
|
||||||
|
stateNode.Balance,
|
||||||
|
strconv.FormatUint(stateNode.Nonce, 10),
|
||||||
|
stateNode.CodeHash,
|
||||||
|
stateNode.StorageRoot,
|
||||||
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -232,22 +247,19 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary
|
// PublishStorageLeafNode writes the storage node to eth.storage_cids
|
||||||
// indexes in the storage_cids table
|
func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, snapTx snapt.Tx) error {
|
||||||
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height *big.Int, statePath []byte, snapTx snapt.Tx) error {
|
|
||||||
var storageKey string
|
|
||||||
if !snapt.IsNullHash(node.Key) {
|
|
||||||
storageKey = node.Key.Hex()
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := snapTx.(fileTx)
|
tx := snapTx.(fileTx)
|
||||||
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = tx.write(&snapt.TableStorageNode, height.String(), headerID, statePath, storageKey, storageCIDStr, node.Path,
|
err := tx.write(&schema.TableStorageNode,
|
||||||
node.NodeType, false, mhKey)
|
storageNode.BlockNumber,
|
||||||
|
storageNode.HeaderID,
|
||||||
|
storageNode.StateKey,
|
||||||
|
storageNode.StorageKey,
|
||||||
|
storageNode.CID,
|
||||||
|
true,
|
||||||
|
storageNode.Value,
|
||||||
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -262,15 +274,11 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height
|
|||||||
|
|
||||||
// PublishCode writes code to the ipfs backing pg datastore
|
// PublishCode writes code to the ipfs backing pg datastore
|
||||||
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
||||||
// no codec for code, doesn't matter though since blockstore key is multihash-derived
|
c := ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes())
|
||||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := snapTx.(fileTx)
|
tx := snapTx.(fileTx)
|
||||||
if err = tx.write(&snapt.TableIPLDBlock, height.String(), mhKey, codeBytes); err != nil {
|
if _, err := tx.publishIPLD(c, codeBytes, height); err != nil {
|
||||||
return fmt.Errorf("error publishing code IPLD: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
// increment code node counter.
|
// increment code node counter.
|
||||||
atomic.AddUint64(&p.codeNodeCounter, 1)
|
atomic.AddUint64(&p.codeNodeCounter, 1)
|
||||||
|
@ -18,13 +18,12 @@ package pg
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
"github.com/lib/pq"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -34,6 +33,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
||||||
@ -90,15 +90,14 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||||
// returns the CID and blockstore prefixed multihash key
|
// returns the CID
|
||||||
func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) {
|
func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) {
|
||||||
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cid = c.String()
|
cid = c.String()
|
||||||
prefixedKey, err = tx.publishIPLD(c, raw, height)
|
return tx.publishIPLD(c, raw, height)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
||||||
@ -135,10 +134,21 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(schema.TableHeader.ToInsertStatement(false), header.Number.Uint64(), header.Hash().Hex(),
|
_, err = tx.Exec(schema.TableHeader.ToInsertStatement(false),
|
||||||
header.ParentHash.Hex(), headerNode.Cid().String(), "0", pq.StringArray([]string{p.db.NodeID()}), "0",
|
header.Number.Uint64(),
|
||||||
header.Root.Hex(), header.TxHash.Hex(), header.ReceiptHash.Hex(), header.UncleHash.Hex(),
|
header.Hash().Hex(),
|
||||||
header.Bloom.Bytes(), header.Time, header.Coinbase.String())
|
header.ParentHash.Hex(),
|
||||||
|
headerNode.Cid().String(),
|
||||||
|
"0",
|
||||||
|
pq.StringArray([]string{p.db.NodeID()}),
|
||||||
|
"0",
|
||||||
|
header.Root.Hex(),
|
||||||
|
header.TxHash.Hex(),
|
||||||
|
header.ReceiptHash.Hex(),
|
||||||
|
header.UncleHash.Hex(),
|
||||||
|
header.Bloom.Bytes(),
|
||||||
|
strconv.FormatUint(header.Time, 10),
|
||||||
|
header.Coinbase.String())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +165,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT
|
|||||||
stateNode.Nonce,
|
stateNode.Nonce,
|
||||||
stateNode.CodeHash,
|
stateNode.CodeHash,
|
||||||
stateNode.StorageRoot,
|
stateNode.StorageRoot,
|
||||||
stateNode.Removed)
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -179,7 +189,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel,
|
|||||||
storageNode.CID,
|
storageNode.CID,
|
||||||
true,
|
true,
|
||||||
storageNode.Value,
|
storageNode.Value,
|
||||||
storageNode.Removed)
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user