diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index 1ea3a718..ff49c393 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -56,7 +56,7 @@ func init() { } func syncAndPublish() { - blockChain, ethClient, rpcClient := getBlockChainAndClients() + blockChain, rpcClient := getBlockChainAndClient() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) quitChan := make(chan bool) @@ -69,7 +69,7 @@ func syncAndPublish() { } ipfsPath = filepath.Join(home, ".ipfs") } - processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) + processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, 1) if err != nil { log.Fatal(err) } @@ -82,7 +82,7 @@ func syncAndPublish() { wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through } -func getBlockChainAndClients() (*geth.BlockChain, core.EthClient, core.RpcClient) { +func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) { rawRpcClient, err := rpc.Dial(ipc) if err != nil { @@ -94,5 +94,5 @@ func getBlockChainAndClients() (*geth.BlockChain, core.EthClient, core.RpcClient vdbNode := node.MakeNode(rpcClient) transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) - return blockChain, ethClient, rpcClient + return blockChain, rpcClient } diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 63182caa..d8dea81f 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -50,7 +50,7 @@ func init() { } func syncPublishScreenAndServe() { - blockChain, ethClient, rpcClient := getBlockChainAndClients() + blockChain, rpcClient := getBlockChainAndClient() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) quitChan := make(chan bool, 1) @@ -63,7 +63,7 @@ func syncPublishScreenAndServe() { } ipfsPath = filepath.Join(home, ".ipfs") } - processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) + processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, 1) if err != nil { log.Fatal(err) } diff --git a/dockerfiles/seed_node/Dockerfile b/dockerfiles/seed_node/Dockerfile index 34e60ea5..6c177983 100644 --- a/dockerfiles/seed_node/Dockerfile +++ b/dockerfiles/seed_node/Dockerfile @@ -1,14 +1,15 @@ FROM golang:alpine as builder -RUN apk --update --no-cache add dep make git g++ linux-headers +ENV GO111MODULE=on + +RUN apk --update --no-cache add make git g++ linux-headers # DEBUG RUN apk add busybox-extras -# Get and build vulcanizedb syncAndPublish fork +# Get and build vulcanizedb ipfs_concurreny fork RUN go get -u -d github.com/vulcanize/vulcanizedb WORKDIR /go/src/github.com/vulcanize/vulcanizedb RUN git checkout ipfs_concurrency -RUN dep ensure RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o vulcanizedb . # Get and build vulcanize's go-ipfs fork @@ -17,7 +18,6 @@ WORKDIR /go/src/github.com/ipfs/go-ipfs RUN git remote add vulcanize https://github.com/vulcanize/go-ipfs.git RUN git fetch vulcanize RUN git checkout -b pg_ipfs vulcanize/postgres_update -RUN dep ensure RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipfs ./cmd/ipfs # Get and build vulcanize's geth fork diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index 5a65e893..bfaaa1ac 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -63,32 +63,26 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) { err := f.fetchHeaders(cids, blocks) if err != nil { - println(1) return nil, err } err = f.fetchUncles(cids, blocks) if err != nil { - println(2) return nil, err } err = f.fetchTrxs(cids, blocks) if err != nil { - println(3) return nil, err } err = f.fetchRcts(cids, blocks) if err != nil { - println(4) return nil, err } err = f.fetchStorage(cids, blocks) if err != nil { - println(5) return nil, err } err = f.fetchState(cids, blocks) if err != nil { - println(6) return nil, err } diff --git a/pkg/ipfs/fetcher_test.go b/pkg/ipfs/fetcher_test.go index e582630d..1aab413b 100644 --- a/pkg/ipfs/fetcher_test.go +++ b/pkg/ipfs/fetcher_test.go @@ -29,7 +29,6 @@ import ( ) var ( - // these need to be actual typed objects so that the cid.Decode works mockHeaderData = []byte{0, 1, 2, 3, 4} mockUncleData = []byte{1, 2, 3, 4, 5} mockTrxData = []byte{2, 3, 4, 5, 6} diff --git a/pkg/ipfs/mocks/dag_putters.go b/pkg/ipfs/mocks/dag_putters.go new file mode 100644 index 00000000..1e5b26ae --- /dev/null +++ b/pkg/ipfs/mocks/dag_putters.go @@ -0,0 +1,47 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import "errors" + +// DagPutter is a mock for testing the ipfs publisher +type DagPutter struct { + CIDsToReturn []string + ErrToReturn error +} + +// DagPut returns the pre-loaded CIDs or error +func (dp *DagPutter) DagPut(raw interface{}) ([]string, error) { + return dp.CIDsToReturn, dp.ErrToReturn +} + +// IncrementingDagPutter is a mock for testing the ipfs publisher +type IncrementingDagPutter struct { + CIDsToReturn []string + iterator int + ErrToReturn error +} + +// DagPut returns the pre-loaded CIDs or error +func (dp *IncrementingDagPutter) DagPut(raw interface{}) ([]string, error) { + if len(dp.CIDsToReturn) >= dp.iterator+1 { + cid := dp.CIDsToReturn[dp.iterator] + dp.iterator++ + return []string{cid}, dp.ErrToReturn + } + return nil, errors.New("dag putter iterator is out of range") +} diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go index 15192e72..bbc11c85 100644 --- a/pkg/ipfs/mocks/test_data.go +++ b/pkg/ipfs/mocks/test_data.go @@ -183,67 +183,92 @@ var ( ReceiptsRlp: []byte{}, } - MockIPLDPayload = ipfs.IPLDPayload{ - BlockNumber: big.NewInt(1), - BlockHash: MockBlock.Hash(), - Receipts: MockReceipts, - HeaderRLP: MockHeaderRlp, - BlockBody: MockBlock.Body(), - TrxMetaData: MockTrxMeta, - ReceiptMetaData: MockRctMeta, - StorageNodes: MockStorageNodes, - StateNodes: MockStateNodes, + MockIPLDPayload = &ipfs.IPLDPayload{ + BlockNumber: big.NewInt(1), + BlockHash: MockBlock.Hash(), + Receipts: MockReceipts, + HeaderRLP: MockHeaderRlp, + BlockBody: MockBlock.Body(), + TrxMetaData: []*ipfs.TrxMetaData{ + { + CID: "", + Src: senderAddr.Hex(), + Dst: "0x0000000000000000000000000000000000000000", + }, + { + CID: "", + Src: senderAddr.Hex(), + Dst: "0x0000000000000000000000000000000000000001", + }, + }, + ReceiptMetaData: []*ipfs.ReceiptMetaData{ + { + CID: "", + Topic0s: []string{ + "0x0000000000000000000000000000000000000000000000000000000000000004", + }, + ContractAddress: "0x0000000000000000000000000000000000000000", + }, + { + CID: "", + Topic0s: []string{ + "0x0000000000000000000000000000000000000000000000000000000000000005", + }, + ContractAddress: "0x0000000000000000000000000000000000000001", + }, + }, + StorageNodes: MockStorageNodes, + StateNodes: MockStateNodes, } - MockCIDPayload = ipfs.CIDPayload{ + MockCIDPayload = &ipfs.CIDPayload{ BlockNumber: "1", - BlockHash: common.HexToHash("0x0"), + BlockHash: MockBlock.Hash(), HeaderCID: "mockHeaderCID", + UncleCIDS: make(map[common.Hash]string), TransactionCIDs: map[common.Hash]*ipfs.TrxMetaData{ - common.HexToHash("0x0"): { + MockTransactions[0].Hash(): { CID: "mockTrxCID1", - Dst: "mockTo1", - Src: "mockFrom1", + Dst: "0x0000000000000000000000000000000000000000", + Src: senderAddr.Hex(), }, - common.HexToHash("0x1"): { + MockTransactions[1].Hash(): { CID: "mockTrxCID2", - Dst: "mockTo2", - Src: "mockFrom2", + Dst: "0x0000000000000000000000000000000000000001", + Src: senderAddr.Hex(), }, }, ReceiptCIDs: map[common.Hash]*ipfs.ReceiptMetaData{ - common.HexToHash("0x0"): { - CID: "mockReceiptCID1", - Topic0s: []string{"mockTopic1"}, + MockTransactions[0].Hash(): { + CID: "mockRctCID1", + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004"}, + ContractAddress: "0x0000000000000000000000000000000000000000", }, - common.HexToHash("0x1"): { - CID: "mockReceiptCID2", - Topic0s: []string{"mockTopic1", "mockTopic2"}, + MockTransactions[1].Hash(): { + CID: "mockRctCID2", + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000005"}, + ContractAddress: "0x0000000000000000000000000000000000000001", }, }, StateNodeCIDs: map[common.Hash]ipfs.StateNodeCID{ - common.HexToHash("0x0"): { + ContractLeafKey: { CID: "mockStateCID1", Leaf: true, + Key: "", }, - common.HexToHash("0x1"): { + AnotherContractLeafKey: { CID: "mockStateCID2", Leaf: true, + Key: "", }, }, StorageNodeCIDs: map[common.Hash][]ipfs.StorageNodeCID{ - common.HexToHash("0x0"): { + ContractLeafKey: { { - CID: "mockStorageCID1", - Key: "0x0", - Leaf: true, - }, - }, - common.HexToHash("0x1"): { - { - CID: "mockStorageCID2", - Key: "0x1", - Leaf: true, + CID: "mockStorageCID", + Key: "0x0000000000000000000000000000000000000000000000000000000000000001", + Leaf: true, + StateKey: "", }, }, }, @@ -281,7 +306,7 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common Topics: []common.Hash{mockTopic1}, } mockReceipt1.Logs = []*types.Log{mockLog1} - mockReceipt1.TxHash = trx1.Hash() + mockReceipt1.TxHash = signedTrx1.Hash() mockTopic2 := common.HexToHash("0x05") mockReceipt2 := types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) mockReceipt2.ContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") @@ -289,6 +314,6 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common Topics: []common.Hash{mockTopic2}, } mockReceipt2.Logs = []*types.Log{mockLog2} - mockReceipt2.TxHash = trx2.Hash() + mockReceipt2.TxHash = signedTrx2.Hash() return types.Transactions{signedTrx1, signedTrx2}, types.Receipts{mockReceipt1, mockReceipt2}, senderAddr } diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 2e98327e..2746c2ea 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -40,11 +40,11 @@ type IPLDPublisher interface { // Publisher is the underlying struct for the IPLDPublisher interface type Publisher struct { - HeaderPutter *eth_block_header.BlockHeaderDagPutter - TransactionPutter *eth_block_transactions.BlockTransactionsDagPutter - ReceiptPutter *eth_block_receipts.EthBlockReceiptDagPutter - StatePutter *eth_state_trie.StateTrieDagPutter - StoragePutter *eth_storage_trie.StorageTrieDagPutter + HeaderPutter ipfs.DagPutter + TransactionPutter ipfs.DagPutter + ReceiptPutter ipfs.DagPutter + StatePutter ipfs.DagPutter + StoragePutter ipfs.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go new file mode 100644 index 00000000..a9f03183 --- /dev/null +++ b/pkg/ipfs/publisher_test.go @@ -0,0 +1,62 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" +) + +var ( + mockHeaderDagPutter *mocks.DagPutter + mockTrxDagPutter *mocks.DagPutter + mockRctDagPutter *mocks.DagPutter + mockStateDagPutter *mocks.IncrementingDagPutter + mockStorageDagPutter *mocks.IncrementingDagPutter +) + +var _ = Describe("Publisher", func() { + BeforeEach(func() { + mockHeaderDagPutter = new(mocks.DagPutter) + mockTrxDagPutter = new(mocks.DagPutter) + mockRctDagPutter = new(mocks.DagPutter) + mockStateDagPutter = new(mocks.IncrementingDagPutter) + mockStorageDagPutter = new(mocks.IncrementingDagPutter) + }) + Describe("Publish", func() { + It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { + mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} + mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2"} + mockRctDagPutter.CIDsToReturn = []string{"mockRctCID1", "mockRctCID2"} + mockStateDagPutter.CIDsToReturn = []string{"mockStateCID1", "mockStateCID2"} + mockStorageDagPutter.CIDsToReturn = []string{"mockStorageCID"} + publisher := ipfs.Publisher{ + HeaderPutter: mockHeaderDagPutter, + TransactionPutter: mockTrxDagPutter, + ReceiptPutter: mockRctDagPutter, + StatePutter: mockStateDagPutter, + StoragePutter: mockStorageDagPutter, + } + cidPayload, err := publisher.Publish(mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(*cidPayload).To(Equal(*mocks.MockCIDPayload)) + }) + }) +}) diff --git a/pkg/seed_node/api.go b/pkg/seed_node/api.go index 88692e7b..0b17dfd9 100644 --- a/pkg/seed_node/api.go +++ b/pkg/seed_node/api.go @@ -34,13 +34,13 @@ const APIVersion = "0.0.1" // PublicSeedNodeAPI is the public api for the seed node type PublicSeedNodeAPI struct { - snp Processor + sni NodeInterface } // NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process -func NewPublicSeedNodeAPI(seedNodeProcessor Processor) *PublicSeedNodeAPI { +func NewPublicSeedNodeAPI(seedNodeInterface NodeInterface) *PublicSeedNodeAPI { return &PublicSeedNodeAPI{ - snp: seedNodeProcessor, + sni: seedNodeInterface, } } @@ -59,7 +59,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S // subscribe to events from the SyncPublishScreenAndServe service payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize) quitChan := make(chan bool, 1) - go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) + go api.sni.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) // loop and await state diff payloads and relay them to the subscriber with then notifier for { @@ -67,11 +67,11 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S case packet := <-payloadChannel: if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { log.Error("Failed to send state diff packet", "err", notifyErr) - api.snp.Unsubscribe(rpcSub.ID) + api.sni.Unsubscribe(rpcSub.ID) return } case <-rpcSub.Err(): - api.snp.Unsubscribe(rpcSub.ID) + api.sni.Unsubscribe(rpcSub.ID) return case <-quitChan: // don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal diff --git a/pkg/seed_node/filterer.go b/pkg/seed_node/filterer.go index ac641b88..ce8e006d 100644 --- a/pkg/seed_node/filterer.go +++ b/pkg/seed_node/filterer.go @@ -24,8 +24,8 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) // ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload diff --git a/pkg/seed_node/repository.go b/pkg/seed_node/repository.go index b5ab720c..1e522d4d 100644 --- a/pkg/seed_node/repository.go +++ b/pkg/seed_node/repository.go @@ -20,8 +20,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) // CIDRepository is an interface for indexing ipfs.CIDPayloads diff --git a/pkg/seed_node/retreiver.go b/pkg/seed_node/retreiver.go index e44860f5..0fcc4c3d 100644 --- a/pkg/seed_node/retreiver.go +++ b/pkg/seed_node/retreiver.go @@ -23,9 +23,9 @@ import ( "github.com/lib/pq" log "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) // CIDRetriever is the interface for retrieving CIDs from the Postgres cache diff --git a/pkg/seed_node/seed_node_suite_test.go b/pkg/seed_node/seed_node_suite_test.go index 81b3f38b..533d3664 100644 --- a/pkg/seed_node/seed_node_suite_test.go +++ b/pkg/seed_node/seed_node_suite_test.go @@ -20,9 +20,9 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" ) func TestSeedNode(t *testing.T) { diff --git a/pkg/seed_node/service.go b/pkg/seed_node/service.go index aa6abd4e..b7faec3f 100644 --- a/pkg/seed_node/service.go +++ b/pkg/seed_node/service.go @@ -19,30 +19,29 @@ package seed_node import ( "sync" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) const payloadChanBufferSize = 20000 // the max eth sub buffer size -const workerPoolSize = 1 -// Processor is the top level interface for streaming, converting to IPLDs, publishing, +// NodeInterface is the top level interface for streaming, converting to IPLDs, publishing, // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients // This service is compatible with the Ethereum service interface (node.Service) -type Processor interface { +type NodeInterface interface { // APIs(), Protocols(), Start() and Stop() node.Service // Main event loop for syncAndPublish processes @@ -83,10 +82,12 @@ type Service struct { Subscriptions map[common.Hash]map[rpc.ID]Subscription // A mapping of subscription hash type to the corresponding StreamFilters SubscriptionTypes map[common.Hash]config.Subscription + // Number of workers + WorkerPoolSize int } -// NewProcessor creates a new Processor interface using an underlying Service struct -func NewProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (Processor, error) { +// NewSeedNode creates a new seed_node.Interface using an underlying seed_node.Service struct +func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int) (NodeInterface, error) { publisher, err := ipfs.NewIPLDPublisher(ipfsPath) if err != nil { return nil, err @@ -108,6 +109,7 @@ func NewProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rp QuitChan: qc, Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]config.Subscription), + WorkerPoolSize: workers, }, nil } @@ -140,10 +142,10 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha // Channels for forwarding data to the publishAndIndex workers publishAndIndexPayload := make(chan ipfs.IPLDPayload, payloadChanBufferSize) - publishAndIndexQuit := make(chan bool, workerPoolSize) + publishAndIndexQuit := make(chan bool, sap.WorkerPoolSize) // publishAndIndex worker pool to handle publishing and indexing concurrently, while // limiting the number of Postgres connections we can possibly open so as to prevent error - for i := 0; i < workerPoolSize; i++ { + for i := 0; i < sap.WorkerPoolSize; i++ { sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit) } @@ -175,7 +177,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha default: } // Also forward a quit signal for each of the workers - for i := 0; i < workerPoolSize; i++ { + for i := 0; i < sap.WorkerPoolSize; i++ { select { case publishAndIndexQuit <- true: default: diff --git a/pkg/seed_node/service_test.go b/pkg/seed_node/service_test.go index eef81a08..2008df6c 100644 --- a/pkg/seed_node/service_test.go +++ b/pkg/seed_node/service_test.go @@ -25,15 +25,15 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks" ) var _ = Describe("Service", func() { - Describe("Loop", func() { + Describe("SyncAndPublish", func() { It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { wg := new(sync.WaitGroup) payloadChan := make(chan statediff.Payload, 1) @@ -42,36 +42,37 @@ var _ = Describe("Service", func() { ReturnErr: nil, } mockPublisher := &mocks.IPLDPublisher{ - ReturnCIDPayload: &mocks.MockCIDPayload, + ReturnCIDPayload: mocks.MockCIDPayload, ReturnErr: nil, } mockStreamer := &mocks2.StateDiffStreamer{ ReturnSub: &rpc.ClientSubscription{}, StreamPayloads: []statediff.Payload{ - mocks.MockStatediffPayload, + mocks.MockStateDiffPayload, }, ReturnErr: nil, } mockConverter := &mocks.PayloadConverter{ - ReturnIPLDPayload: &mocks.MockIPLDPayload, + ReturnIPLDPayload: mocks.MockIPLDPayload, ReturnErr: nil, } processor := &seed_node.Service{ - Repository: mockCidRepo, - Publisher: mockPublisher, - Streamer: mockStreamer, - Converter: mockConverter, - PayloadChan: payloadChan, - QuitChan: quitChan, + Repository: mockCidRepo, + Publisher: mockPublisher, + Streamer: mockStreamer, + Converter: mockConverter, + PayloadChan: payloadChan, + QuitChan: quitChan, + WorkerPoolSize: 1, } err := processor.SyncAndPublish(wg, nil, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) quitChan <- true wg.Wait() - Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStatediffPayload)) - Expect(mockCidRepo.PassedCIDPayload).To(Equal(&mocks.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(&mocks.MockIPLDPayload)) + Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockCidRepo.PassedCIDPayload).To(Equal(mocks.MockCIDPayload)) + Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) }) })