remove option for publishing through ipfs node interface

This commit is contained in:
Ian Norden 2020-08-10 13:04:07 -05:00
parent 39354b2114
commit a532e17e21
55 changed files with 206 additions and 2311 deletions

View File

@ -161,10 +161,6 @@ This set of parameters needs to be set no matter the chain type.
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD
[ipfs]
path = "~/.ipfs" # $IPFS_PATH
mode = "postgres" # $IPFS_MODE
[watcher]
chain = "bitcoin" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER
@ -207,6 +203,7 @@ For Ethereum:
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID
chainID = "1" # $ETH_CHAIN_ID
```
### Exposing the data

View File

@ -19,9 +19,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/resync"
v "github.com/vulcanize/ipfs-blockchain-watcher/version"
)
@ -46,11 +43,6 @@ func rsyncCmdCommand() {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("resync config: %+v", rConfig)
if rConfig.IPFSMode == shared.LocalInterface {
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
}
logWithCommand.Debug("initializing new resync service")
rService, err := resync.NewResyncService(rConfig)
if err != nil {

View File

@ -26,7 +26,6 @@ import (
"github.com/spf13/viper"
h "github.com/vulcanize/ipfs-blockchain-watcher/pkg/historical"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
v "github.com/vulcanize/ipfs-blockchain-watcher/version"
@ -65,11 +64,6 @@ func watch() {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("watcher config: %+v", watcherConfig)
if watcherConfig.IPFSMode == shared.LocalInterface {
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
}
logWithCommand.Debug("initializing new watcher service")
watcher, err := w.NewWatcher(watcherConfig)
if err != nil {

View File

@ -13,7 +13,6 @@ set +x
#test $DATABASE_USER
#test $DATABASE_PASSWORD
#test $IPFS_INIT
#test $IPFS_PATH
VDB_COMMAND=${VDB_COMMAND:-watch}
set +e

View File

@ -12,7 +12,6 @@ test $DATABASE_PORT
test $DATABASE_USER
test $DATABASE_PASSWORD
test $IPFS_INIT
test $IPFS_PATH
test $VDB_COMMAND
set +e

View File

@ -52,10 +52,6 @@ This set of parameters needs to be set no matter the chain type.
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD
[ipfs]
path = "~/.ipfs" # $IPFS_PATH
mode = "direct" # $IPFS_MODE
[watcher]
chain = "bitcoin" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER

View File

@ -30,9 +30,6 @@ This set of parameters needs to be set no matter the chain type.
port = 5432 # $DATABASE_PORT
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD
[ipfs]
path = "~/.ipfs" # $IPFS_PATH
[resync]
chain = "ethereum" # $RESYNC_CHAIN

View File

@ -43,3 +43,4 @@
clientName = "Geth" # $ETH_CLIENT_NAME
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
networkID = "1" # $ETH_NETWORK_ID
chainID = "1" # $ETH_CHAIN_ID

View File

@ -27,21 +27,21 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// IPLDFetcher satisfies the IPLDFetcher interface for ethereum
// it interfaces directly with PG-IPFS instead of going through a node-interface or remote node
type IPLDPGFetcher struct {
type IPLDFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
// NewIPLDFetcher creates a pointer to a new IPLDFetcher
func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher {
return &IPLDFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
@ -77,7 +77,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
@ -90,7 +90,7 @@ func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {

View File

@ -1,107 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"fmt"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// it interfaces directly with PG-IPFS instead of going through a node-interface or remote node
type IPLDPGFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
return nil, fmt.Errorf("btc pg fetcher: header fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
return nil, fmt.Errorf("btc pg fetcher: transaction fetching error: %s", err.Error())
}
return iplds, err
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return ipfs.BlockModel{}, err
}
return ipfs.BlockModel{
Data: headerBytes,
CID: c.CID,
}, nil
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
trxBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
trxIPLDs[i] = ipfs.BlockModel{
Data: trxBytes,
CID: c.CID,
}
}
return trxIPLDs, nil
}

View File

@ -1,126 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"fmt"
"strconv"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for bitcoin
// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary
// It publishes and indexes IPLDs together in a single sqlx.Tx
type IPLDPublisherAndIndexer struct {
indexer *CIDIndexer
}
// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface
func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
return &IPLDPublisherAndIndexer{
indexer: NewCIDIndexer(db),
}
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(ConvertedPayload)
if !ok {
return nil, fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload)
}
// Generate the iplds
headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs)
if err != nil {
return nil, err
}
// Begin new db tx
tx, err := pub.indexer.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return nil, err
}
header := HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: ipldPayload.Header.PrevBlock.String(),
BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)),
BlockHash: ipldPayload.Header.BlockHash().String(),
Timestamp: ipldPayload.Header.Timestamp.UnixNano(),
Bits: ipldPayload.Header.Bits,
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
return nil, err
}
// Publish and index txs
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
return nil, err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err
}
for _, input := range txModel.TxInputs {
if err := pub.indexer.indexTxInput(tx, input, txID); err != nil {
return nil, err
}
}
for _, output := range txModel.TxOutputs {
if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil {
return nil, err
}
}
}
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, err
}
// Index satisfies the shared.CIDIndexer interface
func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error {
return nil
}

View File

@ -1,121 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc_test
import (
"bytes"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
"github.com/multiformats/go-multihash"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
var _ = Describe("PublishAndIndexer", func() {
var (
db *postgres.DB
err error
repo *btc.IPLDPublisherAndIndexer
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
)
BeforeEach(func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = btc.NewIPLDPublisherAndIndexer(db)
})
AfterEach(func() {
btc.TearDownDB(db)
})
Describe("Publish", func() {
It("Published and indexes header and transaction IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT * FROM btc.header_cids
WHERE block_number = $1`
// check header was properly indexed
buf := bytes.NewBuffer(make([]byte, 0, 80))
err = mocks.MockBlock.Header.Serialize(buf)
Expect(err).ToNot(HaveOccurred())
headerBytes := buf.Bytes()
c, _ := ipld.RawdataToCid(ipld.MBitcoinHeader, headerBytes, multihash.DBL_SHA2_256)
header := new(btc.HeaderModel)
err = db.Get(header, pgStr, mocks.MockHeaderMetaData.BlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(header.CID).To(Equal(c.String()))
Expect(header.BlockNumber).To(Equal(mocks.MockHeaderMetaData.BlockNumber))
Expect(header.Bits).To(Equal(mocks.MockHeaderMetaData.Bits))
Expect(header.Timestamp).To(Equal(mocks.MockHeaderMetaData.Timestamp))
Expect(header.BlockHash).To(Equal(mocks.MockHeaderMetaData.BlockHash))
Expect(header.ParentHash).To(Equal(mocks.MockHeaderMetaData.ParentHash))
dc, err := cid.Decode(header.CID)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(headerBytes))
// check that txs were properly indexed
trxs := make([]btc.TxModel, 0)
pgStr = `SELECT transaction_cids.id, transaction_cids.header_id, transaction_cids.index,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.segwit, transaction_cids.witness_hash
FROM btc.transaction_cids INNER JOIN btc.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
err = db.Select(&trxs, pgStr, mocks.MockHeaderMetaData.BlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(len(trxs)).To(Equal(3))
txData := make([][]byte, len(mocks.MockTransactions))
txCIDs := make([]string, len(mocks.MockTransactions))
for i, m := range mocks.MockTransactions {
buf := bytes.NewBuffer(make([]byte, 0))
err = m.MsgTx().Serialize(buf)
Expect(err).ToNot(HaveOccurred())
tx := buf.Bytes()
txData[i] = tx
c, _ := ipld.RawdataToCid(ipld.MBitcoinTx, tx, multihash.DBL_SHA2_256)
txCIDs[i] = c.String()
}
for _, tx := range trxs {
Expect(tx.SegWit).To(Equal(false))
Expect(tx.HeaderID).To(Equal(header.ID))
Expect(tx.WitnessHash).To(Equal(""))
Expect(tx.CID).To(Equal(txCIDs[tx.Index]))
Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[tx.Index].TxHash().String()))
dc, err := cid.Decode(tx.CID)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(txData[tx.Index]))
}
})
})
})

View File

@ -25,36 +25,36 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for bitcoin
// IPLDPublisher satisfies the IPLDPublisher interface for bitcoin
// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary
// It publishes and indexes IPLDs together in a single sqlx.Tx
type IPLDPublisherAndIndexer struct {
type IPLDPublisher struct {
indexer *CIDIndexer
}
// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface
func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
return &IPLDPublisherAndIndexer{
// NewIPLDPublisher creates a pointer to a new eth IPLDPublisher which satisfies the IPLDPublisher interface
func NewIPLDPublisher(db *postgres.DB) *IPLDPublisher {
return &IPLDPublisher{
indexer: NewCIDIndexer(db),
}
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error {
ipldPayload, ok := payload.(ConvertedPayload)
if !ok {
return nil, fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload)
return fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload)
}
// Generate the iplds
headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs)
if err != nil {
return nil, err
return err
}
// Begin new db tx
tx, err := pub.indexer.db.Beginx()
if err != nil {
return nil, err
return err
}
defer func() {
if p := recover(); p != nil {
@ -70,13 +70,13 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
return err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return nil, err
return err
}
header := HeaderModel{
CID: headerNode.Cid().String(),
@ -89,38 +89,32 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
return nil, err
return err
}
// Publish and index txs
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
return nil, err
return err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err
return err
}
for _, input := range txModel.TxInputs {
if err := pub.indexer.indexTxInput(tx, input, txID); err != nil {
return nil, err
return err
}
}
for _, output := range txModel.TxOutputs {
if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil {
return nil, err
return err
}
}
}
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, err
}
// Index satisfies the shared.CIDIndexer interface
func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error {
return nil
return err
}

View File

@ -37,14 +37,14 @@ var _ = Describe("PublishAndIndexer", func() {
var (
db *postgres.DB
err error
repo *btc.IPLDPublisherAndIndexer
repo *btc.IPLDPublisher
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
)
BeforeEach(func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = btc.NewIPLDPublisherAndIndexer(db)
repo = btc.NewIPLDPublisher(db)
})
AfterEach(func() {
btc.TearDownDB(db)
@ -52,8 +52,7 @@ var _ = Describe("PublishAndIndexer", func() {
Describe("Publish", func() {
It("Published and indexes header and transaction IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT * FROM btc.header_cids
WHERE block_number = $1`

