Merge pull request #197 from vulcanize/storage_branch_decode_error

Storage branch decode error
This commit is contained in:
Ian Norden 2020-05-12 10:03:42 -05:00 committed by GitHub
commit 3ea98656e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 21 deletions

View File

@ -138,12 +138,12 @@ func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error
func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) { func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) {
var out []interface{} var out []interface{}
for _, vi := range i { for i, vi := range i {
v, ok := vi.([]byte) v, ok := vi.([]byte)
// Sometimes this throws "panic: interface conversion: interface {} is []interface {}, not []uint8" // Sometimes this throws "panic: interface conversion: interface {} is []interface {}, not []uint8"
// Figure out why, and if it is okay to continue // Figure out why, and if it is okay to continue
if !ok { if !ok {
continue return nil, fmt.Errorf("unable to decode branch node entry into []byte at position: %d value: %+v", i, vi)
} }
switch len(v) { switch len(v) {

View File

@ -132,7 +132,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
continue continue
} }
if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum { if startingBlock != 0 {
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
log.Error(err) log.Error(err)
@ -199,6 +199,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
cidPayload, err := bfs.Publisher.Publish(ipldPayload) cidPayload, err := bfs.Publisher.Publish(ipldPayload)
if err != nil { if err != nil {
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
continue
} }
if err := bfs.Indexer.Index(cidPayload); err != nil { if err := bfs.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())

View File

@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.CIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 1, FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
Start: 100, Stop: 101, Start: 100, Stop: 101,
@ -102,7 +102,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.CIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 1, FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
Start: 100, Stop: 100, Start: 100, Stop: 100,

View File

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
@ -49,7 +50,7 @@ func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(ConvertedPayload) ipldPayload, ok := payload.(ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload)
} }
// Generate the iplds // Generate the iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
@ -158,31 +159,28 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
// Publish and index state and storage // Publish and index state and storage
for _, stateNode := range ipldPayload.StateNodes { for _, stateNode := range ipldPayload.StateNodes {
stateIPLD, err := ipld.FromStateTrieRLP(stateNode.Value) stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value)
if err != nil { if err != nil {
return err return err
} }
if err := shared.PublishIPLD(tx, stateIPLD); err != nil {
return err
}
stateModel := StateNodeModel{ stateModel := StateNodeModel{
Path: stateNode.Path, Path: stateNode.Path,
StateKey: stateNode.LeafKey.String(), StateKey: stateNode.LeafKey.String(),
CID: stateIPLD.Cid().String(), CID: stateCIDStr,
NodeType: ResolveFromNodeType(stateNode.Type), NodeType: ResolveFromNodeType(stateNode.Type),
} }
stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID) stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID)
if err != nil { if err != nil {
return err return err
} }
// If we have a leaf, decode and index the account data and publish and index any associated storage diffs // If we have a leaf, decode and index the account data and any associated storage diffs
if stateNode.Type == statediff.Leaf { if stateNode.Type == statediff.Leaf {
var i []interface{} var i []interface{}
if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil { if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil {
return err return err
} }
if len(i) != 2 { if len(i) != 2 {
return fmt.Errorf("IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements")
} }
var account state.Account var account state.Account
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
@ -199,17 +197,14 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
} }
statePathHash := crypto.Keccak256Hash(stateNode.Path) statePathHash := crypto.Keccak256Hash(stateNode.Path)
for _, storageNode := range ipldPayload.StorageNodes[statePathHash] { for _, storageNode := range ipldPayload.StorageNodes[statePathHash] {
storageIPLD, err := ipld.FromStorageTrieRLP(storageNode.Value) storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
if err != nil { if err != nil {
return err return err
} }
if err := shared.PublishIPLD(tx, storageIPLD); err != nil {
return err
}
storageModel := StorageNodeModel{ storageModel := StorageNodeModel{
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: storageNode.LeafKey.Hex(), StorageKey: storageNode.LeafKey.Hex(),
CID: storageIPLD.Cid().String(), CID: storageCIDStr,
NodeType: ResolveFromNodeType(storageNode.Type), NodeType: ResolveFromNodeType(storageNode.Type),
} }
if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil { if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {

View File

@ -19,12 +19,11 @@ package eth
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"

View File

@ -19,6 +19,8 @@ package shared
import ( import (
"bytes" "bytes"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -113,3 +115,15 @@ func MultihashKeyFromCIDString(c string) (string, error) {
dbKey := dshelp.CidToDsKey(dc) dbKey := dshelp.CidToDsKey(dc)
return blockstore.BlockPrefix.String() + dbKey.String(), nil return blockstore.BlockPrefix.String() + dbKey.String(), nil
} }
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", err
}
dbKey := dshelp.CidToDsKey(c)
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return c.String(), err
}