decouple from sync

This commit is contained in:
Ian Norden 2020-08-31 10:47:06 -05:00
parent 558599dd32
commit 5830df44a4
25 changed files with 419 additions and 716 deletions

View File

@ -24,10 +24,10 @@ import (
"github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/rpcclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/btc"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// NewResponseFilterer constructs a ResponseFilterer for the provided chain type // NewResponseFilterer constructs a ResponseFilterer for the provided chain type

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipfs-blockchain-watcher // Client is used by watchers to stream chain IPLD data from a vulcanizedb ipld-eth-server
package client package client
import ( import (
@ -22,10 +22,10 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" "github.com/vulcanize/ipld-eth-server/pkg/watch"
) )
// Client is used to subscribe to the ipfs-blockchain-watcher ipld data stream // Client is used to subscribe to the ipld-eth-server ipld data stream
type Client struct { type Client struct {
c *rpc.Client c *rpc.Client
} }
@ -37,7 +37,7 @@ func NewClient(c *rpc.Client) *Client {
} }
} }
// Stream is the main loop for subscribing to iplds from an ipfs-blockchain-watcher server // Stream is the main loop for subscribing to iplds from an ipld-eth-server server
func (c *Client) Stream(payloadChan chan watch.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) { func (c *Client) Stream(payloadChan chan watch.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) {
return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", rlpParams) return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", rlpParams)
} }

View File

@ -20,7 +20,8 @@ import (
"context" "context"
"math/big" "math/big"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -127,7 +128,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
} }
start := startingBlock.Int64() start := startingBlock.Int64()
end := endingBlock.Int64() end := endingBlock.Int64()
allRctCIDs := make([]ReceiptModel, 0) allRctCIDs := make([]eth.ReceiptModel, 0)
for i := start; i <= end; i++ { for i := start; i <= end; i++ {
rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil)
if err != nil { if err != nil {
@ -181,7 +182,7 @@ func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, f
} }
// GetTransactionByHash returns the transaction for the given hash // GetTransactionByHash returns the transaction for the given hash
// eth ipfs-blockchain-watcher cannot currently handle pending/tx_pool txs // eth ipld-eth-server cannot currently handle pending/tx_pool txs
func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction // Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash) tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash)

View File

@ -21,19 +21,20 @@ import (
"strconv" "strconv"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
@ -85,7 +86,7 @@ var _ = Describe("API", func() {
db *postgres.DB db *postgres.DB
retriever *eth.CIDRetriever retriever *eth.CIDRetriever
fetcher *eth.IPLDFetcher fetcher *eth.IPLDFetcher
indexAndPublisher *eth.IPLDPublisher indexAndPublisher *eth2.IPLDPublisher
backend *eth.Backend backend *eth.Backend
api *eth.PublicEthAPI api *eth.PublicEthAPI
) )
@ -95,7 +96,7 @@ var _ = Describe("API", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
retriever = eth.NewCIDRetriever(db) retriever = eth.NewCIDRetriever(db)
fetcher = eth.NewIPLDFetcher(db) fetcher = eth.NewIPLDFetcher(db)
indexAndPublisher = eth.NewIPLDPublisher(db) indexAndPublisher = eth2.NewIPLDPublisher(db)
backend = &eth.Backend{ backend = &eth.Backend{
Retriever: retriever, Retriever: retriever,
Fetcher: fetcher, Fetcher: fetcher,

View File

@ -32,7 +32,7 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
@ -120,7 +120,7 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
} }
// BlockByNumber returns the requested canonical block. // BlockByNumber returns the requested canonical block.
// Since the ipfs-blockchain-watcher database can contain forked blocks, it is recommended to fetch BlockByHash as // Since the ipld-eth-server database can contain forked blocks, it is recommended to fetch BlockByHash as
// fetching by number can return non-deterministic results (returns the first block found at that height) // fetching by number can return non-deterministic results (returns the first block found at that height)
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) { func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
var err error var err error

View File

@ -27,11 +27,20 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils" "github.com/vulcanize/ipld-eth-server/pkg/shared"
"github.com/vulcanize/ipld-eth-server/utils"
) )
// Retriever interface for substituting mocks in tests
type Retriever interface {
RetrieveFirstBlockNumber() (int64, error)
RetrieveLastBlockNumber() (int64, error)
Retrieve(filter SubscriptionSettings, blockNumber int64) ([]eth2.CIDWrapper, bool, error)
}
// CIDRetriever satisfies the CIDRetriever interface for ethereum // CIDRetriever satisfies the CIDRetriever interface for ethereum
type CIDRetriever struct { type CIDRetriever struct {
db *postgres.DB db *postgres.DB
@ -59,11 +68,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
} }
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64) ([]eth2.CIDWrapper, bool, error) {
streamFilter, ok := filter.(*SubscriptionSettings)
if !ok {
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
}
log.Debug("retrieving cids") log.Debug("retrieving cids")
// Begin new db tx // Begin new db tx
@ -88,15 +93,15 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, true, err return nil, true, err
} }
cws := make([]shared.CIDsForFetching, len(headers)) cws := make([]eth2.CIDWrapper, len(headers))
empty := true empty := true
for i, header := range headers { for i, header := range headers {
cw := new(CIDWrapper) cw := new(eth2.CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber) cw.BlockNumber = big.NewInt(blockNumber)
if !streamFilter.HeaderFilter.Off { if !filter.HeaderFilter.Off {
cw.Header = header cw.Header = header
empty = false empty = false
if streamFilter.HeaderFilter.Uncles { if filter.HeaderFilter.Uncles {
// Retrieve uncle cids for this header id // Retrieve uncle cids for this header id
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID) uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID)
if err != nil { if err != nil {
@ -107,8 +112,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
} }
} }
// Retrieve cached trx CIDs // Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off { if !filter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) cw.Transactions, err = ecr.RetrieveTxCIDs(tx, filter.TxFilter, header.ID)
if err != nil { if err != nil {
log.Error("transaction cid retrieval error") log.Error("transaction cid retrieval error")
return nil, true, err return nil, true, err
@ -122,8 +127,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
trxIds[j] = tx.ID trxIds[j] = tx.ID
} }
// Retrieve cached receipt CIDs // Retrieve cached receipt CIDs
if !streamFilter.ReceiptFilter.Off { if !filter.ReceiptFilter.Off {
cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, streamFilter.ReceiptFilter, header.ID, trxIds) cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, filter.ReceiptFilter, header.ID, trxIds)
if err != nil { if err != nil {
log.Error("receipt cid retrieval error") log.Error("receipt cid retrieval error")
return nil, true, err return nil, true, err
@ -133,8 +138,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
} }
} }
// Retrieve cached state CIDs // Retrieve cached state CIDs
if !streamFilter.StateFilter.Off { if !filter.StateFilter.Off {
cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, header.ID) cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, filter.StateFilter, header.ID)
if err != nil { if err != nil {
log.Error("state cid retrieval error") log.Error("state cid retrieval error")
return nil, true, err return nil, true, err
@ -144,8 +149,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
} }
} }
// Retrieve cached storage CIDs // Retrieve cached storage CIDs
if !streamFilter.StorageFilter.Off { if !filter.StorageFilter.Off {
cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, header.ID) cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, filter.StorageFilter, header.ID)
if err != nil { if err != nil {
log.Error("storage cid retrieval error") log.Error("storage cid retrieval error")
return nil, true, err return nil, true, err
@ -154,25 +159,25 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
empty = false empty = false
} }
} }
cws[i] = cw cws[i] = *cw
} }
return cws, empty, err return cws, empty, err
} }
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]eth2.HeaderModel, error) {
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]HeaderModel, 0) headers := make([]eth2.HeaderModel, 0)
pgStr := `SELECT * FROM eth.header_cids pgStr := `SELECT * FROM eth.header_cids
WHERE block_number = $1` WHERE block_number = $1`
return headers, tx.Select(&headers, pgStr, blockNumber) return headers, tx.Select(&headers, pgStr, blockNumber)
} }
// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header // RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header
func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]UncleModel, error) { func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth2.UncleModel, error) {
log.Debug("retrieving uncle cids for block id ", headerID) log.Debug("retrieving uncle cids for block id ", headerID)
headers := make([]UncleModel, 0) headers := make([]eth2.UncleModel, 0)
pgStr := `SELECT * FROM eth.uncle_cids pgStr := `SELECT * FROM eth.uncle_cids
WHERE header_id = $1` WHERE header_id = $1`
return headers, tx.Select(&headers, pgStr, headerID) return headers, tx.Select(&headers, pgStr, headerID)
@ -180,10 +185,10 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids // also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]eth2.TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID) log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
results := make([]TxModel, 0) results := make([]eth2.TxModel, 0)
id := 1 id := 1
pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key,
@ -208,7 +213,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided // RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
// filter parameters and correspond to the provided tx ids // filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]eth2.ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID) log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4) args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
@ -282,13 +287,13 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
} }
} }
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]ReceiptModel, 0) receiptCids := make([]eth2.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
} }
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
// filter parameters and correspond to the provided tx ids // filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]eth2.ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber) log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5) args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
@ -370,7 +375,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
} }
} }
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]ReceiptModel, 0) receiptCids := make([]eth2.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
} }
@ -384,7 +389,7 @@ func hasTopics(topics [][]string) bool {
} }
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]StateNodeModel, error) { func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]eth2.StateNodeModel, error) {
log.Debug("retrieving state cids for header id ", headerID) log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2) args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id, pgStr := `SELECT state_cids.id, state_cids.header_id,
@ -404,12 +409,12 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
if !stateFilter.IntermediateNodes { if !stateFilter.IntermediateNodes {
pgStr += ` AND state_cids.node_type = 2` pgStr += ` AND state_cids.node_type = 2`
} }
stateNodeCIDs := make([]StateNodeModel, 0) stateNodeCIDs := make([]eth2.StateNodeModel, 0)
return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...)
} }
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) { func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]eth2.StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID) log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type,
@ -437,23 +442,23 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
if !storageFilter.IntermediateNodes { if !storageFilter.IntermediateNodes {
pgStr += ` AND storage_cids.node_type = 2` pgStr += ` AND storage_cids.node_type = 2`
} }
storageNodeCIDs := make([]StorageNodeWithStateKeyModel, 0) storageNodeCIDs := make([]eth2.StorageNodeWithStateKeyModel, 0)
return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...)
} }
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
// it finds the union of heights where no data exists and where the times_validated is lower than the validation level // it finds the union of heights where no data exists and where the times_validated is lower than the validation level
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]eth2.DBGap, error) {
log.Info("searching for gaps in the eth ipfs watcher database") log.Info("searching for gaps in the eth ipfs watcher database")
startingBlock, err := ecr.RetrieveFirstBlockNumber() startingBlock, err := ecr.RetrieveFirstBlockNumber()
if err != nil { if err != nil {
return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err) return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err)
} }
var initialGap []shared.Gap var initialGap []eth2.DBGap
if startingBlock != 0 { if startingBlock != 0 {
stop := uint64(startingBlock - 1) stop := uint64(startingBlock - 1)
log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop) log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop)
initialGap = []shared.Gap{{ initialGap = []eth2.DBGap{{
Start: 0, Start: 0,
Stop: stop, Stop: stop,
}} }}
@ -471,9 +476,9 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
emptyGaps := make([]shared.Gap, len(results)) emptyGaps := make([]eth2.DBGap, len(results))
for i, res := range results { for i, res := range results {
emptyGaps[i] = shared.Gap{ emptyGaps[i] = eth2.DBGap{
Start: res.Start, Start: res.Start,
Stop: res.Stop, Stop: res.Stop,
} }
@ -492,13 +497,13 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth2.HeaderModel, []eth2.UncleModel, []eth2.TxModel, []eth2.ReceiptModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String()) log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
@ -514,17 +519,17 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID) uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
if err != nil { if err != nil {
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
txIDs := make([]int64, len(txCIDs)) txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs { for i, txCID := range txCIDs {
@ -538,13 +543,13 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
} }
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth2.HeaderModel, []eth2.UncleModel, []eth2.TxModel, []eth2.ReceiptModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber) log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
@ -560,20 +565,20 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
if len(headerCID) < 1 { if len(headerCID) < 1 {
return HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) return eth2.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
} }
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID) uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil { if err != nil {
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err return eth2.HeaderModel{}, nil, nil, nil, err
} }
txIDs := make([]int64, len(txCIDs)) txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs { for i, txCID := range txCIDs {
@ -587,26 +592,26 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
} }
// RetrieveHeaderCIDByHash returns the header for the given block hash // RetrieveHeaderCIDByHash returns the header for the given block hash
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (eth2.HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockHash.String()) log.Debug("retrieving header cids for block hash ", blockHash.String())
pgStr := `SELECT * FROM eth.header_cids pgStr := `SELECT * FROM eth.header_cids
WHERE block_hash = $1` WHERE block_hash = $1`
var headerCID HeaderModel var headerCID eth2.HeaderModel
return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
} }
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth2.TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID) log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT * FROM eth.transaction_cids pgStr := `SELECT * FROM eth.transaction_cids
WHERE header_id = $1 WHERE header_id = $1
ORDER BY index` ORDER BY index`
var txCIDs []TxModel var txCIDs []eth2.TxModel
return txCIDs, tx.Select(&txCIDs, pgStr, headerID) return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
} }
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]eth2.ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs) log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
@ -615,6 +620,6 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64)
WHERE tx_id = ANY($1::INTEGER[]) WHERE tx_id = ANY($1::INTEGER[])
AND receipt_cids.tx_id = transaction_cids.id AND receipt_cids.tx_id = transaction_cids.id
ORDER BY transaction_cids.index` ORDER BY transaction_cids.index`
var rctCIDs []ReceiptModel var rctCIDs []eth2.ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
} }