View File

@ -42,32 +42,6 @@ func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error
}
}
// NewCIDIndexer constructs a CIDIndexer for the provided chain type
func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error) {
switch chain {
case shared.Ethereum:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return eth.NewCIDIndexer(db), nil
case shared.DirectPostgres:
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("ethereum CIDIndexer unexpected ipfs mode %s", ipfsMode.String())
}
case shared.Bitcoin:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return btc.NewCIDIndexer(db), nil
case shared.DirectPostgres:
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("bitcoin CIDIndexer unexpected ipfs mode %s", ipfsMode.String())
}
default:
return nil, fmt.Errorf("invalid chain %s for indexer constructor", chain.String())
}
}
// NewCIDRetriever constructs a CIDRetriever for the provided chain type
func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error) {
switch chain {
@ -139,52 +113,24 @@ func NewPayloadConverter(chainType shared.ChainType, chainID uint64) (shared.Pay
}
// NewIPLDFetcher constructs an IPLDFetcher for the provided chain type
func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDFetcher, error) {
func NewIPLDFetcher(chain shared.ChainType, db *postgres.DB) (shared.IPLDFetcher, error) {
switch chain {
case shared.Ethereum:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return eth.NewIPLDFetcher(ipfsPath)
case shared.DirectPostgres:
return eth.NewIPLDPGFetcher(db), nil
default:
return nil, fmt.Errorf("ethereum IPLDFetcher unexpected ipfs mode %s", ipfsMode.String())
}
return eth.NewIPLDFetcher(db), nil
case shared.Bitcoin:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return btc.NewIPLDFetcher(ipfsPath)
case shared.DirectPostgres:
return btc.NewIPLDPGFetcher(db), nil
default:
return nil, fmt.Errorf("bitcoin IPLDFetcher unexpected ipfs mode %s", ipfsMode.String())
}
return btc.NewIPLDFetcher(db), nil
default:
return nil, fmt.Errorf("invalid chain %s for IPLD fetcher constructor", chain.String())
}
}
// NewIPLDPublisher constructs an IPLDPublisher for the provided chain type
func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error) {
func NewIPLDPublisher(chain shared.ChainType, db *postgres.DB) (shared.IPLDPublisher, error) {
switch chain {
case shared.Ethereum:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return eth.NewIPLDPublisher(ipfsPath)
case shared.DirectPostgres:
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("ethereum IPLDPublisher unexpected ipfs mode %s", ipfsMode.String())
}
return eth.NewIPLDPublisher(db), nil
case shared.Bitcoin:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return btc.NewIPLDPublisher(ipfsPath)
case shared.DirectPostgres:
return btc.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("bitcoin IPLDPublisher unexpected ipfs mode %s", ipfsMode.String())
}
return btc.NewIPLDPublisher(db), nil
default:
return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String())
}

View File

