From 3a666df2948587fb9d969da0567465d6e762242a Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 8 Oct 2019 14:51:38 -0500 Subject: [PATCH] backfiller refactoring; explicity errs; golint --- cmd/syncAndPublish.go | 46 +++++---- cmd/syncPublishScreenAndServe.go | 50 +++------- environments/seedNodeSubscription.toml | 35 ------- pkg/ipfs/converter.go | 36 +++---- pkg/ipfs/fetcher.go | 66 ++++++------- pkg/ipfs/fetcher_test.go | 2 +- pkg/ipfs/helpers.go | 27 ++++-- pkg/ipfs/mocks/publisher.go | 1 + pkg/ipfs/publisher.go | 56 +++++------ pkg/ipfs/resolver.go | 6 +- pkg/super_node/backfiller.go | 13 ++- pkg/super_node/filterer.go | 42 ++++----- pkg/super_node/repository.go | 54 +++++------ pkg/super_node/retriever.go | 53 ++++++----- pkg/super_node/service.go | 126 ++++++++++++------------- pkg/super_node/test_helpers.go | 4 + 16 files changed, 283 insertions(+), 334 deletions(-) delete mode 100644 environments/seedNodeSubscription.toml diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index c68eecad..24a4987b 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -16,27 +16,23 @@ package cmd import ( - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "os" "path/filepath" syn "sync" "time" - "github.com/vulcanize/vulcanizedb/pkg/super_node" - - "github.com/spf13/viper" - - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/geth/client" vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc" "github.com/vulcanize/vulcanizedb/pkg/geth/node" + "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/utils" ) @@ -60,19 +56,19 @@ func init() { } func syncAndPublish() { - superNode, err := newSuperNode() - if err != nil { - log.Fatal(err) + superNode, newNodeErr := newSuperNode() + if newNodeErr != nil { + log.Fatal(newNodeErr) } wg := &syn.WaitGroup{} - err = superNode.SyncAndPublish(wg, nil, nil) - if err != nil { - log.Fatal(err) + syncAndPubErr := superNode.SyncAndPublish(wg, nil, nil) + if syncAndPubErr != nil { + log.Fatal(syncAndPubErr) } if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { - backfiller := newBackFiller(superNode.GetPublisher()) - if err != nil { - log.Fatal(err) + backfiller, newBackFillerErr := newBackFiller() + if newBackFillerErr != nil { + log.Fatal(newBackFillerErr) } backfiller.FillGaps(wg, nil) } @@ -80,9 +76,9 @@ func syncAndPublish() { } func getBlockChainAndClient(path string) (*geth.BlockChain, core.RpcClient) { - rawRpcClient, err := rpc.Dial(path) - if err != nil { - log.Fatal(err) + rawRpcClient, dialErr := rpc.Dial(path) + if dialErr != nil { + log.Fatal(dialErr) } rpcClient := client.NewRpcClient(rawRpcClient, ipc) ethClient := ethclient.NewClient(rawRpcClient) @@ -99,9 +95,9 @@ func newSuperNode() (super_node.NodeInterface, error) { quitChan := make(chan bool) ipfsPath = viper.GetString("client.ipfsPath") if ipfsPath == "" { - home, err := os.UserHomeDir() - if err != nil { - log.Fatal(err) + home, homeDirErr := os.UserHomeDir() + if homeDirErr != nil { + log.Fatal(homeDirErr) } ipfsPath = filepath.Join(home, ".ipfs") } @@ -112,7 +108,7 @@ func newSuperNode() (super_node.NodeInterface, error) { return super_node.NewSuperNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node()) } -func newBackFiller(ipfsPublisher ipfs.IPLDPublisher) super_node.BackFillInterface { +func newBackFiller() (super_node.BackFillInterface, error) { blockChain, archivalRpcClient := getBlockChainAndClient(viper.GetString("backfill.ipcPath")) db := utils.LoadPostgres(databaseConfig, blockChain.Node()) freq := viper.GetInt("backfill.frequency") @@ -122,5 +118,5 @@ func newBackFiller(ipfsPublisher ipfs.IPLDPublisher) super_node.BackFillInterfac } else { frequency = time.Duration(freq) } - return super_node.NewBackFillService(ipfsPublisher, &db, archivalRpcClient, time.Minute*frequency) + return super_node.NewBackFillService(ipfsPath, &db, archivalRpcClient, time.Minute*frequency) } diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 00e4f557..659ac095 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -16,11 +16,8 @@ package cmd import ( - "os" - "path/filepath" syn "sync" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -47,51 +44,30 @@ func init() { } func syncPublishScreenAndServe() { - superNode, err := newSuperNode() - if err != nil { - log.Fatal(err) + superNode, newNodeErr := newSuperNode() + if newNodeErr != nil { + log.Fatal(newNodeErr) } wg := &syn.WaitGroup{} forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000) forwardQuitChan := make(chan bool, 1) - err = superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) - if err != nil { - log.Fatal(err) + syncAndPubErr := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) + if syncAndPubErr != nil { + log.Fatal(syncAndPubErr) } - superNode.ScreenAndServe(forwardPayloadChan, forwardQuitChan) + superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { - backfiller := newBackFiller(superNode.GetPublisher()) - if err != nil { - log.Fatal(err) + backfiller, newBackFillerErr := newBackFiller() + if newBackFillerErr != nil { + log.Fatal(newBackFillerErr) } backfiller.FillGaps(wg, nil) } - var ipcPath string - ipcPath = viper.GetString("server.ipcPath") - if ipcPath == "" { - home, err := os.UserHomeDir() - if err != nil { - log.Fatal(err) - } - ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") - } - _, _, err = rpc.StartIPCEndpoint(ipcPath, superNode.APIs()) - if err != nil { - log.Fatal(err) - } - - var wsEndpoint string - wsEndpoint = viper.GetString("server.wsEndpoint") - if wsEndpoint == "" { - wsEndpoint = "127.0.0.1:8080" - } - var exposeAll = true - var wsOrigins []string = nil - _, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll) - if err != nil { - log.Fatal(err) + serverErr := startServers(superNode) + if serverErr != nil { + log.Fatal(serverErr) } wg.Wait() } diff --git a/environments/seedNodeSubscription.toml b/environments/seedNodeSubscription.toml deleted file mode 100644 index d5142282..00000000 --- a/environments/seedNodeSubscription.toml +++ /dev/null @@ -1,35 +0,0 @@ -[subscription] - path = "ws://seed0.20c.com:8080" - backfill = true - backfillOnly = false - startingBlock = 0 - endingBlock = 0 - [subscription.headerFilter] - off = false - finalOnly = true - [subscription.trxFilter] - off = false - src = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - dst = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - [subscription.receiptFilter] - off = false - contracts = [] - topic0s = [ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" - ] - [subscription.stateFilter] - off = false - addresses = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" - ] - intermediateNodes = false - [subscription.storageFilter] - off = true - addresses = [] - storageKeys = [] - intermediateNodes = false \ No newline at end of file diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index 454c5fd9..93afb667 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -45,14 +45,14 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *Converter { func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack block rlp to access fields block := new(types.Block) - err := rlp.DecodeBytes(payload.BlockRlp, block) - if err != nil { - return nil, err + decodeErr := rlp.DecodeBytes(payload.BlockRlp, block) + if decodeErr != nil { + return nil, decodeErr } header := block.Header() - headerRlp, err := rlp.EncodeToBytes(header) - if err != nil { - return nil, err + headerRlp, encodeErr := rlp.EncodeToBytes(header) + if encodeErr != nil { + return nil, encodeErr } trxLen := len(block.Transactions()) convertedPayload := &IPLDPayload{ @@ -70,9 +70,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { transactions := block.Transactions() for _, trx := range transactions { // Extract to and from data from the the transactions for indexing - from, err := types.Sender(signer, trx) - if err != nil { - return nil, err + from, senderErr := types.Sender(signer, trx) + if senderErr != nil { + return nil, senderErr } txMeta := &TrxMetaData{ Dst: handleNullAddr(trx.To()), @@ -84,14 +84,14 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Decode receipts for this block receipts := make(types.Receipts, 0) - err = rlp.DecodeBytes(payload.ReceiptsRlp, &receipts) - if err != nil { - return nil, err + decodeErr = rlp.DecodeBytes(payload.ReceiptsRlp, &receipts) + if decodeErr != nil { + return nil, decodeErr } // Derive any missing fields - err = receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()) - if err != nil { - return nil, err + deriveErr := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()) + if deriveErr != nil { + return nil, deriveErr } for i, receipt := range receipts { // If the transaction for this receipt has a "to" address, the above DeriveFields() fails to assign it to the receipt's ContractAddress @@ -118,9 +118,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack state diff rlp to access fields stateDiff := new(statediff.StateDiff) - err = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) - if err != nil { - return nil, err + decodeErr = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) + if decodeErr != nil { + return nil, decodeErr } for _, createdAccount := range stateDiff.CreatedAccounts { hashKey := common.BytesToHash(createdAccount.Key) diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index bfaaa1ac..1b7e893f 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -28,7 +28,7 @@ import ( // IPLDFetcher is an interface for fetching IPLDs type IPLDFetcher interface { - FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) + FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) } // EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS @@ -47,8 +47,8 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { }, nil } -// FetchCIDs is the exported method for fetching and returning all the cids passed in a CIDWrapper -func (f *EthIPLDFetcher) FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) { +// FetchIPLDs is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper +func (f *EthIPLDFetcher) FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) { log.Debug("fetching iplds") blocks := &IPLDWrapper{ @@ -61,29 +61,29 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) { StorageNodes: make(map[common.Hash]map[common.Hash]blocks.Block), } - err := f.fetchHeaders(cids, blocks) - if err != nil { - return nil, err + headersErr := f.fetchHeaders(cids, blocks) + if headersErr != nil { + return nil, headersErr } - err = f.fetchUncles(cids, blocks) - if err != nil { - return nil, err + unclesErr := f.fetchUncles(cids, blocks) + if unclesErr != nil { + return nil, unclesErr } - err = f.fetchTrxs(cids, blocks) - if err != nil { - return nil, err + trxsErr := f.fetchTrxs(cids, blocks) + if trxsErr != nil { + return nil, trxsErr } - err = f.fetchRcts(cids, blocks) - if err != nil { - return nil, err + rctsErr := f.fetchRcts(cids, blocks) + if rctsErr != nil { + return nil, rctsErr } - err = f.fetchStorage(cids, blocks) - if err != nil { - return nil, err + storageErr := f.fetchStorage(cids, blocks) + if storageErr != nil { + return nil, storageErr } - err = f.fetchState(cids, blocks) - if err != nil { - return nil, err + stateErr := f.fetchState(cids, blocks) + if stateErr != nil { + return nil, stateErr } return blocks, nil @@ -174,13 +174,13 @@ func (f *EthIPLDFetcher) fetchState(cids CIDWrapper, blocks *IPLDWrapper) error if stateNode.CID == "" || stateNode.Key == "" { continue } - dc, err := cid.Decode(stateNode.CID) - if err != nil { - return err + dc, decodeErr := cid.Decode(stateNode.CID) + if decodeErr != nil { + return decodeErr } - block, err := f.fetch(dc) - if err != nil { - return err + block, fetchErr := f.fetch(dc) + if fetchErr != nil { + return fetchErr } blocks.StateNodes[common.HexToHash(stateNode.Key)] = block } @@ -196,13 +196,13 @@ func (f *EthIPLDFetcher) fetchStorage(cids CIDWrapper, blks *IPLDWrapper) error if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { continue } - dc, err := cid.Decode(storageNode.CID) - if err != nil { - return err + dc, decodeErr := cid.Decode(storageNode.CID) + if decodeErr != nil { + return decodeErr } - blk, err := f.fetch(dc) - if err != nil { - return err + blk, fetchErr := f.fetch(dc) + if fetchErr != nil { + return fetchErr } if blks.StorageNodes[common.HexToHash(storageNode.StateKey)] == nil { blks.StorageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) diff --git a/pkg/ipfs/fetcher_test.go b/pkg/ipfs/fetcher_test.go index 1aab413b..8205f86b 100644 --- a/pkg/ipfs/fetcher_test.go +++ b/pkg/ipfs/fetcher_test.go @@ -83,7 +83,7 @@ var _ = Describe("Fetcher", func() { It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { fetcher := new(ipfs.EthIPLDFetcher) fetcher.BlockService = mockBlockService - iplds, err := fetcher.FetchCIDs(mockCIDWrapper) + iplds, err := fetcher.FetchIPLDs(mockCIDWrapper) Expect(err).ToNot(HaveOccurred()) Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) Expect(len(iplds.Headers)).To(Equal(1)) diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/helpers.go index 9096762a..cb004918 100644 --- a/pkg/ipfs/helpers.go +++ b/pkg/ipfs/helpers.go @@ -23,23 +23,38 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/plugin/loader" "github.com/ipfs/go-ipfs/repo/fsrepo" ) +// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node +// This should only be called once +func InitIPFSPlugins() error { + l, err := loader.NewPluginLoader("") + if err != nil { + return err + } + err = l.Initialize() + if err != nil { + return err + } + return l.Inject() +} + // InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs) func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) { - r, err := fsrepo.Open(ipfsPath) - if err != nil { - return nil, err + r, openErr := fsrepo.Open(ipfsPath) + if openErr != nil { + return nil, openErr } ctx := context.Background() cfg := &core.BuildCfg{ Online: false, Repo: r, } - ipfsNode, err := core.NewNode(ctx, cfg) - if err != nil { - return nil, err + ipfsNode, newNodeErr := core.NewNode(ctx, cfg) + if newNodeErr != nil { + return nil, newNodeErr } return ipfsNode.Blocks, nil } diff --git a/pkg/ipfs/mocks/publisher.go b/pkg/ipfs/mocks/publisher.go index ce31968a..c8f64449 100644 --- a/pkg/ipfs/mocks/publisher.go +++ b/pkg/ipfs/mocks/publisher.go @@ -18,6 +18,7 @@ package mocks import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index b5db48a5..b63e8621 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -22,8 +22,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" - "github.com/ipfs/go-ipfs/plugin/loader" - "github.com/vulcanize/eth-block-extractor/pkg/ipfs" "github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_header" "github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_receipts" @@ -49,18 +47,6 @@ type Publisher struct { // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { - l, err := loader.NewPluginLoader("") - if err != nil { - return nil, err - } - err = l.Initialize() - if err != nil { - return nil, err - } - err = l.Inject() - if err != nil { - return nil, err - } node, err := ipfs.InitIPFSNode(ipfsPath) if err != nil { return nil, err @@ -77,47 +63,47 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { // Process and publish headers - headerCid, err := pub.publishHeaders(payload.HeaderRLP) - if err != nil { - return nil, err + headerCid, headersErr := pub.publishHeaders(payload.HeaderRLP) + if headersErr != nil { + return nil, headersErr } // Process and publish uncles uncleCids := make(map[common.Hash]string) for _, uncle := range payload.BlockBody.Uncles { - uncleRlp, err := rlp.EncodeToBytes(uncle) - if err != nil { - return nil, err + uncleRlp, encodeErr := rlp.EncodeToBytes(uncle) + if encodeErr != nil { + return nil, encodeErr } - cid, err := pub.publishHeaders(uncleRlp) - if err != nil { - return nil, err + cid, unclesErr := pub.publishHeaders(uncleRlp) + if unclesErr != nil { + return nil, unclesErr } uncleCids[uncle.Hash()] = cid } // Process and publish transactions - transactionCids, err := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData) - if err != nil { - return nil, err + transactionCids, trxsErr := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData) + if trxsErr != nil { + return nil, trxsErr } // Process and publish receipts - receiptsCids, err := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData) - if err != nil { - return nil, err + receiptsCids, rctsErr := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData) + if rctsErr != nil { + return nil, rctsErr } // Process and publish state leafs - stateNodeCids, err := pub.publishStateNodes(payload.StateNodes) - if err != nil { - return nil, err + stateNodeCids, stateErr := pub.publishStateNodes(payload.StateNodes) + if stateErr != nil { + return nil, stateErr } // Process and publish storage leafs - storageNodeCids, err := pub.publishStorageNodes(payload.StorageNodes) - if err != nil { - return nil, err + storageNodeCids, storageErr := pub.publishStorageNodes(payload.StorageNodes) + if storageErr != nil { + return nil, storageErr } // Package CIDs and their metadata into a single struct diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index 0178edc7..6ee25af6 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -24,7 +24,7 @@ import ( // IPLDResolver is the interface to resolving IPLDs type IPLDResolver interface { - ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) + ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload } // EthIPLDResolver is the underlying struct to support the IPLDResolver interface @@ -36,7 +36,7 @@ func NewIPLDResolver() *EthIPLDResolver { } // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper -func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) { +func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload { response := &streamer.SuperNodePayload{ BlockNumber: ipfsBlocks.BlockNumber, StateNodesRlp: make(map[common.Hash][]byte), @@ -48,7 +48,7 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.Super eir.resolveReceipts(ipfsBlocks.Receipts, response) eir.resolveState(ipfsBlocks.StateNodes, response) eir.resolveStorage(ipfsBlocks.StorageNodes, response) - return *response, nil + return *response } func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) { diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 324a2cc7..ab60a9a2 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -52,15 +52,19 @@ type BackFillService struct { } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(ipfsPublisher ipfs.IPLDPublisher, db *postgres.DB, archivalNodeRpcClient core.RpcClient, freq time.Duration) BackFillInterface { +func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RpcClient, freq time.Duration) (BackFillInterface, error) { + publisher, err := ipfs.NewIPLDPublisher(ipfsPath) + if err != nil { + return nil, err + } return &BackFillService{ Repository: NewCIDRepository(db), Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), - Publisher: ipfsPublisher, + Publisher: publisher, Retriever: NewCIDRetriever(db), - StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRpcClient), + StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRPCClient), GapCheckFrequency: freq, - } + }, nil } // FillGaps periodically checks for and fills in gaps in the super node db @@ -103,6 +107,7 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { } } }() + log.Info("fillGaps goroutine successfully spun up") } func (bfs *BackFillService) fillGaps(gap [2]int64) { diff --git a/pkg/super_node/filterer.go b/pkg/super_node/filterer.go index e63c1880..7d9c105d 100644 --- a/pkg/super_node/filterer.go +++ b/pkg/super_node/filterer.go @@ -44,25 +44,25 @@ func NewResponseFilterer() *Filterer { // FilterResponse is used to filter through eth data to extract and package requested data into a Payload func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) { response := new(streamer.SuperNodePayload) - err := s.filterHeaders(streamFilters, response, payload) - if err != nil { - return streamer.SuperNodePayload{}, err + headersErr := s.filterHeaders(streamFilters, response, payload) + if headersErr != nil { + return streamer.SuperNodePayload{}, headersErr } - txHashes, err := s.filterTransactions(streamFilters, response, payload) - if err != nil { - return streamer.SuperNodePayload{}, err + txHashes, trxsErr := s.filterTransactions(streamFilters, response, payload) + if trxsErr != nil { + return streamer.SuperNodePayload{}, trxsErr } - err = s.filerReceipts(streamFilters, response, payload, txHashes) - if err != nil { - return streamer.SuperNodePayload{}, err + rctsErr := s.filerReceipts(streamFilters, response, payload, txHashes) + if rctsErr != nil { + return streamer.SuperNodePayload{}, rctsErr } - err = s.filterState(streamFilters, response, payload) - if err != nil { - return streamer.SuperNodePayload{}, err + stateErr := s.filterState(streamFilters, response, payload) + if stateErr != nil { + return streamer.SuperNodePayload{}, stateErr } - err = s.filterStorage(streamFilters, response, payload) - if err != nil { - return streamer.SuperNodePayload{}, err + storageErr := s.filterStorage(streamFilters, response, payload) + if storageErr != nil { + return streamer.SuperNodePayload{}, storageErr } response.BlockNumber = payload.BlockNumber return *response, nil @@ -170,12 +170,12 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac if wantedContract == actualContract { if len(wantedTopics) == 0 { return true - } else { // Or if we have contracts and topics to filter on we only keep receipts that satisfy both conditions - for _, wantedTopic := range wantedTopics { - for _, actualTopic := range actualTopics { - if wantedTopic == actualTopic { - return true - } + } + // Or if we have contracts and topics to filter on we only keep receipts that satisfy both conditions + for _, wantedTopic := range wantedTopics { + for _, actualTopic := range actualTopics { + if wantedTopic == actualTopic { + return true } } } diff --git a/pkg/super_node/repository.go b/pkg/super_node/repository.go index ea4fecf6..48366362 100644 --- a/pkg/super_node/repository.go +++ b/pkg/super_node/repository.go @@ -44,43 +44,43 @@ func NewCIDRepository(db *postgres.DB) *Repository { // Index indexes a cidPayload in Postgres func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error { - tx, err := repo.db.Beginx() - if err != nil { - return err + tx, beginErr := repo.db.Beginx() + if beginErr != nil { + return beginErr } - headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) - if err != nil { + headerID, headerErr := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) + if headerErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } - return err + return headerErr } for uncleHash, cid := range cidPayload.UncleCIDs { - err = repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex()) - if err != nil { + uncleErr := repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex()) + if uncleErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } - return err + return uncleErr } } - err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) - if err != nil { + trxAndRctErr := repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) + if trxAndRctErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } - return err + return trxAndRctErr } - err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) - if err != nil { + stateAndStorageErr := repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) + if stateAndStorageErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } - return err + return stateAndStorageErr } return tx.Commit() } @@ -104,18 +104,18 @@ func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for hash, trxCidMeta := range payload.TransactionCIDs { var txID int64 - err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) + queryErr := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5) RETURNING id`, headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID) - if err != nil { - return err + if queryErr != nil { + return queryErr } receiptCidMeta, ok := payload.ReceiptCIDs[hash] if ok { - err = repo.indexReceiptCID(tx, receiptCidMeta, txID) - if err != nil { - return err + rctErr := repo.indexReceiptCID(tx, receiptCidMeta, txID) + if rctErr != nil { + return rctErr } } } @@ -131,17 +131,17 @@ func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ipfs.ReceiptMetaDa func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for accountKey, stateCID := range payload.StateNodeCIDs { var stateID int64 - err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) + queryErr := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) ON CONFLICT (header_id, state_key) DO UPDATE SET (cid, leaf) = ($3, $4) RETURNING id`, headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID) - if err != nil { - return err + if queryErr != nil { + return queryErr } for _, storageCID := range payload.StorageNodeCIDs[accountKey] { - err = repo.indexStorageCID(tx, storageCID, stateID) - if err != nil { - return err + storageErr := repo.indexStorageCID(tx, storageCID, stateID) + if storageErr != nil { + return storageErr } } } diff --git a/pkg/super_node/retriever.go b/pkg/super_node/retriever.go index 36dfef47..d17dd835 100644 --- a/pkg/super_node/retriever.go +++ b/pkg/super_node/retriever.go @@ -48,13 +48,14 @@ func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever { } } +// RetrieveFirstBlockNumber is used to retrieve the first block number in the db func (ecr *EthCIDRetriever) RetrieveFirstBlockNumber() (int64, error) { var blockNumber int64 err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number ASC LIMIT 1") return blockNumber, err } -// GetLastBlockNumber is used to retrieve the latest block number in the cache +// RetrieveLastBlockNumber is used to retrieve the latest block number in the db func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) { var blockNumber int64 err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ") @@ -64,10 +65,9 @@ func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) { // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) { log.Debug("retrieving cids") - var err error - tx, err := ecr.db.Beginx() - if err != nil { - return nil, err + tx, beginErr := ecr.db.Beginx() + if beginErr != nil { + return nil, beginErr } // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO @@ -76,24 +76,26 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc // Retrieve cached header CIDs if !streamFilters.HeaderFilter.Off { - cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, blockNumber) - if err != nil { + var headersErr error + cw.Headers, headersErr = ecr.retrieveHeaderCIDs(tx, streamFilters, blockNumber) + if headersErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("header cid retrieval error") - return nil, err + return nil, headersErr } if !streamFilters.HeaderFilter.FinalOnly { - cw.Uncles, err = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber) - if err != nil { + var unclesErr error + cw.Uncles, unclesErr = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber) + if unclesErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("uncle cid retrieval error") - return nil, err + return nil, unclesErr } } } @@ -101,53 +103,57 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc // Retrieve cached trx CIDs var trxIds []int64 if !streamFilters.TrxFilter.Off { - cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, blockNumber) - if err != nil { + var trxsErr error + cw.Transactions, trxIds, trxsErr = ecr.retrieveTrxCIDs(tx, streamFilters, blockNumber) + if trxsErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("transaction cid retrieval error") - return nil, err + return nil, trxsErr } } // Retrieve cached receipt CIDs if !streamFilters.ReceiptFilter.Off { - cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, blockNumber, trxIds) - if err != nil { + var rctsErr error + cw.Receipts, rctsErr = ecr.retrieveRctCIDs(tx, streamFilters, blockNumber, trxIds) + if rctsErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("receipt cid retrieval error") - return nil, err + return nil, rctsErr } } // Retrieve cached state CIDs if !streamFilters.StateFilter.Off { - cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, blockNumber) - if err != nil { + var stateErr error + cw.StateNodes, stateErr = ecr.retrieveStateCIDs(tx, streamFilters, blockNumber) + if stateErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("state cid retrieval error") - return nil, err + return nil, stateErr } } // Retrieve cached storage CIDs if !streamFilters.StorageFilter.Off { - cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, blockNumber) - if err != nil { + var storageErr error + cw.StorageNodes, storageErr = ecr.retrieveStorageCIDs(tx, streamFilters, blockNumber) + if storageErr != nil { rollbackErr := tx.Rollback() if rollbackErr != nil { log.Error(rollbackErr) } log.Error("storage cid retrieval error") - return nil, err + return nil, storageErr } } @@ -310,6 +316,7 @@ type gap struct { Stop int64 `db:"stop"` } +// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM header_cids LEFT JOIN header_cids r on header_cids.block_number = r.block_number - 1 diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 28feb405..8aada6ee 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -49,16 +49,13 @@ type NodeInterface interface { // Main event loop for syncAndPublish processes SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- ipfs.IPLDPayload, forwardQuitchan chan<- bool) error // Main event loop for handling client pub-sub - ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) + ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) // Method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) // Method to access the Geth node info for this service Node() core.Node - // Method used to retrieve the underlying IPFS publisher for this service, so that is can be used for backfilling - // This is needed because it's not possible to initialize two ipfs nodes at the same path - GetPublisher() ipfs.IPLDPublisher } // Service is the underlying struct for the super node @@ -92,18 +89,22 @@ type Service struct { // Number of workers WorkerPoolSize int // Info for the Geth node that this super node is working with - gethNode core.Node + GethNode core.Node } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { - publisher, err := ipfs.NewIPLDPublisher(ipfsPath) - if err != nil { - return nil, err + ipfsInitErr := ipfs.InitIPFSPlugins() + if ipfsInitErr != nil { + return nil, ipfsInitErr } - ipldFetcher, err := ipfs.NewIPLDFetcher(ipfsPath) - if err != nil { - return nil, err + publisher, newPublisherErr := ipfs.NewIPLDPublisher(ipfsPath) + if newPublisherErr != nil { + return nil, newPublisherErr + } + ipldFetcher, newFetcherErr := ipfs.NewIPLDFetcher(ipfsPath) + if newFetcherErr != nil { + return nil, newFetcherErr } return &Service{ Streamer: streamer.NewStateDiffStreamer(rpcClient), @@ -119,7 +120,7 @@ func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]config.Subscription), WorkerPoolSize: workers, - gethNode: node, + GethNode: node, }, nil } @@ -144,9 +145,9 @@ func (sap *Service) APIs() []rpc.API { // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // which filters and sends relevant data to client subscriptions, if there are any func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- ipfs.IPLDPayload, screenAndServeQuit chan<- bool) error { - sub, err := sap.Streamer.Stream(sap.PayloadChan) - if err != nil { - return err + sub, streamErr := sap.Streamer.Stream(sap.PayloadChan) + if streamErr != nil { + return streamErr } wg.Add(1) @@ -158,14 +159,13 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha for i := 0; i < sap.WorkerPoolSize; i++ { sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit) } - go func() { for { select { case payload := <-sap.PayloadChan: - ipldPayload, err := sap.Converter.Convert(payload) - if err != nil { - log.Error(err) + ipldPayload, convertErr := sap.Converter.Convert(payload) + if convertErr != nil { + log.Error(convertErr) continue } // If we have a ScreenAndServe process running, forward the payload to it @@ -178,8 +178,8 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha case publishAndIndexPayload <- *ipldPayload: default: } - case err = <-sub.Err(): - log.Error(err) + case subErr := <-sub.Err(): + log.Error(subErr) case <-sap.QuitChan: // If we have a ScreenAndServe process running, forward the quit signal to it select { @@ -199,7 +199,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha } } }() - + log.Info("syncAndPublish goroutine successfully spun up") return nil } @@ -208,14 +208,14 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan ipfs.I for { select { case payload := <-publishAndIndexPayload: - cidPayload, err := sap.Publisher.Publish(&payload) - if err != nil { - log.Errorf("worker %d error: %v", id, err) + cidPayload, publishErr := sap.Publisher.Publish(&payload) + if publishErr != nil { + log.Errorf("worker %d error: %v", id, publishErr) continue } - err = sap.Repository.Index(cidPayload) - if err != nil { - log.Errorf("worker %d error: %v", id, err) + indexErr := sap.Repository.Index(cidPayload) + if indexErr != nil { + log.Errorf("worker %d error: %v", id, indexErr) } case <-publishAndIndexQuit: log.Infof("quiting publishAndIndex worker %d", id) @@ -223,25 +223,29 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan ipfs.I } } }() + log.Info("publishAndIndex goroutine successfully spun up") } // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node // and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) { +func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) { + wg.Add(1) go func() { for { select { case payload := <-screenAndServePayload: - err := sap.sendResponse(payload) - if err != nil { - log.Error(err) + sendErr := sap.sendResponse(payload) + if sendErr != nil { + log.Error(sendErr) } case <-screenAndServeQuit: log.Info("quiting ScreenAndServe process") + wg.Done() return } } }() + log.Info("screenAndServe goroutine successfully spun up") } func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error { @@ -253,9 +257,9 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error { log.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) continue } - response, err := sap.Filterer.FilterResponse(subConfig, payload) - if err != nil { - log.Error(err) + response, filterErr := sap.Filterer.FilterResponse(subConfig, payload) + if filterErr != nil { + log.Error(filterErr) continue } for id, sub := range subs { @@ -276,9 +280,9 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, q log.Info("Subscribing to the super node service") // Subscription type is defined as the hash of its content // Group subscriptions by type and screen payloads once for subs of the same type - by, err := rlp.EncodeToBytes(streamFilters) - if err != nil { - log.Error(err) + by, encodeErr := rlp.EncodeToBytes(streamFilters) + if encodeErr != nil { + log.Error(encodeErr) } subscriptionHash := crypto.Keccak256(by) subscriptionType := common.BytesToHash(subscriptionHash) @@ -307,20 +311,21 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio // Retrieve cached CIDs relevant to this subscriber var endingBlock int64 var startingBlock int64 - var err error - startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() - if err != nil { + var retrieveFirstBlockErr error + var retrieveLastBlockErr error + startingBlock, retrieveFirstBlockErr = sap.Retriever.RetrieveFirstBlockNumber() + if retrieveFirstBlockErr != nil { sub.PayloadChan <- streamer.SuperNodePayload{ - ErrMsg: "unable to set block range start; error: " + err.Error(), + ErrMsg: "unable to set block range start; error: " + retrieveFirstBlockErr.Error(), } } if startingBlock < con.StartingBlock.Int64() { startingBlock = con.StartingBlock.Int64() } - endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() - if err != nil { + endingBlock, retrieveLastBlockErr = sap.Retriever.RetrieveLastBlockNumber() + if retrieveLastBlockErr != nil { sub.PayloadChan <- streamer.SuperNodePayload{ - ErrMsg: "unable to set block range end; error: " + err.Error(), + ErrMsg: "unable to set block range end; error: " + retrieveLastBlockErr.Error(), } } if endingBlock > con.EndingBlock.Int64() && con.EndingBlock.Int64() > 0 && con.EndingBlock.Int64() > startingBlock { @@ -333,32 +338,25 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio // TODO: separate backfill into a different rpc subscription method altogether? go func() { for i := startingBlock; i <= endingBlock; i++ { - cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i) - if err != nil { + cidWrapper, retrieveCIDsErr := sap.Retriever.RetrieveCIDs(con, i) + if retrieveCIDsErr != nil { sub.PayloadChan <- streamer.SuperNodePayload{ - ErrMsg: "CID retrieval error: " + err.Error(), + ErrMsg: "CID retrieval error: " + retrieveCIDsErr.Error(), } continue } if ipfs.EmptyCIDWrapper(*cidWrapper) { continue } - blocksWrapper, err := sap.IPLDFetcher.FetchCIDs(*cidWrapper) - if err != nil { - log.Error(err) + blocksWrapper, fetchIPLDsErr := sap.IPLDFetcher.FetchIPLDs(*cidWrapper) + if fetchIPLDsErr != nil { + log.Error(fetchIPLDsErr) sub.PayloadChan <- streamer.SuperNodePayload{ - ErrMsg: "IPLD fetching error: " + err.Error(), - } - continue - } - backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) - if err != nil { - log.Error(err) - sub.PayloadChan <- streamer.SuperNodePayload{ - ErrMsg: "IPLD resolving error: " + err.Error(), + ErrMsg: "IPLD fetching error: " + fetchIPLDsErr.Error(), } continue } + backFillIplds := sap.Resolver.ResolveIPLDs(*blocksWrapper) select { case sub.PayloadChan <- backFillIplds: log.Infof("sending super node back-fill payload to subscription %s", id) @@ -393,7 +391,7 @@ func (sap *Service) Start(*p2p.Server) error { if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { return err } - sap.ScreenAndServe(payloadChan, quitChan) + sap.ScreenAndServe(wg, payloadChan, quitChan) return nil } @@ -406,7 +404,7 @@ func (sap *Service) Stop() error { // Node returns the Geth node info for this service func (sap *Service) Node() core.Node { - return sap.gethNode + return sap.GethNode } // close is used to close all listening subscriptions @@ -426,7 +424,3 @@ func (sap *Service) close() { } sap.Unlock() } - -func (sap *Service) GetPublisher() ipfs.IPLDPublisher { - return sap.Publisher -} diff --git a/pkg/super_node/test_helpers.go b/pkg/super_node/test_helpers.go index db5edd5d..81afe96f 100644 --- a/pkg/super_node/test_helpers.go +++ b/pkg/super_node/test_helpers.go @@ -26,6 +26,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +// SetupDB is use to setup a db for super node tests func SetupDB() (*postgres.DB, error) { return postgres.NewDB(config.Database{ Hostname: "localhost", @@ -34,6 +35,7 @@ func SetupDB() (*postgres.DB, error) { }, core.Node{}) } +// TearDownDB is used to tear down the super node dbs after tests func TearDownDB(db *postgres.DB) { tx, err := db.Beginx() Expect(err).NotTo(HaveOccurred()) @@ -55,6 +57,7 @@ func TearDownDB(db *postgres.DB) { Expect(err).NotTo(HaveOccurred()) } +// ListContainsString used to check if a list of strings contains a particular string func ListContainsString(sss []string, s string) bool { for _, str := range sss { if s == str { @@ -64,6 +67,7 @@ func ListContainsString(sss []string, s string) bool { return false } +// ListContainsBytes used to check if a list of byte arrays contains a particular byte array func ListContainsBytes(bbb [][]byte, b []byte) bool { for _, by := range bbb { if bytes.Equal(by, b) {