diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 4bedc980..283d3141 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -191,7 +191,7 @@ func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) { switch chain { case shared.Ethereum: - backend, err := eth.NewEthBackend(db, ipfsPath) + backend, err := eth.NewEthBackend(db) if err != nil { return rpc.API{}, err } diff --git a/pkg/super_node/eth/api.go b/pkg/super_node/eth/api.go index f882bbb3..8c350959 100644 --- a/pkg/super_node/eth/api.go +++ b/pkg/super_node/eth/api.go @@ -83,10 +83,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) if err != nil { return nil, err } - if err := tx.Commit(); err != nil { - return nil, err - } - rctIPLDs, err := pea.b.Fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := pea.b.Fetcher.FetchRcts(tx, rctCIDs) if err != nil { return nil, err } @@ -120,11 +117,11 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) } allRctCIDs = append(allRctCIDs, rctCIDs...) } - if err := tx.Commit(); err != nil { + rctIPLDs, err := pea.b.Fetcher.FetchRcts(tx, allRctCIDs) + if err != nil { return nil, err } - rctIPLDs, err := pea.b.Fetcher.FetchRcts(allRctCIDs) - if err != nil { + if err := tx.Commit(); err != nil { return nil, err } return extractLogsOfInterest(rctIPLDs, filter.Topics) diff --git a/pkg/super_node/eth/api_test.go b/pkg/super_node/eth/api_test.go index 60106aee..74e4ecb9 100644 --- a/pkg/super_node/eth/api_test.go +++ b/pkg/super_node/eth/api_test.go @@ -27,12 +27,9 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" - "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - mocks3 "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" @@ -85,44 +82,27 @@ var ( var _ = Describe("API", func() { var ( - db *postgres.DB - retriever *eth.CIDRetriever - fetcher *eth.IPLDFetcher - indexer *eth.CIDIndexer - backend *eth.Backend - api *eth.PublicEthAPI + db *postgres.DB + retriever *eth.CIDRetriever + fetcher *eth.IPLDPGFetcher + indexAndPublisher *eth.IPLDPublisherAndIndexer + backend *eth.Backend + api *eth.PublicEthAPI ) BeforeEach(func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) retriever = eth.NewCIDRetriever(db) - blocksToReturn := map[cid.Cid]blocks.Block{ - mocks.HeaderCID: mocks.HeaderIPLD, - mocks.Trx1CID: mocks.Trx1IPLD, - mocks.Trx2CID: mocks.Trx2IPLD, - mocks.Trx3CID: mocks.Trx3IPLD, - mocks.Rct1CID: mocks.Rct1IPLD, - mocks.Rct2CID: mocks.Rct2IPLD, - mocks.Rct3CID: mocks.Rct3IPLD, - mocks.State1CID: mocks.State1IPLD, - mocks.State2CID: mocks.State2IPLD, - mocks.StorageCID: mocks.StorageIPLD, - } - mockBlockService := &mocks3.MockIPFSBlockService{ - Blocks: blocksToReturn, - } - fetcher = ð.IPLDFetcher{ - BlockService: mockBlockService, - } - indexer = eth.NewCIDIndexer(db) + fetcher = eth.NewIPLDPGFetcher(db) + indexAndPublisher = eth.NewIPLDPublisherAndIndexer(db) backend = ð.Backend{ Retriever: retriever, Fetcher: fetcher, DB: db, } api = eth.NewPublicEthAPI(backend) - err = indexer.Index(mocks.MockCIDPayload) + _, err = indexAndPublisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) uncles := mocks.MockBlock.Uncles() uncleHashes := make([]common.Hash, len(uncles)) diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index 1365c47c..130444c7 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -39,19 +39,14 @@ var ( type Backend struct { Retriever *CIDRetriever - Fetcher *IPLDFetcher + Fetcher *IPLDPGFetcher DB *postgres.DB } -func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) { +func NewEthBackend(db *postgres.DB) (*Backend, error) { r := NewCIDRetriever(db) - f, err := NewIPLDFetcher(ipfsPath) - if err != nil { - return nil, err - } return &Backend{ Retriever: r, - Fetcher: f, DB: db, }, nil } @@ -80,18 +75,18 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe } return nil, err } - if err := tx.Commit(); err != nil { - return nil, err - } // If there are none, throw an error if len(headerCids) < 1 { return nil, fmt.Errorf("header at block %d is not available", number) } // Fetch the header IPLDs for those CIDs - headerIPLD, err := b.Fetcher.FetchHeader(headerCids[0]) + headerIPLD, err := b.Fetcher.FetchHeader(tx, headerCids[0]) if err != nil { return nil, err } + if err := tx.Commit(); err != nil { + return nil, err + } // Decode the first header at this block height and return it // We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already // confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing @@ -131,16 +126,16 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log } return nil, err } - if err := tx.Commit(); err != nil { - return nil, err - } if len(receiptCIDs) == 0 { return nil, nil } - receiptIPLDs, err := b.Fetcher.FetchRcts(receiptCIDs) + receiptIPLDs, err := b.Fetcher.FetchRcts(tx, receiptCIDs) if err != nil { return nil, err } + if err := tx.Commit(); err != nil { + return nil, err + } logs := make([][]*types.Log, len(receiptIPLDs)) for i, rctIPLD := range receiptIPLDs { var rct types.Receipt @@ -172,9 +167,13 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber if err != nil { return nil, err } + tx, err := b.DB.Beginx() + if err != nil { + return nil, err + } // Fetch and decode the header IPLD - headerIPLD, err := b.Fetcher.FetchHeader(headerCID) + headerIPLD, err := b.Fetcher.FetchHeader(tx, headerCID) if err != nil { return nil, err } @@ -183,7 +182,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber return nil, err } // Fetch and decode the uncle IPLDs - uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs) + uncleIPLDs, err := b.Fetcher.FetchUncles(tx, uncleCIDs) if err != nil { return nil, err } @@ -196,7 +195,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber uncles = append(uncles, &uncle) } // Fetch and decode the transaction IPLDs - txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs) + txIPLDs, err := b.Fetcher.FetchTrxs(tx, txCIDs) if err != nil { return nil, err } @@ -209,10 +208,13 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber transactions = append(transactions, &tx) } // Fetch and decode the receipt IPLDs - rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := b.Fetcher.FetchRcts(tx, rctCIDs) if err != nil { return nil, err } + if err := tx.Commit(); err != nil { + return nil, err + } var receipts []*types.Receipt for _, rctIPLD := range rctIPLDs { var receipt types.Receipt @@ -233,8 +235,12 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo if err != nil { return nil, err } + tx, err := b.DB.Beginx() + if err != nil { + return nil, err + } // Fetch and decode the header IPLD - headerIPLD, err := b.Fetcher.FetchHeader(headerCID) + headerIPLD, err := b.Fetcher.FetchHeader(tx, headerCID) if err != nil { return nil, err } @@ -243,7 +249,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo return nil, err } // Fetch and decode the uncle IPLDs - uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs) + uncleIPLDs, err := b.Fetcher.FetchUncles(tx, uncleCIDs) if err != nil { return nil, err } @@ -256,7 +262,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo uncles = append(uncles, &uncle) } // Fetch and decode the transaction IPLDs - txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs) + txIPLDs, err := b.Fetcher.FetchTrxs(tx, txCIDs) if err != nil { return nil, err } @@ -269,10 +275,13 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo transactions = append(transactions, &tx) } // Fetch and decode the receipt IPLDs - rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := b.Fetcher.FetchRcts(tx, rctCIDs) if err != nil { return nil, err } + if err := tx.Commit(); err != nil { + return nil, err + } var receipts []*types.Receipt for _, rctIPLD := range rctIPLDs { var receipt types.Receipt @@ -301,10 +310,17 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type if err := b.DB.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil { return nil, common.Hash{}, 0, 0, err } - txIPLD, err := b.Fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}}) + tx, err := b.DB.Beginx() if err != nil { return nil, common.Hash{}, 0, 0, err } + txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{CID: txCIDWithHeaderInfo.CID}}) + if err != nil { + return nil, common.Hash{}, 0, 0, err + } + if err := tx.Commit(); err != nil { + return nil, common.Hash{}, 0, 0, err + } var transaction types.Transaction if err := rlp.DecodeBytes(txIPLD[0].Data, &transaction); err != nil { return nil, common.Hash{}, 0, 0, err