From 5830df44a44351cc1d4b8c88773cb29c181119bb Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 31 Aug 2020 10:47:06 -0500 Subject: [PATCH] decouple from sync --- pkg/builders/builders.go | 6 +- pkg/client/client.go | 8 +- pkg/eth/api.go | 7 +- pkg/eth/api_test.go | 13 +- pkg/eth/backend.go | 4 +- pkg/eth/cid_retriever.go | 117 ++++++------- pkg/eth/cid_retriever_test.go | 249 +++++++++++++--------------- pkg/eth/eth_suite_test.go | 2 +- pkg/eth/filterer.go | 64 ++++--- pkg/eth/filterer_test.go | 50 +++--- pkg/eth/ipld_fetcher.go | 54 +++--- pkg/eth/ipld_fetcher_test.go | 15 +- pkg/eth/mocks/converter.go | 4 +- pkg/eth/mocks/indexer.go | 4 +- pkg/eth/mocks/publisher.go | 4 +- pkg/eth/subscription_config.go | 27 --- pkg/eth/test_helpers.go | 5 +- pkg/shared/env.go | 55 +----- pkg/shared/mocks/payload_fetcher.go | 2 +- pkg/shared/mocks/retriever.go | 2 +- pkg/shared/mocks/streamer.go | 2 +- pkg/shared/test_helpers.go | 9 +- pkg/watch/api.go | 32 +--- pkg/watch/config.go | 151 +++++------------ pkg/watch/service.go | 249 ++++++++-------------------- 25 files changed, 419 insertions(+), 716 deletions(-) diff --git a/pkg/builders/builders.go b/pkg/builders/builders.go index 226dc2ef..f40c526e 100644 --- a/pkg/builders/builders.go +++ b/pkg/builders/builders.go @@ -24,10 +24,10 @@ import ( "github.com/btcsuite/btcd/rpcclient" "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/btc" + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // NewResponseFilterer constructs a ResponseFilterer for the provided chain type diff --git a/pkg/client/client.go b/pkg/client/client.go index 5f365525..590e0166 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipfs-blockchain-watcher +// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipld-eth-server package client import ( @@ -22,10 +22,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + "github.com/vulcanize/ipld-eth-server/pkg/watch" ) -// Client is used to subscribe to the ipfs-blockchain-watcher ipld data stream +// Client is used to subscribe to the ipld-eth-server ipld data stream type Client struct { c *rpc.Client } @@ -37,7 +37,7 @@ func NewClient(c *rpc.Client) *Client { } } -// Stream is the main loop for subscribing to iplds from an ipfs-blockchain-watcher server +// Stream is the main loop for subscribing to iplds from an ipld-eth-server server func (c *Client) Stream(payloadChan chan watch.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) { return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", rlpParams) } diff --git a/pkg/eth/api.go b/pkg/eth/api.go index 6c9c40bd..491a13a3 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -20,7 +20,8 @@ import ( "context" "math/big" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -127,7 +128,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) } start := startingBlock.Int64() end := endingBlock.Int64() - allRctCIDs := make([]ReceiptModel, 0) + allRctCIDs := make([]eth.ReceiptModel, 0) for i := start; i <= end; i++ { rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) if err != nil { @@ -181,7 +182,7 @@ func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, f } // GetTransactionByHash returns the transaction for the given hash -// eth ipfs-blockchain-watcher cannot currently handle pending/tx_pool txs +// eth ipld-eth-server cannot currently handle pending/tx_pool txs func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { // Try to return an already finalized transaction tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash) diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index f41b3b44..e48fff48 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -21,19 +21,20 @@ import ( "strconv" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -85,7 +86,7 @@ var _ = Describe("API", func() { db *postgres.DB retriever *eth.CIDRetriever fetcher *eth.IPLDFetcher - indexAndPublisher *eth.IPLDPublisher + indexAndPublisher *eth2.IPLDPublisher backend *eth.Backend api *eth.PublicEthAPI ) @@ -95,7 +96,7 @@ var _ = Describe("API", func() { Expect(err).ToNot(HaveOccurred()) retriever = eth.NewCIDRetriever(db) fetcher = eth.NewIPLDFetcher(db) - indexAndPublisher = eth.NewIPLDPublisher(db) + indexAndPublisher = eth2.NewIPLDPublisher(db) backend = ð.Backend{ Retriever: retriever, Fetcher: fetcher, diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 082f8dbe..cf972e15 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -32,7 +32,7 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -120,7 +120,7 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log } // BlockByNumber returns the requested canonical block. -// Since the ipfs-blockchain-watcher database can contain forked blocks, it is recommended to fetch BlockByHash as +// Since the ipld-eth-server database can contain forked blocks, it is recommended to fetch BlockByHash as // fetching by number can return non-deterministic results (returns the first block found at that height) func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) { var err error diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index 14698b70..e62ecffe 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -27,11 +27,20 @@ import ( "github.com/lib/pq" log "github.com/sirupsen/logrus" + eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" - "github.com/vulcanize/ipfs-blockchain-watcher/utils" + + "github.com/vulcanize/ipld-eth-server/pkg/shared" + "github.com/vulcanize/ipld-eth-server/utils" ) +// Retriever interface for substituting mocks in tests +type Retriever interface { + RetrieveFirstBlockNumber() (int64, error) + RetrieveLastBlockNumber() (int64, error) + Retrieve(filter SubscriptionSettings, blockNumber int64) ([]eth2.CIDWrapper, bool, error) +} + // CIDRetriever satisfies the CIDRetriever interface for ethereum type CIDRetriever struct { db *postgres.DB @@ -59,11 +68,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { - streamFilter, ok := filter.(*SubscriptionSettings) - if !ok { - return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) - } +func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64) ([]eth2.CIDWrapper, bool, error) { log.Debug("retrieving cids") // Begin new db tx @@ -88,15 +93,15 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe log.Error("header cid retrieval error") return nil, true, err } - cws := make([]shared.CIDsForFetching, len(headers)) + cws := make([]eth2.CIDWrapper, len(headers)) empty := true for i, header := range headers { - cw := new(CIDWrapper) + cw := new(eth2.CIDWrapper) cw.BlockNumber = big.NewInt(blockNumber) - if !streamFilter.HeaderFilter.Off { + if !filter.HeaderFilter.Off { cw.Header = header empty = false - if streamFilter.HeaderFilter.Uncles { + if filter.HeaderFilter.Uncles { // Retrieve uncle cids for this header id uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID) if err != nil { @@ -107,8 +112,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } } // Retrieve cached trx CIDs - if !streamFilter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) + if !filter.TxFilter.Off { + cw.Transactions, err = ecr.RetrieveTxCIDs(tx, filter.TxFilter, header.ID) if err != nil { log.Error("transaction cid retrieval error") return nil, true, err @@ -122,8 +127,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe trxIds[j] = tx.ID } // Retrieve cached receipt CIDs - if !streamFilter.ReceiptFilter.Off { - cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, streamFilter.ReceiptFilter, header.ID, trxIds) + if !filter.ReceiptFilter.Off { + cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, filter.ReceiptFilter, header.ID, trxIds) if err != nil { log.Error("receipt cid retrieval error") return nil, true, err @@ -133,8 +138,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } } // Retrieve cached state CIDs - if !streamFilter.StateFilter.Off { - cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, header.ID) + if !filter.StateFilter.Off { + cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, filter.StateFilter, header.ID) if err != nil { log.Error("state cid retrieval error") return nil, true, err @@ -144,8 +149,8 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } } // Retrieve cached storage CIDs - if !streamFilter.StorageFilter.Off { - cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, header.ID) + if !filter.StorageFilter.Off { + cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, filter.StorageFilter, header.ID) if err != nil { log.Error("storage cid retrieval error") return nil, true, err @@ -154,25 +159,25 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe empty = false } } - cws[i] = cw + cws[i] = *cw } return cws, empty, err } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight -func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { +func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]eth2.HeaderModel, error) { log.Debug("retrieving header cids for block ", blockNumber) - headers := make([]HeaderModel, 0) + headers := make([]eth2.HeaderModel, 0) pgStr := `SELECT * FROM eth.header_cids WHERE block_number = $1` return headers, tx.Select(&headers, pgStr, blockNumber) } // RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header -func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]UncleModel, error) { +func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth2.UncleModel, error) { log.Debug("retrieving uncle cids for block id ", headerID) - headers := make([]UncleModel, 0) + headers := make([]eth2.UncleModel, 0) pgStr := `SELECT * FROM eth.uncle_cids WHERE header_id = $1` return headers, tx.Select(&headers, pgStr, headerID) @@ -180,10 +185,10 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64 // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]eth2.TxModel, error) { log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) - results := make([]TxModel, 0) + results := make([]eth2.TxModel, 0) id := 1 pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, @@ -208,7 +213,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID // RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]eth2.ReceiptModel, error) { log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, @@ -282,13 +287,13 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip } } pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]ReceiptModel, 0) + receiptCids := make([]eth2.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) } // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]eth2.ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, @@ -370,7 +375,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b } } pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]ReceiptModel, 0) + receiptCids := make([]eth2.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) } @@ -384,7 +389,7 @@ func hasTopics(topics [][]string) bool { } // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]StateNodeModel, error) { +func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]eth2.StateNodeModel, error) { log.Debug("retrieving state cids for header id ", headerID) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.id, state_cids.header_id, @@ -404,12 +409,12 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, if !stateFilter.IntermediateNodes { pgStr += ` AND state_cids.node_type = 2` } - stateNodeCIDs := make([]StateNodeModel, 0) + stateNodeCIDs := make([]eth2.StateNodeModel, 0) return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) } // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) { +func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]eth2.StorageNodeWithStateKeyModel, error) { log.Debug("retrieving storage cids for header id ", headerID) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, @@ -437,23 +442,23 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF if !storageFilter.IntermediateNodes { pgStr += ` AND storage_cids.node_type = 2` } - storageNodeCIDs := make([]StorageNodeWithStateKeyModel, 0) + storageNodeCIDs := make([]eth2.StorageNodeWithStateKeyModel, 0) return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // it finds the union of heights where no data exists and where the times_validated is lower than the validation level -func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { +func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]eth2.DBGap, error) { log.Info("searching for gaps in the eth ipfs watcher database") startingBlock, err := ecr.RetrieveFirstBlockNumber() if err != nil { return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err) } - var initialGap []shared.Gap + var initialGap []eth2.DBGap if startingBlock != 0 { stop := uint64(startingBlock - 1) log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop) - initialGap = []shared.Gap{{ + initialGap = []eth2.DBGap{{ Start: 0, Stop: stop, }} @@ -471,9 +476,9 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { return nil, err } - emptyGaps := make([]shared.Gap, len(results)) + emptyGaps := make([]eth2.DBGap, len(results)) for i, res := range results { - emptyGaps[i] = shared.Gap{ + emptyGaps[i] = eth2.DBGap{ Start: res.Start, Stop: res.Stop, } @@ -492,13 +497,13 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash -func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth2.HeaderModel, []eth2.UncleModel, []eth2.TxModel, []eth2.ReceiptModel, error) { log.Debug("retrieving block cids for block hash ", blockHash.String()) // Begin new db tx tx, err := ecr.db.Beginx() if err != nil { - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } defer func() { if p := recover(); p != nil { @@ -514,17 +519,17 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) if err != nil { log.Error("header cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("uncle cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("tx cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } txIDs := make([]int64, len(txCIDs)) for i, txCID := range txCIDs { @@ -538,13 +543,13 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel } // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number -func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth2.HeaderModel, []eth2.UncleModel, []eth2.TxModel, []eth2.ReceiptModel, error) { log.Debug("retrieving block cids for block number ", blockNumber) // Begin new db tx tx, err := ecr.db.Beginx() if err != nil { - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } defer func() { if p := recover(); p != nil { @@ -560,20 +565,20 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } if len(headerCID) < 1 { - return HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) + return eth2.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) } uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("uncle cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("tx cid retrieval error") - return HeaderModel{}, nil, nil, nil, err + return eth2.HeaderModel{}, nil, nil, nil, err } txIDs := make([]int64, len(txCIDs)) for i, txCID := range txCIDs { @@ -587,26 +592,26 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, } // RetrieveHeaderCIDByHash returns the header for the given block hash -func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { +func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (eth2.HeaderModel, error) { log.Debug("retrieving header cids for block hash ", blockHash.String()) pgStr := `SELECT * FROM eth.header_cids WHERE block_hash = $1` - var headerCID HeaderModel + var headerCID eth2.HeaderModel return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) } // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id -func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { +func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth2.TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) pgStr := `SELECT * FROM eth.transaction_cids WHERE header_id = $1 ORDER BY index` - var txCIDs []TxModel + var txCIDs []eth2.TxModel return txCIDs, tx.Select(&txCIDs, pgStr, headerID) } // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs -func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]eth2.ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, @@ -615,6 +620,6 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) WHERE tx_id = ANY($1::INTEGER[]) AND receipt_cids.tx_id = transaction_cids.id ORDER BY transaction_cids.index` - var rctCIDs []ReceiptModel + var rctCIDs []eth2.ReceiptModel return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) } diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index 20e2abc0..24ef1e7f 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -19,22 +19,21 @@ package eth_test import ( "math/big" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/common" - + "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( - openFilter = ð.SubscriptionSettings{ + openFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{}, @@ -43,7 +42,7 @@ var ( StateFilter: eth.StateFilter{}, StorageFilter: eth.StorageFilter{}, } - rctAddressFilter = ð.SubscriptionSettings{ + rctAddressFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -62,7 +61,7 @@ var ( Off: true, }, } - rctTopicsFilter = ð.SubscriptionSettings{ + rctTopicsFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -81,7 +80,7 @@ var ( Off: true, }, } - rctTopicsAndAddressFilter = ð.SubscriptionSettings{ + rctTopicsAndAddressFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -104,7 +103,7 @@ var ( Off: true, }, } - rctTopicsAndAddressFilterFail = ð.SubscriptionSettings{ + rctTopicsAndAddressFilterFail = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -127,7 +126,7 @@ var ( Off: true, }, } - rctAddressesAndTopicFilter = ð.SubscriptionSettings{ + rctAddressesAndTopicFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -147,7 +146,7 @@ var ( Off: true, }, } - rctsForAllCollectedTrxs = ð.SubscriptionSettings{ + rctsForAllCollectedTrxs = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -166,7 +165,7 @@ var ( Off: true, }, } - rctsForSelectCollectedTrxs = ð.SubscriptionSettings{ + rctsForSelectCollectedTrxs = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -187,7 +186,7 @@ var ( Off: true, }, } - stateFilter = ð.SubscriptionSettings{ + stateFilter = eth.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -212,14 +211,14 @@ var _ = Describe("Retriever", func() { var ( db *postgres.DB repo *eth2.IPLDPublisher - retriever *eth2.CIDRetriever + retriever *eth.CIDRetriever ) BeforeEach(func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = eth2.NewIPLDPublisher(db) - retriever = eth2.NewCIDRetriever(db) + retriever = eth.NewCIDRetriever(db) }) AfterEach(func() { eth.TearDownDB(db) @@ -235,23 +234,21 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids)).To(Equal(1)) - cidWrapper, ok := cids[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) expectedHeaderCID := mocks.MockCIDWrapper.Header - expectedHeaderCID.ID = cidWrapper.Header.ID - expectedHeaderCID.NodeID = cidWrapper.Header.NodeID - Expect(cidWrapper.Header).To(Equal(expectedHeaderCID)) - Expect(len(cidWrapper.Transactions)).To(Equal(3)) - Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[2].CID)).To(BeTrue()) - Expect(len(cidWrapper.Receipts)).To(Equal(3)) - Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0].CID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1].CID)).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[2].CID)).To(BeTrue()) - Expect(len(cidWrapper.StateNodes)).To(Equal(2)) - for _, stateNode := range cidWrapper.StateNodes { + expectedHeaderCID.ID = cids[0].Header.ID + expectedHeaderCID.NodeID = cids[0].Header.NodeID + Expect(cids[0].Header).To(Equal(expectedHeaderCID)) + Expect(len(cids[0].Transactions)).To(Equal(3)) + Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) + Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) + Expect(eth.TxModelsContainsCID(cids[0].Transactions, mocks.MockCIDWrapper.Transactions[2].CID)).To(BeTrue()) + Expect(len(cids[0].Receipts)).To(Equal(3)) + Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[0].CID)).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[1].CID)).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, mocks.MockCIDWrapper.Receipts[2].CID)).To(BeTrue()) + Expect(len(cids[0].StateNodes)).To(Equal(2)) + for _, stateNode := range cids[0].StateNodes { if stateNode.CID == mocks.State1CID.String() { Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex())) Expect(stateNode.NodeType).To(Equal(2)) @@ -263,11 +260,11 @@ var _ = Describe("Retriever", func() { Expect(stateNode.Path).To(Equal([]byte{'\x0c'})) } } - Expect(len(cidWrapper.StorageNodes)).To(Equal(1)) + Expect(len(cids[0].StorageNodes)).To(Equal(1)) expectedStorageNodeCIDs := mocks.MockCIDWrapper.StorageNodes - expectedStorageNodeCIDs[0].ID = cidWrapper.StorageNodes[0].ID - expectedStorageNodeCIDs[0].StateID = cidWrapper.StorageNodes[0].StateID - Expect(cidWrapper.StorageNodes).To(Equal(expectedStorageNodeCIDs)) + expectedStorageNodeCIDs[0].ID = cids[0].StorageNodes[0].ID + expectedStorageNodeCIDs[0].StateID = cids[0].StorageNodes[0].StateID + Expect(cids[0].StorageNodes).To(Equal(expectedStorageNodeCIDs)) }) It("Applies filters from the provided config.Subscription", func() { @@ -275,125 +272,111 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids1)).To(Equal(1)) - cidWrapper1, ok := cids1[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper1.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper1.Transactions)).To(Equal(0)) - Expect(len(cidWrapper1.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper1.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper1.Receipts)).To(Equal(1)) + Expect(cids1[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids1[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids1[0].Transactions)).To(Equal(0)) + Expect(len(cids1[0].StateNodes)).To(Equal(0)) + Expect(len(cids1[0].StorageNodes)).To(Equal(0)) + Expect(len(cids1[0].Receipts)).To(Equal(1)) expectedReceiptCID := mocks.MockCIDWrapper.Receipts[0] - expectedReceiptCID.ID = cidWrapper1.Receipts[0].ID - expectedReceiptCID.TxID = cidWrapper1.Receipts[0].TxID - Expect(cidWrapper1.Receipts[0]).To(Equal(expectedReceiptCID)) + expectedReceiptCID.ID = cids1[0].Receipts[0].ID + expectedReceiptCID.TxID = cids1[0].Receipts[0].TxID + Expect(cids1[0].Receipts[0]).To(Equal(expectedReceiptCID)) cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids2)).To(Equal(1)) - cidWrapper2, ok := cids2[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper2.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper2.Transactions)).To(Equal(0)) - Expect(len(cidWrapper2.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper2.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper2.Receipts)).To(Equal(1)) + Expect(cids2[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids2[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids2[0].Transactions)).To(Equal(0)) + Expect(len(cids2[0].StateNodes)).To(Equal(0)) + Expect(len(cids2[0].StorageNodes)).To(Equal(0)) + Expect(len(cids2[0].Receipts)).To(Equal(1)) expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0] - expectedReceiptCID.ID = cidWrapper2.Receipts[0].ID - expectedReceiptCID.TxID = cidWrapper2.Receipts[0].TxID - Expect(cidWrapper2.Receipts[0]).To(Equal(expectedReceiptCID)) + expectedReceiptCID.ID = cids2[0].Receipts[0].ID + expectedReceiptCID.TxID = cids2[0].Receipts[0].TxID + Expect(cids2[0].Receipts[0]).To(Equal(expectedReceiptCID)) cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids3)).To(Equal(1)) - cidWrapper3, ok := cids3[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper3.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper3.Transactions)).To(Equal(0)) - Expect(len(cidWrapper3.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper3.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper3.Receipts)).To(Equal(1)) + Expect(cids3[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids3[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids3[0].Transactions)).To(Equal(0)) + Expect(len(cids3[0].StateNodes)).To(Equal(0)) + Expect(len(cids3[0].StorageNodes)).To(Equal(0)) + Expect(len(cids3[0].Receipts)).To(Equal(1)) expectedReceiptCID = mocks.MockCIDWrapper.Receipts[0] - expectedReceiptCID.ID = cidWrapper3.Receipts[0].ID - expectedReceiptCID.TxID = cidWrapper3.Receipts[0].TxID - Expect(cidWrapper3.Receipts[0]).To(Equal(expectedReceiptCID)) + expectedReceiptCID.ID = cids3[0].Receipts[0].ID + expectedReceiptCID.TxID = cids3[0].Receipts[0].TxID + Expect(cids3[0].Receipts[0]).To(Equal(expectedReceiptCID)) cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids4)).To(Equal(1)) - cidWrapper4, ok := cids4[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper4.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper4.Transactions)).To(Equal(0)) - Expect(len(cidWrapper4.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper4.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper4.Receipts)).To(Equal(1)) + Expect(cids4[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids4[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids4[0].Transactions)).To(Equal(0)) + Expect(len(cids4[0].StateNodes)).To(Equal(0)) + Expect(len(cids4[0].StorageNodes)).To(Equal(0)) + Expect(len(cids4[0].Receipts)).To(Equal(1)) expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1] - expectedReceiptCID.ID = cidWrapper4.Receipts[0].ID - expectedReceiptCID.TxID = cidWrapper4.Receipts[0].TxID - Expect(cidWrapper4.Receipts[0]).To(Equal(expectedReceiptCID)) + expectedReceiptCID.ID = cids4[0].Receipts[0].ID + expectedReceiptCID.TxID = cids4[0].Receipts[0].TxID + Expect(cids4[0].Receipts[0]).To(Equal(expectedReceiptCID)) cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids5)).To(Equal(1)) - cidWrapper5, ok := cids5[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper5.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper5.Transactions)).To(Equal(3)) - Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx1CID.String())).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx2CID.String())).To(BeTrue()) - Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx3CID.String())).To(BeTrue()) - Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper5.Receipts)).To(Equal(3)) - Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct1CID.String())).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct2CID.String())).To(BeTrue()) - Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct3CID.String())).To(BeTrue()) + Expect(cids5[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids5[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids5[0].Transactions)).To(Equal(3)) + Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx1CID.String())).To(BeTrue()) + Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx2CID.String())).To(BeTrue()) + Expect(eth.TxModelsContainsCID(cids5[0].Transactions, mocks.Trx3CID.String())).To(BeTrue()) + Expect(len(cids5[0].StateNodes)).To(Equal(0)) + Expect(len(cids5[0].StorageNodes)).To(Equal(0)) + Expect(len(cids5[0].Receipts)).To(Equal(3)) + Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct1CID.String())).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct2CID.String())).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, mocks.Rct3CID.String())).To(BeTrue()) cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids6)).To(Equal(1)) - cidWrapper6, ok := cids6[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper6.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper6.Transactions)).To(Equal(1)) + Expect(cids6[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids6[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids6[0].Transactions)).To(Equal(1)) expectedTxCID := mocks.MockCIDWrapper.Transactions[1] - expectedTxCID.ID = cidWrapper6.Transactions[0].ID - expectedTxCID.HeaderID = cidWrapper6.Transactions[0].HeaderID - Expect(cidWrapper6.Transactions[0]).To(Equal(expectedTxCID)) - Expect(len(cidWrapper6.StateNodes)).To(Equal(0)) - Expect(len(cidWrapper6.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper6.Receipts)).To(Equal(1)) + expectedTxCID.ID = cids6[0].Transactions[0].ID + expectedTxCID.HeaderID = cids6[0].Transactions[0].HeaderID + Expect(cids6[0].Transactions[0]).To(Equal(expectedTxCID)) + Expect(len(cids6[0].StateNodes)).To(Equal(0)) + Expect(len(cids6[0].StorageNodes)).To(Equal(0)) + Expect(len(cids6[0].Receipts)).To(Equal(1)) expectedReceiptCID = mocks.MockCIDWrapper.Receipts[1] - expectedReceiptCID.ID = cidWrapper6.Receipts[0].ID - expectedReceiptCID.TxID = cidWrapper6.Receipts[0].TxID - Expect(cidWrapper6.Receipts[0]).To(Equal(expectedReceiptCID)) + expectedReceiptCID.ID = cids6[0].Receipts[0].ID + expectedReceiptCID.TxID = cids6[0].Receipts[0].TxID + Expect(cids6[0].Receipts[0]).To(Equal(expectedReceiptCID)) cids7, empty, err := retriever.Retrieve(stateFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids7)).To(Equal(1)) - cidWrapper7, ok := cids7[0].(*eth.CIDWrapper) - Expect(ok).To(BeTrue()) - Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(cidWrapper7.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper7.Transactions)).To(Equal(0)) - Expect(len(cidWrapper7.Receipts)).To(Equal(0)) - Expect(len(cidWrapper7.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper7.StateNodes)).To(Equal(1)) - Expect(cidWrapper7.StateNodes[0]).To(Equal(eth.StateNodeModel{ - ID: cidWrapper7.StateNodes[0].ID, - HeaderID: cidWrapper7.StateNodes[0].HeaderID, + Expect(cids7[0].BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) + Expect(cids7[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(len(cids7[0].Transactions)).To(Equal(0)) + Expect(len(cids7[0].Receipts)).To(Equal(0)) + Expect(len(cids7[0].StorageNodes)).To(Equal(0)) + Expect(len(cids7[0].StateNodes)).To(Equal(1)) + Expect(cids7[0].StateNodes[0]).To(Equal(eth2.StateNodeModel{ + ID: cids7[0].StateNodes[0].ID, + HeaderID: cids7[0].StateNodes[0].HeaderID, NodeType: 2, StateKey: common.BytesToHash(mocks.AccountLeafKey).Hex(), CID: mocks.State2CID.String(), @@ -602,11 +585,11 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(5)) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 107, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) It("Finds validation level gaps", func() { @@ -669,21 +652,21 @@ var _ = Describe("Retriever", func() { err = repo.Publish(payload14) Expect(err).ToNot(HaveOccurred()) - cleaner := eth.NewCleaner(db) + cleaner := eth2.NewDBCleaner(db) err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}}) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(8)) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 106, Stop: 108})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 110, Stop: 999})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 101, Stop: 102})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 104, Stop: 104})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 106, Stop: 108})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 110, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) }) }) diff --git a/pkg/eth/eth_suite_test.go b/pkg/eth/eth_suite_test.go index 47adce6b..3be0ebfc 100644 --- a/pkg/eth/eth_suite_test.go +++ b/pkg/eth/eth_suite_test.go @@ -27,7 +27,7 @@ import ( func TestETHWatcher(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "ETH IPFS Watcher Suite Test") + RunSpecs(t, "eth ipld server eth suite test") } var _ = BeforeSuite(func() { diff --git a/pkg/eth/filterer.go b/pkg/eth/filterer.go index 08acc3da..8f41833c 100644 --- a/pkg/eth/filterer.go +++ b/pkg/eth/filterer.go @@ -18,7 +18,6 @@ package eth import ( "bytes" - "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -27,11 +26,16 @@ import ( "github.com/ethereum/go-ethereum/statediff" "github.com/multiformats/go-multihash" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) +// Filterer interface for substituing mocks in tests +type Filterer interface { + Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*eth.IPLDs, error) +} + // ResponseFilterer satisfies the ResponseFilterer interface for ethereum type ResponseFilterer struct{} @@ -41,42 +45,34 @@ func NewResponseFilterer() *ResponseFilterer { } // Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) { - ethFilters, ok := filter.(*SubscriptionSettings) - if !ok { - return IPLDs{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) - } - ethPayload, ok := payload.(ConvertedPayload) - if !ok { - return IPLDs{}, fmt.Errorf("eth filterer expected payload type %T got %T", ConvertedPayload{}, payload) - } - if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) { - response := new(IPLDs) - response.TotalDifficulty = ethPayload.TotalDifficulty - if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil { - return IPLDs{}, err +func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*eth.IPLDs, error) { + if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) { + response := new(eth.IPLDs) + response.TotalDifficulty = payload.TotalDifficulty + if err := s.filterHeaders(filter.HeaderFilter, response, payload); err != nil { + return nil, err } - txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload) + txHashes, err := s.filterTransactions(filter.TxFilter, response, payload) if err != nil { - return IPLDs{}, err + return nil, err } var filterTxs []common.Hash - if ethFilters.ReceiptFilter.MatchTxs { + if filter.ReceiptFilter.MatchTxs { filterTxs = txHashes } - if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { - return IPLDs{}, err + if err := s.filerReceipts(filter.ReceiptFilter, response, payload, filterTxs); err != nil { + return nil, err } - if err := s.filterStateAndStorage(ethFilters.StateFilter, ethFilters.StorageFilter, response, ethPayload); err != nil { - return IPLDs{}, err + if err := s.filterStateAndStorage(filter.StateFilter, filter.StorageFilter, response, payload); err != nil { + return nil, err } - response.BlockNumber = ethPayload.Block.Number() - return *response, nil + response.BlockNumber = payload.Block.Number() + return response, nil } - return IPLDs{}, nil + return nil, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *eth.IPLDs, payload eth.ConvertedPayload) error { if !headerFilter.Off { headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) if err != nil { @@ -118,7 +114,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *eth.IPLDs, payload eth.ConvertedPayload) ([]common.Hash, error) { var trxHashes []common.Hash if !trxFilter.Off { trxLen := len(payload.Block.Body().Transactions) @@ -166,7 +162,7 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s return false } -func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { +func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *eth.IPLDs, payload eth.ConvertedPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) for i, receipt := range payload.Receipts { @@ -256,9 +252,9 @@ func slicesShareString(slice1, slice2 []string) int { } // filterStateAndStorage filters state and storage nodes into the response according to the provided filters -func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { - response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) - response.StorageNodes = make([]StorageNode, 0) +func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *eth.IPLDs, payload eth.ConvertedPayload) error { + response.StateNodes = make([]eth.StateNode, 0, len(payload.StateNodes)) + response.StorageNodes = make([]eth.StorageNode, 0) stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) for i, addr := range stateFilter.Addresses { stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) @@ -278,7 +274,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag if err != nil { return err } - response.StateNodes = append(response.StateNodes, StateNode{ + response.StateNodes = append(response.StateNodes, eth.StateNode{ StateLeafKey: stateNode.LeafKey, Path: stateNode.Path, IPLD: ipfs.BlockModel{ @@ -296,7 +292,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag if err != nil { return err } - response.StorageNodes = append(response.StorageNodes, StorageNode{ + response.StorageNodes = append(response.StorageNodes, eth.StorageNode{ StateLeafKey: stateNode.LeafKey, StorageLeafKey: storageNode.LeafKey, IPLD: ipfs.BlockModel{ diff --git a/pkg/eth/filterer_test.go b/pkg/eth/filterer_test.go index 490f1386..41cbcf93 100644 --- a/pkg/eth/filterer_test.go +++ b/pkg/eth/filterer_test.go @@ -23,10 +23,11 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -40,10 +41,9 @@ var _ = Describe("Filterer", func() { }) It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { - payload, err := filterer.Filter(openFilter, mocks.MockConvertedPayload) + iplds, err := filterer.Filter(openFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds, ok := payload.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds).ToNot(BeNil()) Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) var expectedEmptyUncles []ipfs.BlockModel @@ -76,10 +76,9 @@ var _ = Describe("Filterer", func() { }) It("Applies filters from the provided config.Subscription", func() { - payload1, err := filterer.Filter(rctAddressFilter, mocks.MockConvertedPayload) + iplds1, err := filterer.Filter(rctAddressFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds1, ok := payload1.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds1).ToNot(BeNil()) Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds1.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds1.Uncles)).To(Equal(0)) @@ -92,10 +91,9 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct1IPLD.Cid().String(), })) - payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload) + iplds2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds2, ok := payload2.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds2).ToNot(BeNil()) Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds2.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds2.Uncles)).To(Equal(0)) @@ -108,10 +106,9 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct1IPLD.Cid().String(), })) - payload3, err := filterer.Filter(rctTopicsAndAddressFilter, mocks.MockConvertedPayload) + iplds3, err := filterer.Filter(rctTopicsAndAddressFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds3, ok := payload3.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds3).ToNot(BeNil()) Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds3.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds3.Uncles)).To(Equal(0)) @@ -124,10 +121,9 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct1IPLD.Cid().String(), })) - payload4, err := filterer.Filter(rctAddressesAndTopicFilter, mocks.MockConvertedPayload) + iplds4, err := filterer.Filter(rctAddressesAndTopicFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds4, ok := payload4.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds4).ToNot(BeNil()) Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds4.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds4.Uncles)).To(Equal(0)) @@ -140,10 +136,9 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct2IPLD.Cid().String(), })) - payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload) + iplds5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds5, ok := payload5.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds5).ToNot(BeNil()) Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds5.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds5.Uncles)).To(Equal(0)) @@ -158,10 +153,9 @@ var _ = Describe("Filterer", func() { Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(2))).To(BeTrue()) - payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload) + iplds6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds6, ok := payload6.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds6).ToNot(BeNil()) Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds6.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds6.Uncles)).To(Equal(0)) @@ -175,10 +169,9 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct2IPLD.Cid().String(), })) - payload7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload) + iplds7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds7, ok := payload7.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds7).ToNot(BeNil()) Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds7.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds7.Uncles)).To(Equal(0)) @@ -192,10 +185,9 @@ var _ = Describe("Filterer", func() { CID: mocks.State2IPLD.Cid().String(), })) - payload8, err := filterer.Filter(rctTopicsAndAddressFilterFail, mocks.MockConvertedPayload) + iplds8, err := filterer.Filter(rctTopicsAndAddressFilterFail, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - iplds8, ok := payload8.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds8).ToNot(BeNil()) Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds8.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds8.Uncles)).To(Equal(0)) diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index acea6197..0ed3df6c 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -25,11 +25,18 @@ import ( "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) +// Fetcher interface for substituting mocks in tests +type Fetcher interface { + Fetch(cids eth.CIDWrapper) (*eth.IPLDs, error) +} + // IPLDFetcher satisfies the IPLDFetcher interface for ethereum // It interfaces directly with PG-IPFS type IPLDFetcher struct { @@ -44,18 +51,15 @@ func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher { } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { - cidWrapper, ok := cids.(*CIDWrapper) - if !ok { - return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) - } +func (f *IPLDFetcher) Fetch(cids eth.CIDWrapper) (*eth.IPLDs, error) { log.Debug("fetching iplds") - iplds := IPLDs{} - iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10) + iplds := new(eth.IPLDs) + var ok bool + iplds.TotalDifficulty, ok = new(big.Int).SetString(cids.Header.TotalDifficulty, 10) if !ok { return nil, errors.New("eth fetcher: unable to set total difficulty") } - iplds.BlockNumber = cidWrapper.BlockNumber + iplds.BlockNumber = cids.BlockNumber tx, err := f.db.Beginx() if err != nil { @@ -72,27 +76,27 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { } }() - iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) + iplds.Header, err = f.FetchHeader(tx, cids.Header) if err != nil { return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) } - iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles) + iplds.Uncles, err = f.FetchUncles(tx, cids.Uncles) if err != nil { return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) } - iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) + iplds.Transactions, err = f.FetchTrxs(tx, cids.Transactions) if err != nil { return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) } - iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts) + iplds.Receipts, err = f.FetchRcts(tx, cids.Receipts) if err != nil { return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) } - iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes) + iplds.StateNodes, err = f.FetchState(tx, cids.StateNodes) if err != nil { return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) } - iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes) + iplds.StorageNodes, err = f.FetchStorage(tx, cids.StorageNodes) if err != nil { return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) } @@ -100,7 +104,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { } // FetchHeaders fetches headers -func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { @@ -113,7 +117,7 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, } // FetchUncles fetches uncles -func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.BlockModel, error) { log.Debug("fetching uncle iplds") uncleIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -130,7 +134,7 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockM } // FetchTrxs fetches transactions -func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -147,7 +151,7 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, } // FetchRcts fetches receipts -func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.BlockModel, error) { log.Debug("fetching receipt iplds") rctIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -164,9 +168,9 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockM } // FetchState fetches state nodes -func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { +func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]eth.StateNode, error) { log.Debug("fetching state iplds") - stateNodes := make([]StateNode, 0, len(cids)) + stateNodes := make([]eth.StateNode, 0, len(cids)) for _, stateNode := range cids { if stateNode.CID == "" { continue @@ -175,7 +179,7 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNod if err != nil { return nil, err } - stateNodes = append(stateNodes, StateNode{ + stateNodes = append(stateNodes, eth.StateNode{ IPLD: ipfs.BlockModel{ Data: stateBytes, CID: stateNode.CID, @@ -189,9 +193,9 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNod } // FetchStorage fetches storage nodes -func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { +func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []eth.StorageNodeWithStateKeyModel) ([]eth.StorageNode, error) { log.Debug("fetching storage iplds") - storageNodes := make([]StorageNode, 0, len(cids)) + storageNodes := make([]eth.StorageNode, 0, len(cids)) for _, storageNode := range cids { if storageNode.CID == "" || storageNode.StateKey == "" { continue @@ -200,7 +204,7 @@ func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyMo if err != nil { return nil, err } - storageNodes = append(storageNodes, StorageNode{ + storageNodes = append(storageNodes, eth.StorageNode{ IPLD: ipfs.BlockModel{ Data: storageBytes, CID: storageNode.CID, diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index c1165d8c..a370c774 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -20,15 +20,17 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( db *postgres.DB - pubAndIndexer *eth.IPLDPublisher + pubAndIndexer *eth2.IPLDPublisher fetcher *eth.IPLDFetcher ) @@ -38,7 +40,7 @@ var _ = Describe("IPLDFetcher", func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - pubAndIndexer = eth.NewIPLDPublisher(db) + pubAndIndexer = eth2.NewIPLDPublisher(db) err = pubAndIndexer.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) fetcher = eth.NewIPLDFetcher(db) @@ -48,10 +50,9 @@ var _ = Describe("IPLDFetcher", func() { }) It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { - i, err := fetcher.Fetch(mocks.MockCIDWrapper) + iplds, err := fetcher.Fetch(*mocks.MockCIDWrapper) Expect(err).ToNot(HaveOccurred()) - iplds, ok := i.(eth.IPLDs) - Expect(ok).To(BeTrue()) + Expect(iplds).ToNot(BeNil()) Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty)) Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number())) Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) diff --git a/pkg/eth/mocks/converter.go b/pkg/eth/mocks/converter.go index 728d1871..50158a4e 100644 --- a/pkg/eth/mocks/converter.go +++ b/pkg/eth/mocks/converter.go @@ -21,8 +21,8 @@ import ( "github.com/ethereum/go-ethereum/statediff" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // PayloadConverter is the underlying struct for the Converter interface diff --git a/pkg/eth/mocks/indexer.go b/pkg/eth/mocks/indexer.go index c01d4dd2..cee84767 100644 --- a/pkg/eth/mocks/indexer.go +++ b/pkg/eth/mocks/indexer.go @@ -19,9 +19,9 @@ package mocks import ( "fmt" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/eth" ) // CIDIndexer is the underlying struct for the Indexer interface diff --git a/pkg/eth/mocks/publisher.go b/pkg/eth/mocks/publisher.go index c3e9a26a..5758b277 100644 --- a/pkg/eth/mocks/publisher.go +++ b/pkg/eth/mocks/publisher.go @@ -19,9 +19,9 @@ package mocks import ( "fmt" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/eth" ) // IPLDPublisher is the underlying struct for the Publisher interface diff --git a/pkg/eth/subscription_config.go b/pkg/eth/subscription_config.go index b56585ee..d74ad3fd 100644 --- a/pkg/eth/subscription_config.go +++ b/pkg/eth/subscription_config.go @@ -20,8 +20,6 @@ import ( "math/big" "github.com/spf13/viper" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) // SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher @@ -125,28 +123,3 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { } return sc, nil } - -// StartingBlock satisfies the SubscriptionSettings() interface -func (sc *SubscriptionSettings) StartingBlock() *big.Int { - return sc.Start -} - -// EndingBlock satisfies the SubscriptionSettings() interface -func (sc *SubscriptionSettings) EndingBlock() *big.Int { - return sc.End -} - -// HistoricalData satisfies the SubscriptionSettings() interface -func (sc *SubscriptionSettings) HistoricalData() bool { - return sc.BackFill -} - -// HistoricalDataOnly satisfies the SubscriptionSettings() interface -func (sc *SubscriptionSettings) HistoricalDataOnly() bool { - return sc.BackFillOnly -} - -// ChainType satisfies the SubscriptionSettings() interface -func (sc *SubscriptionSettings) ChainType() shared.ChainType { - return shared.Ethereum -} diff --git a/pkg/eth/test_helpers.go b/pkg/eth/test_helpers.go index 5d241203..48fd71a2 100644 --- a/pkg/eth/test_helpers.go +++ b/pkg/eth/test_helpers.go @@ -19,6 +19,7 @@ package eth import ( . "github.com/onsi/gomega" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" ) @@ -45,7 +46,7 @@ func TearDownDB(db *postgres.DB) { } // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string -func TxModelsContainsCID(txs []TxModel, cid string) bool { +func TxModelsContainsCID(txs []eth.TxModel, cid string) bool { for _, tx := range txs { if tx.CID == cid { return true @@ -55,7 +56,7 @@ func TxModelsContainsCID(txs []TxModel, cid string) bool { } // ListContainsBytes used to check if a list of byte arrays contains a particular byte array -func ReceiptModelsContainsCID(rcts []ReceiptModel, cid string) bool { +func ReceiptModelsContainsCID(rcts []eth.ReceiptModel, cid string) bool { for _, rct := range rcts { if rct.CID == cid { return true diff --git a/pkg/shared/env.go b/pkg/shared/env.go index b11c726e..c14774a4 100644 --- a/pkg/shared/env.go +++ b/pkg/shared/env.go @@ -17,79 +17,32 @@ package shared import ( - "github.com/ethereum/go-ethereum/rpc" - - "github.com/btcsuite/btcd/rpcclient" "github.com/spf13/viper" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" ) // Env variables const ( - HTTP_TIMEOUT = "HTTP_TIMEOUT" - - ETH_WS_PATH = "ETH_WS_PATH" - ETH_HTTP_PATH = "ETH_HTTP_PATH" ETH_NODE_ID = "ETH_NODE_ID" ETH_CLIENT_NAME = "ETH_CLIENT_NAME" ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" ETH_NETWORK_ID = "ETH_NETWORK_ID" ETH_CHAIN_ID = "ETH_CHAIN_ID" - - BTC_WS_PATH = "BTC_WS_PATH" - BTC_HTTP_PATH = "BTC_HTTP_PATH" - BTC_NODE_PASSWORD = "BTC_NODE_PASSWORD" - BTC_NODE_USER = "BTC_NODE_USER" - BTC_NODE_ID = "BTC_NODE_ID" - BTC_CLIENT_NAME = "BTC_CLIENT_NAME" - BTC_GENESIS_BLOCK = "BTC_GENESIS_BLOCK" - BTC_NETWORK_ID = "BTC_NETWORK_ID" - BTC_CHAIN_ID = "BTC_CHAIN_ID" ) -// GetEthNodeAndClient returns eth node info and client from path url -func GetEthNodeAndClient(path string) (node.Node, *rpc.Client, error) { +// GetNodeInfo returns the ethereum node info from env variables +func GetNodeInfo() node.Info { viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID) - rpcClient, err := rpc.Dial(path) - if err != nil { - return node.Node{}, nil, err - } - return node.Node{ + return node.Info{ ID: viper.GetString("ethereum.nodeID"), ClientName: viper.GetString("ethereum.clientName"), GenesisBlock: viper.GetString("ethereum.genesisBlock"), NetworkID: viper.GetString("ethereum.networkID"), ChainID: viper.GetUint64("ethereum.chainID"), - }, rpcClient, nil -} - -// GetBtcNodeAndClient returns btc node info from path url -func GetBtcNodeAndClient(path string) (node.Node, *rpcclient.ConnConfig) { - viper.BindEnv("bitcoin.nodeID", BTC_NODE_ID) - viper.BindEnv("bitcoin.clientName", BTC_CLIENT_NAME) - viper.BindEnv("bitcoin.genesisBlock", BTC_GENESIS_BLOCK) - viper.BindEnv("bitcoin.networkID", BTC_NETWORK_ID) - viper.BindEnv("bitcoin.pass", BTC_NODE_PASSWORD) - viper.BindEnv("bitcoin.user", BTC_NODE_USER) - viper.BindEnv("bitcoin.chainID", BTC_CHAIN_ID) - - // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node - return node.Node{ - ID: viper.GetString("bitcoin.nodeID"), - ClientName: viper.GetString("bitcoin.clientName"), - GenesisBlock: viper.GetString("bitcoin.genesisBlock"), - NetworkID: viper.GetString("bitcoin.networkID"), - ChainID: viper.GetUint64("bitcoin.chainID"), - }, &rpcclient.ConnConfig{ - Host: path, - HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode - DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("bitcoin.pass"), - User: viper.GetString("bitcoin.user"), - } + } } diff --git a/pkg/shared/mocks/payload_fetcher.go b/pkg/shared/mocks/payload_fetcher.go index 1b3e2788..218cd923 100644 --- a/pkg/shared/mocks/payload_fetcher.go +++ b/pkg/shared/mocks/payload_fetcher.go @@ -20,7 +20,7 @@ import ( "errors" "sync/atomic" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // PayloadFetcher mock for tests diff --git a/pkg/shared/mocks/retriever.go b/pkg/shared/mocks/retriever.go index b878cea5..ba3843f9 100644 --- a/pkg/shared/mocks/retriever.go +++ b/pkg/shared/mocks/retriever.go @@ -18,7 +18,7 @@ package mocks import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // CIDRetriever is a mock CID retriever for use in tests diff --git a/pkg/shared/mocks/streamer.go b/pkg/shared/mocks/streamer.go index 1fdb49be..daf683eb 100644 --- a/pkg/shared/mocks/streamer.go +++ b/pkg/shared/mocks/streamer.go @@ -18,7 +18,7 @@ package mocks import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) // PayloadStreamer mock struct diff --git a/pkg/shared/test_helpers.go b/pkg/shared/test_helpers.go index 09291d2f..38eead8e 100644 --- a/pkg/shared/test_helpers.go +++ b/pkg/shared/test_helpers.go @@ -19,10 +19,11 @@ package shared import ( "bytes" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" @@ -30,11 +31,11 @@ import ( // SetupDB is use to setup a db for watcher tests func SetupDB() (*postgres.DB, error) { - return postgres.NewDB(config.Database{ + return postgres.NewDB(postgres.Config{ Hostname: "localhost", Name: "vulcanize_testing", Port: 5432, - }, node.Node{}) + }, node.Info{}) } // ListContainsString used to check if a list of strings contains a particular string @@ -58,7 +59,7 @@ func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool { } // ListContainsGap used to check if a list of Gaps contains a particular Gap -func ListContainsGap(gapList []Gap, gap Gap) bool { +func ListContainsGap(gapList []eth.DBGap, gap eth.DBGap) bool { for _, listGap := range gapList { if listGap == gap { return true diff --git a/pkg/watch/api.go b/pkg/watch/api.go index 6154cfe3..fd95b79d 100644 --- a/pkg/watch/api.go +++ b/pkg/watch/api.go @@ -20,15 +20,14 @@ import ( "context" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" - v "github.com/vulcanize/ipfs-blockchain-watcher/version" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" + v "github.com/vulcanize/ipld-eth-server/version" ) // APIName is the namespace used for the state diffing service API @@ -50,24 +49,7 @@ func NewPublicWatcherAPI(w Watcher) *PublicWatcherAPI { } // Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed -func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) { - var params shared.SubscriptionSettings - switch api.w.Chain() { - case shared.Ethereum: - var ethParams eth.SubscriptionSettings - if err := rlp.DecodeBytes(rlpParams, ðParams); err != nil { - return nil, err - } - params = ðParams - case shared.Bitcoin: - var btcParams btc.SubscriptionSettings - if err := rlp.DecodeBytes(rlpParams, &btcParams); err != nil { - return nil, err - } - params = &btcParams - default: - panic("ipfs-blockchain-watcher is not configured for a specific chain type") - } +func (api *PublicWatcherAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -107,7 +89,7 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc // Node is a public rpc method to allow transformers to fetch the node info for the watcher // NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself -func (api *PublicWatcherAPI) Node() *node.Node { +func (api *PublicWatcherAPI) Node() *node.Info { return api.w.Node() } @@ -136,7 +118,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo { return &p2p.NodeInfo{ // TODO: formalize this ID: "vulcanizeDB", - Name: "ipfs-blockchain-watcher", + Name: "ipld-eth-server", } } diff --git a/pkg/watch/config.go b/pkg/watch/config.go index 2f8bac20..45880186 100644 --- a/pkg/watch/config.go +++ b/pkg/watch/config.go @@ -17,33 +17,22 @@ package watch import ( - "fmt" "os" "path/filepath" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" - "github.com/vulcanize/ipfs-blockchain-watcher/utils" + + "github.com/vulcanize/ipld-eth-server/utils" ) // Env variables const ( - SUPERNODE_CHAIN = "SUPERNODE_CHAIN" - SUPERNODE_SYNC = "SUPERNODE_SYNC" - SUPERNODE_WORKERS = "SUPERNODE_WORKERS" - SUPERNODE_SERVER = "SUPERNODE_SERVER" - SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH" - SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH" - SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH" - SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL" - - SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS" - SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS" - SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME" + SERVER_WS_PATH = "SERVER_WS_PATH" + SERVER_IPC_PATH = "SERVER_IPC_PATH" + SERVER_HTTP_PATH = "SERVER_HTTP_PATH" SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS" SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS" @@ -52,97 +41,47 @@ const ( // Config struct type Config struct { - Chain shared.ChainType - DBConfig config.Database - // Server fields - Serve bool - ServeDBConn *postgres.DB + DB *postgres.DB + DBConfig postgres.Config WSEndpoint string HTTPEndpoint string IPCEndpoint string - // Sync params - Sync bool - SyncDBConn *postgres.DB - Workers int - WSClient interface{} - NodeInfo node.Node - // Historical switch - Historical bool + NodeInfo node.Info } // NewConfig is used to initialize a watcher config from a .toml file // Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile func NewConfig() (*Config, error) { c := new(Config) - var err error - viper.BindEnv("watcher.chain", SUPERNODE_CHAIN) - viper.BindEnv("watcher.sync", SUPERNODE_SYNC) - viper.BindEnv("watcher.workers", SUPERNODE_WORKERS) - viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH) - viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH) - viper.BindEnv("watcher.server", SUPERNODE_SERVER) - viper.BindEnv("watcher.wsPath", SUPERNODE_WS_PATH) - viper.BindEnv("watcher.ipcPath", SUPERNODE_IPC_PATH) - viper.BindEnv("watcher.httpPath", SUPERNODE_HTTP_PATH) - viper.BindEnv("watcher.backFill", SUPERNODE_BACKFILL) - - c.Historical = viper.GetBool("watcher.backFill") - chain := viper.GetString("watcher.chain") - c.Chain, err = shared.NewChainType(chain) - if err != nil { - return nil, err - } + viper.BindEnv("server.wsPath", SERVER_WS_PATH) + viper.BindEnv("server.ipcPath", SERVER_IPC_PATH) + viper.BindEnv("server.httpPath", SERVER_HTTP_PATH) c.DBConfig.Init() - c.Sync = viper.GetBool("watcher.sync") - if c.Sync { - workers := viper.GetInt("watcher.workers") - if workers < 1 { - workers = 1 - } - c.Workers = workers - switch c.Chain { - case shared.Ethereum: - ethWS := viper.GetString("ethereum.wsPath") - c.NodeInfo, c.WSClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("ws://%s", ethWS)) - if err != nil { - return nil, err - } - case shared.Bitcoin: - btcWS := viper.GetString("bitcoin.wsPath") - c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS) - } - syncDBConn := overrideDBConnConfig(c.DBConfig, Sync) - syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo) - c.SyncDBConn = &syncDB - } - c.Serve = viper.GetBool("watcher.server") - if c.Serve { - wsPath := viper.GetString("watcher.wsPath") - if wsPath == "" { - wsPath = "127.0.0.1:8080" - } - c.WSEndpoint = wsPath - ipcPath := viper.GetString("watcher.ipcPath") - if ipcPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") - } - c.IPCEndpoint = ipcPath - httpPath := viper.GetString("watcher.httpPath") - if httpPath == "" { - httpPath = "127.0.0.1:8081" - } - c.HTTPEndpoint = httpPath - serveDBConn := overrideDBConnConfig(c.DBConfig, Serve) - serveDB := utils.LoadPostgres(serveDBConn, c.NodeInfo) - c.ServeDBConn = &serveDB + wsPath := viper.GetString("watcher.wsPath") + if wsPath == "" { + wsPath = "127.0.0.1:8080" } + c.WSEndpoint = wsPath + ipcPath := viper.GetString("watcher.ipcPath") + if ipcPath == "" { + home, err := os.UserHomeDir() + if err != nil { + return nil, err + } + ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") + } + c.IPCEndpoint = ipcPath + httpPath := viper.GetString("watcher.httpPath") + if httpPath == "" { + httpPath = "127.0.0.1:8081" + } + c.HTTPEndpoint = httpPath + overrideDBConnConfig(&c.DBConfig) + serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.DB = &serveDB return c, nil } @@ -154,23 +93,11 @@ var ( Serve mode = "serve" ) -func overrideDBConnConfig(con config.Database, m mode) config.Database { - switch m { - case Sync: - viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS) - viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS) - viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME) - con.MaxIdle = viper.GetInt("database.sync.maxIdle") - con.MaxOpen = viper.GetInt("database.sync.maxOpen") - con.MaxLifetime = viper.GetInt("database.sync.maxLifetime") - case Serve: - viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) - viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) - viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) - con.MaxIdle = viper.GetInt("database.server.maxIdle") - con.MaxOpen = viper.GetInt("database.server.maxOpen") - con.MaxLifetime = viper.GetInt("database.server.maxLifetime") - default: - } - return con +func overrideDBConnConfig(con *postgres.Config) { + viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) + con.MaxIdle = viper.GetInt("database.server.maxIdle") + con.MaxOpen = viper.GetInt("database.server.maxOpen") + con.MaxLifetime = viper.GetInt("database.server.maxLifetime") } diff --git a/pkg/watch/service.go b/pkg/watch/service.go index 74cb7422..b3ce55bf 100644 --- a/pkg/watch/service.go +++ b/pkg/watch/service.go @@ -28,10 +28,12 @@ import ( "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/builders" + eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + + "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/shared" ) const ( @@ -44,16 +46,14 @@ const ( type Watcher interface { // APIs(), Protocols(), Start() and Stop() ethnode.Service - // Data processing event loop - Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error // Pub-Sub handling event loop - Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) + Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) // Method to subscribe to the service - Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) + Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) // Method to unsubscribe from the service Unsubscribe(id rpc.ID) // Method to access the node info for the service - Node() *node.Node + Node() *node.Info // Method to access chain type Chain() shared.ChainType } @@ -62,32 +62,20 @@ type Watcher interface { type Service struct { // Used to sync access to the Subscriptions sync.Mutex - // Interface for streaming payloads over an rpc subscription - Streamer shared.PayloadStreamer - // Interface for converting raw payloads into IPLD object payloads - Converter shared.PayloadConverter - // Interface for publishing and indexing the PG-IPLD payloads - Publisher shared.IPLDPublisher // Interface for filtering and serving data according to subscribed clients according to their specification - Filterer shared.ResponseFilterer + Filterer eth.Filterer // Interface for fetching IPLD objects from IPFS - IPLDFetcher shared.IPLDFetcher + IPLDFetcher eth.Fetcher // Interface for searching and retrieving CIDs from Postgres index - Retriever shared.CIDRetriever - // Chan the processor uses to subscribe to payloads from the Streamer - PayloadChan chan shared.RawChainData + Retriever eth.Retriever // Used to signal shutdown of the service QuitChan chan bool // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) Subscriptions map[common.Hash]map[rpc.ID]Subscription // A mapping of subscription params hash to the corresponding subscription params - SubscriptionTypes map[common.Hash]shared.SubscriptionSettings + SubscriptionTypes map[common.Hash]eth.SubscriptionSettings // Info for the Geth node that this watcher is working with - NodeInfo *node.Node - // Number of publish workers - WorkerPoolSize int - // chain type for this service - chain shared.ChainType + NodeInfo *node.Info // Underlying db db *postgres.DB // wg for syncing serve processes @@ -97,44 +85,14 @@ type Service struct { // NewWatcher creates a new Watcher using an underlying Service struct func NewWatcher(settings *Config) (Watcher, error) { sn := new(Service) - var err error - // If we are syncing, initialize the needed interfaces - if settings.Sync { - sn.Streamer, sn.PayloadChan, err = builders.NewPayloadStreamer(settings.Chain, settings.WSClient) - if err != nil { - return nil, err - } - sn.Converter, err = builders.NewPayloadConverter(settings.Chain, settings.NodeInfo.ChainID) - if err != nil { - return nil, err - } - sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.SyncDBConn) - if err != nil { - return nil, err - } - sn.Filterer, err = builders.NewResponseFilterer(settings.Chain) - if err != nil { - return nil, err - } - } - // If we are serving, initialize the needed interfaces - if settings.Serve { - sn.Retriever, err = builders.NewCIDRetriever(settings.Chain, settings.ServeDBConn) - if err != nil { - return nil, err - } - sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.ServeDBConn) - if err != nil { - return nil, err - } - sn.db = settings.ServeDBConn - } + sn.Retriever = eth.NewCIDRetriever(settings.DB) + sn.IPLDFetcher = eth.NewIPLDFetcher(settings.DB) + sn.Filterer = eth.NewResponseFilterer() + sn.db = settings.DB sn.QuitChan = make(chan bool) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) - sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) - sn.WorkerPoolSize = settings.Workers + sn.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings) sn.NodeInfo = &settings.NodeInfo - sn.chain = settings.Chain return sn, nil } @@ -172,91 +130,24 @@ func (sap *Service) APIs() []rpc.API { Public: true, }, } - chainAPI, err := builders.NewPublicAPI(sap.chain, sap.db) + backend, err := eth.NewEthBackend(sap.db) if err != nil { log.Error(err) - return apis - } - return append(apis, chainAPI) -} - -// Sync streams incoming raw chain data and converts it for further processing -// It forwards the converted data to the publish process(es) it spins up -// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel -// This continues on no matter if or how many subscribers there are -func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error { - sub, err := sap.Streamer.Stream(sap.PayloadChan) - if err != nil { - return err - } - // spin up publish worker goroutines - publishPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) - for i := 1; i <= sap.WorkerPoolSize; i++ { - go sap.publish(wg, i, publishPayload) - log.Debugf("%s publish worker %d successfully spun up", sap.chain.String(), i) - } - go func() { - wg.Add(1) - defer wg.Done() - for { - select { - case payload := <-sap.PayloadChan: - ipldPayload, err := sap.Converter.Convert(payload) - if err != nil { - log.Errorf("watcher conversion error for chain %s: %v", sap.chain.String(), err) - continue - } - log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height()) - // If we have a ScreenAndServe process running, forward the iplds to it - select { - case screenAndServePayload <- ipldPayload: - default: - } - // Forward the payload to the publish workers - // this channel acts as a ring buffer - select { - case publishPayload <- ipldPayload: - default: - <-publishPayload - publishPayload <- ipldPayload - } - case err := <-sub.Err(): - log.Errorf("watcher subscription error for chain %s: %v", sap.chain.String(), err) - case <-sap.QuitChan: - log.Infof("quiting %s Sync process", sap.chain.String()) - return - } - } - }() - log.Infof("%s Sync goroutine successfully spun up", sap.chain.String()) - return nil -} - -// publish is spun up by SyncAndConvert and receives converted chain data from that process -// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan shared.ConvertedData) { - wg.Add(1) - defer wg.Done() - for { - select { - case payload := <-publishPayload: - log.Debugf("%s watcher sync worker %d publishing and indexing data streamed at head height %d", sap.chain.String(), id, payload.Height()) - if err := sap.Publisher.Publish(payload); err != nil { - log.Errorf("%s watcher publish worker %d publishing error: %v", sap.chain.String(), id, err) - continue - } - case <-sap.QuitChan: - log.Infof("%s watcher publish worker %d shutting down", sap.chain.String(), id) - return - } + return nil } + return append(apis, rpc.API{ + Namespace: eth.APIName, + Version: eth.APIVersion, + Service: eth.NewPublicEthAPI(backend), + Public: true, + }) } // Serve listens for incoming converter data off the screenAndServePayload from the Sync process // It filters and sends this data to any subscribers to the service // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only -func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { +func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) { sap.serveWg = wg go func() { wg.Add(1) @@ -266,17 +157,17 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share case payload := <-screenAndServePayload: sap.filterAndServe(payload) case <-sap.QuitChan: - log.Infof("quiting %s Serve process", sap.chain.String()) + log.Info("quiting eth ipld server process") return } } }() - log.Infof("%s Serve goroutine successfully spun up", sap.chain.String()) + log.Info("eth ipld server process successfully spun up") } // filterAndServe filters the payload according to each subscription type and sends to the subscriptions -func (sap *Service) filterAndServe(payload shared.ConvertedData) { - log.Debugf("sending %s payload to subscriptions", sap.chain.String()) +func (sap *Service) filterAndServe(payload eth2.ConvertedPayload) { + log.Debug("sending eth ipld payload to subscriptions") sap.Lock() sap.serveWg.Add(1) defer sap.Unlock() @@ -285,11 +176,11 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] if !ok { - log.Errorf("watcher %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) + log.Errorf("eth ipld server subscription configuration for subscription type %s not available", ty.Hex()) sap.closeType(ty) continue } - if subConfig.EndingBlock().Int64() > 0 && subConfig.EndingBlock().Int64() < payload.Height() { + if subConfig.End.Int64() > 0 && subConfig.End.Int64() < payload.Block.Number().Int64() { // We are not out of range for this subscription type // close it, and continue to the next sap.closeType(ty) @@ -297,21 +188,21 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { } response, err := sap.Filterer.Filter(subConfig, payload) if err != nil { - log.Errorf("watcher filtering error for chain %s: %v", sap.chain.String(), err) + log.Errorf("eth ipld server filtering error: %v", err) sap.closeType(ty) continue } responseRLP, err := rlp.EncodeToBytes(response) if err != nil { - log.Errorf("watcher rlp encoding error for chain %s: %v", sap.chain.String(), err) + log.Errorf("eth ipld server rlp encoding error: %v", err) continue } for id, sub := range subs { select { - case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: - log.Debugf("sending watcher %s payload to subscription %s", sap.chain.String(), id) + case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}: + log.Debugf("sending eth ipld server payload to subscription %s", id) default: - log.Infof("unable to send %s payload to subscription %s; channel has no receiver", sap.chain.String(), id) + log.Infof("unable to send eth ipld payload to subscription %s; channel has no receiver", id) } } } @@ -319,20 +210,15 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { // Subscribe is used by the API to remotely subscribe to the service loop // The params must be rlp serializable and satisfy the SubscriptionSettings() interface -func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { +func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) { sap.serveWg.Add(1) defer sap.serveWg.Done() - log.Infof("New %s subscription %s", sap.chain.String(), id) + log.Infof("new eth ipld subscription %s", id) subscription := Subscription{ ID: id, PayloadChan: sub, QuitChan: quitChan, } - if params.ChainType() != sap.chain { - sendNonBlockingErr(subscription, fmt.Errorf("subscription %s is for chain %s, service supports chain %s", id, params.ChainType().String(), sap.chain.String())) - sendNonBlockingQuit(subscription) - return - } // Subscription type is defined as the hash of the rlp-serialized subscription settings by, err := rlp.EncodeToBytes(params) if err != nil { @@ -341,7 +227,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha return } subscriptionType := crypto.Keccak256Hash(by) - if !params.HistoricalDataOnly() { + if !params.BackFillOnly { // Add subscriber sap.Lock() if sap.Subscriptions[subscriptionType] == nil { @@ -353,9 +239,9 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha } // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // Otherwise we only filter new data as it is streamed in from the state diffing geth node - if params.HistoricalData() || params.HistoricalDataOnly() { + if params.BackFill || params.BackFillOnly { if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, fmt.Errorf("%s watcher subscriber backfill error: %v", sap.chain.String(), err)) + sendNonBlockingErr(subscription, fmt.Errorf("eth ipld server subscription backfill error: %v", err)) sendNonBlockingQuit(subscription) return } @@ -363,8 +249,8 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha } // sendHistoricalData sends historical data to the requesting subscription -func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { - log.Infof("Sending %s historical data to subscription %s", sap.chain.String(), id) +func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.SubscriptionSettings) error { + log.Infof("sending eth ipld historical data to subscription %s", id) // Retrieve cached CIDs relevant to this subscriber var endingBlock int64 var startingBlock int64 @@ -373,31 +259,31 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share if err != nil { return err } - if startingBlock < params.StartingBlock().Int64() { - startingBlock = params.StartingBlock().Int64() + if startingBlock < params.Start.Int64() { + startingBlock = params.Start.Int64() } endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() if err != nil { return err } - if endingBlock > params.EndingBlock().Int64() && params.EndingBlock().Int64() > 0 && params.EndingBlock().Int64() > startingBlock { - endingBlock = params.EndingBlock().Int64() + if endingBlock > params.End.Int64() && params.End.Int64() > 0 && params.End.Int64() > startingBlock { + endingBlock = params.End.Int64() } - log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) - log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) + log.Debugf("eth ipld historical data starting block: %d", params.Start.Int64()) + log.Debugf("eth ipld historical data ending block: %d", endingBlock) go func() { sap.serveWg.Add(1) defer sap.serveWg.Done() for i := startingBlock; i <= endingBlock; i++ { select { case <-sap.QuitChan: - log.Infof("%s watcher historical data feed to subscription %s closed", sap.chain.String(), id) + log.Infof("%s watcher historical data feed to subscription %s closed", id) return default: } cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf(" %s watcher CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("eth ipld server cid retrieval error at block %d\r%s", i, err.Error())) continue } if empty { @@ -406,7 +292,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for _, cids := range cidWrappers { response, err := sap.IPLDFetcher.Fetch(cids) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("%s watcher IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("eth ipld server ipld fetching error at block %d\r%s", i, err.Error())) continue } responseRLP, err := rlp.EncodeToBytes(response) @@ -415,19 +301,19 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share continue } select { - case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: - log.Debugf("sending watcher historical data payload to %s subscription %s", sap.chain.String(), id) + case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}: + log.Debugf("eth ipld server sending historical data payload to subscription %s", id) default: - log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) + log.Infof("eth ipld server unable to send backFill payload to subscription %s; channel has no receiver", id) } } } // when we are done backfilling send an empty payload signifying so in the msg select { case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: - log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id) + log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id) default: - log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id) + log.Infof("eth ipld server unable to send backFill completion notice to %s subscription %s", id) } }() return nil @@ -435,7 +321,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share // Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop func (sap *Service) Unsubscribe(id rpc.ID) { - log.Infof("Unsubscribing %s from the %s watcher service", id, sap.chain.String()) + log.Infof("unsubscribing %s from the eth ipld server", id) sap.Lock() for ty := range sap.Subscriptions { delete(sap.Subscriptions[ty], id) @@ -451,12 +337,9 @@ func (sap *Service) Unsubscribe(id rpc.ID) { // Start is used to begin the service // This is mostly just to satisfy the node.Service interface func (sap *Service) Start(*p2p.Server) error { - log.Infof("Starting %s watcher service", sap.chain.String()) + log.Info("starting eth ipld server") wg := new(sync.WaitGroup) - payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize) - if err := sap.Sync(wg, payloadChan); err != nil { - return err - } + payloadChan := make(chan eth2.ConvertedPayload, PayloadChanBufferSize) sap.Serve(wg, payloadChan) return nil } @@ -464,7 +347,7 @@ func (sap *Service) Start(*p2p.Server) error { // Stop is used to close down the service // This is mostly just to satisfy the node.Service interface func (sap *Service) Stop() error { - log.Infof("Stopping %s watcher service", sap.chain.String()) + log.Infof("stopping eth ipld server") sap.Lock() close(sap.QuitChan) sap.close() @@ -473,19 +356,19 @@ func (sap *Service) Stop() error { } // Node returns the node info for this service -func (sap *Service) Node() *node.Node { +func (sap *Service) Node() *node.Info { return sap.NodeInfo } // Chain returns the chain type for this service func (sap *Service) Chain() shared.ChainType { - return sap.chain + return shared.Ethereum } // close is used to close all listening subscriptions // close needs to be called with subscription access locked func (sap *Service) close() { - log.Infof("Closing all %s subscriptions", sap.chain.String()) + log.Infof("closing all eth ipld server subscriptions") for subType, subs := range sap.Subscriptions { for _, sub := range subs { sendNonBlockingQuit(sub) @@ -498,7 +381,7 @@ func (sap *Service) close() { // closeType is used to close all subscriptions of given type // closeType needs to be called with subscription access locked func (sap *Service) closeType(subType common.Hash) { - log.Infof("Closing all %s subscriptions of type %s", sap.chain.String(), subType.String()) + log.Infof("closing all eth ipld server subscriptions of type %s", subType.String()) subs := sap.Subscriptions[subType] for _, sub := range subs { sendNonBlockingQuit(sub)