forked from cerc-io/ipld-eth-server
avoid decode error on storage branch nodes when element won't decode into byte array
This commit is contained in:
parent
bb14c01529
commit
3320df264f
@ -138,12 +138,12 @@ func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error
|
|||||||
func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) {
|
func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) {
|
||||||
var out []interface{}
|
var out []interface{}
|
||||||
|
|
||||||
for _, vi := range i {
|
for i, vi := range i {
|
||||||
v, ok := vi.([]byte)
|
v, ok := vi.([]byte)
|
||||||
// Sometimes this throws "panic: interface conversion: interface {} is []interface {}, not []uint8"
|
// Sometimes this throws "panic: interface conversion: interface {} is []interface {}, not []uint8"
|
||||||
// Figure out why, and if it is okay to continue
|
// Figure out why, and if it is okay to continue
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
return nil, fmt.Errorf("unable to decode branch node entry into []byte at position: %d value: %+v", i, vi)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch len(v) {
|
switch len(v) {
|
||||||
|
@ -132,7 +132,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
|
|||||||
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
|
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum {
|
if startingBlock != 0 {
|
||||||
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
|
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
|
||||||
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
|
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
@ -199,6 +199,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
|
|||||||
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
|
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
|
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if err := bfs.Indexer.Index(cidPayload); err != nil {
|
if err := bfs.Indexer.Index(cidPayload); err != nil {
|
||||||
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())
|
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())
|
||||||
|
@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
|
|||||||
ReturnErr: nil,
|
ReturnErr: nil,
|
||||||
}
|
}
|
||||||
mockRetriever := &mocks2.CIDRetriever{
|
mockRetriever := &mocks2.CIDRetriever{
|
||||||
FirstBlockNumberToReturn: 1,
|
FirstBlockNumberToReturn: 0,
|
||||||
GapsToRetrieve: []shared.Gap{
|
GapsToRetrieve: []shared.Gap{
|
||||||
{
|
{
|
||||||
Start: 100, Stop: 101,
|
Start: 100, Stop: 101,
|
||||||
@ -102,7 +102,7 @@ var _ = Describe("BackFiller", func() {
|
|||||||
ReturnErr: nil,
|
ReturnErr: nil,
|
||||||
}
|
}
|
||||||
mockRetriever := &mocks2.CIDRetriever{
|
mockRetriever := &mocks2.CIDRetriever{
|
||||||
FirstBlockNumberToReturn: 1,
|
FirstBlockNumberToReturn: 0,
|
||||||
GapsToRetrieve: []shared.Gap{
|
GapsToRetrieve: []shared.Gap{
|
||||||
{
|
{
|
||||||
Start: 100, Stop: 100,
|
Start: 100, Stop: 100,
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
|
||||||
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
|
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
|
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
|
||||||
@ -49,7 +50,7 @@ func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
|
|||||||
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
|
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
|
||||||
ipldPayload, ok := payload.(ConvertedPayload)
|
ipldPayload, ok := payload.(ConvertedPayload)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload)
|
return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload)
|
||||||
}
|
}
|
||||||
// Generate the iplds
|
// Generate the iplds
|
||||||
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
|
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
|
||||||
@ -158,31 +159,28 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
|
|||||||
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
|
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
|
||||||
// Publish and index state and storage
|
// Publish and index state and storage
|
||||||
for _, stateNode := range ipldPayload.StateNodes {
|
for _, stateNode := range ipldPayload.StateNodes {
|
||||||
stateIPLD, err := ipld.FromStateTrieRLP(stateNode.Value)
|
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := shared.PublishIPLD(tx, stateIPLD); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
stateModel := StateNodeModel{
|
stateModel := StateNodeModel{
|
||||||
Path: stateNode.Path,
|
Path: stateNode.Path,
|
||||||
StateKey: stateNode.LeafKey.String(),
|
StateKey: stateNode.LeafKey.String(),
|
||||||
CID: stateIPLD.Cid().String(),
|
CID: stateCIDStr,
|
||||||
NodeType: ResolveFromNodeType(stateNode.Type),
|
NodeType: ResolveFromNodeType(stateNode.Type),
|
||||||
}
|
}
|
||||||
stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID)
|
stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If we have a leaf, decode and index the account data and publish and index any associated storage diffs
|
// If we have a leaf, decode and index the account data and any associated storage diffs
|
||||||
if stateNode.Type == statediff.Leaf {
|
if stateNode.Type == statediff.Leaf {
|
||||||
var i []interface{}
|
var i []interface{}
|
||||||
if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil {
|
if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(i) != 2 {
|
if len(i) != 2 {
|
||||||
return fmt.Errorf("IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements")
|
return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements")
|
||||||
}
|
}
|
||||||
var account state.Account
|
var account state.Account
|
||||||
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
|
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
|
||||||
@ -199,17 +197,14 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
|
|||||||
}
|
}
|
||||||
statePathHash := crypto.Keccak256Hash(stateNode.Path)
|
statePathHash := crypto.Keccak256Hash(stateNode.Path)
|
||||||
for _, storageNode := range ipldPayload.StorageNodes[statePathHash] {
|
for _, storageNode := range ipldPayload.StorageNodes[statePathHash] {
|
||||||
storageIPLD, err := ipld.FromStorageTrieRLP(storageNode.Value)
|
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := shared.PublishIPLD(tx, storageIPLD); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
storageModel := StorageNodeModel{
|
storageModel := StorageNodeModel{
|
||||||
Path: storageNode.Path,
|
Path: storageNode.Path,
|
||||||
StorageKey: storageNode.LeafKey.Hex(),
|
StorageKey: storageNode.LeafKey.Hex(),
|
||||||
CID: storageIPLD.Cid().String(),
|
CID: storageCIDStr,
|
||||||
NodeType: ResolveFromNodeType(storageNode.Type),
|
NodeType: ResolveFromNodeType(storageNode.Type),
|
||||||
}
|
}
|
||||||
if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {
|
if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {
|
||||||
|
@ -19,12 +19,11 @@ package eth
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
|
|
||||||
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
|
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
|
||||||
|
@ -19,6 +19,8 @@ package shared
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -113,3 +115,15 @@ func MultihashKeyFromCIDString(c string) (string, error) {
|
|||||||
dbKey := dshelp.CidToDsKey(dc)
|
dbKey := dshelp.CidToDsKey(dc)
|
||||||
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||||
|
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) {
|
||||||
|
c, err := ipld.RawdataToCid(codec, raw, mh)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
dbKey := dshelp.CidToDsKey(c)
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
|
||||||
|
_, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
|
||||||
|
return c.String(), err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user