From 8ccdfd4835e1415a528e843b81974458856d1a10 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 5 Jun 2019 22:50:12 -0500 Subject: [PATCH] fix streamFilters issue --- cmd/test.go | 77 --------------------------- cmd/test2.go | 73 +++++++++++++++++-------- libraries/shared/streamer/streamer.go | 2 +- pkg/core/rpc_client.go | 2 +- pkg/eth/client/rpc_client.go | 2 +- pkg/fakes/mock_rpc_client.go | 2 +- pkg/ipfs/api.go | 21 +++----- pkg/ipfs/fetcher.go | 20 +++---- pkg/ipfs/publisher.go | 9 +++- pkg/ipfs/resolver.go | 8 ++- pkg/ipfs/retreiver.go | 28 ++++++---- pkg/ipfs/service.go | 7 ++- pkg/ipfs/streamer.go | 2 +- pkg/ipfs/test_helpers/test_data.go | 4 +- pkg/ipfs/test_helpers/test_helpers.go | 17 ------ pkg/ipfs/types.go | 6 ++- 16 files changed, 111 insertions(+), 169 deletions(-) delete mode 100644 cmd/test.go delete mode 100644 pkg/ipfs/test_helpers/test_helpers.go diff --git a/cmd/test.go b/cmd/test.go deleted file mode 100644 index eebf2ddf..00000000 --- a/cmd/test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "fmt" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" -) - -// testCmd represents the test command -var testCmd = &cobra.Command{ - Use: "test", - Short: "A brief description of your command", - Long: `A longer description that spans multiple lines and likely contains examples -and usage of using your command. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, - Run: func(cmd *cobra.Command, args []string) { - test() - }, -} - -func init() { - rootCmd.AddCommand(testCmd) -} - -func test() { - _, _, rpcClient := getBlockChainAndClients() - streamer := ipfs.NewStateDiffStreamer(rpcClient) - payloadChan := make(chan statediff.Payload, 800) - sub, err := streamer.Stream(payloadChan) - if err != nil { - println(err.Error()) - log.Fatal(err) - } - for { - select { - case payload := <-payloadChan: - fmt.Printf("blockRlp: %v\r\nstateDiffRlp: %v\r\nerror: %v\r\n", payload.BlockRlp, payload.StateDiffRlp, payload.Err) - var block types.Block - err := rlp.DecodeBytes(payload.BlockRlp, &block) - if err != nil { - log.Fatal(err) - } - var stateDiff statediff.StateDiff - err = rlp.DecodeBytes(payload.StateDiffRlp, &stateDiff) - if err != nil { - log.Fatal(err) - } - fmt.Printf("block: %v\r\nstateDiff: %v\r\n", block, stateDiff) - fmt.Printf("block number: %d\r\n", block.Number()) - case err = <-sub.Err(): - println(err.Error()) - log.Fatal(err) - } - } -} diff --git a/cmd/test2.go b/cmd/test2.go index 0064a605..18e98869 100644 --- a/cmd/test2.go +++ b/cmd/test2.go @@ -16,10 +16,11 @@ package cmd import ( - "github.com/ethereum/go-ethereum/core/state" - "github.com/i-norden/go-ethereum/rlp" + "bytes" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -48,15 +49,18 @@ to quickly create a Cobra application.`, func test2() { rpcClient := getRpcClient() str := streamer.NewSeedStreamer(rpcClient) - payloadChan := make(chan ipfs.ResponsePayload, 800) - filter := ipfs.StreamFilters{} - filter.HeaderFilter.FinalOnly = true - filter.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - filter.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - filter.ReceiptFilter.Topic0s = []string{} - filter.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - filter.StorageFilter.Off = true - sub, err := str.Stream(payloadChan, filter) + payloadChan := make(chan ipfs.ResponsePayload, 8000) + streamFilters := ipfs.StreamFilters{} + streamFilters.HeaderFilter.FinalOnly = true + streamFilters.ReceiptFilter.Topic0s = []string{ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377", + } + streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + streamFilters.StorageFilter.Off = true + //streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + //streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + sub, err := str.Stream(payloadChan, streamFilters) if err != nil { println(err.Error()) log.Fatal(err) @@ -68,29 +72,52 @@ func test2() { log.Error(payload.Err) } for _, headerRlp := range payload.HeadersRlp { - header := new(types.Header) - err = rlp.DecodeBytes(headerRlp, header) + var header types.Header + err = rlp.Decode(bytes.NewBuffer(headerRlp), &header) + if err != nil { + println(err.Error()) + log.Error(err) + } println("header") - println(header.TxHash.Hex()) + println(header.Hash().Hex()) println(header.Number.Int64()) } for _, trxRlp := range payload.TransactionsRlp { - trx := new(types.Transaction) - err = rlp.DecodeBytes(trxRlp, trx) + var trx types.Transaction + buff := bytes.NewBuffer(trxRlp) + stream := rlp.NewStream(buff, 0) + err := trx.DecodeRLP(stream) + if err != nil { + println(err.Error()) + log.Error(err) + } println("trx") println(trx.Hash().Hex()) - println(trx.Value().Int64()) } for _, rctRlp := range payload.ReceiptsRlp { - rct := new(types.Receipt) - err = rlp.DecodeBytes(rctRlp, rct) + var rct types.Receipt + buff := bytes.NewBuffer(rctRlp) + stream := rlp.NewStream(buff, 0) + err = rct.DecodeRLP(stream) + if err != nil { + println(err.Error()) + log.Error(err) + } println("rct") - println(rct.TxHash.Hex()) - println(rct.BlockNumber.Bytes()) + for _, l := range rct.Logs { + println("log") + println(l.BlockHash.Hex()) + println(l.TxHash.Hex()) + println(l.Address.Hex()) + } } for _, stateRlp := range payload.StateNodesRlp { - acct := new(state.Account) - err = rlp.DecodeBytes(stateRlp, acct) + var acct state.Account + err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct) + if err != nil { + println(err.Error()) + log.Error(err) + } println("state") println(acct.Root.Hex()) println(acct.Balance.Int64()) diff --git a/libraries/shared/streamer/streamer.go b/libraries/shared/streamer/streamer.go index 16a1d964..f296e469 100644 --- a/libraries/shared/streamer/streamer.go +++ b/libraries/shared/streamer/streamer.go @@ -43,5 +43,5 @@ func NewSeedStreamer(client core.RpcClient) *Streamer { // Stream is the main loop for subscribing to data from a vulcanizedb seed node func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("vulcanizedb", payloadChan, "subscribe") + return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) } diff --git a/pkg/core/rpc_client.go b/pkg/core/rpc_client.go index b193cd3c..5b69417f 100644 --- a/pkg/core/rpc_client.go +++ b/pkg/core/rpc_client.go @@ -29,5 +29,5 @@ type RPCClient interface { BatchCall(batch []client.BatchElem) error IpcPath() string SupportedModules() (map[string]string, error) - Subscribe(namespace string, payloadChan interface{}, subName string, args ...interface{}) (*rpc.ClientSubscription, error) + Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) } diff --git a/pkg/eth/client/rpc_client.go b/pkg/eth/client/rpc_client.go index 3c8dafa6..f4aa889f 100644 --- a/pkg/eth/client/rpc_client.go +++ b/pkg/eth/client/rpc_client.go @@ -90,4 +90,4 @@ func (client RPCClient) Subscribe(namespace string, payloadChan interface{}, arg return nil, errors.New("channel given to Subscribe must not be nil") } return client.client.Subscribe(context.Background(), namespace, payloadChan, args...) -} \ No newline at end of file +} diff --git a/pkg/fakes/mock_rpc_client.go b/pkg/fakes/mock_rpc_client.go index 9ff03b5a..190309ea 100644 --- a/pkg/fakes/mock_rpc_client.go +++ b/pkg/fakes/mock_rpc_client.go @@ -188,6 +188,6 @@ func (client *MockRPCClient) AssertBatchCalledWith(method string, lengthOfBatch Expect(client.passedMethod).To(Equal(method)) } -func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, subName string, args ...interface{}) (*rpc.ClientSubscription, error) { +func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { panic("implement me") } diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go index e32203b5..cd42252c 100644 --- a/pkg/ipfs/api.go +++ b/pkg/ipfs/api.go @@ -18,6 +18,7 @@ package ipfs import ( "context" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -40,29 +41,19 @@ func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI { } } -// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created -func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeDefOnly chan ResponsePayload) (*rpc.Subscription, error) { +// Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created +func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters StreamFilters) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } - streamFilters := StreamFilters{} - streamFilters.HeaderFilter.FinalOnly = true - streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - streamFilters.ReceiptFilter.Topic0s = []string{ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377", - } - streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} - streamFilters.StorageFilter.Off = true // create subscription and start waiting for statediff events rpcSub := notifier.CreateSubscription() go func() { - // subscribe to events from the state diff service + // subscribe to events from the SyncPublishScreenAndServe service payloadChannel := make(chan ResponsePayload) quitChan := make(chan bool) go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters) @@ -81,11 +72,11 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeD } return case <-quitChan: - // don't need to unsubscribe, statediff service does so before sending the quit signal + // don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal return } } }() return rpcSub, nil -} \ No newline at end of file +} diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index 6b6a9411..4319efca 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -27,9 +27,9 @@ import ( log "github.com/sirupsen/logrus" ) -// IPLDFethcer is an interface for fetching IPLDs +// IPLDFetcher is an interface for fetching IPLDs type IPLDFetcher interface { - FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) + FetchCIDs(cids CidWrapper) (*IpldWrapper, error) } // EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS @@ -48,9 +48,9 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { }, nil } -// FetchCIDs is the exported method for fetching and returning all the cids passed in a cidWrapper -func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) { - blocks := &ipfsBlockWrapper{ +// FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper +func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { + blocks := &IpldWrapper{ Headers: make([]blocks.Block, 0), Transactions: make([]blocks.Block, 0), Receipts: make([]blocks.Block, 0), @@ -84,7 +84,7 @@ func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) { // fetchHeaders fetches headers // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper) error { +func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) error { headerCids := make([]cid.Cid, 0, len(cids.Headers)) for _, c := range cids.Headers { dc, err := cid.Decode(c) @@ -102,7 +102,7 @@ func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper) // fetchTrxs fetches transactions // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) error { +func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error { trxCids := make([]cid.Cid, 0, len(cids.Transactions)) for _, c := range cids.Transactions { dc, err := cid.Decode(c) @@ -120,7 +120,7 @@ func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) er // fetchRcts fetches receipts // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) error { +func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error { rctCids := make([]cid.Cid, 0, len(cids.Receipts)) for _, c := range cids.Receipts { dc, err := cid.Decode(c) @@ -139,7 +139,7 @@ func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) er // fetchState fetches state nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state keys -func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) error { +func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error { for _, stateNode := range cids.StateNodes { if stateNode.CID == "" || stateNode.Key == "" { continue @@ -160,7 +160,7 @@ func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) e // fetchStorage fetches storage nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state and storage keys -func (f *EthIPLDFetcher) fetchStorage(cids cidWrapper, blocks *ipfsBlockWrapper) error { +func (f *EthIPLDFetcher) fetchStorage(cids CidWrapper, blocks *IpldWrapper) error { for _, storageNode := range cids.StorageNodes { if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { continue diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index f2490040..fbee915a 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -122,7 +122,7 @@ func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { return nil, err } - // Package CIDs into a single struct + // Package CIDs and their metadata into a single struct return &CIDPayload{ BlockHash: payload.BlockHash, BlockNumber: payload.BlockNumber.String(), @@ -147,6 +147,13 @@ func (pub *Publisher) publishHeaders(headerRLP []byte) (string, error) { } func (pub *Publisher) publishTransactions(blockBody *types.Body, trxMeta []*TrxMetaData) (map[common.Hash]*TrxMetaData, error) { + /* + println("publishing transactions") + for _, trx := range blockBody.Transactions { + println("trx value:") + println(trx.Value().Int64()) + } + */ transactionCids, err := pub.TransactionPutter.DagPut(blockBody) if err != nil { return nil, err diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index e0352b15..1cf2baf2 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -22,17 +22,21 @@ import ( "github.com/ipfs/go-block-format" ) +// IPLDResolver is the interface to resolving IPLDs type IPLDResolver interface { - ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) + ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) } +// EthIPLDResolver is the underlying struct to support the IPLDResolver interface type EthIPLDResolver struct{} +// NewIPLDResolver returns a pointer to an EthIPLDResolver which satisfies the IPLDResolver interface func NewIPLDResolver() *EthIPLDResolver { return &EthIPLDResolver{} } -func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) { +// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper +func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) { response := new(ResponsePayload) eir.resolveHeaders(ipfsBlocks.Headers, response) eir.resolveTransactions(ipfsBlocks.Transactions, response) diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index 02f805b9..e01cd221 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -22,26 +22,32 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +// CIDRetriever is the interface for retrieving CIDs from the Postgres cache type CIDRetriever interface { - RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) + RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) } +// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface type EthCIDRetriever struct { db *postgres.DB } +// NewCIDRetriever returns a pointer to a new EthCIDRetriever which supports the CIDRetriever interface func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever { return &EthCIDRetriever{ db: db, } } +// GetLastBlockNumber is used to retrieve the latest block number in the cache func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) { var blockNumber int64 err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ") return blockNumber, err } -func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) { + +// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters +func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) { var endingBlock int64 var err error if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock { @@ -50,13 +56,13 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrap return nil, err } } - cids := make([]cidWrapper, 0, endingBlock+1-streamFilters.StartingBlock) + cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock) tx, err := ecr.db.Beginx() if err != nil { return nil, err } for i := streamFilters.StartingBlock; i <= endingBlock; i++ { - cw := &cidWrapper{ + cw := &CidWrapper{ BlockNumber: i, Headers: make([]string, 0), Transactions: make([]string, 0), @@ -106,7 +112,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrap return cids, err } -func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { var pgStr string if streamFilters.HeaderFilter.FinalOnly { pgStr = `SELECT cid FROM header_cids @@ -119,10 +125,10 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters Stream return tx.Select(cids.Headers, pgStr, blockNumber) } -func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) ([]int64, error) { +func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) ([]int64, error) { args := make([]interface{}, 0, 3) type result struct { - Id int64 `db:"id"` + ID int64 `db:"id"` Cid string `db:"cid"` } results := make([]result, 0) @@ -144,12 +150,12 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFil ids := make([]int64, 0) for _, res := range results { cids.Transactions = append(cids.Transactions, res.Cid) - ids = append(ids, res.Id) + ids = append(ids, res.ID) } return ids, nil } -func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64, trxIds []int64) error { +func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64, trxIds []int64) error { args := make([]interface{}, 0, 2) pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids WHERE receipt_cids.tx_id = transaction_cids.id @@ -169,7 +175,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFil return tx.Select(cids.Receipts, pgStr, args...) } -func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` @@ -186,7 +192,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamF return tx.Select(cids.StateNodes, pgStr, args...) } -func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { +func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error { args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids WHERE storage_cids.state_id = state_cids.id diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index 7430a683..aa325562 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -30,7 +30,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size +const payloadChanBufferSize = 8000 // the max eth sub buffer size // SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing, // indexing all Ethereum data screening this data, and serving it up to subscribed clients @@ -118,7 +118,7 @@ func (sap *Service) APIs() []rpc.API { } // SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids -// It then forwards the data to the Serve() loop which filters and sends relevent data to client subscriptions +// It then forwards the data to the Serve() loop which filters and sends relevant data to client subscriptions func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { @@ -139,7 +139,6 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- continue } // If we have a ScreenAndServe process running, forward the payload to it - // If the ScreenAndServe process loop is slower than this one, will it miss some incoming payloads?? select { case forwardPayloadChan <- *ipldPayload: default: @@ -284,7 +283,7 @@ func (sap *Service) serve(id rpc.ID, payload ResponsePayload) { if ok { select { case sub.PayloadChan <- payload: - log.Infof("sending state diff payload to subscription %s", id) + log.Infof("sending seed node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) } diff --git a/pkg/ipfs/streamer.go b/pkg/ipfs/streamer.go index f3c6c01a..095ec927 100644 --- a/pkg/ipfs/streamer.go +++ b/pkg/ipfs/streamer.go @@ -42,5 +42,5 @@ func NewStateDiffStreamer(client core.RpcClient) *Streamer { // Stream is the main loop for subscribing to data from the Geth state diff process func (sds *Streamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("statediff", payloadChan, "subscribe") + return sds.Client.Subscribe("statediff", payloadChan, "stream") } diff --git a/pkg/ipfs/test_helpers/test_data.go b/pkg/ipfs/test_helpers/test_data.go index 8ce24d8e..2401a970 100644 --- a/pkg/ipfs/test_helpers/test_data.go +++ b/pkg/ipfs/test_helpers/test_data.go @@ -171,14 +171,14 @@ var ( common.HexToHash("0x0"): { { CID: "mockStorageCID1", - Key: common.HexToHash("0x0"), + Key: "0x0", Leaf: true, }, }, common.HexToHash("0x1"): { { CID: "mockStorageCID2", - Key: common.HexToHash("0x1"), + Key: "0x1", Leaf: true, }, }, diff --git a/pkg/ipfs/test_helpers/test_helpers.go b/pkg/ipfs/test_helpers/test_helpers.go deleted file mode 100644 index 2c443bf3..00000000 --- a/pkg/ipfs/test_helpers/test_helpers.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package test_helpers diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 36cafb02..904c5efc 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -65,7 +65,8 @@ func (sd *ResponsePayload) Encode() ([]byte, error) { return sd.encoded, sd.err } -type cidWrapper struct { +// CidWrapper is used to package CIDs retrieved from the local Postgres cache +type CidWrapper struct { BlockNumber int64 Headers []string Transactions []string @@ -74,7 +75,8 @@ type cidWrapper struct { StorageNodes []StorageNodeCID } -type ipfsBlockWrapper struct { +// IpldWrapper is used to package raw IPLD block data for resolution +type IpldWrapper struct { Headers []blocks.Block Transactions []blocks.Block Receipts []blocks.Block