Index block number using string in file mode
This commit is contained in:
parent
44d48b8644
commit
fc9e82f717
@ -18,6 +18,7 @@ package publisher
|
||||
import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
@ -159,7 +160,7 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
|
||||
|
||||
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||
// returns the CID and blockstore prefixed multihash key
|
||||
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) {
|
||||
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) {
|
||||
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
||||
if err != nil {
|
||||
return
|
||||
@ -169,10 +170,10 @@ func (tx fileWriters) publishRaw(codec uint64, raw []byte, height uint64) (cid,
|
||||
return
|
||||
}
|
||||
|
||||
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) {
|
||||
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
||||
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
|
||||
return prefixedKey, tx.write(&snapt.TableIPLDBlock, height, prefixedKey, raw)
|
||||
return prefixedKey, tx.write(&snapt.TableIPLDBlock, height.String(), prefixedKey, raw)
|
||||
}
|
||||
|
||||
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary
|
||||
@ -182,7 +183,7 @@ func (p *publisher) PublishHeader(header *types.Header) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil {
|
||||
if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -204,7 +205,7 @@ func (p *publisher) PublishHeader(header *types.Header) error {
|
||||
|
||||
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes
|
||||
// in the state_cids table
|
||||
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *big.Int, snapTx snapt.Tx) error {
|
||||
var stateKey string
|
||||
if !snapt.IsNullHash(node.Key) {
|
||||
stateKey = node.Key.Hex()
|
||||
@ -216,7 +217,7 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height u
|
||||
return err
|
||||
}
|
||||
|
||||
err = tx.write(&snapt.TableStateNode, height, headerID, stateKey, stateCIDStr, node.Path,
|
||||
err = tx.write(&snapt.TableStateNode, height.String(), headerID, stateKey, stateCIDStr, node.Path,
|
||||
node.NodeType, false, mhKey)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -230,7 +231,7 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height u
|
||||
|
||||
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary
|
||||
// indexes in the storage_cids table
|
||||
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height *big.Int, statePath []byte, snapTx snapt.Tx) error {
|
||||
var storageKey string
|
||||
if !snapt.IsNullHash(node.Key) {
|
||||
storageKey = node.Key.Hex()
|
||||
@ -242,7 +243,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height
|
||||
return err
|
||||
}
|
||||
|
||||
err = tx.write(&snapt.TableStorageNode, height, headerID, statePath, storageKey, storageCIDStr, node.Path,
|
||||
err = tx.write(&snapt.TableStorageNode, height.String(), headerID, statePath, storageKey, storageCIDStr, node.Path,
|
||||
node.NodeType, false, mhKey)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -255,7 +256,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height
|
||||
}
|
||||
|
||||
// PublishCode writes code to the ipfs backing pg datastore
|
||||
func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
||||
// no codec for code, doesn't matter though since blockstore key is multihash-derived
|
||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
|
||||
if err != nil {
|
||||
@ -263,7 +264,7 @@ func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes [
|
||||
}
|
||||
|
||||
tx := snapTx.(fileTx)
|
||||
if err = tx.write(&snapt.TableIPLDBlock, height, mhKey, codeBytes); err != nil {
|
||||
if err = tx.write(&snapt.TableIPLDBlock, height.String(), mhKey, codeBytes); err != nil {
|
||||
return fmt.Errorf("error publishing code IPLD: %v", err)
|
||||
}
|
||||
// increment code node counter.
|
||||
|
@ -38,7 +38,7 @@ func writeFiles(t *testing.T, dir string) *publisher {
|
||||
test.NoError(t, err)
|
||||
|
||||
headerID := fixt.Block1_Header.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx))
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
||||
|
||||
test.NoError(t, tx.Commit())
|
||||
return pub
|
||||
|
@ -18,6 +18,7 @@ package pg
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -87,7 +88,7 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
|
||||
|
||||
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||
// returns the CID and blockstore prefixed multihash key
|
||||
func (tx pubTx) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) {
|
||||
func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) {
|
||||
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
|
||||
if err != nil {
|
||||
return
|
||||
@ -97,10 +98,10 @@ func (tx pubTx) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefix
|
||||
return
|
||||
}
|
||||
|
||||
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) {
|
||||
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
|
||||
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
|
||||
_, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, prefixedKey, raw)
|
||||
_, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height.Uint64(), prefixedKey, raw)
|
||||
return prefixedKey, err
|
||||
}
|
||||
|
||||
@ -118,7 +119,7 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
|
||||
tx := pubTx{snapTx, nil}
|
||||
defer func() { err = snapt.CommitOrRollback(tx, err) }()
|
||||
|
||||
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil {
|
||||
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -131,7 +132,7 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
|
||||
}
|
||||
|
||||
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
|
||||
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *big.Int, snapTx snapt.Tx) error {
|
||||
var stateKey string
|
||||
if !snapt.IsNullHash(node.Key) {
|
||||
stateKey = node.Key.Hex()
|
||||
@ -144,7 +145,7 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height u
|
||||
}
|
||||
|
||||
_, err = tx.Exec(snapt.TableStateNode.ToInsertStatement(),
|
||||
height, headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey)
|
||||
height.Uint64(), headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -157,7 +158,7 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height u
|
||||
}
|
||||
|
||||
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
|
||||
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height *big.Int, statePath []byte, snapTx snapt.Tx) error {
|
||||
var storageKey string
|
||||
if !snapt.IsNullHash(node.Key) {
|
||||
storageKey = node.Key.Hex()
|
||||
@ -170,7 +171,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height
|
||||
}
|
||||
|
||||
_, err = tx.Exec(snapt.TableStorageNode.ToInsertStatement(),
|
||||
height, headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
|
||||
height.Uint64(), headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -183,7 +184,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height
|
||||
}
|
||||
|
||||
// PublishCode writes code to the ipfs backing pg datastore
|
||||
func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
||||
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
|
||||
// no codec for code, doesn't matter though since blockstore key is multihash-derived
|
||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
|
||||
if err != nil {
|
||||
@ -191,7 +192,7 @@ func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes [
|
||||
}
|
||||
|
||||
tx := snapTx.(pubTx)
|
||||
if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, mhKey, codeBytes); err != nil {
|
||||
if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height.Uint64(), mhKey, codeBytes); err != nil {
|
||||
return fmt.Errorf("error publishing code IPLD: %v", err)
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ func writeData(t *testing.T) *publisher {
|
||||
test.NoError(t, err)
|
||||
|
||||
headerID := fixt.Block1_Header.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx))
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
||||
|
||||
test.NoError(t, tx.Commit())
|
||||
return pub
|
||||
@ -73,6 +73,7 @@ func TestBasic(t *testing.T) {
|
||||
test.NoError(t, err)
|
||||
|
||||
headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header)
|
||||
test.NoError(t, err)
|
||||
test.ExpectEqual(t, headerNode.Cid().String(), header.CID)
|
||||
test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -140,9 +141,9 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
||||
}()
|
||||
|
||||
if len(iters) > 0 {
|
||||
return s.createSnapshotAsync(iters, headerID, params.Height)
|
||||
return s.createSnapshotAsync(iters, headerID, new(big.Int).SetUint64(params.Height))
|
||||
} else {
|
||||
return s.createSnapshot(iters[0], headerID, params.Height)
|
||||
return s.createSnapshot(iters[0], headerID, new(big.Int).SetUint64(params.Height))
|
||||
}
|
||||
}
|
||||
|
||||
@ -195,7 +196,7 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height uint64) error {
|
||||
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int) error {
|
||||
tx, err := s.ipfsPublisher.BeginTx()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -265,7 +266,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height u
|
||||
}
|
||||
|
||||
// Full-trie concurrent snapshot
|
||||
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height uint64) error {
|
||||
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height *big.Int) error {
|
||||
errors := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
for _, it := range iters {
|
||||
@ -293,7 +294,7 @@ func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) storageSnapshot(sr common.Hash, headerID string, height uint64, statePath []byte, tx Tx) (Tx, error) {
|
||||
func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.Int, statePath []byte, tx Tx) (Tx, error) {
|
||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
||||
return tx, nil
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package snapshot
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -78,7 +79,7 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func failingPublishStateNode(_ *snapt.Node, _ string, _ uint64, _ snapt.Tx) error {
|
||||
func failingPublishStateNode(_ *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
|
||||
return errors.New("failingPublishStateNode")
|
||||
}
|
||||
|
||||
|
@ -1,15 +1,17 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
)
|
||||
|
||||
type Publisher interface {
|
||||
PublishHeader(header *types.Header) error
|
||||
PublishStateNode(node *Node, headerID string, height uint64, tx Tx) error
|
||||
PublishStorageNode(node *Node, headerID string, height uint64, statePath []byte, tx Tx) error
|
||||
PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, tx Tx) error
|
||||
PublishStateNode(node *Node, headerID string, height *big.Int, tx Tx) error
|
||||
PublishStorageNode(node *Node, headerID string, height *big.Int, statePath []byte, tx Tx) error
|
||||
PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, tx Tx) error
|
||||
BeginTx() (Tx, error)
|
||||
PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user