direct fetching of iplds from pg-ipfs for btc and eth

This commit is contained in:
Ian Norden 2020-04-30 16:26:32 -05:00
parent 059664ffc9
commit ff5472ccf5
13 changed files with 431 additions and 21 deletions

View File

@ -40,6 +40,7 @@ type IPLDFetcher struct {
}
// NewIPLDFetcher creates a pointer to a new IPLDFetcher
// It interfaces with PG-IPFS through an internalized IPFS node interface
func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) {
blockService, err := ipfs.InitIPFSBlockService(ipfsPath)
if err != nil {

View File

@ -0,0 +1,98 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"fmt"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// it interfaces directly with PG-IPFS instead of going through a node-interface or remote node
type IPLDPGFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
var err error
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("btc pg fetcher: header fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("btc pg fetcher: transaction fetching error: %s", err.Error())
}
return iplds, tx.Commit()
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return ipfs.BlockModel{}, err
}
return ipfs.BlockModel{
Data: headerBytes,
CID: c.CID,
}, nil
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
trxBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return nil, err
}
trxIPLDs[i] = ipfs.BlockModel{
Data: trxBytes,
CID: c.CID,
}
}
return trxIPLDs, nil
}

View File

@ -18,6 +18,7 @@ package btc_test
import (
"bytes"
"fmt"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
@ -111,6 +112,7 @@ var _ = Describe("PublishAndIndexer", func() {
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.CidToDsKey(dc)
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
fmt.Printf("mhKey: %s\r\n", prefixedKey)
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())

View File

@ -60,8 +60,7 @@ func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFS
case shared.LocalInterface, shared.RemoteClient:
return btc.NewCIDIndexer(db), nil
case shared.DirectPostgres:
// TODO
return nil, nil
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("bitcoin CIDIndexer unexpected ipfs mode %s", ipfsMode.String())
}
@ -137,12 +136,26 @@ func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error
}
// NewIPLDFetcher constructs an IPLDFetcher for the provided chain type
func NewIPLDFetcher(chain shared.ChainType, ipfsPath string) (shared.IPLDFetcher, error) {
func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDFetcher, error) {
switch chain {
case shared.Ethereum:
return eth.NewIPLDFetcher(ipfsPath)
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return eth.NewIPLDFetcher(ipfsPath)
case shared.DirectPostgres:
return eth.NewIPLDPGFetcher(db), nil
default:
return nil, fmt.Errorf("ethereum IPLDFetcher unexpected ipfs mode %s", ipfsMode.String())
}
case shared.Bitcoin:
return btc.NewIPLDFetcher(ipfsPath)
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return btc.NewIPLDFetcher(ipfsPath)
case shared.DirectPostgres:
return btc.NewIPLDPGFetcher(db), nil
default:
return nil, fmt.Errorf("bitcoin IPLDFetcher unexpected ipfs mode %s", ipfsMode.String())
}
default:
return nil, fmt.Errorf("invalid chain %s for IPLD fetcher constructor", chain.String())
}
@ -165,8 +178,7 @@ func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB,
case shared.LocalInterface, shared.RemoteClient:
return btc.NewIPLDPublisher(ipfsPath)
case shared.DirectPostgres:
// TODO
return nil, nil
return btc.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("bitcoin IPLDPublisher unexpected ipfs mode %s", ipfsMode.String())
}

View File

