diff --git a/integration_test/seed_node.go b/integration_test/seed_node.go new file mode 100644 index 00000000..76ab1b72 --- /dev/null +++ b/integration_test/seed_node.go @@ -0,0 +1 @@ +package integration diff --git a/libraries/shared/streamer/seed_node_streamer.go b/libraries/shared/streamer/seed_node_streamer.go index d491db64..8f33d007 100644 --- a/libraries/shared/streamer/seed_node_streamer.go +++ b/libraries/shared/streamer/seed_node_streamer.go @@ -28,7 +28,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -// ISeedNodeStreamer is the interface for streaming data from a vulcanizeDB seed node +// ISeedNodeStreamer is the interface for streaming SeedNodePayloads from a vulcanizeDB seed node type ISeedNodeStreamer interface { Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) } diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index 0296be86..454c5fd9 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -67,7 +67,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { StorageNodes: make(map[common.Hash][]StorageNode), } signer := types.MakeSigner(pc.chainConfig, block.Number()) - for _, trx := range block.Transactions() { + transactions := block.Transactions() + for _, trx := range transactions { // Extract to and from data from the the transactions for indexing from, err := types.Sender(signer, trx) if err != nil { @@ -87,7 +88,18 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { if err != nil { return nil, err } - for _, receipt := range receipts { + // Derive any missing fields + err = receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()) + if err != nil { + return nil, err + } + for i, receipt := range receipts { + // If the transaction for this receipt has a "to" address, the above DeriveFields() fails to assign it to the receipt's ContractAddress + // If it doesn't have a "to" address, it correctly derives it and assigns it to to the receipt's ContractAddress + // Weird, right? + if transactions[i].To() != nil { + receipt.ContractAddress = *transactions[i].To() + } // Extract topic0 data from the receipt's logs for indexing rctMeta := &ReceiptMetaData{ Topic0s: make([]string, 0, len(receipt.Logs)), @@ -116,7 +128,6 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { Value: createdAccount.Value, Leaf: createdAccount.Leaf, } - convertedPayload.StorageNodes[hashKey] = make([]StorageNode, 0) for _, storageDiff := range createdAccount.Storage { convertedPayload.StorageNodes[hashKey] = append(convertedPayload.StorageNodes[hashKey], StorageNode{ Key: common.BytesToHash(storageDiff.Key), @@ -131,7 +142,6 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { Value: deletedAccount.Value, Leaf: deletedAccount.Leaf, } - convertedPayload.StorageNodes[hashKey] = make([]StorageNode, 0) for _, storageDiff := range deletedAccount.Storage { convertedPayload.StorageNodes[hashKey] = append(convertedPayload.StorageNodes[hashKey], StorageNode{ Key: common.BytesToHash(storageDiff.Key), @@ -146,7 +156,6 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { Value: updatedAccount.Value, Leaf: updatedAccount.Leaf, } - convertedPayload.StorageNodes[hashKey] = make([]StorageNode, 0) for _, storageDiff := range updatedAccount.Storage { convertedPayload.StorageNodes[hashKey] = append(convertedPayload.StorageNodes[hashKey], StorageNode{ Key: common.BytesToHash(storageDiff.Key), diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go new file mode 100644 index 00000000..016244ec --- /dev/null +++ b/pkg/ipfs/converter_test.go @@ -0,0 +1,54 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs_test + +import ( + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" +) + +var _ = Describe("Converter", func() { + Describe("Convert", func() { + It("Converts mock statediff.Payloads into the expected IPLDPayloads", func() { + converter := ipfs.NewPayloadConverter(params.MainnetChainConfig) + converterPayload, err := converter.Convert(mocks.MockStateDiffPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(converterPayload.BlockNumber).To(Equal(mocks.BlockNumber)) + Expect(converterPayload.BlockHash).To(Equal(mocks.MockBlock.Hash())) + Expect(converterPayload.StateNodes).To(Equal(mocks.MockStateNodes)) + Expect(converterPayload.StorageNodes).To(Equal(mocks.MockStorageNodes)) + gotBody, err := rlp.EncodeToBytes(converterPayload.BlockBody) + Expect(err).ToNot(HaveOccurred()) + expectedBody, err := rlp.EncodeToBytes(mocks.MockBlock.Body()) + Expect(err).ToNot(HaveOccurred()) + Expect(gotBody).To(Equal(expectedBody)) + Expect(converterPayload.HeaderRLP).To(Equal(mocks.MockHeaderRlp)) + Expect(converterPayload.TrxMetaData).To(Equal(mocks.MockTrxMeta)) + Expect(converterPayload.ReceiptMetaData).To(Equal(mocks.MockRctMeta)) + }) + It(" Throws an error if the wrong chain config is used", func() { + converter := ipfs.NewPayloadConverter(params.TestnetChainConfig) + _, err := converter.Convert(mocks.MockStateDiffPayload) + Expect(err).To(HaveOccurred()) + }) + }) +}) diff --git a/pkg/ipfs/ipfs_suite_test.go b/pkg/ipfs/ipfs_suite_test.go new file mode 100644 index 00000000..f62834c1 --- /dev/null +++ b/pkg/ipfs/ipfs_suite_test.go @@ -0,0 +1,35 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs_test + +import ( + "io/ioutil" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +func TestIPFS(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "IPFS Suite Test") +} + +var _ = BeforeSuite(func() { + logrus.SetOutput(ioutil.Discard) +}) diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go index 564098e7..15192e72 100644 --- a/pkg/ipfs/mocks/test_data.go +++ b/pkg/ipfs/mocks/test_data.go @@ -17,35 +17,84 @@ package mocks import ( - "errors" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" "math/big" - "math/rand" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs" + rand2 "math/rand" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) // Test variables var ( - BlockNumber = big.NewInt(rand.Int63()) - BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73" - CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") - NewNonceValue = rand.Uint64() - NewBalanceValue = rand.Int63() - ContractRoot = common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") - StoragePath = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes() - StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() - StorageValue = common.Hex2Bytes("0x03") - storage = []statediff.StorageDiff{{ + // block data + BlockNumber = big.NewInt(rand2.Int63()) + MockHeader = types.Header{ + Time: 0, + Number: BlockNumber, + Root: common.HexToHash("0x0"), + TxHash: common.HexToHash("0x0"), + ReceiptHash: common.HexToHash("0x0"), + } + MockTransactions, MockReceipts, senderAddr = createTransactionsAndReceipts() + ReceiptsRlp, _ = rlp.EncodeToBytes(MockReceipts) + MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) + MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) + MockHeaderRlp, err = rlp.EncodeToBytes(MockBlock.Header()) + MockTrxMeta = []*ipfs.TrxMetaData{ + { + CID: "", // This is empty until we go to publish to ipfs + Src: senderAddr.Hex(), + Dst: "0x0000000000000000000000000000000000000000", + }, + { + CID: "", + Src: senderAddr.Hex(), + Dst: "0x0000000000000000000000000000000000000001", + }, + } + MockRctMeta = []*ipfs.ReceiptMetaData{ + { + CID: "", + Topic0s: []string{ + "0x0000000000000000000000000000000000000000000000000000000000000004", + }, + ContractAddress: "0x0000000000000000000000000000000000000000", + }, + { + CID: "", + Topic0s: []string{ + "0x0000000000000000000000000000000000000000000000000000000000000005", + }, + ContractAddress: "0x0000000000000000000000000000000000000001", + }, + } + + // statediff data + CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") + NonceValue = rand2.Uint64() + anotherNonceValue = rand2.Uint64() + BalanceValue = rand2.Int63() + anotherBalanceValue = rand2.Int63() + ContractRoot = common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + StoragePath = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes() + StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() + StorageValue = common.Hex2Bytes("0x03") + storage = []statediff.StorageDiff{{ Key: StorageKey, Value: StorageValue, Path: StoragePath, Proof: [][]byte{}, + Leaf: true, }} emptyStorage = make([]statediff.StorageDiff, 0) address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") @@ -53,22 +102,31 @@ var ( anotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") AnotherContractLeafKey = ipfs.AddressToKey(anotherAddress) testAccount = state.Account{ - Nonce: NewNonceValue, - Balance: big.NewInt(NewBalanceValue), + Nonce: NonceValue, + Balance: big.NewInt(BalanceValue), Root: ContractRoot, CodeHash: CodeHash, } - valueBytes, _ = rlp.EncodeToBytes(testAccount) - CreatedAccountDiffs = []statediff.AccountDiff{ + anotherTestAccount = state.Account{ + Nonce: anotherNonceValue, + Balance: big.NewInt(anotherBalanceValue), + Root: common.HexToHash("0x"), + CodeHash: nil, + } + valueBytes, _ = rlp.EncodeToBytes(testAccount) + anotherValueBytes, _ = rlp.EncodeToBytes(anotherTestAccount) + CreatedAccountDiffs = []statediff.AccountDiff{ { Key: ContractLeafKey.Bytes(), Value: valueBytes, Storage: storage, + Leaf: true, }, { Key: AnotherContractLeafKey.Bytes(), - Value: valueBytes, + Value: anotherValueBytes, Storage: emptyStorage, + Leaf: true, }, } @@ -76,61 +134,67 @@ var ( Key: ContractLeafKey.Bytes(), Value: valueBytes, Storage: storage, + Leaf: true, }} DeletedAccountDiffs = []statediff.AccountDiff{{ Key: ContractLeafKey.Bytes(), Value: valueBytes, Storage: storage, + Leaf: true, }} MockStateDiff = statediff.StateDiff{ BlockNumber: BlockNumber, - BlockHash: common.HexToHash(BlockHash), + BlockHash: MockBlock.Hash(), CreatedAccounts: CreatedAccountDiffs, - DeletedAccounts: DeletedAccountDiffs, - UpdatedAccounts: UpdatedAccountDiffs, } MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) - - mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) - mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) - MockTransactions = types.Transactions{mockTransaction1, mockTransaction2} - - mockReceipt1 = types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50) - mockReceipt2 = types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) - MockReceipts = types.Receipts{mockReceipt1, mockReceipt2} - - MockHeader = types.Header{ - Time: 0, - Number: big.NewInt(1), - Root: common.HexToHash("0x0"), - TxHash: common.HexToHash("0x0"), - ReceiptHash: common.HexToHash("0x0"), + MockStateNodes = map[common.Hash]ipfs.StateNode{ + ContractLeafKey: { + Value: valueBytes, + Leaf: true, + }, + AnotherContractLeafKey: { + Value: anotherValueBytes, + Leaf: true, + }, + } + MockStorageNodes = map[common.Hash][]ipfs.StorageNode{ + ContractLeafKey: { + { + Key: common.BytesToHash(StorageKey), + Value: StorageValue, + Leaf: true, + }, + }, } - MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) - MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) - MockStatediffPayload = statediff.Payload{ + // aggregate payloads + MockStateDiffPayload = statediff.Payload{ BlockRlp: MockBlockRlp, StateDiffRlp: MockStateDiffBytes, - Err: nil, + ReceiptsRlp: ReceiptsRlp, } - EmptyStatediffPayload = statediff.Payload{ + EmptyStateDiffPayload = statediff.Payload{ BlockRlp: []byte{}, StateDiffRlp: []byte{}, - Err: nil, + ReceiptsRlp: []byte{}, } - ErrStatediffPayload = statediff.Payload{ - BlockRlp: []byte{}, - StateDiffRlp: []byte{}, - Err: errors.New("mock error"), + MockIPLDPayload = ipfs.IPLDPayload{ + BlockNumber: big.NewInt(1), + BlockHash: MockBlock.Hash(), + Receipts: MockReceipts, + HeaderRLP: MockHeaderRlp, + BlockBody: MockBlock.Body(), + TrxMetaData: MockTrxMeta, + ReceiptMetaData: MockRctMeta, + StorageNodes: MockStorageNodes, + StateNodes: MockStateNodes, } - MockIPLDPayload = ipfs.IPLDPayload{} - MockCIDPayload = ipfs.CIDPayload{ BlockNumber: "1", BlockHash: common.HexToHash("0x0"), @@ -185,3 +249,46 @@ var ( }, } ) + +// createTransactionsAndReceipts is a helper function to generate signed mock transactions and mock receipts with mock logs +func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common.Address) { + // make transactions + trx1 := types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) + trx2 := types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) + transactionSigner := types.MakeSigner(params.MainnetChainConfig, BlockNumber) + mockCurve := elliptic.P256() + mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) + if err != nil { + log.Fatal(err) + } + signedTrx1, err := types.SignTx(trx1, transactionSigner, mockPrvKey) + if err != nil { + log.Fatal(err) + } + signedTrx2, err := types.SignTx(trx2, transactionSigner, mockPrvKey) + if err != nil { + log.Fatal(err) + } + senderAddr, err := types.Sender(transactionSigner, signedTrx1) // same for both trx + if err != nil { + log.Fatal(err) + } + // make receipts + mockTopic1 := common.HexToHash("0x04") + mockReceipt1 := types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50) + mockReceipt1.ContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") + mockLog1 := &types.Log{ + Topics: []common.Hash{mockTopic1}, + } + mockReceipt1.Logs = []*types.Log{mockLog1} + mockReceipt1.TxHash = trx1.Hash() + mockTopic2 := common.HexToHash("0x05") + mockReceipt2 := types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) + mockReceipt2.ContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") + mockLog2 := &types.Log{ + Topics: []common.Hash{mockTopic2}, + } + mockReceipt2.Logs = []*types.Log{mockLog2} + mockReceipt2.TxHash = trx2.Hash() + return types.Transactions{signedTrx1, signedTrx2}, types.Receipts{mockReceipt1, mockReceipt2}, senderAddr +} diff --git a/pkg/seed_node/api.go b/pkg/seed_node/api.go index 7364ad73..88692e7b 100644 --- a/pkg/seed_node/api.go +++ b/pkg/seed_node/api.go @@ -19,11 +19,10 @@ package seed_node import ( "context" - "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/pkg/config" ) diff --git a/pkg/seed_node/filterer.go b/pkg/seed_node/filterer.go index a2ac5bcf..ac641b88 100644 --- a/pkg/seed_node/filterer.go +++ b/pkg/seed_node/filterer.go @@ -19,13 +19,12 @@ package seed_node import ( "bytes" - "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" ) diff --git a/pkg/seed_node/repository.go b/pkg/seed_node/repository.go index 31fbc038..b5ab720c 100644 --- a/pkg/seed_node/repository.go +++ b/pkg/seed_node/repository.go @@ -19,8 +19,8 @@ package seed_node import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) diff --git a/pkg/seed_node/retreiver.go b/pkg/seed_node/retreiver.go index 1bbd512c..e44860f5 100644 --- a/pkg/seed_node/retreiver.go +++ b/pkg/seed_node/retreiver.go @@ -19,12 +19,11 @@ package seed_node import ( "math/big" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) diff --git a/pkg/seed_node/seed_node_suite_test.go b/pkg/seed_node/seed_node_suite_test.go index e0122031..81b3f38b 100644 --- a/pkg/seed_node/seed_node_suite_test.go +++ b/pkg/seed_node/seed_node_suite_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/sirupsen/logrus" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) diff --git a/pkg/seed_node/service.go b/pkg/seed_node/service.go index b8609611..aa6abd4e 100644 --- a/pkg/seed_node/service.go +++ b/pkg/seed_node/service.go @@ -20,10 +20,6 @@ import ( "sync" "github.com/ethereum/go-ethereum/params" - - "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" @@ -33,6 +29,8 @@ import ( "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" diff --git a/pkg/seed_node/service_test.go b/pkg/seed_node/service_test.go index 9b113b69..eef81a08 100644 --- a/pkg/seed_node/service_test.go +++ b/pkg/seed_node/service_test.go @@ -20,13 +20,12 @@ import ( "sync" "time" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks" @@ -35,7 +34,7 @@ import ( var _ = Describe("Service", func() { Describe("Loop", func() { - It("Streams StatediffPayloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { + It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { wg := new(sync.WaitGroup) payloadChan := make(chan statediff.Payload, 1) quitChan := make(chan bool, 1)