@ -84,8 +84,8 @@ var _ = Describe("API", func() {
var (
db *postgres.DB
retriever *eth.CIDRetriever
fetcher *eth.IPLDPGFetcher
indexAndPublisher *eth.IPLDPublisherAndIndexer
fetcher *eth.IPLDFetcher
indexAndPublisher *eth.IPLDPublisher
backend *eth.Backend
api *eth.PublicEthAPI
)
@ -94,15 +94,15 @@ var _ = Describe("API", func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
retriever = eth.NewCIDRetriever(db)
fetcher = eth.NewIPLDPGFetcher(db)
indexAndPublisher = eth.NewIPLDPublisherAndIndexer(db)
fetcher = eth.NewIPLDFetcher(db)
indexAndPublisher = eth.NewIPLDPublisher(db)
backend = &eth.Backend{
Retriever: retriever,
Fetcher: fetcher,
DB: db,
}
api = eth.NewPublicEthAPI(backend)
_, err = indexAndPublisher.Publish(mocks.MockConvertedPayload)
err = indexAndPublisher.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
uncles := mocks.MockBlock.Uncles()
uncleHashes := make([]common.Hash, len(uncles))

View File

@ -39,7 +39,7 @@ var (
type Backend struct {
Retriever *CIDRetriever
Fetcher *IPLDPGFetcher
Fetcher *IPLDFetcher
DB *postgres.DB
}

View File

@ -211,14 +211,14 @@ var (
var _ = Describe("Retriever", func() {
var (
db *postgres.DB
repo *eth2.IPLDPublisherAndIndexer
repo *eth2.IPLDPublisher
retriever *eth2.CIDRetriever
)
BeforeEach(func() {
var err error
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth2.NewIPLDPublisherAndIndexer(db)
repo = eth2.NewIPLDPublisher(db)
retriever = eth2.NewCIDRetriever(db)
})
AfterEach(func() {
@ -227,7 +227,7 @@ var _ = Describe("Retriever", func() {
Describe("Retrieve", func() {
BeforeEach(func() {
_, err := repo.Publish(mocks.MockConvertedPayload)
err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
})
It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() {
@ -413,7 +413,7 @@ var _ = Describe("Retriever", func() {
Expect(err).To(HaveOccurred())
})
It("Gets the number of the first block that has data in the database", func() {
_, err := repo.Publish(mocks.MockConvertedPayload)
err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -423,7 +423,7 @@ var _ = Describe("Retriever", func() {
It("Gets the number of the first block that has data in the database", func() {
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(1010101)
_, err := repo.Publish(payload)
err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -435,9 +435,9 @@ var _ = Describe("Retriever", func() {
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.Block = newMockBlock(5)
_, err := repo.Publish(payload1)
err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -451,7 +451,7 @@ var _ = Describe("Retriever", func() {
Expect(err).To(HaveOccurred())
})
It("Gets the number of the latest block that has data in the database", func() {
_, err := repo.Publish(mocks.MockConvertedPayload)
err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -461,7 +461,7 @@ var _ = Describe("Retriever", func() {
It("Gets the number of the latest block that has data in the database", func() {
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(1010101)
_, err := repo.Publish(payload)
err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -473,9 +473,9 @@ var _ = Describe("Retriever", func() {
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.Block = newMockBlock(5)
_, err := repo.Publish(payload1)
err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -492,13 +492,13 @@ var _ = Describe("Retriever", func() {
payload2.Block = newMockBlock(2)
payload3 := payload2
payload3.Block = newMockBlock(3)
_, err := repo.Publish(payload0)
err := repo.Publish(payload0)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload1)
err = repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload3)
err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -508,7 +508,7 @@ var _ = Describe("Retriever", func() {
It("Returns the gap from 0 to the earliest block", func() {
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(5)
_, err := repo.Publish(payload)
err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -523,11 +523,11 @@ var _ = Describe("Retriever", func() {
payload1 := mocks.MockConvertedPayload
payload3 := payload1
payload3.Block = newMockBlock(3)
_, err := repo.Publish(payload0)
err := repo.Publish(payload0)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload1)
err = repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload3)
err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -541,9 +541,9 @@ var _ = Describe("Retriever", func() {
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.Block = newMockBlock(0)
_, err := repo.Publish(payload1)
err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -576,27 +576,27 @@ var _ = Describe("Retriever", func() {
payload11 := mocks.MockConvertedPayload
payload11.Block = newMockBlock(1000)
_, err := repo.Publish(payload1)
err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload3)
err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload4)
err = repo.Publish(payload4)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload5)
err = repo.Publish(payload5)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload6)
err = repo.Publish(payload6)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload7)
err = repo.Publish(payload7)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload8)
err = repo.Publish(payload8)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload9)
err = repo.Publish(payload9)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload10)
err = repo.Publish(payload10)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload11)
err = repo.Publish(payload11)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
@ -640,33 +640,33 @@ var _ = Describe("Retriever", func() {
payload14 := mocks.MockConvertedPayload
payload14.Block = newMockBlock(1000)
_, err := repo.Publish(payload1)
err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload2)
err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload3)
err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload4)
err = repo.Publish(payload4)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload5)
err = repo.Publish(payload5)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload6)
err = repo.Publish(payload6)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload7)
err = repo.Publish(payload7)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload8)
err = repo.Publish(payload8)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload9)
err = repo.Publish(payload9)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload10)
err = repo.Publish(payload10)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload11)
err = repo.Publish(payload11)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload12)
err = repo.Publish(payload12)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload13)
err = repo.Publish(payload13)
Expect(err).ToNot(HaveOccurred())
_, err = repo.Publish(payload14)
err = repo.Publish(payload14)
Expect(err).ToNot(HaveOccurred())
cleaner := eth.NewCleaner(db)

View File

@ -18,6 +18,7 @@ package eth
import (
"fmt"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff"
)
@ -66,4 +67,4 @@ func ChainConfig(chainID uint64) (*params.ChainConfig, error) {
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}
}

View File

@ -30,21 +30,21 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// IPLDFetcher satisfies the IPLDFetcher interface for ethereum
// It interfaces directly with PG-IPFS
type IPLDPGFetcher struct {
type IPLDFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
// NewIPLDFetcher creates a pointer to a new IPLDFetcher
func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher {
return &IPLDFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
@ -100,7 +100,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
@ -113,7 +113,7 @@ func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel
}
// FetchUncles fetches uncles
func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
@ -130,7 +130,7 @@ func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.Bloc
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
@ -147,7 +147,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode
}
// FetchRcts fetches receipts
func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) {
func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
@ -164,7 +164,7 @@ func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.Bloc
}
// FetchState fetches state nodes
func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) {
func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds")
stateNodes := make([]StateNode, 0, len(cids))
for _, stateNode := range cids {
@ -189,7 +189,7 @@ func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateN
}
// FetchStorage fetches storage nodes
func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, 0, len(cids))
for _, storageNode := range cids {

View File

@ -28,20 +28,20 @@ import (
var (
db *postgres.DB
pubAndIndexer *eth.IPLDPublisherAndIndexer
fetcher *eth.IPLDPGFetcher
pubAndIndexer *eth.IPLDPublisher
fetcher *eth.IPLDFetcher
)
var _ = Describe("IPLDPGFetcher", func() {
var _ = Describe("IPLDFetcher", func() {
Describe("Fetch", func() {
BeforeEach(func() {
var err error
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db)
_, err = pubAndIndexer.Publish(mocks.MockConvertedPayload)
pubAndIndexer = eth.NewIPLDPublisher(db)
err = pubAndIndexer.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
fetcher = eth.NewIPLDPGFetcher(db)
fetcher = eth.NewIPLDFetcher(db)
})
AfterEach(func() {
eth.TearDownDB(db)

View File

@ -1,215 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum
// It interfaces directly with PG-IPFS
type IPLDPGFetcher struct {
db *postgres.DB
}
// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher
func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher {
return &IPLDPGFetcher{
db: db,
}
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
iplds := IPLDs{}
iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10)
if !ok {
return nil, errors.New("eth fetcher: unable to set total difficulty")
}
iplds.BlockNumber = cidWrapper.BlockNumber
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error())
}
iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error())
}
iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error())
}
iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error())
}
iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes)
if err != nil {
return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error())
}
return iplds, err
}
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return ipfs.BlockModel{}, err
}
return ipfs.BlockModel{
Data: headerBytes,
CID: c.CID,
}, nil
}
// FetchUncles fetches uncles
func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
uncleIPLDs[i] = ipfs.BlockModel{
Data: uncleBytes,
CID: c.CID,
}
}
return uncleIPLDs, nil
}
// FetchTrxs fetches transactions
func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
trxIPLDs[i] = ipfs.BlockModel{
Data: txBytes,
CID: c.CID,
}
}
return trxIPLDs, nil
}
// FetchRcts fetches receipts
func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
rctBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
rctIPLDs[i] = ipfs.BlockModel{
Data: rctBytes,
CID: c.CID,
}
}
return rctIPLDs, nil
}
// FetchState fetches state nodes
func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds")
stateNodes := make([]StateNode, 0, len(cids))
for _, stateNode := range cids {
if stateNode.CID == "" {
continue
}
stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey)
if err != nil {
return nil, err
}
stateNodes = append(stateNodes, StateNode{
IPLD: ipfs.BlockModel{
Data: stateBytes,
CID: stateNode.CID,
},
StateLeafKey: common.HexToHash(stateNode.StateKey),
Type: ResolveToNodeType(stateNode.NodeType),
Path: stateNode.Path,
})
}
return stateNodes, nil
}
// FetchStorage fetches storage nodes
func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, 0, len(cids))
for _, storageNode := range cids {
if storageNode.CID == "" || storageNode.StateKey == "" {
continue
}
storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey)
if err != nil {
return nil, err
}
storageNodes = append(storageNodes, StorageNode{
IPLD: ipfs.BlockModel{
Data: storageBytes,
CID: storageNode.CID,
},
StateLeafKey: common.HexToHash(storageNode.StateKey),
StorageLeafKey: common.HexToHash(storageNode.StorageKey),
Type: ResolveToNodeType(storageNode.NodeType),
Path: storageNode.Path,
})
}
return storageNodes, nil
}

View File

@ -1,65 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
var (
db *postgres.DB
pubAndIndexer *eth.IPLDPublisherAndIndexer
fetcher *eth.IPLDPGFetcher
)
var _ = Describe("IPLDPGFetcher", func() {
Describe("Fetch", func() {
BeforeEach(func() {
var err error
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db)
_, err = pubAndIndexer.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
fetcher = eth.NewIPLDPGFetcher(db)
})
AfterEach(func() {
eth.TearDownDB(db)
})
It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() {
i, err := fetcher.Fetch(mocks.MockCIDWrapper)
Expect(err).ToNot(HaveOccurred())
iplds, ok := i.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty))
Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number()))
Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header))
Expect(len(iplds.Uncles)).To(Equal(0))
Expect(iplds.Transactions).To(Equal(mocks.MockIPLDs.Transactions))
Expect(iplds.Receipts).To(Equal(mocks.MockIPLDs.Receipts))
Expect(iplds.StateNodes).To(Equal(mocks.MockIPLDs.StateNodes))
Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes))
})
})
})