View File

@ -19,22 +19,21 @@ package eth_test
import ( import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
openFilter = &eth.SubscriptionSettings{ openFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{}, HeaderFilter: eth.HeaderFilter{},
@ -43,7 +42,7 @@ var (
StateFilter: eth.StateFilter{}, StateFilter: eth.StateFilter{},
StorageFilter: eth.StorageFilter{}, StorageFilter: eth.StorageFilter{},
} }
rctAddressFilter = &eth.SubscriptionSettings{ rctAddressFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -62,7 +61,7 @@ var (
Off: true, Off: true,
}, },
} }
rctTopicsFilter = &eth.SubscriptionSettings{ rctTopicsFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -81,7 +80,7 @@ var (
Off: true, Off: true,
}, },
} }
rctTopicsAndAddressFilter = &eth.SubscriptionSettings{ rctTopicsAndAddressFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -104,7 +103,7 @@ var (
Off: true, Off: true,
}, },
} }
rctTopicsAndAddressFilterFail = &eth.SubscriptionSettings{ rctTopicsAndAddressFilterFail = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -127,7 +126,7 @@ var (
Off: true, Off: true,
}, },
} }
rctAddressesAndTopicFilter = &eth.SubscriptionSettings{ rctAddressesAndTopicFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -147,7 +146,7 @@ var (
Off: true, Off: true,
}, },
} }
rctsForAllCollectedTrxs = &eth.SubscriptionSettings{ rctsForAllCollectedTrxs = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -166,7 +165,7 @@ var (
Off: true, Off: true,
}, },
} }
rctsForSelectCollectedTrxs = &eth.SubscriptionSettings{ rctsForSelectCollectedTrxs = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -187,7 +186,7 @@ var (
Off: true, Off: true,
}, },
} }
stateFilter = &eth.SubscriptionSettings{ stateFilter = eth.SubscriptionSettings{
Start: big.NewInt(0), Start: big.NewInt(0),
End: big.NewInt(1), End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{ HeaderFilter: eth.HeaderFilter{
@ -212,14 +211,14 @@ var _ = Describe("Retriever", func() {
var ( var (
db *postgres.DB db *postgres.DB
repo *eth2.IPLDPublisher repo *eth2.IPLDPublisher
retriever *eth2.CIDRetriever retriever *eth.CIDRetriever
) )
BeforeEach(func() { BeforeEach(func() {
var err error var err error
db, err = shared.SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
repo = eth2.NewIPLDPublisher(db) repo = eth2.NewIPLDPublisher(db)
retriever = eth2.NewCIDRetriever(db) retriever = eth.NewCIDRetriever(db)
}) })
AfterEach(func() { AfterEach(func() {
eth.TearDownDB(db) eth.TearDownDB(db)
@ -235,23 +234,21 @@ var _ = Describe("Retriever", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids)).To(Equal(1)) Expect(len(cids)).To(Equal(1))
cidWrapper, ok := cids[0].(*eth.CIDWrapper) Expect(cids[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue())
Expect(cidWrapper.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
expectedHeaderCID := mocks.MockCIDWrapper.Header expectedHeaderCID := mocks.MockCIDWrapper.Header
expectedHeaderCID.ID = cidWrapper.Header.ID expectedHeaderCID.ID = cids[0].Header.ID
expectedHeaderCID.NodeID = cidWrapper.Header.NodeID expectedHeaderCID.NodeID = cids[0].Header.NodeID
Expect(cidWrapper.Header).To(Equal(expectedHeaderCID)) Expect(cids[0].Header).To(Equal(expectedHeaderCID))
Expect(len(cidWrapper.Transactions)).To(Equal(3)) Expect(len(cids[0].Transactions)).To(Equal(3))
Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[2].CID)).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[2].CID)).To(BeTrue())
Expect(len(cidWrapper.Receipts)).To(Equal(3)) Expect(len(cids[0].Receipts)).To(Equal(3))
Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0].CID)).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[0].CID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1].CID)).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[1].CID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[2].CID)).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[2].CID)).To(BeTrue())
Expect(len(cidWrapper.StateNodes)).To(Equal(2)) Expect(len(cids[0].StateNodes)).To(Equal(2))
for _, stateNode := range cidWrapper.StateNodes { for _, stateNode := range cids[0].StateNodes {
if stateNode.CID == mocks.State1CID.String() { if stateNode.CID == mocks.State1CID.String() {
Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex())) Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex()))
Expect(stateNode.NodeType).To(Equal(2)) Expect(stateNode.NodeType).To(Equal(2))
@ -263,11 +260,11 @@ var _ = Describe("Retriever", func() {
Expect(stateNode.Path).To(Equal([]byte{'\x0c'})) Expect(stateNode.Path).To(Equal([]byte{'\x0c'}))
} }
} }
Expect(len(cidWrapper.StorageNodes)).To(Equal(1)) Expect(len(cids[0].StorageNodes)).To(Equal(1))
expectedStorageNodeCIDs := mocks.MockCIDWrapper.StorageNodes expectedStorageNodeCIDs := mocks.MockCIDWrapper.StorageNodes
expectedStorageNodeCIDs[0].ID = cidWrapper.StorageNodes[0].ID expectedStorageNodeCIDs[0].ID = cids[0].StorageNodes[0].ID
expectedStorageNodeCIDs[0].StateID = cidWrapper.StorageNodes[0].StateID expectedStorageNodeCIDs[0].StateID = cids[0].StorageNodes[0].StateID
Expect(cidWrapper.StorageNodes).To(Equal(expectedStorageNodeCIDs)) Expect(cids[0].StorageNodes).To(Equal(expectedStorageNodeCIDs))
}) })
It("Applies filters from the provided config.Subscription", func() { It("Applies filters from the provided config.Subscription", func() {
@ -275,125 +272,111 @@ var _ = Describe("Retriever", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids1)).To(Equal(1)) Expect(len(cids1)).To(Equal(1))
cidWrapper1, ok := cids1[0].(*eth.CIDWrapper) Expect(cids1[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids1[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids1[0].Transactions)).To(Equal(0))
Expect(cidWrapper1.Header).To(Equal(eth.HeaderModel{})) Expect(len(cids1[0].StateNodes)).To(Equal(0))
Expect(len(cidWrapper1.Transactions)).To(Equal(0)) Expect(len(cids1[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper1.StateNodes)).To(Equal(0)) Expect(len(cids1[0].Receipts)).To(Equal(1))
Expect(len(cidWrapper1.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper1.Receipts)).To(Equal(1))
expectedReceiptCID := mocks.MockCIDWrapper.Receipts[0] expectedReceiptCID := mocks.MockCIDWrapper.Receipts[0]
expectedReceiptCID.ID = cidWrapper1.Receipts[0].ID expectedReceiptCID.ID = cids1[0].Receipts[0].ID
expectedReceiptCID.TxID = cidWrapper1.Receipts[0].TxID expectedReceiptCID.TxID = cids1[0].Receipts[0].TxID
Expect(cidWrapper1.Receipts[0]).To(Equal(expectedReceiptCID)) Expect(cids1[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1) cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids2)).To(Equal(1)) Expect(len(cids2)).To(Equal(1))
cidWrapper2, ok := cids2[0].(*eth.CIDWrapper) Expect(cids2[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids2[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids2[0].Transactions)).To(Equal(0))
Expect(cidWrapper2.Header).To(Equal(eth.HeaderModel{})) Expect(len(cids2[0].StateNodes)).To(Equal(0))
Expect(len(cidWrapper2.Transactions)).To(Equal(0)) Expect(len(cids2[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper2.StateNodes)).To(Equal(0)) Expect(len(cids2[0].Receipts)).To(Equal(1))
Expect(len(cidWrapper2.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper2.Receipts)).To(Equal(1))
expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0] expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0]
expectedReceiptCID.ID = cidWrapper2.Receipts[0].ID expectedReceiptCID.ID = cids2[0].Receipts[0].ID
expectedReceiptCID.TxID = cidWrapper2.Receipts[0].TxID expectedReceiptCID.TxID = cids2[0].Receipts[0].TxID
Expect(cidWrapper2.Receipts[0]).To(Equal(expectedReceiptCID)) Expect(cids2[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1) cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids3)).To(Equal(1)) Expect(len(cids3)).To(Equal(1))
cidWrapper3, ok := cids3[0].(*eth.CIDWrapper) Expect(cids3[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids3[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids3[0].Transactions)).To(Equal(0))
Expect(cidWrapper3.Header).To(Equal(eth.HeaderModel{})) Expect(len(cids3[0].StateNodes)).To(Equal(0))
Expect(len(cidWrapper3.Transactions)).To(Equal(0)) Expect(len(cids3[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper3.StateNodes)).To(Equal(0)) Expect(len(cids3[0].Receipts)).To(Equal(1))
Expect(len(cidWrapper3.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper3.Receipts)).To(Equal(1))
expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0] expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0]
expectedReceiptCID.ID = cidWrapper3.Receipts[0].ID expectedReceiptCID.ID = cids3[0].Receipts[0].ID
expectedReceiptCID.TxID = cidWrapper3.Receipts[0].TxID expectedReceiptCID.TxID = cids3[0].Receipts[0].TxID
Expect(cidWrapper3.Receipts[0]).To(Equal(expectedReceiptCID)) Expect(cids3[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1) cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids4)).To(Equal(1)) Expect(len(cids4)).To(Equal(1))
cidWrapper4, ok := cids4[0].(*eth.CIDWrapper) Expect(cids4[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids4[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids4[0].Transactions)).To(Equal(0))
Expect(cidWrapper4.Header).To(Equal(eth.HeaderModel{})) Expect(len(cids4[0].StateNodes)).To(Equal(0))
Expect(len(cidWrapper4.Transactions)).To(Equal(0)) Expect(len(cids4[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper4.StateNodes)).To(Equal(0)) Expect(len(cids4[0].Receipts)).To(Equal(1))
Expect(len(cidWrapper4.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper4.Receipts)).To(Equal(1))
expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1] expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1]
expectedReceiptCID.ID = cidWrapper4.Receipts[0].ID expectedReceiptCID.ID = cids4[0].Receipts[0].ID
expectedReceiptCID.TxID = cidWrapper4.Receipts[0].TxID expectedReceiptCID.TxID = cids4[0].Receipts[0].TxID
Expect(cidWrapper4.Receipts[0]).To(Equal(expectedReceiptCID)) Expect(cids4[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1) cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids5)).To(Equal(1)) Expect(len(cids5)).To(Equal(1))
cidWrapper5, ok := cids5[0].(*eth.CIDWrapper) Expect(cids5[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids5[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids5[0].Transactions)).To(Equal(3))
Expect(cidWrapper5.Header).To(Equal(eth.HeaderModel{})) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx1CID.String())).To(BeTrue())
Expect(len(cidWrapper5.Transactions)).To(Equal(3)) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx2CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx1CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx3CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx2CID.String())).To(BeTrue()) Expect(len(cids5[0].StateNodes)).To(Equal(0))
Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx3CID.String())).To(BeTrue()) Expect(len(cids5[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) Expect(len(cids5[0].Receipts)).To(Equal(3))
Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct1CID.String())).To(BeTrue())
Expect(len(cidWrapper5.Receipts)).To(Equal(3)) Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct2CID.String())).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct1CID.String())).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct3CID.String())).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct2CID.String())).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct3CID.String())).To(BeTrue())
cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1) cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids6)).To(Equal(1)) Expect(len(cids6)).To(Equal(1))
cidWrapper6, ok := cids6[0].(*eth.CIDWrapper) Expect(cids6[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids6[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids6[0].Transactions)).To(Equal(1))
Expect(cidWrapper6.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper6.Transactions)).To(Equal(1))
expectedTxCID := mocks.MockCIDWrapper.Transactions[1] expectedTxCID := mocks.MockCIDWrapper.Transactions[1]
expectedTxCID.ID = cidWrapper6.Transactions[0].ID expectedTxCID.ID = cids6[0].Transactions[0].ID
expectedTxCID.HeaderID = cidWrapper6.Transactions[0].HeaderID expectedTxCID.HeaderID = cids6[0].Transactions[0].HeaderID
Expect(cidWrapper6.Transactions[0]).To(Equal(expectedTxCID)) Expect(cids6[0].Transactions[0]).To(Equal(expectedTxCID))
Expect(len(cidWrapper6.StateNodes)).To(Equal(0)) Expect(len(cids6[0].StateNodes)).To(Equal(0))
Expect(len(cidWrapper6.StorageNodes)).To(Equal(0)) Expect(len(cids6[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper6.Receipts)).To(Equal(1)) Expect(len(cids6[0].Receipts)).To(Equal(1))
expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1] expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1]
expectedReceiptCID.ID = cidWrapper6.Receipts[0].ID expectedReceiptCID.ID = cids6[0].Receipts[0].ID
expectedReceiptCID.TxID = cidWrapper6.Receipts[0].TxID expectedReceiptCID.TxID = cids6[0].Receipts[0].TxID
Expect(cidWrapper6.Receipts[0]).To(Equal(expectedReceiptCID)) Expect(cids6[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids7, empty, err := retriever.Retrieve(stateFilter, 1) cids7, empty, err := retriever.Retrieve(stateFilter, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids7)).To(Equal(1)) Expect(len(cids7)).To(Equal(1))
cidWrapper7, ok := cids7[0].(*eth.CIDWrapper) Expect(cids7[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(ok).To(BeTrue()) Expect(cids7[0].Header).To(Equal(eth2.HeaderModel{}))
Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cids7[0].Transactions)).To(Equal(0))
Expect(cidWrapper7.Header).To(Equal(eth.HeaderModel{})) Expect(len(cids7[0].Receipts)).To(Equal(0))
Expect(len(cidWrapper7.Transactions)).To(Equal(0)) Expect(len(cids7[0].StorageNodes)).To(Equal(0))
Expect(len(cidWrapper7.Receipts)).To(Equal(0)) Expect(len(cids7[0].StateNodes)).To(Equal(1))
Expect(len(cidWrapper7.StorageNodes)).To(Equal(0)) Expect(cids7[0].StateNodes[0]).To(Equal(eth2.StateNodeModel{
Expect(len(cidWrapper7.StateNodes)).To(Equal(1)) ID: cids7[0].StateNodes[0].ID,
Expect(cidWrapper7.StateNodes[0]).To(Equal(eth.StateNodeModel{ HeaderID: cids7[0].StateNodes[0].HeaderID,
ID: cidWrapper7.StateNodes[0].ID,
HeaderID: cidWrapper7.StateNodes[0].HeaderID,
NodeType: 2, NodeType: 2,
StateKey: common.BytesToHash(mocks.AccountLeafKey).Hex(), StateKey: common.BytesToHash(mocks.AccountLeafKey).Hex(),
CID: mocks.State2CID.String(), CID: mocks.State2CID.String(),
@ -602,11 +585,11 @@ var _ = Describe("Retriever", func() {
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(5)) Expect(len(gaps)).To(Equal(5))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 107, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue())
}) })
It("Finds validation level gaps", func() { It("Finds validation level gaps", func() {
@ -669,21 +652,21 @@ var _ = Describe("Retriever", func() {
err = repo.Publish(payload14) err = repo.Publish(payload14)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cleaner := eth.NewCleaner(db) cleaner := eth2.NewDBCleaner(db)
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}}) err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}})
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(8)) Expect(len(gaps)).To(Equal(8))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 101, Stop: 102})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 104, Stop: 104})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 106, Stop: 108})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 106, Stop: 108})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 110, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 110, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue())
}) })
}) })
}) })