@ -167,7 +167,6 @@ func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]ipfs.BlockModel, error) {
// FetchRcts fetches receipts
// It uses the f.fetchBatch method
// batch fetch preserves order?
func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds")
rctCids := make([]cid.Cid, len(cids))
@ -198,9 +197,9 @@ func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([]ipfs.BlockModel, error)
// needs to maintain the data's relation to state keys
func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds")
stateNodes := make([]StateNode, len(cids))
for i, stateNode := range cids {
if stateNode.CID == "" || stateNode.StateKey == "" {
stateNodes := make([]StateNode, 0, len(cids))
for _, stateNode := range cids {
if stateNode.CID == "" {
continue
}
dc, err := cid.Decode(stateNode.CID)
@ -211,7 +210,7 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) {
if err != nil {
return nil, err
}
stateNodes[i] = StateNode{
stateNodes = append(stateNodes, StateNode{
IPLD: ipfs.BlockModel{
Data: state.RawData(),
CID: state.Cid().String(),
@ -219,7 +218,7 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) {
StateLeafKey: common.HexToHash(stateNode.StateKey),
Type: ResolveToNodeType(stateNode.NodeType),
Path: stateNode.Path,
}
})
}
return stateNodes, nil
}
@ -229,9 +228,9 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) {
// needs to maintain the data's relation to state and storage keys
func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, len(cids))
for i, storageNode := range cids {
if storageNode.CID == "" || storageNode.StorageKey == "" || storageNode.StateKey == "" {
storageNodes := make([]StorageNode, 0, len(cids))
for _, storageNode := range cids {
if storageNode.CID == "" || storageNode.StateKey == "" {
continue
}
dc, err := cid.Decode(storageNode.CID)
@ -242,7 +241,7 @@ func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]Stora
if err != nil {
return nil, err
}
storageNodes[i] = StorageNode{
storageNodes = append(storageNodes, StorageNode{
IPLD: ipfs.BlockModel{
Data: storage.RawData(),
CID: storage.Cid().String(),
@ -251,7 +250,7 @@ func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]Stora
StorageLeafKey: common.HexToHash(storageNode.StorageKey),
Type: ResolveToNodeType(storageNode.NodeType),
Path: storageNode.Path,
}
})
}
return storageNodes, nil
}

View File

@ -89,8 +89,8 @@ var (
}
)
var _ = Describe("Fetcher", func() {
Describe("FetchCIDs", func() {
var _ = Describe("IPLDFetcher", func() {
Describe("Fetch", func() {
BeforeEach(func() {
mockBlockService = new(mocks.MockIPFSBlockService)
err := mockBlockService.AddBlocks(mockBlocks)

View File

@ -0,0 +1,210 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// It interfaces directly with PG-IPFS
type IPLDPGFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
var err error
iplds := IPLDs{}
iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10)
if !ok {
return nil, errors.New("eth fetcher: unable to set total difficulty")
}
iplds.BlockNumber = cidWrapper.BlockNumber
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error())
}
iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error())
}
iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error())
}
iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error())
}
iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error())
}
return iplds, tx.Commit()
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return ipfs.BlockModel{}, err
}
return ipfs.BlockModel{
Data: headerBytes,
CID: c.CID,
}, nil
}
// FetchUncles fetches uncles
func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
uncleBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return nil, err
}
uncleIPLDs[i] = ipfs.BlockModel{
Data: uncleBytes,
CID: c.CID,
}
}
return uncleIPLDs, nil
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
txBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return nil, err
}
trxIPLDs[i] = ipfs.BlockModel{
Data: txBytes,
CID: c.CID,
}
}
return trxIPLDs, nil
}
// FetchRcts fetches receipts
func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
rctBytes, err := shared.FetchIPLD(tx, c.CID)
if err != nil {
return nil, err
}
rctIPLDs[i] = ipfs.BlockModel{
Data: rctBytes,
CID: c.CID,
}
}
return rctIPLDs, nil
}
// FetchState fetches state nodes
func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds")
stateNodes := make([]StateNode, 0, len(cids))
for _, stateNode := range cids {
if stateNode.CID == "" {
continue
}
stateBytes, err := shared.FetchIPLD(tx, stateNode.CID)
if err != nil {
return nil, err
}
stateNodes = append(stateNodes, StateNode{
IPLD: ipfs.BlockModel{
Data: stateBytes,
CID: stateNode.CID,
},
StateLeafKey: common.HexToHash(stateNode.StateKey),
Type: ResolveToNodeType(stateNode.NodeType),
Path: stateNode.Path,
})
}
return stateNodes, nil
}
// FetchStorage fetches storage nodes
func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, 0, len(cids))
for _, storageNode := range cids {
if storageNode.CID == "" || storageNode.StateKey == "" {
continue
}
storageBytes, err := shared.FetchIPLD(tx, storageNode.CID)
if err != nil {
return nil, err
}
storageNodes = append(storageNodes, StorageNode{
IPLD: ipfs.BlockModel{
Data: storageBytes,
CID: storageNode.CID,
},
StateLeafKey: common.HexToHash(storageNode.StateKey),
StorageLeafKey: common.HexToHash(storageNode.StorageKey),
Type: ResolveToNodeType(storageNode.NodeType),
Path: storageNode.Path,
})
}
return storageNodes, nil
}

View File

@ -0,0 +1,65 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
var (
db *postgres.DB
pubAndIndexer *eth.IPLDPublisherAndIndexer
fetcher *eth.IPLDPGFetcher
)
var _ = Describe("IPLDPGFetcher", func() {
Describe("Fetch", func() {
BeforeEach(func() {
var err error
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db)
_, err = pubAndIndexer.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
fetcher = eth.NewIPLDPGFetcher(db)
})
AfterEach(func() {
eth.TearDownDB(db)
})
It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() {
i, err := fetcher.Fetch(mocks.MockCIDWrapper)
Expect(err).ToNot(HaveOccurred())
iplds, ok := i.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty))
Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number()))
Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header))
Expect(len(iplds.Uncles)).To(Equal(0))
Expect(iplds.Transactions).To(Equal(mocks.MockIPLDs.Transactions))
Expect(iplds.Receipts).To(Equal(mocks.MockIPLDs.Receipts))
Expect(iplds.StateNodes).To(Equal(mocks.MockIPLDs.StateNodes))
Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes))
})
})
})

View File

@ -128,7 +128,7 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath)
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}

View File

@ -19,6 +19,8 @@ package shared
import (
"bytes"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
@ -90,3 +92,24 @@ func PublishIPLD(tx *sqlx.Tx, i node.Node) error {
_, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return err
}
// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx
func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) {
mhKey, err := MultihashKeyFromCIDString(cid)
if err != nil {
return nil, err
}
pgStr := `SELECT data FROM public.blocks WHERE key = $1`
var block []byte
return block, tx.Get(&block, pgStr, mhKey)
}
// MultihashKeyFromCIDString converts a cid string into a blockstore-prefixed multihash db key string
func MultihashKeyFromCIDString(c string) (string, error) {
dc, err := cid.Decode(c)
if err != nil {
return "", err
}
dbKey := dshelp.CidToDsKey(dc)
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}