Updates to use v4 schema

This commit is contained in:
Prathamesh Musale 2022-04-19 15:49:49 +05:30
parent 560199f930
commit 26ae3ab008
8 changed files with 54 additions and 51 deletions

View File

@ -159,20 +159,20 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
// returns the CID and blockstore prefixed multihash key
func (tx fileWriters) publishRaw(codec uint64, raw []byte) (cid, prefixedKey string, err error) {
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) {
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
if err != nil {
return
}
cid = c.String()
prefixedKey, err = tx.publishIPLD(c, raw)
prefixedKey, err = tx.publishIPLD(c, raw, height)
return
}
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte) (string, error) {
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) {
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
return prefixedKey, tx.write(&snapt.TableIPLDBlock, prefixedKey, raw)
return prefixedKey, tx.write(&snapt.TableIPLDBlock, height, prefixedKey, raw)
}
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary
@ -182,7 +182,7 @@ func (p *publisher) PublishHeader(header *types.Header) error {
if err != nil {
return err
}
if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData()); err != nil {
if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil {
return err
}
@ -204,19 +204,19 @@ func (p *publisher) PublishHeader(header *types.Header) error {
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes
// in the state_cids table
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx snapt.Tx) error {
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error {
var stateKey string
if !snapt.IsNullHash(node.Key) {
stateKey = node.Key.Hex()
}
tx := snapTx.(fileTx)
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value)
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height)
if err != nil {
return err
}
err = tx.write(&snapt.TableStateNode, headerID, stateKey, stateCIDStr, node.Path,
err = tx.write(&snapt.TableStateNode, height, headerID, stateKey, stateCIDStr, node.Path,
node.NodeType, false, mhKey)
if err != nil {
return err
@ -230,19 +230,19 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx s
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary
// indexes in the storage_cids table
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, statePath []byte, snapTx snapt.Tx) error {
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error {
var storageKey string
if !snapt.IsNullHash(node.Key) {
storageKey = node.Key.Hex()
}
tx := snapTx.(fileTx)
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value)
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height)
if err != nil {
return err
}
err = tx.write(&snapt.TableStorageNode, headerID, statePath, storageKey, storageCIDStr, node.Path,
err = tx.write(&snapt.TableStorageNode, height, headerID, statePath, storageKey, storageCIDStr, node.Path,
node.NodeType, false, mhKey)
if err != nil {
return err
@ -255,7 +255,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, stateP
}
// PublishCode writes code to the ipfs backing pg datastore
func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
@ -263,7 +263,7 @@ func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx s
}
tx := snapTx.(fileTx)
if err = tx.write(&snapt.TableIPLDBlock, mhKey, codeBytes); err != nil {
if err = tx.write(&snapt.TableIPLDBlock, height, mhKey, codeBytes); err != nil {
return fmt.Errorf("error publishing code IPLD: %v", err)
}
// increment code node counter.

View File

@ -38,7 +38,7 @@ func writeFiles(t *testing.T, dir string) *publisher {
test.NoError(t, err)
headerID := fixt.Block1_Header.Hash().String()
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx))
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx))
test.NoError(t, tx.Commit())
return pub
@ -127,6 +127,7 @@ func TestPgCopy(t *testing.T) {
test.NoError(t, err)
headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header)
test.NoError(t, err)
test.ExpectEqual(t, headerNode.Cid().String(), header.CID)
test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash)
}

View File

@ -87,20 +87,20 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
// returns the CID and blockstore prefixed multihash key
func (tx pubTx) publishRaw(codec uint64, raw []byte) (cid, prefixedKey string, err error) {
func (tx pubTx) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) {
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
if err != nil {
return
}
cid = c.String()
prefixedKey, err = tx.publishIPLD(c, raw)
prefixedKey, err = tx.publishIPLD(c, raw, height)
return
}
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte) (string, error) {
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) {
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), prefixedKey, raw)
_, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, prefixedKey, raw)
return prefixedKey, err
}
@ -118,7 +118,7 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
tx := pubTx{snapTx, nil}
defer func() { err = snapt.CommitOrRollback(tx, err) }()
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData()); err != nil {
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil {
return err
}
@ -131,20 +131,20 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
}
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx snapt.Tx) error {
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error {
var stateKey string
if !snapt.IsNullHash(node.Key) {
stateKey = node.Key.Hex()
}
tx := snapTx.(pubTx)
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value)
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height)
if err != nil {
return err
}
_, err = tx.Exec(snapt.TableStateNode.ToInsertStatement(),
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey)
height, headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil {
return err
}
@ -157,20 +157,20 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx s
}
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, statePath []byte, snapTx snapt.Tx) error {
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error {
var storageKey string
if !snapt.IsNullHash(node.Key) {
storageKey = node.Key.Hex()
}
tx := snapTx.(pubTx)
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value)
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height)
if err != nil {
return err
}
_, err = tx.Exec(snapt.TableStorageNode.ToInsertStatement(),
headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
height, headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil {
return err
}
@ -183,7 +183,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, stateP
}
// PublishCode writes code to the ipfs backing pg datastore
func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
@ -191,7 +191,7 @@ func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx s
}
tx := snapTx.(pubTx)
if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), mhKey, codeBytes); err != nil {
if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, mhKey, codeBytes); err != nil {
return fmt.Errorf("error publishing code IPLD: %v", err)
}