View File

@ -27,7 +27,7 @@ import (
func TestETHWatcher(t *testing.T) { func TestETHWatcher(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "ETH IPFS Watcher Suite Test") RunSpecs(t, "eth ipld server eth suite test")
} }
var _ = BeforeSuite(func() { var _ = BeforeSuite(func() {

View File

@ -18,7 +18,6 @@ package eth
import ( import (
"bytes" "bytes"
"fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -27,11 +26,16 @@ import (
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
) )
// Filterer interface for substituing mocks in tests
type Filterer interface {
Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*eth.IPLDs, error)
}
// ResponseFilterer satisfies the ResponseFilterer interface for ethereum // ResponseFilterer satisfies the ResponseFilterer interface for ethereum
type ResponseFilterer struct{} type ResponseFilterer struct{}
@ -41,42 +45,34 @@ func NewResponseFilterer() *ResponseFilterer {
} }
// Filter is used to filter through eth data to extract and package requested data into a Payload // Filter is used to filter through eth data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) { func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*eth.IPLDs, error) {
ethFilters, ok := filter.(*SubscriptionSettings) if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) {
if !ok { response := new(eth.IPLDs)
return IPLDs{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) response.TotalDifficulty = payload.TotalDifficulty
if err := s.filterHeaders(filter.HeaderFilter, response, payload); err != nil {
return nil, err
} }
ethPayload, ok := payload.(ConvertedPayload) txHashes, err := s.filterTransactions(filter.TxFilter, response, payload)
if !ok {
return IPLDs{}, fmt.Errorf("eth filterer expected payload type %T got %T", ConvertedPayload{}, payload)
}
if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) {
response := new(IPLDs)
response.TotalDifficulty = ethPayload.TotalDifficulty
if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil {
return IPLDs{}, err
}
txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload)
if err != nil { if err != nil {
return IPLDs{}, err return nil, err
} }
var filterTxs []common.Hash var filterTxs []common.Hash
if ethFilters.ReceiptFilter.MatchTxs { if filter.ReceiptFilter.MatchTxs {
filterTxs = txHashes filterTxs = txHashes
} }
if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { if err := s.filerReceipts(filter.ReceiptFilter, response, payload, filterTxs); err != nil {
return IPLDs{}, err return nil, err
} }
if err := s.filterStateAndStorage(ethFilters.StateFilter, ethFilters.StorageFilter, response, ethPayload); err != nil { if err := s.filterStateAndStorage(filter.StateFilter, filter.StorageFilter, response, payload); err != nil {
return IPLDs{}, err return nil, err
} }
response.BlockNumber = ethPayload.Block.Number() response.BlockNumber = payload.Block.Number()
return *response, nil return response, nil
} }
return IPLDs{}, nil return nil, nil
} }
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *eth.IPLDs, payload eth.ConvertedPayload) error {
if !headerFilter.Off { if !headerFilter.Off {
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
if err != nil { if err != nil {
@ -118,7 +114,7 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *eth.IPLDs, payload eth.ConvertedPayload) ([]common.Hash, error) {
var trxHashes []common.Hash var trxHashes []common.Hash
if !trxFilter.Off { if !trxFilter.Off {
trxLen := len(payload.Block.Body().Transactions) trxLen := len(payload.Block.Body().Transactions)
@ -166,7 +162,7 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s
return false return false
} }
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *eth.IPLDs, payload eth.ConvertedPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off { if !receiptFilter.Off {
response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts))
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
@ -256,9 +252,9 @@ func slicesShareString(slice1, slice2 []string) int {
} }
// filterStateAndStorage filters state and storage nodes into the response according to the provided filters // filterStateAndStorage filters state and storage nodes into the response according to the provided filters
func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *eth.IPLDs, payload eth.ConvertedPayload) error {
response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) response.StateNodes = make([]eth.StateNode, 0, len(payload.StateNodes))
response.StorageNodes = make([]StorageNode, 0) response.StorageNodes = make([]eth.StorageNode, 0)
stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses))
for i, addr := range stateFilter.Addresses { for i, addr := range stateFilter.Addresses {
stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes())
@ -278,7 +274,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
if err != nil { if err != nil {
return err return err
} }
response.StateNodes = append(response.StateNodes, StateNode{ response.StateNodes = append(response.StateNodes, eth.StateNode{
StateLeafKey: stateNode.LeafKey, StateLeafKey: stateNode.LeafKey,
Path: stateNode.Path, Path: stateNode.Path,
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{
@ -296,7 +292,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
if err != nil { if err != nil {
return err return err
} }
response.StorageNodes = append(response.StorageNodes, StorageNode{ response.StorageNodes = append(response.StorageNodes, eth.StorageNode{
StateLeafKey: stateNode.LeafKey, StateLeafKey: stateNode.LeafKey,
StorageLeafKey: storageNode.LeafKey, StorageLeafKey: storageNode.LeafKey,
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{

View File

@ -23,10 +23,11 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
@ -40,10 +41,9 @@ var _ = Describe("Filterer", func() {
}) })
It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() {
payload, err := filterer.Filter(openFilter, mocks.MockConvertedPayload) iplds, err := filterer.Filter(openFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds, ok := payload.(eth.IPLDs) Expect(iplds).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header))
var expectedEmptyUncles []ipfs.BlockModel var expectedEmptyUncles []ipfs.BlockModel
@ -76,10 +76,9 @@ var _ = Describe("Filterer", func() {
}) })
It("Applies filters from the provided config.Subscription", func() { It("Applies filters from the provided config.Subscription", func() {
payload1, err := filterer.Filter(rctAddressFilter, mocks.MockConvertedPayload) iplds1, err := filterer.Filter(rctAddressFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds1, ok := payload1.(eth.IPLDs) Expect(iplds1).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds1.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds1.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds1.Uncles)).To(Equal(0)) Expect(len(iplds1.Uncles)).To(Equal(0))
@ -92,10 +91,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.Rct1IPLD.Cid().String(), CID: mocks.Rct1IPLD.Cid().String(),
})) }))
payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload) iplds2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds2, ok := payload2.(eth.IPLDs) Expect(iplds2).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds2.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds2.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds2.Uncles)).To(Equal(0)) Expect(len(iplds2.Uncles)).To(Equal(0))
@ -108,10 +106,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.Rct1IPLD.Cid().String(), CID: mocks.Rct1IPLD.Cid().String(),
})) }))
payload3, err := filterer.Filter(rctTopicsAndAddressFilter, mocks.MockConvertedPayload) iplds3, err := filterer.Filter(rctTopicsAndAddressFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds3, ok := payload3.(eth.IPLDs) Expect(iplds3).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds3.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds3.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds3.Uncles)).To(Equal(0)) Expect(len(iplds3.Uncles)).To(Equal(0))
@ -124,10 +121,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.Rct1IPLD.Cid().String(), CID: mocks.Rct1IPLD.Cid().String(),
})) }))
payload4, err := filterer.Filter(rctAddressesAndTopicFilter, mocks.MockConvertedPayload) iplds4, err := filterer.Filter(rctAddressesAndTopicFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds4, ok := payload4.(eth.IPLDs) Expect(iplds4).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds4.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds4.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds4.Uncles)).To(Equal(0)) Expect(len(iplds4.Uncles)).To(Equal(0))
@ -140,10 +136,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.Rct2IPLD.Cid().String(), CID: mocks.Rct2IPLD.Cid().String(),
})) }))
payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload) iplds5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds5, ok := payload5.(eth.IPLDs) Expect(iplds5).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds5.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds5.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds5.Uncles)).To(Equal(0)) Expect(len(iplds5.Uncles)).To(Equal(0))
@ -158,10 +153,9 @@ var _ = Describe("Filterer", func() {
Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(1))).To(BeTrue())
Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(2))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(2))).To(BeTrue())
payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload) iplds6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds6, ok := payload6.(eth.IPLDs) Expect(iplds6).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds6.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds6.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds6.Uncles)).To(Equal(0)) Expect(len(iplds6.Uncles)).To(Equal(0))
@ -175,10 +169,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.Rct2IPLD.Cid().String(), CID: mocks.Rct2IPLD.Cid().String(),
})) }))
payload7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload) iplds7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds7, ok := payload7.(eth.IPLDs) Expect(iplds7).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds7.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds7.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds7.Uncles)).To(Equal(0)) Expect(len(iplds7.Uncles)).To(Equal(0))
@ -192,10 +185,9 @@ var _ = Describe("Filterer", func() {
CID: mocks.State2IPLD.Cid().String(), CID: mocks.State2IPLD.Cid().String(),
})) }))
payload8, err := filterer.Filter(rctTopicsAndAddressFilterFail, mocks.MockConvertedPayload) iplds8, err := filterer.Filter(rctTopicsAndAddressFilterFail, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds8, ok := payload8.(eth.IPLDs) Expect(iplds8).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds8.Header).To(Equal(ipfs.BlockModel{})) Expect(iplds8.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds8.Uncles)).To(Equal(0)) Expect(len(iplds8.Uncles)).To(Equal(0))

