diff --git a/pkg/snapshot/file/publisher.go b/pkg/snapshot/file/publisher.go index f20dfb8..4af6fd7 100644 --- a/pkg/snapshot/file/publisher.go +++ b/pkg/snapshot/file/publisher.go @@ -159,20 +159,20 @@ func (p *publisher) BeginTx() (snapt.Tx, error) { // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx // returns the CID and blockstore prefixed multihash key -func (tx fileWriters) publishRaw(codec uint64, raw []byte) (cid, prefixedKey string, err error) { +func (tx fileWriters) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) { c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256) if err != nil { return } cid = c.String() - prefixedKey, err = tx.publishIPLD(c, raw) + prefixedKey, err = tx.publishIPLD(c, raw, height) return } -func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte) (string, error) { +func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) { dbKey := dshelp.MultihashToDsKey(c.Hash()) prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() - return prefixedKey, tx.write(&snapt.TableIPLDBlock, prefixedKey, raw) + return prefixedKey, tx.write(&snapt.TableIPLDBlock, height, prefixedKey, raw) } // PublishHeader writes the header to the ipfs backing pg datastore and adds secondary @@ -182,7 +182,7 @@ func (p *publisher) PublishHeader(header *types.Header) error { if err != nil { return err } - if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData()); err != nil { + if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil { return err } @@ -204,19 +204,19 @@ func (p *publisher) PublishHeader(header *types.Header) error { // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes // in the state_cids table -func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx snapt.Tx) error { +func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error { var stateKey string if !snapt.IsNullHash(node.Key) { stateKey = node.Key.Hex() } tx := snapTx.(fileTx) - stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value) + stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height) if err != nil { return err } - err = tx.write(&snapt.TableStateNode, headerID, stateKey, stateCIDStr, node.Path, + err = tx.write(&snapt.TableStateNode, height, headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey) if err != nil { return err @@ -230,19 +230,19 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx s // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary // indexes in the storage_cids table -func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, statePath []byte, snapTx snapt.Tx) error { +func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error { var storageKey string if !snapt.IsNullHash(node.Key) { storageKey = node.Key.Hex() } tx := snapTx.(fileTx) - storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value) + storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height) if err != nil { return err } - err = tx.write(&snapt.TableStorageNode, headerID, statePath, storageKey, storageCIDStr, node.Path, + err = tx.write(&snapt.TableStorageNode, height, headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) if err != nil { return err @@ -255,7 +255,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, stateP } // PublishCode writes code to the ipfs backing pg datastore -func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { +func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { // no codec for code, doesn't matter though since blockstore key is multihash-derived mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) if err != nil { @@ -263,7 +263,7 @@ func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx s } tx := snapTx.(fileTx) - if err = tx.write(&snapt.TableIPLDBlock, mhKey, codeBytes); err != nil { + if err = tx.write(&snapt.TableIPLDBlock, height, mhKey, codeBytes); err != nil { return fmt.Errorf("error publishing code IPLD: %v", err) } // increment code node counter. diff --git a/pkg/snapshot/file/publisher_test.go b/pkg/snapshot/file/publisher_test.go index f2a591c..81b7950 100644 --- a/pkg/snapshot/file/publisher_test.go +++ b/pkg/snapshot/file/publisher_test.go @@ -38,7 +38,7 @@ func writeFiles(t *testing.T, dir string) *publisher { test.NoError(t, err) headerID := fixt.Block1_Header.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx)) + test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx)) test.NoError(t, tx.Commit()) return pub @@ -127,6 +127,7 @@ func TestPgCopy(t *testing.T) { test.NoError(t, err) headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header) + test.NoError(t, err) test.ExpectEqual(t, headerNode.Cid().String(), header.CID) test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash) } diff --git a/pkg/snapshot/pg/publisher.go b/pkg/snapshot/pg/publisher.go index 3b7f4a2..a7934d3 100644 --- a/pkg/snapshot/pg/publisher.go +++ b/pkg/snapshot/pg/publisher.go @@ -87,20 +87,20 @@ func (p *publisher) BeginTx() (snapt.Tx, error) { // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx // returns the CID and blockstore prefixed multihash key -func (tx pubTx) publishRaw(codec uint64, raw []byte) (cid, prefixedKey string, err error) { +func (tx pubTx) publishRaw(codec uint64, raw []byte, height uint64) (cid, prefixedKey string, err error) { c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256) if err != nil { return } cid = c.String() - prefixedKey, err = tx.publishIPLD(c, raw) + prefixedKey, err = tx.publishIPLD(c, raw, height) return } -func (tx pubTx) publishIPLD(c cid.Cid, raw []byte) (string, error) { +func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height uint64) (string, error) { dbKey := dshelp.MultihashToDsKey(c.Hash()) prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() - _, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), prefixedKey, raw) + _, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, prefixedKey, raw) return prefixedKey, err } @@ -118,7 +118,7 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) { tx := pubTx{snapTx, nil} defer func() { err = snapt.CommitOrRollback(tx, err) }() - if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData()); err != nil { + if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number.Uint64()); err != nil { return err } @@ -131,20 +131,20 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) { } // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table -func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx snapt.Tx) error { +func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height uint64, snapTx snapt.Tx) error { var stateKey string if !snapt.IsNullHash(node.Key) { stateKey = node.Key.Hex() } tx := snapTx.(pubTx) - stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value) + stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height) if err != nil { return err } _, err = tx.Exec(snapt.TableStateNode.ToInsertStatement(), - headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey) + height, headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey) if err != nil { return err } @@ -157,20 +157,20 @@ func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, snapTx s } // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table -func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, statePath []byte, snapTx snapt.Tx) error { +func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height uint64, statePath []byte, snapTx snapt.Tx) error { var storageKey string if !snapt.IsNullHash(node.Key) { storageKey = node.Key.Hex() } tx := snapTx.(pubTx) - storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value) + storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height) if err != nil { return err } _, err = tx.Exec(snapt.TableStorageNode.ToInsertStatement(), - headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) + height, headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) if err != nil { return err } @@ -183,7 +183,7 @@ func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, stateP } // PublishCode writes code to the ipfs backing pg datastore -func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { +func (p *publisher) PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { // no codec for code, doesn't matter though since blockstore key is multihash-derived mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) if err != nil { @@ -191,7 +191,7 @@ func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, snapTx s } tx := snapTx.(pubTx) - if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), mhKey, codeBytes); err != nil { + if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height, mhKey, codeBytes); err != nil { return fmt.Errorf("error publishing code IPLD: %v", err) } diff --git a/pkg/snapshot/pg/publisher_test.go b/pkg/snapshot/pg/publisher_test.go index e4ba321..1f61579 100644 --- a/pkg/snapshot/pg/publisher_test.go +++ b/pkg/snapshot/pg/publisher_test.go @@ -36,7 +36,7 @@ func writeData(t *testing.T) *publisher { test.NoError(t, err) headerID := fixt.Block1_Header.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx)) + test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number.Uint64(), tx)) test.NoError(t, tx.Commit()) return pub diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index bdd7bdc..616e6fc 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -140,11 +140,10 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { }() if len(iters) > 0 { - return s.createSnapshotAsync(iters, headerID) + return s.createSnapshotAsync(iters, headerID, params.Height) } else { - return s.createSnapshot(iters[0], headerID) + return s.createSnapshot(iters[0], headerID, params.Height) } - return nil } // Create snapshot up to head (ignores height param) @@ -196,7 +195,7 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro }, nil } -func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { +func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height uint64) error { tx, err := s.ipfsPublisher.BeginTx() if err != nil { return err @@ -231,7 +230,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] res.node.Key = common.BytesToHash(leafKey) - err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx) + err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx) if err != nil { return err } @@ -245,17 +244,17 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { return errors.New("missing code") } - if err = s.ipfsPublisher.PublishCode(codeHash, codeBytes, tx); err != nil { + if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil { return err } } - if tx, err = s.storageSnapshot(account.Root, headerID, res.node.Path, tx); err != nil { + if tx, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil { return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) } case Extension, Branch: res.node.Key = common.BytesToHash([]byte{}) - if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil { + if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { return err } default: @@ -266,14 +265,14 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { } // Full-trie concurrent snapshot -func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string) error { +func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height uint64) error { errors := make(chan error) var wg sync.WaitGroup for _, it := range iters { wg.Add(1) go func(it trie.NodeIterator) { defer wg.Done() - if err := s.createSnapshot(it, headerID); err != nil { + if err := s.createSnapshot(it, headerID, height); err != nil { errors <- err } }(it) @@ -294,7 +293,7 @@ func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string return err } -func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) { +func (s *Service) storageSnapshot(sr common.Hash, headerID string, height uint64, statePath []byte, tx Tx) (Tx, error) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return tx, nil } @@ -338,7 +337,7 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []b default: return nil, errors.New("unexpected node type") } - if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, statePath, tx); err != nil { + if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, height, statePath, tx); err != nil { return nil, err } } diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index f1263fe..b7f9765 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -43,7 +43,7 @@ func TestCreateSnapshot(t *testing.T) { Times(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil). AnyTimes() - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Times(len(fixt.Block1_StateNodePaths)) // TODO: fixtures for storage node @@ -78,7 +78,7 @@ func TestCreateSnapshot(t *testing.T) { } } -func failingPublishStateNode(_ *snapt.Node, _ string, _ snapt.Tx) error { +func failingPublishStateNode(_ *snapt.Node, _ string, _ uint64, _ snapt.Tx) error { return errors.New("failingPublishStateNode") } @@ -88,7 +88,7 @@ func TestRecovery(t *testing.T) { pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes() pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes() pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Times(workers). DoAndReturn(failingPublishStateNode) tx.EXPECT().Commit().AnyTimes() @@ -116,7 +116,7 @@ func TestRecovery(t *testing.T) { t.Fatal("cannot stat recovery file:", err) } - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() err = service.CreateSnapshot(params) if err != nil { t.Fatal(err) diff --git a/pkg/types/publisher.go b/pkg/types/publisher.go index ccb9276..8f5709a 100644 --- a/pkg/types/publisher.go +++ b/pkg/types/publisher.go @@ -7,9 +7,9 @@ import ( type Publisher interface { PublishHeader(header *types.Header) error - PublishStateNode(node *Node, headerID string, tx Tx) error - PublishStorageNode(node *Node, headerID string, statePath []byte, tx Tx) error - PublishCode(codeHash common.Hash, codeBytes []byte, tx Tx) error + PublishStateNode(node *Node, headerID string, height uint64, tx Tx) error + PublishStorageNode(node *Node, headerID string, height uint64, statePath []byte, tx Tx) error + PublishCode(height uint64, codeHash common.Hash, codeBytes []byte, tx Tx) error BeginTx() (Tx, error) PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error) } diff --git a/pkg/types/schema.go b/pkg/types/schema.go index a21dee8..77b81b3 100644 --- a/pkg/types/schema.go +++ b/pkg/types/schema.go @@ -3,10 +3,11 @@ package types var TableIPLDBlock = Table{ `public.blocks`, []column{ + {"block_number", bigint}, {"key", text}, {"data", bytea}, }, - `ON CONFLICT (key) DO NOTHING`, + `ON CONFLICT (key, block_number) DO NOTHING`, } var TableNodeInfo = Table{ @@ -40,12 +41,13 @@ var TableHeader = Table{ {"times_validated", integer}, {"coinbase", varchar}, }, - "ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = (EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_id, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncle_root, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.mh_key, eth.header_cids.times_validated + 1, EXCLUDED.coinbase)", + "ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = (EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_id, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncle_root, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.mh_key, eth.header_cids.times_validated + 1, EXCLUDED.coinbase)", } var TableStateNode = Table{ "eth.state_cids", []column{ + {"block_number", bigint}, {"header_id", varchar}, {"state_leaf_key", varchar}, {"cid", text}, @@ -54,12 +56,13 @@ var TableStateNode = Table{ {"diff", boolean}, {"mh_key", text}, }, - `ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.state_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)`, + `ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.state_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)`, } var TableStorageNode = Table{ "eth.storage_cids", []column{ + {"block_number", bigint}, {"header_id", varchar}, {"state_path", bytea}, {"storage_leaf_key", varchar}, @@ -69,5 +72,5 @@ var TableStorageNode = Table{ {"diff", boolean}, {"mh_key", text}, }, - "ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.storage_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)", + "ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (EXCLUDED.storage_leaf_key, EXCLUDED.cid, EXCLUDED.node_type, EXCLUDED.diff, EXCLUDED.mh_key)", }