View File

@ -36,7 +36,7 @@ func writeData(t *testing.T) *publisher {
test.NoError(t, err)
headerID := fixt.Block1_Header.Hash().String()
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx))
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx))
test.NoError(t, tx.Commit())
return pub

View File

@ -140,11 +140,10 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
}()
if len(iters) > 0 {
return s.createSnapshotAsync(iters, headerID)
return s.createSnapshotAsync(iters, headerID, params.Height)
} else {
return s.createSnapshot(iters[0], headerID)
return s.createSnapshot(iters[0], headerID, params.Height)
}
return nil
}
// Create snapshot up to head (ignores height param)
@ -196,7 +195,7 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro
}, nil
}
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height uint64) error {
tx, err := s.ipfsPublisher.BeginTx()
if err != nil {
return err
@ -231,7 +230,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx)
err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx)
if err != nil {
return err
}
@ -245,17 +244,17 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
return errors.New("missing code")
}
if err = s.ipfsPublisher.PublishCode(codeHash, codeBytes, tx); err != nil {
if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil {
return err
}
}
if tx, err = s.storageSnapshot(account.Root, headerID, res.node.Path, tx); err != nil {
if tx, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil {
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
return err
}
default:
@ -266,14 +265,14 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
}
// Full-trie concurrent snapshot
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string) error {
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height uint64) error {
errors := make(chan error)
var wg sync.WaitGroup
for _, it := range iters {
wg.Add(1)
go func(it trie.NodeIterator) {
defer wg.Done()
if err := s.createSnapshot(it, headerID); err != nil {
if err := s.createSnapshot(it, headerID, height); err != nil {
errors <- err
}
}(it)
@ -294,7 +293,7 @@ func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string
return err
}
func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) {
func (s *Service) storageSnapshot(sr common.Hash, headerID string, height uint64, statePath []byte, tx Tx) (Tx, error) {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return tx, nil
}
@ -338,7 +337,7 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []b
default:
return nil, errors.New("unexpected node type")
}
if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, statePath, tx); err != nil {
if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, height, statePath, tx); err != nil {
return nil, err
}
}

View File

@ -43,7 +43,7 @@ func TestCreateSnapshot(t *testing.T) {
Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
AnyTimes()
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Times(len(fixt.Block1_StateNodePaths))
// TODO: fixtures for storage node
@ -78,7 +78,7 @@ func TestCreateSnapshot(t *testing.T) {
}
}
func failingPublishStateNode(_ *snapt.Node, _ string, _ snapt.Tx) error {
func failingPublishStateNode(_ *snapt.Node, _ string, _ uint64, _ snapt.Tx) error {
return errors.New("failingPublishStateNode")
}
@ -88,7 +88,7 @@ func TestRecovery(t *testing.T) {
pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes()
pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Times(workers).
DoAndReturn(failingPublishStateNode)
tx.EXPECT().Commit().AnyTimes()
@ -116,7 +116,7 @@ func TestRecovery(t *testing.T) {
t.Fatal("cannot stat recovery file:", err)
}
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
err = service.CreateSnapshot(params)
if err != nil {
t.Fatal(err)

View File

@ -7,9 +7,9 @@ import (
type Publisher interface {
PublishHeader(header *types.Header) error
PublishStateNode(node *Node, headerID string, tx Tx) error
PublishStorageNode(node *Node, headerID string, statePath []byte, tx Tx) error
PublishCode(codeHash common.Hash, codeBytes []byte, tx Tx) error
PublishStateNode(node *Node, headerID string, height uint64, tx Tx) error
PublishStorageNode(node *Node, headerID string, height uint64, statePath []byte, tx Tx) error
PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, tx Tx) error
BeginTx() (Tx, error)
PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error)
}

View File

@ -3,10 +3,11 @@ package types
var TableIPLDBlock = Table{
`public.blocks`,
[]column{
{"block_number", bigint},
{"key", text},
{"data", bytea},
},
`ON CONFLICT (key) DO NOTHING`,
`ON CONFLICT (key, block_number) DO NOTHING`,
}
var TableNodeInfo = Table{
@ -40,12 +41,13 @@ var TableHeader = Table{
{"times_validated", integer},
{"coinbase", varchar},
},
"ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = (EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_id, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncle_root, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.mh_key, eth.header_cids.times_validated + 1, EXCLUDED.coinbase)",
"ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = (EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_id, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncle_root, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.mh_key, eth.header_cids.times_validated + 1, EXCLUDED.coinbase)",
}
var TableStateNode = Table{
"eth.state_cids",
[]column{
{"block_number", bigint},
{"header_id", varchar},
{"state_leaf_key", varchar},
{"cid", text},
@ -54,12 +56,13 @@ var TableStateNode = Table{
{"diff", boolean},
{"mh_key", text},
},
`ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.state_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)`,
`ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.state_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)`,
}
var TableStorageNode = Table{
"eth.storage_cids",
[]column{
{"block_number", bigint},
{"header_id", varchar},
{"state_path", bytea},
{"storage_leaf_key", varchar},
@ -69,5 +72,5 @@ var TableStorageNode = Table{
{"diff", boolean},
{"mh_key", text},
},
"ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.storage_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)",
"ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.storage_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)",
}