View File

@ -25,11 +25,18 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// Fetcher interface for substituting mocks in tests
type Fetcher interface {
Fetch(cids eth.CIDWrapper) (*eth.IPLDs, error)
}
// IPLDFetcher satisfies the IPLDFetcher interface for ethereum // IPLDFetcher satisfies the IPLDFetcher interface for ethereum
// It interfaces directly with PG-IPFS // It interfaces directly with PG-IPFS
type IPLDFetcher struct { type IPLDFetcher struct {
@ -44,18 +51,15 @@ func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher {
} }
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { func (f *IPLDFetcher) Fetch(cids eth.CIDWrapper) (*eth.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds") log.Debug("fetching iplds")
iplds := IPLDs{} iplds := new(eth.IPLDs)
iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10) var ok bool
iplds.TotalDifficulty, ok = new(big.Int).SetString(cids.Header.TotalDifficulty, 10)
if !ok { if !ok {
return nil, errors.New("eth fetcher: unable to set total difficulty") return nil, errors.New("eth fetcher: unable to set total difficulty")
} }
iplds.BlockNumber = cidWrapper.BlockNumber iplds.BlockNumber = cids.BlockNumber
tx, err := f.db.Beginx() tx, err := f.db.Beginx()
if err != nil { if err != nil {
@ -72,27 +76,27 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
} }
}() }()
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) iplds.Header, err = f.FetchHeader(tx, cids.Header)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error())
} }
iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles) iplds.Uncles, err = f.FetchUncles(tx, cids.Uncles)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error())
} }
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) iplds.Transactions, err = f.FetchTrxs(tx, cids.Transactions)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error())
} }
iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts) iplds.Receipts, err = f.FetchRcts(tx, cids.Receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error())
} }
iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes) iplds.StateNodes, err = f.FetchState(tx, cids.StateNodes)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error())
} }
iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes) iplds.StorageNodes, err = f.FetchStorage(tx, cids.StorageNodes)
if err != nil { if err != nil {
return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error())
} }
@ -100,7 +104,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
} }
// FetchHeaders fetches headers // FetchHeaders fetches headers
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld") log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil { if err != nil {
@ -113,7 +117,7 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel,
} }
// FetchUncles fetches uncles // FetchUncles fetches uncles
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching uncle iplds") log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids)) uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -130,7 +134,7 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockM
} }
// FetchTrxs fetches transactions // FetchTrxs fetches transactions
func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds") log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids)) trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -147,7 +151,7 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel,
} }
// FetchRcts fetches receipts // FetchRcts fetches receipts
func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds") log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids)) rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -164,9 +168,9 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockM
} }
// FetchState fetches state nodes // FetchState fetches state nodes
func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]eth.StateNode, error) {
log.Debug("fetching state iplds") log.Debug("fetching state iplds")
stateNodes := make([]StateNode, 0, len(cids)) stateNodes := make([]eth.StateNode, 0, len(cids))
for _, stateNode := range cids { for _, stateNode := range cids {
if stateNode.CID == "" { if stateNode.CID == "" {
continue continue
@ -175,7 +179,7 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNod
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateNodes = append(stateNodes, StateNode{ stateNodes = append(stateNodes, eth.StateNode{
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{
Data: stateBytes, Data: stateBytes,
CID: stateNode.CID, CID: stateNode.CID,
@ -189,9 +193,9 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNod
} }
// FetchStorage fetches storage nodes // FetchStorage fetches storage nodes
func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []eth.StorageNodeWithStateKeyModel) ([]eth.StorageNode, error) {
log.Debug("fetching storage iplds") log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, 0, len(cids)) storageNodes := make([]eth.StorageNode, 0, len(cids))
for _, storageNode := range cids { for _, storageNode := range cids {
if storageNode.CID == "" || storageNode.StateKey == "" { if storageNode.CID == "" || storageNode.StateKey == "" {
continue continue
@ -200,7 +204,7 @@ func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyMo
if err != nil { if err != nil {
return nil, err return nil, err
} }
storageNodes = append(storageNodes, StorageNode{ storageNodes = append(storageNodes, eth.StorageNode{
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{
Data: storageBytes, Data: storageBytes,
CID: storageNode.CID, CID: storageNode.CID,

View File

@ -20,15 +20,17 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
db *postgres.DB db *postgres.DB
pubAndIndexer *eth.IPLDPublisher pubAndIndexer *eth2.IPLDPublisher
fetcher *eth.IPLDFetcher fetcher *eth.IPLDFetcher
) )
@ -38,7 +40,7 @@ var _ = Describe("IPLDFetcher", func() {
var err error var err error
db, err = shared.SetupDB() db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
pubAndIndexer = eth.NewIPLDPublisher(db) pubAndIndexer = eth2.NewIPLDPublisher(db)
err = pubAndIndexer.Publish(mocks.MockConvertedPayload) err = pubAndIndexer.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
fetcher = eth.NewIPLDFetcher(db) fetcher = eth.NewIPLDFetcher(db)
@ -48,10 +50,9 @@ var _ = Describe("IPLDFetcher", func() {
}) })
It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() {
i, err := fetcher.Fetch(mocks.MockCIDWrapper) iplds, err := fetcher.Fetch(*mocks.MockCIDWrapper)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds, ok := i.(eth.IPLDs) Expect(iplds).ToNot(BeNil())
Expect(ok).To(BeTrue())
Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty)) Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty))
Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number())) Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number()))
Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header))

View File

@ -21,8 +21,8 @@ import (
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// PayloadConverter is the underlying struct for the Converter interface // PayloadConverter is the underlying struct for the Converter interface

View File

@ -19,9 +19,9 @@ package mocks
import ( import (
"fmt" "fmt"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )
// CIDIndexer is the underlying struct for the Indexer interface // CIDIndexer is the underlying struct for the Indexer interface

View File

@ -19,9 +19,9 @@ package mocks
import ( import (
"fmt" "fmt"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )
// IPLDPublisher is the underlying struct for the Publisher interface // IPLDPublisher is the underlying struct for the Publisher interface

View File

@ -20,8 +20,6 @@ import (
"math/big" "math/big"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
) )
// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher // SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher
@ -125,28 +123,3 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
} }
return sc, nil return sc, nil
} }
// StartingBlock satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) StartingBlock() *big.Int {
return sc.Start
}
// EndingBlock satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) EndingBlock() *big.Int {
return sc.End
}
// HistoricalData satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) HistoricalData() bool {
return sc.BackFill
}
// HistoricalDataOnly satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) HistoricalDataOnly() bool {
return sc.BackFillOnly
}
// ChainType satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) ChainType() shared.ChainType {
return shared.Ethereum
}