View File

@ -32,13 +32,13 @@ type IPLDPublisher struct {
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error {
ipldPayload, ok := payload.(eth.ConvertedPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
return fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
}
pub.PassedIPLDPayload = ipldPayload
return pub.ReturnCIDPayload, pub.ReturnErr
return pub.ReturnErr
}
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
@ -50,16 +50,12 @@ type IterativeIPLDPublisher struct {
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) error {
ipldPayload, ok := payload.(eth.ConvertedPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
return fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
}
pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload)
if len(pub.ReturnCIDPayload) < pub.iteration+1 {
return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration)
}
returnPayload := pub.ReturnCIDPayload[pub.iteration]
pub.iteration++
return returnPayload, pub.ReturnErr
return pub.ReturnErr
}

View File

@ -63,6 +63,7 @@ var (
AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593")
ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce())
ContractHash = crypto.Keccak256Hash(ContractAddress.Bytes()).String()
MockContractByteCode = []byte{0, 1, 2, 3, 4, 5}
mockTopic11 = common.HexToHash("0x04")
mockTopic12 = common.HexToHash("0x06")
mockTopic21 = common.HexToHash("0x05")
@ -99,28 +100,34 @@ var (
StorageMhKey = shared.MultihashKeyFromCID(StorageCID)
MockTrxMeta = []eth.TxModel{
{
CID: "", // This is empty until we go to publish to ipfs
MhKey: "",
Src: SenderAddr.Hex(),
Dst: Address.String(),
Index: 0,
TxHash: MockTransactions[0].Hash().String(),
CID: "", // This is empty until we go to publish to ipfs
MhKey: "",
Src: SenderAddr.Hex(),
Dst: Address.String(),
Index: 0,
TxHash: MockTransactions[0].Hash().String(),
Data: []byte{},
Deployment: false,
},
{
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
TxHash: MockTransactions[1].Hash().String(),
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
TxHash: MockTransactions[1].Hash().String(),
Data: []byte{},
Deployment: false,
},
{
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: "",
Index: 2,
TxHash: MockTransactions[2].Hash().String(),
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: "",
Index: 2,
TxHash: MockTransactions[2].Hash().String(),
Data: MockContractByteCode,
Deployment: true,
},
}
MockTrxMetaPostPublsh = []eth.TxModel{
@ -532,7 +539,7 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common
// make transactions
trx1 := types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), []byte{})
trx2 := types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), []byte{})
trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), []byte{0, 1, 2, 3, 4, 5})
trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), MockContractByteCode)
transactionSigner := types.MakeSigner(params.MainnetChainConfig, new(big.Int).Set(BlockNumber))
mockCurve := elliptic.P256()
mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader)

View File

@ -51,16 +51,16 @@ type UncleModel struct {
// TxModel is the db model for eth.transaction_cids
type TxModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Dst string `db:"dst"`
Src string `db:"src"`
Data []byte `db:"data"`
Deployment bool `db:"deployment"`
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Dst string `db:"dst"`
Src string `db:"src"`
Data []byte `db:"data"`
Deployment bool `db:"deployment"`
}
// ReceiptModel is the db model for eth.receipt_cids

View File

@ -1,228 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for ethereum
// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary
// It publishes and indexes IPLDs together in a single sqlx.Tx
type IPLDPublisherAndIndexer struct {
indexer *CIDIndexer
}
// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface
func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
return &IPLDPublisherAndIndexer{
indexer: NewCIDIndexer(db),
}
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(ConvertedPayload)
if !ok {
return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload)
}
// Generate the iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
if err != nil {
return nil, err
}
// Begin new db tx
tx, err := pub.indexer.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
}
}
for _, node := range rctTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return nil, err
}
reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
header := HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: ipldPayload.Block.ParentHash().String(),
BlockNumber: ipldPayload.Block.Number().String(),
BlockHash: ipldPayload.Block.Hash().String(),
TotalDifficulty: ipldPayload.TotalDifficulty.String(),
Reward: reward.String(),
Bloom: ipldPayload.Block.Bloom().Bytes(),
StateRoot: ipldPayload.Block.Root().String(),
RctRoot: ipldPayload.Block.ReceiptHash().String(),
TxRoot: ipldPayload.Block.TxHash().String(),
UncleRoot: ipldPayload.Block.UncleHash().String(),
Timestamp: ipldPayload.Block.Time(),
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
return nil, err
}
// Publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return nil, err
}
uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64())
uncle := UncleModel{
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil {
return nil, err
}
}
// Publish and index txs and receipts
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
return nil, err
}
rctNode := rctNodes[i]
if err := shared.PublishIPLD(tx, rctNode); err != nil {
return nil, err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err
}
rctModel := ipldPayload.ReceiptMetaData[i]
rctModel.CID = rctNode.Cid().String()
rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid())
if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil {
return nil, err
}
}
// Publish and index state and storage
err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID)
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer
}
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
// Publish and index state and storage
for _, stateNode := range ipldPayload.StateNodes {
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
stateModel := StateNodeModel{
Path: stateNode.Path,
StateKey: stateNode.LeafKey.String(),
CID: stateCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(stateNode.Type),
}
stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID)
if err != nil {
return err
}
// If we have a leaf, decode and index the account data and any associated storage diffs
if stateNode.Type == statediff.Leaf {
var i []interface{}
if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil {
return err
}
if len(i) != 2 {
return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements")
}
var account state.Account
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
return err
}
accountModel := StateAccountModel{
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil {
return err
}
for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
storageModel := StorageNodeModel{
Path: storageNode.Path,
StorageKey: storageNode.LeafKey.Hex(),
CID: storageCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(storageNode.Type),
}
if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {
return err
}
}
}
}
return nil
}
// Index satisfies the shared.CIDIndexer interface
func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error {
return nil
}

View File

