diff --git a/pkg/ipfs/ipld/trie_node.go b/pkg/ipfs/ipld/trie_node.go index 0a35ad77..788f76db 100644 --- a/pkg/ipfs/ipld/trie_node.go +++ b/pkg/ipfs/ipld/trie_node.go @@ -138,12 +138,12 @@ func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) { var out []interface{} - for _, vi := range i { + for i, vi := range i { v, ok := vi.([]byte) // Sometimes this throws "panic: interface conversion: interface {} is []interface {}, not []uint8" // Figure out why, and if it is okay to continue if !ok { - continue + return nil, fmt.Errorf("unable to decode branch node entry into []byte at position: %d value: %+v", i, vi) } switch len(v) { diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 712a0425..6355065e 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -132,7 +132,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) continue } - if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum { + if startingBlock != 0 { log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { log.Error(err) @@ -199,6 +199,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { cidPayload, err := bfs.Publisher.Publish(ipldPayload) if err != nil { log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) + continue } if err := bfs.Indexer.Index(cidPayload); err != nil { log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 6ecd773a..1df2db68 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 1, + FirstBlockNumberToReturn: 0, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 101, @@ -102,7 +102,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 1, + FirstBlockNumberToReturn: 0, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 100, diff --git a/pkg/super_node/eth/publishAndIndexer.go b/pkg/super_node/eth/publishAndIndexer.go index f12109f8..3a1232cd 100644 --- a/pkg/super_node/eth/publishAndIndexer.go +++ b/pkg/super_node/eth/publishAndIndexer.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/jmoiron/sqlx" + "github.com/multiformats/go-multihash" common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" @@ -49,7 +50,7 @@ func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) + return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload) } // Generate the iplds headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) @@ -158,31 +159,28 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { // Publish and index state and storage for _, stateNode := range ipldPayload.StateNodes { - stateIPLD, err := ipld.FromStateTrieRLP(stateNode.Value) + stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value) if err != nil { return err } - if err := shared.PublishIPLD(tx, stateIPLD); err != nil { - return err - } stateModel := StateNodeModel{ Path: stateNode.Path, StateKey: stateNode.LeafKey.String(), - CID: stateIPLD.Cid().String(), + CID: stateCIDStr, NodeType: ResolveFromNodeType(stateNode.Type), } stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID) if err != nil { return err } - // If we have a leaf, decode and index the account data and publish and index any associated storage diffs + // If we have a leaf, decode and index the account data and any associated storage diffs if stateNode.Type == statediff.Leaf { var i []interface{} if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil { return err } if len(i) != 2 { - return fmt.Errorf("IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") + return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") } var account state.Account if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { @@ -199,17 +197,14 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, } statePathHash := crypto.Keccak256Hash(stateNode.Path) for _, storageNode := range ipldPayload.StorageNodes[statePathHash] { - storageIPLD, err := ipld.FromStorageTrieRLP(storageNode.Value) + storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value) if err != nil { return err } - if err := shared.PublishIPLD(tx, storageIPLD); err != nil { - return err - } storageModel := StorageNodeModel{ Path: storageNode.Path, StorageKey: storageNode.LeafKey.Hex(), - CID: storageIPLD.Cid().String(), + CID: storageCIDStr, NodeType: ResolveFromNodeType(storageNode.Type), } if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil { diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 7b8a0cd4..08b967ea 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -19,12 +19,11 @@ package eth import ( "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/statediff" common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" diff --git a/pkg/super_node/shared/functions.go b/pkg/super_node/shared/functions.go index ca85202d..d36231e0 100644 --- a/pkg/super_node/shared/functions.go +++ b/pkg/super_node/shared/functions.go @@ -19,6 +19,8 @@ package shared import ( "bytes" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" + "github.com/ipfs/go-cid" "github.com/ethereum/go-ethereum/common" @@ -113,3 +115,15 @@ func MultihashKeyFromCIDString(c string) (string, error) { dbKey := dshelp.CidToDsKey(dc) return blockstore.BlockPrefix.String() + dbKey.String(), nil } + +// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx +func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) { + c, err := ipld.RawdataToCid(codec, raw, mh) + if err != nil { + return "", err + } + dbKey := dshelp.CidToDsKey(c) + prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() + _, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw) + return c.String(), err +}