View File

@ -19,6 +19,7 @@ package eth
import ( import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
) )
@ -45,7 +46,7 @@ func TearDownDB(db *postgres.DB) {
} }
// TxModelsContainsCID used to check if a list of TxModels contains a specific cid string // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string
func TxModelsContainsCID(txs []TxModel, cid string) bool { func TxModelsContainsCID(txs []eth.TxModel, cid string) bool {
for _, tx := range txs { for _, tx := range txs {
if tx.CID == cid { if tx.CID == cid {
return true return true
@ -55,7 +56,7 @@ func TxModelsContainsCID(txs []TxModel, cid string) bool {
} }
// ListContainsBytes used to check if a list of byte arrays contains a particular byte array // ListContainsBytes used to check if a list of byte arrays contains a particular byte array
func ReceiptModelsContainsCID(rcts []ReceiptModel, cid string) bool { func ReceiptModelsContainsCID(rcts []eth.ReceiptModel, cid string) bool {
for _, rct := range rcts { for _, rct := range rcts {
if rct.CID == cid { if rct.CID == cid {
return true return true

View File

@ -17,79 +17,32 @@
package shared package shared
import ( import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/btcsuite/btcd/rpcclient"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
) )
// Env variables // Env variables
const ( const (
HTTP_TIMEOUT = "HTTP_TIMEOUT"
ETH_WS_PATH = "ETH_WS_PATH"
ETH_HTTP_PATH = "ETH_HTTP_PATH"
ETH_NODE_ID = "ETH_NODE_ID" ETH_NODE_ID = "ETH_NODE_ID"
ETH_CLIENT_NAME = "ETH_CLIENT_NAME" ETH_CLIENT_NAME = "ETH_CLIENT_NAME"
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK"
ETH_NETWORK_ID = "ETH_NETWORK_ID" ETH_NETWORK_ID = "ETH_NETWORK_ID"
ETH_CHAIN_ID = "ETH_CHAIN_ID" ETH_CHAIN_ID = "ETH_CHAIN_ID"
BTC_WS_PATH = "BTC_WS_PATH"
BTC_HTTP_PATH = "BTC_HTTP_PATH"
BTC_NODE_PASSWORD = "BTC_NODE_PASSWORD"
BTC_NODE_USER = "BTC_NODE_USER"
BTC_NODE_ID = "BTC_NODE_ID"
BTC_CLIENT_NAME = "BTC_CLIENT_NAME"
BTC_GENESIS_BLOCK = "BTC_GENESIS_BLOCK"
BTC_NETWORK_ID = "BTC_NETWORK_ID"
BTC_CHAIN_ID = "BTC_CHAIN_ID"
) )
// GetEthNodeAndClient returns eth node info and client from path url // GetNodeInfo returns the ethereum node info from env variables
func GetEthNodeAndClient(path string) (node.Node, *rpc.Client, error) { func GetNodeInfo() node.Info {
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID) viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID)
rpcClient, err := rpc.Dial(path) return node.Info{
if err != nil {
return node.Node{}, nil, err
}
return node.Node{
ID: viper.GetString("ethereum.nodeID"), ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"), ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"), GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"), NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"), ChainID: viper.GetUint64("ethereum.chainID"),
}, rpcClient, nil
}
// GetBtcNodeAndClient returns btc node info from path url
func GetBtcNodeAndClient(path string) (node.Node, *rpcclient.ConnConfig) {
viper.BindEnv("bitcoin.nodeID", BTC_NODE_ID)
viper.BindEnv("bitcoin.clientName", BTC_CLIENT_NAME)
viper.BindEnv("bitcoin.genesisBlock", BTC_GENESIS_BLOCK)
viper.BindEnv("bitcoin.networkID", BTC_NETWORK_ID)
viper.BindEnv("bitcoin.pass", BTC_NODE_PASSWORD)
viper.BindEnv("bitcoin.user", BTC_NODE_USER)
viper.BindEnv("bitcoin.chainID", BTC_CHAIN_ID)
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
return node.Node{
ID: viper.GetString("bitcoin.nodeID"),
ClientName: viper.GetString("bitcoin.clientName"),
GenesisBlock: viper.GetString("bitcoin.genesisBlock"),
NetworkID: viper.GetString("bitcoin.networkID"),
ChainID: viper.GetUint64("bitcoin.chainID"),
}, &rpcclient.ConnConfig{
Host: path,
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("bitcoin.pass"),
User: viper.GetString("bitcoin.user"),
} }
} }

View File

@ -20,7 +20,7 @@ import (
"errors" "errors"
"sync/atomic" "sync/atomic"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// PayloadFetcher mock for tests // PayloadFetcher mock for tests

View File

@ -18,7 +18,7 @@ package mocks
import ( import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// CIDRetriever is a mock CID retriever for use in tests // CIDRetriever is a mock CID retriever for use in tests

View File

@ -18,7 +18,7 @@ package mocks
import ( import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
// PayloadStreamer mock struct // PayloadStreamer mock struct

View File

@ -19,10 +19,11 @@ package shared
import ( import (
"bytes" "bytes"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
@ -30,11 +31,11 @@ import (
// SetupDB is use to setup a db for watcher tests // SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) { func SetupDB() (*postgres.DB, error) {
return postgres.NewDB(config.Database{ return postgres.NewDB(postgres.Config{
Hostname: "localhost", Hostname: "localhost",
Name: "vulcanize_testing", Name: "vulcanize_testing",
Port: 5432, Port: 5432,
}, node.Node{}) }, node.Info{})
} }
// ListContainsString used to check if a list of strings contains a particular string // ListContainsString used to check if a list of strings contains a particular string
@ -58,7 +59,7 @@ func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool {
} }
// ListContainsGap used to check if a list of Gaps contains a particular Gap // ListContainsGap used to check if a list of Gaps contains a particular Gap
func ListContainsGap(gapList []Gap, gap Gap) bool { func ListContainsGap(gapList []eth.DBGap, gap eth.DBGap) bool {
for _, listGap := range gapList { for _, listGap := range gapList {
if listGap == gap { if listGap == gap {
return true return true

View File

@ -20,15 +20,14 @@ import (
"context" "context"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
v "github.com/vulcanize/ipfs-blockchain-watcher/version" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
v "github.com/vulcanize/ipld-eth-server/version"
) )
// APIName is the namespace used for the state diffing service API // APIName is the namespace used for the state diffing service API
@ -50,24 +49,7 @@ func NewPublicWatcherAPI(w Watcher) *PublicWatcherAPI {
} }
// Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed // Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed
func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) { func (api *PublicWatcherAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) {
var params shared.SubscriptionSettings
switch api.w.Chain() {
case shared.Ethereum:
var ethParams eth.SubscriptionSettings
if err := rlp.DecodeBytes(rlpParams, &ethParams); err != nil {
return nil, err
}
params = &ethParams
case shared.Bitcoin:
var btcParams btc.SubscriptionSettings
if err := rlp.DecodeBytes(rlpParams, &btcParams); err != nil {
return nil, err
}
params = &btcParams
default:
panic("ipfs-blockchain-watcher is not configured for a specific chain type")
}
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
@ -107,7 +89,7 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc
// Node is a public rpc method to allow transformers to fetch the node info for the watcher // Node is a public rpc method to allow transformers to fetch the node info for the watcher
// NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself // NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself
func (api *PublicWatcherAPI) Node() *node.Node { func (api *PublicWatcherAPI) Node() *node.Info {
return api.w.Node() return api.w.Node()
} }
@ -136,7 +118,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo {
return &p2p.NodeInfo{ return &p2p.NodeInfo{
// TODO: formalize this // TODO: formalize this
ID: "vulcanizeDB", ID: "vulcanizeDB",
Name: "ipfs-blockchain-watcher", Name: "ipld-eth-server",
} }
} }

View File

@ -17,33 +17,22 @@
package watch package watch
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils" "github.com/vulcanize/ipld-eth-server/utils"
) )
// Env variables // Env variables
const ( const (
SUPERNODE_CHAIN = "SUPERNODE_CHAIN" SERVER_WS_PATH = "SERVER_WS_PATH"
SUPERNODE_SYNC = "SUPERNODE_SYNC" SERVER_IPC_PATH = "SERVER_IPC_PATH"
SUPERNODE_WORKERS = "SUPERNODE_WORKERS" SERVER_HTTP_PATH = "SERVER_HTTP_PATH"
SUPERNODE_SERVER = "SUPERNODE_SERVER"
SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH"
SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH"
SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH"
SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL"
SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME"
SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS" SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS"
SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS" SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS"
@ -52,74 +41,25 @@ const (
// Config struct // Config struct
type Config struct { type Config struct {
Chain shared.ChainType DB *postgres.DB
DBConfig config.Database DBConfig postgres.Config
// Server fields
Serve bool
ServeDBConn *postgres.DB
WSEndpoint string WSEndpoint string
HTTPEndpoint string HTTPEndpoint string
IPCEndpoint string IPCEndpoint string
// Sync params NodeInfo node.Info
Sync bool
SyncDBConn *postgres.DB
Workers int
WSClient interface{}
NodeInfo node.Node
// Historical switch
Historical bool
} }
// NewConfig is used to initialize a watcher config from a .toml file // NewConfig is used to initialize a watcher config from a .toml file
// Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile // Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
func NewConfig() (*Config, error) { func NewConfig() (*Config, error) {
c := new(Config) c := new(Config)
var err error
viper.BindEnv("watcher.chain", SUPERNODE_CHAIN) viper.BindEnv("server.wsPath", SERVER_WS_PATH)
viper.BindEnv("watcher.sync", SUPERNODE_SYNC) viper.BindEnv("server.ipcPath", SERVER_IPC_PATH)
viper.BindEnv("watcher.workers", SUPERNODE_WORKERS) viper.BindEnv("server.httpPath", SERVER_HTTP_PATH)
viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH)
viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH)
viper.BindEnv("watcher.server", SUPERNODE_SERVER)
viper.BindEnv("watcher.wsPath", SUPERNODE_WS_PATH)
viper.BindEnv("watcher.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("watcher.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("watcher.backFill", SUPERNODE_BACKFILL)
c.Historical = viper.GetBool("watcher.backFill")
chain := viper.GetString("watcher.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
}
c.DBConfig.Init() c.DBConfig.Init()
c.Sync = viper.GetBool("watcher.sync")
if c.Sync {
workers := viper.GetInt("watcher.workers")
if workers < 1 {
workers = 1
}
c.Workers = workers
switch c.Chain {
case shared.Ethereum:
ethWS := viper.GetString("ethereum.wsPath")
c.NodeInfo, c.WSClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("ws://%s", ethWS))
if err != nil {
return nil, err
}
case shared.Bitcoin:
btcWS := viper.GetString("bitcoin.wsPath")
c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS)
}
syncDBConn := overrideDBConnConfig(c.DBConfig, Sync)
syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo)
c.SyncDBConn = &syncDB
}
c.Serve = viper.GetBool("watcher.server")
if c.Serve {
wsPath := viper.GetString("watcher.wsPath") wsPath := viper.GetString("watcher.wsPath")
if wsPath == "" { if wsPath == "" {
wsPath = "127.0.0.1:8080" wsPath = "127.0.0.1:8080"
@ -139,10 +79,9 @@ func NewConfig() (*Config, error) {
httpPath = "127.0.0.1:8081" httpPath = "127.0.0.1:8081"
} }
c.HTTPEndpoint = httpPath c.HTTPEndpoint = httpPath
serveDBConn := overrideDBConnConfig(c.DBConfig, Serve) overrideDBConnConfig(&c.DBConfig)
serveDB := utils.LoadPostgres(serveDBConn, c.NodeInfo) serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.ServeDBConn = &serveDB c.DB = &serveDB
}
return c, nil return c, nil
} }
@ -154,23 +93,11 @@ var (
Serve mode = "serve" Serve mode = "serve"
) )
func overrideDBConnConfig(con config.Database, m mode) config.Database { func overrideDBConnConfig(con *postgres.Config) {
switch m {
case Sync:
viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.sync.maxIdle")
con.MaxOpen = viper.GetInt("database.sync.maxOpen")
con.MaxLifetime = viper.GetInt("database.sync.maxLifetime")
case Serve:
viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.server.maxIdle") con.MaxIdle = viper.GetInt("database.server.maxIdle")
con.MaxOpen = viper.GetInt("database.server.maxOpen") con.MaxOpen = viper.GetInt("database.server.maxOpen")
con.MaxLifetime = viper.GetInt("database.server.maxLifetime") con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
default:
}
return con
} }

View File

@ -28,10 +28,12 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/builders" eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
const ( const (
@ -44,16 +46,14 @@ const (
type Watcher interface { type Watcher interface {
// APIs(), Protocols(), Start() and Stop() // APIs(), Protocols(), Start() and Stop()
ethnode.Service ethnode.Service
// Data processing event loop
Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
// Pub-Sub handling event loop // Pub-Sub handling event loop
Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload)
// Method to subscribe to the service // Method to subscribe to the service
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings)
// Method to unsubscribe from the service // Method to unsubscribe from the service
Unsubscribe(id rpc.ID) Unsubscribe(id rpc.ID)
// Method to access the node info for the service // Method to access the node info for the service
Node() *node.Node Node() *node.Info
// Method to access chain type // Method to access chain type
Chain() shared.ChainType Chain() shared.ChainType
} }
@ -62,32 +62,20 @@ type Watcher interface {
type Service struct { type Service struct {
// Used to sync access to the Subscriptions // Used to sync access to the Subscriptions
sync.Mutex sync.Mutex
// Interface for streaming payloads over an rpc subscription
Streamer shared.PayloadStreamer
// Interface for converting raw payloads into IPLD object payloads
Converter shared.PayloadConverter
// Interface for publishing and indexing the PG-IPLD payloads
Publisher shared.IPLDPublisher
// Interface for filtering and serving data according to subscribed clients according to their specification // Interface for filtering and serving data according to subscribed clients according to their specification
Filterer shared.ResponseFilterer Filterer eth.Filterer
// Interface for fetching IPLD objects from IPFS // Interface for fetching IPLD objects from IPFS
IPLDFetcher shared.IPLDFetcher IPLDFetcher eth.Fetcher
// Interface for searching and retrieving CIDs from Postgres index // Interface for searching and retrieving CIDs from Postgres index
Retriever shared.CIDRetriever Retriever eth.Retriever
// Chan the processor uses to subscribe to payloads from the Streamer
PayloadChan chan shared.RawChainData
// Used to signal shutdown of the service // Used to signal shutdown of the service
QuitChan chan bool QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters)
Subscriptions map[common.Hash]map[rpc.ID]Subscription Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription params hash to the corresponding subscription params // A mapping of subscription params hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]shared.SubscriptionSettings SubscriptionTypes map[common.Hash]eth.SubscriptionSettings
// Info for the Geth node that this watcher is working with // Info for the Geth node that this watcher is working with
NodeInfo *node.Node NodeInfo *node.Info
// Number of publish workers
WorkerPoolSize int
// chain type for this service
chain shared.ChainType
// Underlying db // Underlying db
db *postgres.DB db *postgres.DB
// wg for syncing serve processes // wg for syncing serve processes
@ -97,44 +85,14 @@ type Service struct {
// NewWatcher creates a new Watcher using an underlying Service struct // NewWatcher creates a new Watcher using an underlying Service struct
func NewWatcher(settings *Config) (Watcher, error) { func NewWatcher(settings *Config) (Watcher, error) {
sn := new(Service) sn := new(Service)
var err error sn.Retriever = eth.NewCIDRetriever(settings.DB)
// If we are syncing, initialize the needed interfaces sn.IPLDFetcher = eth.NewIPLDFetcher(settings.DB)
if settings.Sync { sn.Filterer = eth.NewResponseFilterer()
sn.Streamer, sn.PayloadChan, err = builders.NewPayloadStreamer(settings.Chain, settings.WSClient) sn.db = settings.DB
if err != nil {
return nil, err
}
sn.Converter, err = builders.NewPayloadConverter(settings.Chain, settings.NodeInfo.ChainID)
if err != nil {
return nil, err
}
sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.SyncDBConn)
if err != nil {
return nil, err
}
sn.Filterer, err = builders.NewResponseFilterer(settings.Chain)
if err != nil {
return nil, err
}
}
// If we are serving, initialize the needed interfaces
if settings.Serve {
sn.Retriever, err = builders.NewCIDRetriever(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
sn.db = settings.ServeDBConn
}
sn.QuitChan = make(chan bool) sn.QuitChan = make(chan bool)
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
sn.WorkerPoolSize = settings.Workers
sn.NodeInfo = &settings.NodeInfo sn.NodeInfo = &settings.NodeInfo
sn.chain = settings.Chain
return sn, nil return sn, nil
} }
@ -172,91 +130,24 @@ func (sap *Service) APIs() []rpc.API {
Public: true, Public: true,
}, },
} }
chainAPI, err := builders.NewPublicAPI(sap.chain, sap.db) backend, err := eth.NewEthBackend(sap.db)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return apis
}
return append(apis, chainAPI)
}
// Sync streams incoming raw chain data and converts it for further processing
// It forwards the converted data to the publish process(es) it spins up
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
// This continues on no matter if or how many subscribers there are
func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil {
return err
}
// spin up publish worker goroutines
publishPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
for i := 1; i <= sap.WorkerPoolSize; i++ {
go sap.publish(wg, i, publishPayload)
log.Debugf("%s publish worker %d successfully spun up", sap.chain.String(), i)
}
go func() {
wg.Add(1)
defer wg.Done()
for {
select {
case payload := <-sap.PayloadChan:
ipldPayload, err := sap.Converter.Convert(payload)
if err != nil {
log.Errorf("watcher conversion error for chain %s: %v", sap.chain.String(), err)
continue
}
log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
// If we have a ScreenAndServe process running, forward the iplds to it
select {
case screenAndServePayload <- ipldPayload:
default:
}
// Forward the payload to the publish workers
// this channel acts as a ring buffer
select {
case publishPayload <- ipldPayload:
default:
<-publishPayload
publishPayload <- ipldPayload
}
case err := <-sub.Err():
log.Errorf("watcher subscription error for chain %s: %v", sap.chain.String(), err)
case <-sap.QuitChan:
log.Infof("quiting %s Sync process", sap.chain.String())
return
}
}
}()
log.Infof("%s Sync goroutine successfully spun up", sap.chain.String())
return nil return nil
}
// publish is spun up by SyncAndConvert and receives converted chain data from that process
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan shared.ConvertedData) {
wg.Add(1)
defer wg.Done()
for {
select {
case payload := <-publishPayload:
log.Debugf("%s watcher sync worker %d publishing and indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
if err := sap.Publisher.Publish(payload); err != nil {
log.Errorf("%s watcher publish worker %d publishing error: %v", sap.chain.String(), id, err)
continue
}
case <-sap.QuitChan:
log.Infof("%s watcher publish worker %d shutting down", sap.chain.String(), id)
return
}
} }
return append(apis, rpc.API{
Namespace: eth.APIName,
Version: eth.APIVersion,
Service: eth.NewPublicEthAPI(backend),
Public: true,
})
} }
// Serve listens for incoming converter data off the screenAndServePayload from the Sync process // Serve listens for incoming converter data off the screenAndServePayload from the Sync process
// It filters and sends this data to any subscribers to the service // It filters and sends this data to any subscribers to the service
// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) {
sap.serveWg = wg sap.serveWg = wg
go func() { go func() {
wg.Add(1) wg.Add(1)
@ -266,17 +157,17 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share
case payload := <-screenAndServePayload: case payload := <-screenAndServePayload:
sap.filterAndServe(payload) sap.filterAndServe(payload)
case <-sap.QuitChan: case <-sap.QuitChan:
log.Infof("quiting %s Serve process", sap.chain.String()) log.Info("quiting eth ipld server process")
return return
} }
} }
}() }()
log.Infof("%s Serve goroutine successfully spun up", sap.chain.String()) log.Info("eth ipld server process successfully spun up")
} }
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions // filterAndServe filters the payload according to each subscription type and sends to the subscriptions
func (sap *Service) filterAndServe(payload shared.ConvertedData) { func (sap *Service) filterAndServe(payload eth2.ConvertedPayload) {
log.Debugf("sending %s payload to subscriptions", sap.chain.String()) log.Debug("sending eth ipld payload to subscriptions")
sap.Lock() sap.Lock()
sap.serveWg.Add(1) sap.serveWg.Add(1)
defer sap.Unlock() defer sap.Unlock()
@ -285,11 +176,11 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
// Retrieve the subscription parameters for this subscription type // Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty] subConfig, ok := sap.SubscriptionTypes[ty]
if !ok { if !ok {
log.Errorf("watcher %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) log.Errorf("eth ipld server subscription configuration for subscription type %s not available", ty.Hex())
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
if subConfig.EndingBlock().Int64() > 0 && subConfig.EndingBlock().Int64() < payload.Height() { if subConfig.End.Int64() > 0 && subConfig.End.Int64() < payload.Block.Number().Int64() {
// We are not out of range for this subscription type // We are not out of range for this subscription type
// close it, and continue to the next // close it, and continue to the next
sap.closeType(ty) sap.closeType(ty)
@ -297,21 +188,21 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
} }
response, err := sap.Filterer.Filter(subConfig, payload) response, err := sap.Filterer.Filter(subConfig, payload)
if err != nil { if err != nil {
log.Errorf("watcher filtering error for chain %s: %v", sap.chain.String(), err) log.Errorf("eth ipld server filtering error: %v", err)
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response) responseRLP, err := rlp.EncodeToBytes(response)
if err != nil { if err != nil {
log.Errorf("watcher rlp encoding error for chain %s: %v", sap.chain.String(), err) log.Errorf("eth ipld server rlp encoding error: %v", err)
continue continue
} }
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}:
log.Debugf("sending watcher %s payload to subscription %s", sap.chain.String(), id) log.Debugf("sending eth ipld server payload to subscription %s", id)
default: default:
log.Infof("unable to send %s payload to subscription %s; channel has no receiver", sap.chain.String(), id) log.Infof("unable to send eth ipld payload to subscription %s; channel has no receiver", id)
} }
} }
} }
@ -319,20 +210,15 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
// Subscribe is used by the API to remotely subscribe to the service loop // Subscribe is used by the API to remotely subscribe to the service loop
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface // The params must be rlp serializable and satisfy the SubscriptionSettings() interface
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) {
sap.serveWg.Add(1) sap.serveWg.Add(1)
defer sap.serveWg.Done() defer sap.serveWg.Done()
log.Infof("New %s subscription %s", sap.chain.String(), id) log.Infof("new eth ipld subscription %s", id)
subscription := Subscription{ subscription := Subscription{
ID: id, ID: id,
PayloadChan: sub, PayloadChan: sub,
QuitChan: quitChan, QuitChan: quitChan,
} }
if params.ChainType() != sap.chain {
sendNonBlockingErr(subscription, fmt.Errorf("subscription %s is for chain %s, service supports chain %s", id, params.ChainType().String(), sap.chain.String()))
sendNonBlockingQuit(subscription)
return
}
// Subscription type is defined as the hash of the rlp-serialized subscription settings // Subscription type is defined as the hash of the rlp-serialized subscription settings
by, err := rlp.EncodeToBytes(params) by, err := rlp.EncodeToBytes(params)
if err != nil { if err != nil {
@ -341,7 +227,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
return return
} }
subscriptionType := crypto.Keccak256Hash(by) subscriptionType := crypto.Keccak256Hash(by)
if !params.HistoricalDataOnly() { if !params.BackFillOnly {
// Add subscriber // Add subscriber
sap.Lock() sap.Lock()
if sap.Subscriptions[subscriptionType] == nil { if sap.Subscriptions[subscriptionType] == nil {
@ -353,9 +239,9 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
} }
// If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
// Otherwise we only filter new data as it is streamed in from the state diffing geth node // Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() { if params.BackFill || params.BackFillOnly {
if err := sap.sendHistoricalData(subscription, id, params); err != nil { if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, fmt.Errorf("%s watcher subscriber backfill error: %v", sap.chain.String(), err)) sendNonBlockingErr(subscription, fmt.Errorf("eth ipld server subscription backfill error: %v", err))
sendNonBlockingQuit(subscription) sendNonBlockingQuit(subscription)
return return
} }
@ -363,8 +249,8 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
} }
// sendHistoricalData sends historical data to the requesting subscription // sendHistoricalData sends historical data to the requesting subscription
func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.SubscriptionSettings) error {
log.Infof("Sending %s historical data to subscription %s", sap.chain.String(), id) log.Infof("sending eth ipld historical data to subscription %s", id)
// Retrieve cached CIDs relevant to this subscriber // Retrieve cached CIDs relevant to this subscriber
var endingBlock int64 var endingBlock int64
var startingBlock int64 var startingBlock int64
@ -373,31 +259,31 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
if err != nil { if err != nil {
return err return err
} }
if startingBlock < params.StartingBlock().Int64() { if startingBlock < params.Start.Int64() {
startingBlock = params.StartingBlock().Int64() startingBlock = params.Start.Int64()
} }
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil { if err != nil {
return err return err
} }
if endingBlock > params.EndingBlock().Int64() && params.EndingBlock().Int64() > 0 && params.EndingBlock().Int64() > startingBlock { if endingBlock > params.End.Int64() && params.End.Int64() > 0 && params.End.Int64() > startingBlock {
endingBlock = params.EndingBlock().Int64() endingBlock = params.End.Int64()
} }
log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) log.Debugf("eth ipld historical data starting block: %d", params.Start.Int64())
log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) log.Debugf("eth ipld historical data ending block: %d", endingBlock)
go func() { go func() {
sap.serveWg.Add(1) sap.serveWg.Add(1)
defer sap.serveWg.Done() defer sap.serveWg.Done()
for i := startingBlock; i <= endingBlock; i++ { for i := startingBlock; i <= endingBlock; i++ {
select { select {
case <-sap.QuitChan: case <-sap.QuitChan:
log.Infof("%s watcher historical data feed to subscription %s closed", sap.chain.String(), id) log.Infof("%s watcher historical data feed to subscription %s closed", id)
return return
default: default:
} }
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf(" %s watcher CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("eth ipld server cid retrieval error at block %d\r%s", i, err.Error()))
continue continue
} }
if empty { if empty {
@ -406,7 +292,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for _, cids := range cidWrappers { for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids) response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("%s watcher IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("eth ipld server ipld fetching error at block %d\r%s", i, err.Error()))
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response) responseRLP, err := rlp.EncodeToBytes(response)
@ -415,19 +301,19 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
continue continue
} }
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}:
log.Debugf("sending watcher historical data payload to %s subscription %s", sap.chain.String(), id) log.Debugf("eth ipld server sending historical data payload to subscription %s", id)
default: default:
log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) log.Infof("eth ipld server unable to send backFill payload to subscription %s; channel has no receiver", id)
} }
} }
} }
// when we are done backfilling send an empty payload signifying so in the msg // when we are done backfilling send an empty payload signifying so in the msg
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id) log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id)
default: default:
log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id) log.Infof("eth ipld server unable to send backFill completion notice to %s subscription %s", id)
} }
}() }()
return nil return nil
@ -435,7 +321,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
// Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop // Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Unsubscribe(id rpc.ID) {
log.Infof("Unsubscribing %s from the %s watcher service", id, sap.chain.String()) log.Infof("unsubscribing %s from the eth ipld server", id)
sap.Lock() sap.Lock()
for ty := range sap.Subscriptions { for ty := range sap.Subscriptions {
delete(sap.Subscriptions[ty], id) delete(sap.Subscriptions[ty], id)
@ -451,12 +337,9 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
// Start is used to begin the service // Start is used to begin the service
// This is mostly just to satisfy the node.Service interface // This is mostly just to satisfy the node.Service interface
func (sap *Service) Start(*p2p.Server) error { func (sap *Service) Start(*p2p.Server) error {
log.Infof("Starting %s watcher service", sap.chain.String()) log.Info("starting eth ipld server")
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize) payloadChan := make(chan eth2.ConvertedPayload, PayloadChanBufferSize)
if err := sap.Sync(wg, payloadChan); err != nil {
return err
}
sap.Serve(wg, payloadChan) sap.Serve(wg, payloadChan)
return nil return nil
} }
@ -464,7 +347,7 @@ func (sap *Service) Start(*p2p.Server) error {
// Stop is used to close down the service // Stop is used to close down the service
// This is mostly just to satisfy the node.Service interface // This is mostly just to satisfy the node.Service interface
func (sap *Service) Stop() error { func (sap *Service) Stop() error {
log.Infof("Stopping %s watcher service", sap.chain.String()) log.Infof("stopping eth ipld server")
sap.Lock() sap.Lock()
close(sap.QuitChan) close(sap.QuitChan)
sap.close() sap.close()
@ -473,19 +356,19 @@ func (sap *Service) Stop() error {
} }
// Node returns the node info for this service // Node returns the node info for this service
func (sap *Service) Node() *node.Node { func (sap *Service) Node() *node.Info {
return sap.NodeInfo return sap.NodeInfo
} }
// Chain returns the chain type for this service // Chain returns the chain type for this service
func (sap *Service) Chain() shared.ChainType { func (sap *Service) Chain() shared.ChainType {
return sap.chain return shared.Ethereum
} }
// close is used to close all listening subscriptions // close is used to close all listening subscriptions
// close needs to be called with subscription access locked // close needs to be called with subscription access locked
func (sap *Service) close() { func (sap *Service) close() {
log.Infof("Closing all %s subscriptions", sap.chain.String()) log.Infof("closing all eth ipld server subscriptions")
for subType, subs := range sap.Subscriptions { for subType, subs := range sap.Subscriptions {
for _, sub := range subs { for _, sub := range subs {
sendNonBlockingQuit(sub) sendNonBlockingQuit(sub)
@ -498,7 +381,7 @@ func (sap *Service) close() {
// closeType is used to close all subscriptions of given type // closeType is used to close all subscriptions of given type
// closeType needs to be called with subscription access locked // closeType needs to be called with subscription access locked
func (sap *Service) closeType(subType common.Hash) { func (sap *Service) closeType(subType common.Hash) {
log.Infof("Closing all %s subscriptions of type %s", sap.chain.String(), subType.String()) log.Infof("closing all eth ipld server subscriptions of type %s", subType.String())
subs := sap.Subscriptions[subType] subs := sap.Subscriptions[subType]
for _, sub := range subs { for _, sub := range subs {
sendNonBlockingQuit(sub) sendNonBlockingQuit(sub)