@ -1,238 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
var _ = Describe("PublishAndIndexer", func() {
var (
db *postgres.DB
err error
repo *eth.IPLDPublisherAndIndexer
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
)
BeforeEach(func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth.NewIPLDPublisherAndIndexer(db)
})
AfterEach(func() {
eth.TearDownDB(db)
})
Describe("Publish", func() {
It("Published and indexes header IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT cid, td, reward, id
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
ID int
}
header := new(res)
err = db.QueryRowx(pgStr, 1).StructScan(header)
Expect(err).ToNot(HaveOccurred())
Expect(header.CID).To(Equal(mocks.HeaderCID.String()))
Expect(header.TD).To(Equal(mocks.MockBlock.Difficulty().String()))
Expect(header.Reward).To(Equal("5000000000000000000"))
dc, err := cid.Decode(header.CID)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(mocks.MockHeaderRlp))
})
It("Publishes and indexes transaction IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
// check that txs were properly indexed
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
err = db.Select(&trxs, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(trxs)).To(Equal(3))
Expect(shared.ListContainsString(trxs, mocks.Trx1CID.String())).To(BeTrue())
Expect(shared.ListContainsString(trxs, mocks.Trx2CID.String())).To(BeTrue())
Expect(shared.ListContainsString(trxs, mocks.Trx3CID.String())).To(BeTrue())
// and published
for _, c := range trxs {
dc, err := cid.Decode(c)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
switch c {
case mocks.Trx1CID.String():
Expect(data).To(Equal(mocks.MockTransactions.GetRlp(0)))
case mocks.Trx2CID.String():
Expect(data).To(Equal(mocks.MockTransactions.GetRlp(1)))
case mocks.Trx3CID.String():
Expect(data).To(Equal(mocks.MockTransactions.GetRlp(2)))
}
}
})
It("Publishes and indexes receipt IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
// check receipts were properly indexed
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.block_number = $1`
err = db.Select(&rcts, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(rcts)).To(Equal(3))
Expect(shared.ListContainsString(rcts, mocks.Rct1CID.String())).To(BeTrue())
Expect(shared.ListContainsString(rcts, mocks.Rct2CID.String())).To(BeTrue())
Expect(shared.ListContainsString(rcts, mocks.Rct3CID.String())).To(BeTrue())
// and published
for _, c := range rcts {
dc, err := cid.Decode(c)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
switch c {
case mocks.Rct1CID.String():
Expect(data).To(Equal(mocks.MockReceipts.GetRlp(0)))
case mocks.Rct2CID.String():
Expect(data).To(Equal(mocks.MockReceipts.GetRlp(1)))
case mocks.Rct3CID.String():
Expect(data).To(Equal(mocks.MockReceipts.GetRlp(2)))
}
}
})
It("Publishes and indexes state IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
// check that state nodes were properly indexed and published
stateNodes := make([]eth.StateNodeModel, 0)
pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
err = db.Select(&stateNodes, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(stateNodes)).To(Equal(2))
for _, stateNode := range stateNodes {
var data []byte
dc, err := cid.Decode(stateNode.CID)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1`
var account eth.StateAccountModel
err = db.Get(&account, pgStr, stateNode.ID)
Expect(err).ToNot(HaveOccurred())
if stateNode.CID == mocks.State1CID.String() {
Expect(stateNode.NodeType).To(Equal(2))
Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex()))
Expect(stateNode.Path).To(Equal([]byte{'\x06'}))
Expect(data).To(Equal(mocks.ContractLeafNode))
Expect(account).To(Equal(eth.StateAccountModel{
ID: account.ID,
StateID: stateNode.ID,
Balance: "0",
CodeHash: mocks.ContractCodeHash.Bytes(),
StorageRoot: mocks.ContractRoot,
Nonce: 1,
}))
}
if stateNode.CID == mocks.State2CID.String() {
Expect(stateNode.NodeType).To(Equal(2))
Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.AccountLeafKey).Hex()))
Expect(stateNode.Path).To(Equal([]byte{'\x0c'}))
Expect(data).To(Equal(mocks.AccountLeafNode))
Expect(account).To(Equal(eth.StateAccountModel{
ID: account.ID,
StateID: stateNode.ID,
Balance: "1000",
CodeHash: mocks.AccountCodeHash.Bytes(),
StorageRoot: mocks.AccountRoot,
Nonce: 0,
}))
}
}
pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1`
})
It("Publishes and indexes storage IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
Expect(err).ToNot(HaveOccurred())
// check that storage nodes were properly indexed
storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1`
err = db.Select(&storageNodes, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(storageNodes)).To(Equal(1))
Expect(storageNodes[0]).To(Equal(eth.StorageNodeWithStateKeyModel{
CID: mocks.StorageCID.String(),
NodeType: 2,
StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(),
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
Path: []byte{},
}))
var data []byte
dc, err := cid.Decode(storageNodes[0].CID)
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(mocks.StorageLeafNode))
})
})
})

View File

@ -31,36 +31,36 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for ethereum
// IPLDPublisher satisfies the IPLDPublisher interface for ethereum
// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary
// It publishes and indexes IPLDs together in a single sqlx.Tx
type IPLDPublisherAndIndexer struct {
type IPLDPublisher struct {
indexer *CIDIndexer
}
// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface
func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer {
return &IPLDPublisherAndIndexer{
// NewIPLDPublisher creates a pointer to a new IPLDPublisher which satisfies the IPLDPublisher interface
func NewIPLDPublisher(db *postgres.DB) *IPLDPublisher {
return &IPLDPublisher{
indexer: NewCIDIndexer(db),
}
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error {
ipldPayload, ok := payload.(ConvertedPayload)
if !ok {
return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload)
return fmt.Errorf("eth IPLDPublisher expected payload type %T got %T", ConvertedPayload{}, payload)
}
// Generate the iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
if err != nil {
return nil, err
return err
}
// Begin new db tx
tx, err := pub.indexer.db.Beginx()
if err != nil {
return nil, err
return err
}
defer func() {
if p := recover(); p != nil {
@ -76,18 +76,18 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
return err
}
}
for _, node := range rctTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
return nil, err
return err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return nil, err
return err
}
reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
header := HeaderModel{
@ -107,13 +107,13 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
return nil, err
return err
}
// Publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return nil, err
return err
}
uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64())
uncle := UncleModel{
@ -124,47 +124,47 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
Reward: uncleReward.String(),
}
if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil {
return nil, err
return err
}
}
// Publish and index txs and receipts
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
return nil, err
return err
}
rctNode := rctNodes[i]
if err := shared.PublishIPLD(tx, rctNode); err != nil {
return nil, err
return err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err
return err
}
// If tx is a contract deployment, publish the data (code)
if txModel.Deployment {
if _, err = shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, txModel.Data); err != nil {
return nil, err
return err
}
}
rctModel := ipldPayload.ReceiptMetaData[i]
rctModel.CID = rctNode.Cid().String()
rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid())
if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil {
return nil, err
return err
}
}
// Publish and index state and storage
err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID)
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer
return err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer
}
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
func (pub *IPLDPublisher) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
// Publish and index state and storage
for _, stateNode := range ipldPayload.StateNodes {
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value)
@ -190,7 +190,7 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
return err
}
if len(i) != 2 {
return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements")
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
}
var account state.Account
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
@ -226,8 +226,3 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
}
return nil
}
// Index satisfies the shared.CIDIndexer interface
func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error {
return nil
}

View File

@ -34,14 +34,14 @@ var _ = Describe("PublishAndIndexer", func() {
var (
db *postgres.DB
err error
repo *eth.IPLDPublisherAndIndexer
repo *eth.IPLDPublisher
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
)
BeforeEach(func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth.NewIPLDPublisherAndIndexer(db)
repo = eth.NewIPLDPublisher(db)
})
AfterEach(func() {
eth.TearDownDB(db)
@ -49,8 +49,7 @@ var _ = Describe("PublishAndIndexer", func() {
Describe("Publish", func() {
It("Published and indexes header IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT cid, td, reward, id
FROM eth.header_cids
@ -79,8 +78,7 @@ var _ = Describe("PublishAndIndexer", func() {
})
It("Publishes and indexes transaction IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
// check that txs were properly indexed
trxs := make([]string, 0)
@ -113,8 +111,7 @@ var _ = Describe("PublishAndIndexer", func() {
})
It("Publishes and indexes receipt IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
// check receipts were properly indexed
rcts := make([]string, 0)
@ -149,8 +146,7 @@ var _ = Describe("PublishAndIndexer", func() {
})
It("Publishes and indexes state IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
// check that state nodes were properly indexed and published
stateNodes := make([]eth.StateNodeModel, 0)
@ -205,8 +201,7 @@ var _ = Describe("PublishAndIndexer", func() {
})
It("Publishes and indexes storage IPLDs in a single tx", func() {
emptyReturn, err := repo.Publish(mocks.MockConvertedPayload)
Expect(emptyReturn).To(BeNil())
err = repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
// check that storage nodes were properly indexed
storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0)

View File

@ -45,8 +45,6 @@ const (
// Config struct
type Config struct {
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
DBConfig config.Database
DB *postgres.DB
@ -71,19 +69,7 @@ func NewConfig() (*Config, error) {
return nil, err
}
c.IPFSMode, err = shared.GetIPFSMode()
if err != nil {
return nil, err
}
if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient {
c.IPFSPath, err = shared.GetIPFSPath()
if err != nil {
return nil, err
}
}
c.DBConfig.Init()
if err := c.init(); err != nil {
return nil, err
}

View File

@ -40,8 +40,6 @@ type BackFillService struct {
Converter shared.PayloadConverter
// Interface for publishing the IPLD payloads to IPFS
Publisher shared.IPLDPublisher
// Interface for indexing the CIDs of the published IPLDs in Postgres
Indexer shared.CIDIndexer
// Interface for searching and retrieving CIDs from Postgres index
Retriever shared.CIDRetriever
// Interface for fetching payloads over at historical blocks; over http
@ -64,11 +62,7 @@ type BackFillService struct {
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.DB)
if err != nil {
return nil, err
}
@ -93,7 +87,6 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
batchNumber = shared.DefaultMaxBatchNumber
}
return &BackFillService{
Indexer: indexer,
Converter: converter,
Publisher: publisher,
Retriever: retriever,
@ -183,14 +176,10 @@ func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan
default:
log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id)
}
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
if err != nil {
if err := bfs.Publisher.Publish(ipldPayload); err != nil {
log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error())
continue
}
if err := bfs.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error())
}
}
log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1])
case <-bfs.QuitChan:

View File

@ -33,9 +33,6 @@ import (
var _ = Describe("BackFiller", func() {
Describe("FillGaps", func() {
It("Periodically checks for and fills in gaps in the watcher's data", func() {
mockCidRepo := &mocks.CIDIndexer{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
ReturnErr: nil,
@ -60,7 +57,6 @@ var _ = Describe("BackFiller", func() {
}
quitChan := make(chan bool, 1)
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
@ -74,9 +70,6 @@ var _ = Describe("BackFiller", func() {
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload))
@ -89,9 +82,6 @@ var _ = Describe("BackFiller", func() {
})
It("Works for single block `ranges`", func() {
mockCidRepo := &mocks.CIDIndexer{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload},
ReturnErr: nil,
@ -115,7 +105,6 @@ var _ = Describe("BackFiller", func() {
}
quitChan := make(chan bool, 1)
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
@ -129,8 +118,6 @@ var _ = Describe("BackFiller", func() {
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1))
@ -141,9 +128,6 @@ var _ = Describe("BackFiller", func() {
})
It("Finds beginning gap", func() {
mockCidRepo := &mocks.CIDIndexer{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
ReturnErr: nil,
@ -169,7 +153,6 @@ var _ = Describe("BackFiller", func() {
}
quitChan := make(chan bool, 1)
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
@ -183,9 +166,6 @@ var _ = Describe("BackFiller", func() {
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload))

View File

@ -1,90 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
"context"
"github.com/sirupsen/logrus"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
ipld "github.com/ipfs/go-ipld-format"
)
// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node
// This should only be called once
func InitIPFSPlugins() error {
logrus.Debug("initializing IPFS plugins")
l, err := loader.NewPluginLoader("")
if err != nil {
return err
}
err = l.Initialize()
if err != nil {
return err
}
return l.Inject()
}
// InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs)
func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) {
logrus.Debug("initializing IPFS block service interface")
r, openErr := fsrepo.Open(ipfsPath)
if openErr != nil {
return nil, openErr
}
ctx := context.Background()
cfg := &core.BuildCfg{
Online: false,
Repo: r,
}
ipfsNode, newNodeErr := core.NewNode(ctx, cfg)
if newNodeErr != nil {
return nil, newNodeErr
}
return ipfsNode.Blocks, nil
}
type IPFS struct {
n *core.IpfsNode
ctx context.Context
}
func (ipfs IPFS) Add(node ipld.Node) error {
return ipfs.n.DAG.Add(ipfs.n.Context(), node)
}
func InitIPFSNode(repoPath string) (*IPFS, error) {
logrus.Debug("initializing IPFS node interface")
r, err := fsrepo.Open(repoPath)
if err != nil {
return nil, err
}
ctx := context.Background()
cfg := &core.BuildCfg{
Online: false,
Repo: r,
}
ipfsNode, err := core.NewNode(ctx, cfg)
if err != nil {
return nil, err
}
return &IPFS{n: ipfsNode, ctx: ctx}, nil
}

View File

@ -1,50 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
var (
duplicateKeyErrorString = "pq: duplicate key value violates unique constraint"
)
type BtcHeaderDagPutter struct {
adder *ipfs.IPFS
}
func NewBtcHeaderDagPutter(adder *ipfs.IPFS) *BtcHeaderDagPutter {
return &BtcHeaderDagPutter{adder: adder}
}
func (bhdp *BtcHeaderDagPutter) DagPut(n node.Node) (string, error) {
header, ok := n.(*ipld.BtcHeader)
if !ok {
return "", fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, n)
}
if err := bhdp.adder.Add(header); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return header.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type BtcTxDagPutter struct {
adder *ipfs.IPFS
}
func NewBtcTxDagPutter(adder *ipfs.IPFS) *BtcTxDagPutter {
return &BtcTxDagPutter{adder: adder}
}
func (etdp *BtcTxDagPutter) DagPut(n node.Node) (string, error) {
transaction, ok := n.(*ipld.BtcTx)
if !ok {
return "", fmt.Errorf("BtcTxDagPutter expected input type %T got %T", &ipld.BtcTx{}, n)
}
if err := etdp.adder.Add(transaction); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return transaction.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type BtcTxTrieDagPutter struct {
adder *ipfs.IPFS
}
func NewBtcTxTrieDagPutter(adder *ipfs.IPFS) *BtcTxTrieDagPutter {
return &BtcTxTrieDagPutter{adder: adder}
}
func (etdp *BtcTxTrieDagPutter) DagPut(n node.Node) (string, error) {
txTrieNode, ok := n.(*ipld.BtcTxTrie)
if !ok {
return "", fmt.Errorf("BtcTxTrieDagPutter expected input type %T got %T", &ipld.BtcTxTrie{}, n)
}
if err := etdp.adder.Add(txTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return txTrieNode.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthHeaderDagPutter struct {
adder *ipfs.IPFS
}
func NewEthBlockHeaderDagPutter(adder *ipfs.IPFS) *EthHeaderDagPutter {
return &EthHeaderDagPutter{adder: adder}
}
func (bhdp *EthHeaderDagPutter) DagPut(n node.Node) (string, error) {
header, ok := n.(*ipld.EthHeader)
if !ok {
return "", fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, n)
}
if err := bhdp.adder.Add(header); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return header.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthReceiptDagPutter struct {
adder *ipfs.IPFS
}
func NewEthReceiptDagPutter(adder *ipfs.IPFS) *EthReceiptDagPutter {
return &EthReceiptDagPutter{adder: adder}
}
func (erdp *EthReceiptDagPutter) DagPut(n node.Node) (string, error) {
receipt, ok := n.(*ipld.EthReceipt)
if !ok {
return "", fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", &ipld.EthReceipt{}, n)
}
if err := erdp.adder.Add(receipt); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return receipt.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthRctTrieDagPutter struct {
adder *ipfs.IPFS
}
func NewEthRctTrieDagPutter(adder *ipfs.IPFS) *EthRctTrieDagPutter {
return &EthRctTrieDagPutter{adder: adder}
}
func (etdp *EthRctTrieDagPutter) DagPut(n node.Node) (string, error) {
rctTrieNode, ok := n.(*ipld.EthRctTrie)
if !ok {
return "", fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", &ipld.EthRctTrie{}, n)
}
if err := etdp.adder.Add(rctTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return rctTrieNode.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthStateDagPutter struct {
adder *ipfs.IPFS
}
func NewEthStateDagPutter(adder *ipfs.IPFS) *EthStateDagPutter {
return &EthStateDagPutter{adder: adder}
}
func (erdp *EthStateDagPutter) DagPut(n node.Node) (string, error) {
stateNode, ok := n.(*ipld.EthStateTrie)
if !ok {
return "", fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, n)
}
if err := erdp.adder.Add(stateNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return stateNode.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthStorageDagPutter struct {
adder *ipfs.IPFS
}
func NewEthStorageDagPutter(adder *ipfs.IPFS) *EthStorageDagPutter {
return &EthStorageDagPutter{adder: adder}
}
func (erdp *EthStorageDagPutter) DagPut(n node.Node) (string, error) {
storageNode, ok := n.(*ipld.EthStorageTrie)
if !ok {
return "", fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, n)
}
if err := erdp.adder.Add(storageNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return storageNode.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthTxsDagPutter struct {
adder *ipfs.IPFS
}
func NewEthTxsDagPutter(adder *ipfs.IPFS) *EthTxsDagPutter {
return &EthTxsDagPutter{adder: adder}
}
func (etdp *EthTxsDagPutter) DagPut(n node.Node) (string, error) {
transaction, ok := n.(*ipld.EthTx)
if !ok {
return "", fmt.Errorf("EthTxsDagPutter expected input type %T got %T", &ipld.EthTx{}, n)
}
if err := etdp.adder.Add(transaction); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return transaction.Cid().String(), nil
}

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
"strings"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
type EthTxTrieDagPutter struct {
adder *ipfs.IPFS
}
func NewEthTxTrieDagPutter(adder *ipfs.IPFS) *EthTxTrieDagPutter {
return &EthTxTrieDagPutter{adder: adder}
}
func (etdp *EthTxTrieDagPutter) DagPut(n node.Node) (string, error) {
txTrieNode, ok := n.(*ipld.EthTxTrie)
if !ok {
return "", fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", &ipld.EthTxTrie{}, n)
}
if err := etdp.adder.Add(txTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) {
return "", err
}
return txTrieNode.Cid().String(), nil
}

View File

@ -1,26 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
ipld "github.com/ipfs/go-ipld-format"
)
// DagPutter is a general interface for a dag putter
type DagPutter interface {
DagPut(n ipld.Node) (string, error)
}

View File

@ -1,86 +0,0 @@
package mocks
import (
"context"
"errors"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
)
// MockIPFSBlockService is a mock for testing the ipfs fetcher
type MockIPFSBlockService struct {
Blocks map[cid.Cid]blocks.Block
}
// GetBlock is used to retrieve a block from the mock BlockService
func (bs *MockIPFSBlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if bs.Blocks == nil {
return nil, errors.New("BlockService has not been initialized")
}
blk, ok := bs.Blocks[c]
if ok {
return blk, nil
}
return nil, nil
}
// GetBlocks is used to retrieve a set of blocks from the mock BlockService
func (bs *MockIPFSBlockService) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block {
if bs.Blocks == nil {
panic("BlockService has not been initialized")
}
blkChan := make(chan blocks.Block)
go func() {
for _, c := range cs {
blk, ok := bs.Blocks[c]
if ok {
blkChan <- blk
}
}
close(blkChan)
}()
return blkChan
}
// AddBlock adds a block to the mock BlockService
func (bs *MockIPFSBlockService) AddBlock(blk blocks.Block) error {
if bs.Blocks == nil {
bs.Blocks = make(map[cid.Cid]blocks.Block)
}
bs.Blocks[blk.Cid()] = blk
return nil
}
// AddBlocks adds a set of blocks to the mock BlockService
func (bs *MockIPFSBlockService) AddBlocks(blks []blocks.Block) error {
if bs.Blocks == nil {
bs.Blocks = make(map[cid.Cid]blocks.Block)
}
for _, block := range blks {
bs.Blocks[block.Cid()] = block
}
return nil
}
// Close is here to satisfy the interface
func (*MockIPFSBlockService) Close() error {
panic("implement me")
}
// Blockstore is here to satisfy the interface
func (*MockIPFSBlockService) Blockstore() blockstore.Blockstore {
panic("implement me")
}
// DeleteBlock is here to satisfy the interface
func (*MockIPFSBlockService) DeleteBlock(c cid.Cid) error {
panic("implement me")
}
// Exchange is here to satisfy the interface
func (*MockIPFSBlockService) Exchange() exchange.Interface {
panic("implement me")
}

View File

@ -1,53 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"errors"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/common"
)
// DagPutter is a mock for testing the ipfs publisher
type DagPutter struct {
PassedNode node.Node
ErrToReturn error
}
// DagPut returns the pre-loaded CIDs or error
func (dp *DagPutter) DagPut(n node.Node) (string, error) {
dp.PassedNode = n
return n.Cid().String(), dp.ErrToReturn
}
// MappedDagPutter is a mock for testing the ipfs publisher
type MappedDagPutter struct {
CIDsToReturn map[common.Hash]string
PassedNode node.Node
ErrToReturn error
}
// DagPut returns the pre-loaded CIDs or error
func (mdp *MappedDagPutter) DagPut(n node.Node) (string, error) {
if mdp.CIDsToReturn == nil {
return "", errors.New("mapped dag putter needs to be initialized with a map of cids to return")
}
hash := common.BytesToHash(n.RawData())
return mdp.CIDsToReturn[hash], nil
}

View File

@ -51,8 +51,6 @@ type Config struct {
// DB info
DB *postgres.DB
DBConfig config.Database
IPFSPath string
IPFSMode shared.IPFSMode
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
NodeInfo node.Node // Info for the associated node
@ -91,16 +89,6 @@ func NewConfig() (*Config, error) {
c.ClearOldCache = viper.GetBool("resync.clearOldCache")
c.ResetValidation = viper.GetBool("resync.resetValidation")
c.IPFSMode, err = shared.GetIPFSMode()
if err != nil {
return nil, err
}
if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient {
c.IPFSPath, err = shared.GetIPFSPath()
if err != nil {
return nil, err
}
}
resyncType := viper.GetString("resync.type")
c.ResyncType, err = shared.GenerateDataTypeFromString(resyncType)
if err != nil {

View File

@ -35,8 +35,6 @@ type Service struct {
Converter shared.PayloadConverter
// Interface for publishing the IPLD payloads to IPFS
Publisher shared.IPLDPublisher
// Interface for indexing the CIDs of the published IPLDs in Postgres
Indexer shared.CIDIndexer
// Interface for searching and retrieving CIDs from Postgres index
Retriever shared.CIDRetriever
// Interface for fetching payloads over at historical blocks; over http
@ -63,11 +61,7 @@ type Service struct {
// NewResyncService creates and returns a resync service from the provided settings
func NewResyncService(settings *Config) (Resync, error) {
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.DB)
if err != nil {
return nil, err
}
@ -96,7 +90,6 @@ func NewResyncService(settings *Config) (Resync, error) {
batchNumber = shared.DefaultMaxBatchNumber
}
return &Service{
Indexer: indexer,
Converter: converter,
Publisher: publisher,
Retriever: retriever,
@ -168,13 +161,9 @@ func (rs *Service) resync(id int, heightChan chan []uint64) {
if err != nil {
logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error())
}
cidPayload, err := rs.Publisher.Publish(ipldPayload)
if err != nil {
if err := rs.Publisher.Publish(ipldPayload); err != nil {
logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error())
}
if err := rs.Indexer.Index(cidPayload); err != nil {
logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error())
}
}
logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1])
case <-rs.quitChan:

View File

@ -17,9 +17,6 @@
package shared
import (
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/rpc"
"github.com/btcsuite/btcd/rpcclient"
@ -29,8 +26,6 @@ import (
// Env variables
const (
IPFS_PATH = "IPFS_PATH"
IPFS_MODE = "IPFS_MODE"
HTTP_TIMEOUT = "HTTP_TIMEOUT"
ETH_WS_PATH = "ETH_WS_PATH"
@ -73,30 +68,6 @@ func GetEthNodeAndClient(path string) (node.Node, *rpc.Client, error) {
}, rpcClient, nil
}
// GetIPFSPath returns the ipfs path from the config or env variable
func GetIPFSPath() (string, error) {
viper.BindEnv("ipfs.path", IPFS_PATH)
ipfsPath := viper.GetString("ipfs.path")
if ipfsPath == "" {
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
ipfsPath = filepath.Join(home, ".ipfs")
}
return ipfsPath, nil
}
// GetIPFSMode returns the ipfs mode of operation from the config or env variable
func GetIPFSMode() (IPFSMode, error) {
viper.BindEnv("ipfs.mode", IPFS_MODE)
ipfsMode := viper.GetString("ipfs.mode")
if ipfsMode == "" {
return DirectPostgres, nil
}
return NewIPFSMode(ipfsMode)
}
// GetBtcNodeAndClient returns btc node info from path url
func GetBtcNodeAndClient(path string) (node.Node, *rpcclient.ConnConfig) {
viper.BindEnv("bitcoin.nodeID", BTC_NODE_ID)

View File

@ -37,12 +37,7 @@ type PayloadConverter interface {
// IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing
type IPLDPublisher interface {
Publish(payload ConvertedData) (CIDsForIndexing, error)
}
// CIDIndexer indexes a CID payload in Postgres
type CIDIndexer interface {
Index(cids CIDsForIndexing) error
Publish(payload ConvertedData) error
}
// ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet

View File

@ -1,58 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import (
"errors"
"strings"
)
// IPFSMode enum for specifying how we want to interface and publish objects to IPFS
type IPFSMode int
const (
Unknown IPFSMode = iota
LocalInterface
RemoteClient
DirectPostgres
)
func (c IPFSMode) String() string {
switch c {
case LocalInterface:
return "Local"
case RemoteClient:
return "Remote"
case DirectPostgres:
return "Postgres"
default:
return ""
}
}
func NewIPFSMode(name string) (IPFSMode, error) {
switch strings.ToLower(name) {
case "local", "interface":
return LocalInterface, nil
case "remote", "client":
return RemoteClient, errors.New("remote IPFS client mode is not currently supported")
case "postgres", "direct":
return DirectPostgres, nil
default:
return Unknown, errors.New("unrecognized name for ipfs mode")
}
}

View File

@ -53,8 +53,6 @@ const (
// Config struct
type Config struct {
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
DBConfig config.Database
// Server fields
Serve bool
@ -96,19 +94,7 @@ func NewConfig() (*Config, error) {
return nil, err
}
c.IPFSMode, err = shared.GetIPFSMode()
if err != nil {
return nil, err
}
if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient {
c.IPFSPath, err = shared.GetIPFSPath()
if err != nil {
return nil, err
}
}
c.DBConfig.Init()
c.Sync = viper.GetBool("watcher.sync")
if c.Sync {
workers := viper.GetInt("watcher.workers")

View File

@ -66,10 +66,8 @@ type Service struct {
Streamer shared.PayloadStreamer
// Interface for converting raw payloads into IPLD object payloads
Converter shared.PayloadConverter
// Interface for publishing the IPLD payloads to IPFS
// Interface for publishing and indexing the PG-IPLD payloads
Publisher shared.IPLDPublisher
// Interface for indexing the CIDs of the published IPLDs in Postgres
Indexer shared.CIDIndexer
// Interface for filtering and serving data according to subscribed clients according to their specification
Filterer shared.ResponseFilterer
// Interface for fetching IPLD objects from IPFS
@ -86,12 +84,10 @@ type Service struct {
SubscriptionTypes map[common.Hash]shared.SubscriptionSettings
// Info for the Geth node that this watcher is working with
NodeInfo *node.Node
// Number of publishAndIndex workers
// Number of publish workers
WorkerPoolSize int
// chain type for this service
chain shared.ChainType
// Path to ipfs data dir
ipfsPath string
// Underlying db
db *postgres.DB
// wg for syncing serve processes
@ -112,11 +108,7 @@ func NewWatcher(settings *Config) (Watcher, error) {
if err != nil {
return nil, err
}
sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Indexer, err = builders.NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.SyncDBConn)
if err != nil {
return nil, err
}
@ -131,7 +123,7 @@ func NewWatcher(settings *Config) (Watcher, error) {
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
@ -142,7 +134,6 @@ func NewWatcher(settings *Config) (Watcher, error) {
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings)
sn.WorkerPoolSize = settings.Workers
sn.NodeInfo = &settings.NodeInfo
sn.ipfsPath = settings.IPFSPath
sn.chain = settings.Chain
return sn, nil
}
@ -190,7 +181,7 @@ func (sap *Service) APIs() []rpc.API {
}
// Sync streams incoming raw chain data and converts it for further processing
// It forwards the converted data to the publishAndIndex process(es) it spins up
// It forwards the converted data to the publish process(es) it spins up
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
// This continues on no matter if or how many subscribers there are
func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
@ -198,11 +189,11 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
if err != nil {
return err
}
// spin up publishAndIndex worker goroutines
publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
// spin up publish worker goroutines
publishPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
for i := 1; i <= sap.WorkerPoolSize; i++ {
go sap.publishAndIndex(wg, i, publishAndIndexPayload)
log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i)
go sap.publish(wg, i, publishPayload)
log.Debugf("%s publish worker %d successfully spun up", sap.chain.String(), i)
}
go func() {
wg.Add(1)
@ -221,13 +212,13 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
case screenAndServePayload <- ipldPayload:
default:
}
// Forward the payload to the publishAndIndex workers
// Forward the payload to the publish workers
// this channel acts as a ring buffer
select {
case publishAndIndexPayload <- ipldPayload:
case publishPayload <- ipldPayload:
default:
<-publishAndIndexPayload
publishAndIndexPayload <- ipldPayload
<-publishPayload
publishPayload <- ipldPayload
}
case err := <-sub.Err():
log.Errorf("watcher subscription error for chain %s: %v", sap.chain.String(), err)
@ -241,26 +232,21 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
return nil
}
// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process
// publish is spun up by SyncAndConvert and receives converted chain data from that process
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) {
func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan shared.ConvertedData) {
wg.Add(1)
defer wg.Done()
for {
select {
case payload := <-publishAndIndexPayload:
log.Debugf("%s watcher publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height())
cidPayload, err := sap.Publisher.Publish(payload)
if err != nil {
log.Errorf("%s watcher publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err)
case payload := <-publishPayload:
log.Debugf("%s watcher sync worker %d publishing and indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
if err := sap.Publisher.Publish(payload); err != nil {
log.Errorf("%s watcher publish worker %d publishing error: %v", sap.chain.String(), id, err)
continue
}
log.Debugf("%s watcher publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s watcher publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err)
}
case <-sap.QuitChan:
log.Infof("%s watcher publishAndIndex worker %d shutting down", sap.chain.String(), id)
log.Infof("%s watcher publish worker %d shutting down", sap.chain.String(), id)
return
}
}

View File

@ -36,9 +36,6 @@ var _ = Describe("Service", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan shared.RawChainData, 1)
quitChan := make(chan bool, 1)
mockCidIndexer := &mocks.CIDIndexer{
ReturnErr: nil,
}
mockPublisher := &mocks.IPLDPublisher{
ReturnCIDPayload: mocks.MockCIDPayload,
ReturnErr: nil,
@ -55,7 +52,6 @@ var _ = Describe("Service", func() {
ReturnErr: nil,
}
processor := &watch.Service{
Indexer: mockCidIndexer,
Publisher: mockPublisher,
Streamer: mockStreamer,
Converter: mockConverter,
@ -69,8 +65,6 @@ var _ = Describe("Service", func() {
close(quitChan)
wg.Wait()
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))
Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockConvertedPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
})