From 7279d48dc6513407c4c3df43fc5a36f3a8af0920 Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 21 Feb 2023 19:43:43 -0600 Subject: [PATCH] remove vestigial packages --- cmd/subscribe.go | 172 ---------------- pkg/client/client.go | 44 ----- pkg/eth/filterer.go | 366 ----------------------------------- pkg/eth/filterer_test.go | 208 -------------------- pkg/eth/ipld_fetcher.go | 248 ------------------------ pkg/eth/ipld_fetcher_test.go | 75 ------- pkg/serve/helpers.go | 37 ---- pkg/serve/subscription.go | 60 ------ 8 files changed, 1210 deletions(-) delete mode 100644 cmd/subscribe.go delete mode 100644 pkg/client/client.go delete mode 100644 pkg/eth/filterer.go delete mode 100644 pkg/eth/filterer_test.go delete mode 100644 pkg/eth/ipld_fetcher.go delete mode 100644 pkg/eth/ipld_fetcher_test.go delete mode 100644 pkg/serve/helpers.go delete mode 100644 pkg/serve/subscription.go diff --git a/cmd/subscribe.go b/cmd/subscribe.go deleted file mode 100644 index 155aebbc..00000000 --- a/cmd/subscribe.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "bytes" - "fmt" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/client" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - w "github.com/cerc-io/ipld-eth-server/v4/pkg/serve" -) - -// subscribeCmd represents the subscribe command -var subscribeCmd = &cobra.Command{ - Use: "subscribe", - Short: "This command is used to subscribe to the eth ipfs watcher data stream with the provided filters", - Long: `This command is for demo and testing purposes and is used to subscribe to the watcher with the provided subscription configuration parameters. -It does not do anything with the data streamed from the watcher other than unpack it and print it out for demonstration purposes.`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - subscribe() - }, -} - -func init() { - rootCmd.AddCommand(subscribeCmd) -} - -func subscribe() { - // Prep the subscription config/filters to be sent to the server - ethSubConfig, err := eth.NewEthSubscriptionConfig() - if err != nil { - log.Fatal(err) - } - - // Create a new rpc client and a subscription streamer with that client - rpcClient, err := getRPCClient() - if err != nil { - logWithCommand.Fatal(err) - } - subClient := client.NewClient(rpcClient) - - // Buffered channel for reading subscription payloads - payloadChan := make(chan w.SubscriptionPayload, 20000) - - // Subscribe to the watcher service with the given config/filter parameters - sub, err := subClient.Stream(payloadChan, *ethSubConfig) - if err != nil { - logWithCommand.Fatal(err) - } - logWithCommand.Info("awaiting payloads") - // Receive response payloads and print out the results - for { - select { - case payload := <-payloadChan: - if payload.Err != "" { - logWithCommand.Error(payload.Err) - continue - } - var ethData eth.IPLDs - if err := rlp.DecodeBytes(payload.Data, ðData); err != nil { - logWithCommand.Error(err) - continue - } - var header types.Header - err = rlp.Decode(bytes.NewBuffer(ethData.Header.Data), &header) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex()) - fmt.Printf("header: %v\n", header) - for _, trxRlp := range ethData.Transactions { - var trx types.Transaction - buff := bytes.NewBuffer(trxRlp.Data) - stream := rlp.NewStream(buff, 0) - err := trx.DecodeRLP(stream) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Transaction with hash %s\n", trx.Hash().Hex()) - fmt.Printf("trx: %v\n", trx) - } - for _, rctRlp := range ethData.Receipts { - var rct types.Receipt - buff := bytes.NewBuffer(rctRlp.Data) - stream := rlp.NewStream(buff, 0) - err = rct.DecodeRLP(stream) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Receipt with block hash %s, trx hash %s\n", rct.BlockHash.Hex(), rct.TxHash.Hex()) - fmt.Printf("rct: %v\n", rct) - for _, l := range rct.Logs { - if len(l.Topics) < 1 { - logWithCommand.Error(fmt.Sprintf("log only has %d topics", len(l.Topics))) - continue - } - fmt.Printf("Log for block hash %s, trx hash %s, address %s, and with topic0 %s\n", - l.BlockHash.Hex(), l.TxHash.Hex(), l.Address.Hex(), l.Topics[0].Hex()) - fmt.Printf("log: %v\n", l) - } - } - // This assumes leafs only - for _, stateNode := range ethData.StateNodes { - var acct types.StateAccount - err = rlp.DecodeBytes(stateNode.IPLD.Data, &acct) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Account for key %s, and root %s, with balance %s\n", - stateNode.StateLeafKey.Hex(), acct.Root.Hex(), acct.Balance.String()) - fmt.Printf("state account: %+v\n", acct) - } - for _, storageNode := range ethData.StorageNodes { - fmt.Printf("Storage for state key %s ", storageNode.StateLeafKey.Hex()) - fmt.Printf("with storage key %s\n", storageNode.StorageLeafKey.Hex()) - var i []interface{} - err := rlp.DecodeBytes(storageNode.IPLD.Data, &i) - if err != nil { - logWithCommand.Error(err) - continue - } - // if a value node - if len(i) == 1 { - valueBytes, ok := i[0].([]byte) - if !ok { - continue - } - fmt.Printf("Storage leaf key: %s, and value hash: %s\n", - storageNode.StorageLeafKey.Hex(), common.BytesToHash(valueBytes).Hex()) - } - } - case err = <-sub.Err(): - logWithCommand.Fatal(err) - } - } -} - -func getRPCClient() (*rpc.Client, error) { - vulcPath := viper.GetString("watcher.ethSubscription.wsPath") - if vulcPath == "" { - vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided - } - return rpc.Dial(vulcPath) -} diff --git a/pkg/client/client.go b/pkg/client/client.go deleted file mode 100644 index 524f0c36..00000000 --- a/pkg/client/client.go +++ /dev/null @@ -1,44 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipld-eth-server -package client - -import ( - "context" - - "github.com/ethereum/go-ethereum/rpc" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/serve" -) - -// Client is used to subscribe to the ipld-eth-server ipld data stream -type Client struct { - c *rpc.Client -} - -// NewClient creates a new Client -func NewClient(c *rpc.Client) *Client { - return &Client{ - c: c, - } -} - -// Stream is the main loop for subscribing to iplds from an ipld-eth-server server -func (c *Client) Stream(payloadChan chan serve.SubscriptionPayload, params eth.SubscriptionSettings) (*rpc.ClientSubscription, error) { - return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", params) -} diff --git a/pkg/eth/filterer.go b/pkg/eth/filterer.go deleted file mode 100644 index d4cc936a..00000000 --- a/pkg/eth/filterer.go +++ /dev/null @@ -1,366 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - sdtypes "github.com/ethereum/go-ethereum/statediff/types" - "github.com/ipfs/go-cid" - "github.com/multiformats/go-multihash" -) - -// Filterer interface for substituing mocks in tests -type Filterer interface { - Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) -} - -// ResponseFilterer satisfies the ResponseFilterer interface for ethereum -type ResponseFilterer struct{} - -// NewResponseFilterer creates a new Filterer satisfying the ResponseFilterer interface -func NewResponseFilterer() *ResponseFilterer { - return &ResponseFilterer{} -} - -// Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) { - if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) { - response := new(IPLDs) - response.TotalDifficulty = payload.TotalDifficulty - if err := s.filterHeaders(filter.HeaderFilter, response, payload); err != nil { - return nil, err - } - txHashes, err := s.filterTransactions(filter.TxFilter, response, payload) - if err != nil { - return nil, err - } - var filterTxs []common.Hash - if filter.ReceiptFilter.MatchTxs { - filterTxs = txHashes - } - if err := s.filerReceipts(filter.ReceiptFilter, response, payload, filterTxs); err != nil { - return nil, err - } - if err := s.filterStateAndStorage(filter.StateFilter, filter.StorageFilter, response, payload); err != nil { - return nil, err - } - response.BlockNumber = payload.Block.Number() - return response, nil - } - return nil, nil -} - -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { - if !headerFilter.Off { - headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) - if err != nil { - return err - } - cid, err := ipld.RawdataToCid(ipld.MEthHeader, headerRLP, multihash.KECCAK_256) - if err != nil { - return err - } - response.Header = models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: headerRLP, - Key: cid.String(), - } - if headerFilter.Uncles { - response.Uncles = make([]models.IPLDModel, len(payload.Block.Body().Uncles)) - for i, uncle := range payload.Block.Body().Uncles { - uncleRlp, err := rlp.EncodeToBytes(uncle) - if err != nil { - return err - } - cid, err := ipld.RawdataToCid(ipld.MEthHeader, uncleRlp, multihash.KECCAK_256) - if err != nil { - return err - } - response.Uncles[i] = models.IPLDModel{ - BlockNumber: uncle.Number.String(), - Data: uncleRlp, - Key: cid.String(), - } - } - } - } - return nil -} - -func checkRange(start, end, actual int64) bool { - if (end <= 0 || end >= actual) && start <= actual { - return true - } - return false -} - -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { - var trxHashes []common.Hash - if !trxFilter.Off { - trxLen := len(payload.Block.Body().Transactions) - trxHashes = make([]common.Hash, 0, trxLen) - response.Transactions = make([]models.IPLDModel, 0, trxLen) - for i, trx := range payload.Block.Body().Transactions { - // TODO: check if want corresponding receipt and if we do we must include this transaction - if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { - trxBuffer := new(bytes.Buffer) - if err := trx.EncodeRLP(trxBuffer); err != nil { - return nil, err - } - data := trxBuffer.Bytes() - cid, err := ipld.RawdataToCid(ipld.MEthTx, data, multihash.KECCAK_256) - if err != nil { - return nil, err - } - response.Transactions = append(response.Transactions, models.IPLDModel{ - Data: data, - Key: cid.String(), - }) - trxHashes = append(trxHashes, trx.Hash()) - } - } - } - return trxHashes, nil -} - -// checkTransactionAddrs returns true if either the transaction src and dst are one of the wanted src and dst addresses -func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool { - // If we aren't filtering for any addresses, every transaction is a go - if len(wantedDst) == 0 && len(wantedSrc) == 0 { - return true - } - for _, src := range wantedSrc { - if src == actualSrc { - return true - } - } - for _, dst := range wantedDst { - if dst == actualDst { - return true - } - } - return false -} - -func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { - if !receiptFilter.Off { - response.Receipts = make([]models.IPLDModel, 0, len(payload.Receipts)) - rctLeafCID, rctIPLDData, err := GetRctLeafNodeData(payload.Receipts) - if err != nil { - return err - } - - for idx, receipt := range payload.Receipts { - // topics is always length 4 - topics := make([][]string, 4) - contracts := make([]string, len(receipt.Logs)) - for _, l := range receipt.Logs { - contracts = append(contracts, l.Address.String()) - for idx, t := range l.Topics { - topics[idx] = append(topics[idx], t.String()) - } - } - - // TODO: Verify this filter logic. - if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) { - response.Receipts = append(response.Receipts, models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: rctIPLDData[idx], - Key: rctLeafCID[idx].String(), - }) - } - } - } - return nil -} - -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedAddresses []string, actualAddresses []string, wantedTrxHashes []common.Hash) bool { - // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go - if len(wantedTopics) == 0 && len(wantedAddresses) == 0 && len(wantedTrxHashes) == 0 { - return true - } - // Keep receipts that are from watched txs - for _, wantedTrxHash := range wantedTrxHashes { - if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { - return true - } - } - // If there are no wanted contract addresses, we keep all receipts that match the topic filter - if len(wantedAddresses) == 0 { - if match := filterMatch(wantedTopics, actualTopics); match { - return true - } - } - // If there are wanted contract addresses to filter on - for _, wantedAddr := range wantedAddresses { - // and this is an address of interest - for _, actualAddr := range actualAddresses { - if wantedAddr == actualAddr { - // we keep the receipt if it matches on the topic filter - if match := filterMatch(wantedTopics, actualTopics); match { - return true - } - } - } - } - return false -} - -// filterMatch returns true if the actualTopics conform to the wantedTopics filter -func filterMatch(wantedTopics, actualTopics [][]string) bool { - // actualTopics should always be length 4, but the members can be nil slices - matches := 0 - for i, actualTopicSet := range actualTopics { - if i < len(wantedTopics) && len(wantedTopics[i]) > 0 { - // If we have topics in this filter slot, count as a match if one of the topics matches - matches += slicesShareString(actualTopicSet, wantedTopics[i]) - } else { - // Filter slot is either empty or doesn't exist => not matching any topics at this slot => counts as a match - matches++ - } - } - return matches == 4 -} - -// returns 1 if the two slices have a string in common, 0 if they do not -func slicesShareString(slice1, slice2 []string) int { - for _, str1 := range slice1 { - for _, str2 := range slice2 { - if str1 == str2 { - return 1 - } - } - } - return 0 -} - -// filterStateAndStorage filters state and storage nodes into the response according to the provided filters -func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { - response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) - response.StorageNodes = make([]StorageNode, 0) - stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) - for i, addr := range stateFilter.Addresses { - stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) - } - storageAddressFilters := make([]common.Hash, len(storageFilter.Addresses)) - for i, addr := range storageFilter.Addresses { - storageAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) - } - storageKeyFilters := make([]common.Hash, len(storageFilter.StorageKeys)) - for i, store := range storageFilter.StorageKeys { - storageKeyFilters[i] = common.HexToHash(store) - } - for _, stateNode := range payload.StateNodes { - if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) { - if stateNode.NodeType == sdtypes.Leaf || stateFilter.IntermediateNodes { - cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.NodeValue, multihash.KECCAK_256) - if err != nil { - return err - } - response.StateNodes = append(response.StateNodes, StateNode{ - StateLeafKey: common.BytesToHash(stateNode.LeafKey), - Path: stateNode.Path, - IPLD: models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: stateNode.NodeValue, - Key: cid.String(), - }, - Type: stateNode.NodeType, - }) - } - } - if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { - for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { - if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { - cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.NodeValue, multihash.KECCAK_256) - if err != nil { - return err - } - response.StorageNodes = append(response.StorageNodes, StorageNode{ - StateLeafKey: common.BytesToHash(stateNode.LeafKey), - StorageLeafKey: common.BytesToHash(storageNode.LeafKey), - IPLD: models.IPLDModel{ - BlockNumber: payload.Block.Number().String(), - Data: storageNode.NodeValue, - Key: cid.String(), - }, - Type: storageNode.NodeType, - Path: storageNode.Path, - }) - } - } - } - } - return nil -} - -func checkNodeKeys(wantedKeys []common.Hash, actualKey []byte) bool { - // If we aren't filtering for any specific keys, all nodes are a go - if len(wantedKeys) == 0 { - return true - } - for _, key := range wantedKeys { - if bytes.Equal(key.Bytes(), actualKey) { - return true - } - } - return false -} - -// GetRctLeafNodeData converts the receipts to receipt trie and returns the receipt leaf node IPLD data and -// corresponding CIDs -func GetRctLeafNodeData(rcts types.Receipts) ([]cid.Cid, [][]byte, error) { - receiptTrie := ipld.NewRctTrie() - for idx, rct := range rcts { - ethRct, err := ipld.NewReceipt(rct) - if err != nil { - return nil, nil, err - } - if err = receiptTrie.Add(idx, ethRct.RawData()); err != nil { - return nil, nil, err - } - } - - rctLeafNodes, keys, err := receiptTrie.GetLeafNodes() - if err != nil { - return nil, nil, err - } - - ethRctleafNodeCids := make([]cid.Cid, len(rctLeafNodes)) - ethRctleafNodeData := make([][]byte, len(rctLeafNodes)) - for i, rln := range rctLeafNodes { - var idx uint - - r := bytes.NewReader(keys[i].TrieKey) - err = rlp.Decode(r, &idx) - if err != nil { - return nil, nil, err - } - - ethRctleafNodeCids[idx] = rln.Cid() - ethRctleafNodeData[idx] = rln.RawData() - } - - return ethRctleafNodeCids, ethRctleafNodeData, nil -} diff --git a/pkg/eth/filterer_test.go b/pkg/eth/filterer_test.go deleted file mode 100644 index 530ec01a..00000000 --- a/pkg/eth/filterer_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/statediff/indexer/models" - sdtypes "github.com/ethereum/go-ethereum/statediff/types" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" -) - -var ( - filterer *eth.ResponseFilterer -) - -var _ = Describe("Filterer", func() { - Describe("FilterResponse", func() { - BeforeEach(func() { - filterer = eth.NewResponseFilterer() - }) - - It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { - iplds, err := filterer.Filter(openFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds).ToNot(BeNil()) - Expect(iplds.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header)) - var expectedEmptyUncles []models.IPLDModel - Expect(iplds.Uncles).To(Equal(expectedEmptyUncles)) - Expect(len(iplds.Transactions)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx1)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx3)).To(BeTrue()) - Expect(len(iplds.Receipts)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct1IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct2IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct3IPLD)).To(BeTrue()) - Expect(len(iplds.StateNodes)).To(Equal(2)) - for _, stateNode := range iplds.StateNodes { - Expect(stateNode.Type).To(Equal(sdtypes.Leaf)) - if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.AccountLeafKey) { - Expect(stateNode.IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State2IPLD.RawData(), - Key: test_helpers.State2IPLD.Cid().String(), - })) - } - if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.ContractLeafKey) { - Expect(stateNode.IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State1IPLD.RawData(), - Key: test_helpers.State1IPLD.Cid().String(), - })) - } - } - Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes)) - }) - - It("Applies filters from the provided config.Subscription", func() { - iplds1, err := filterer.Filter(rctAddressFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds1).ToNot(BeNil()) - Expect(iplds1.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds1.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds1.Uncles)).To(Equal(0)) - Expect(len(iplds1.Transactions)).To(Equal(0)) - Expect(len(iplds1.StorageNodes)).To(Equal(0)) - Expect(len(iplds1.StateNodes)).To(Equal(0)) - Expect(len(iplds1.Receipts)).To(Equal(1)) - Expect(iplds1.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds2, err := filterer.Filter(rctTopicsFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds2).ToNot(BeNil()) - Expect(iplds2.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds2.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds2.Uncles)).To(Equal(0)) - Expect(len(iplds2.Transactions)).To(Equal(0)) - Expect(len(iplds2.StorageNodes)).To(Equal(0)) - Expect(len(iplds2.StateNodes)).To(Equal(0)) - Expect(len(iplds2.Receipts)).To(Equal(1)) - Expect(iplds2.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds3, err := filterer.Filter(rctTopicsAndAddressFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds3).ToNot(BeNil()) - Expect(iplds3.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds3.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds3.Uncles)).To(Equal(0)) - Expect(len(iplds3.Transactions)).To(Equal(0)) - Expect(len(iplds3.StorageNodes)).To(Equal(0)) - Expect(len(iplds3.StateNodes)).To(Equal(0)) - Expect(len(iplds3.Receipts)).To(Equal(1)) - Expect(iplds3.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct1IPLD, - Key: test_helpers.Rct1CID.String(), - })) - - iplds4, err := filterer.Filter(rctAddressesAndTopicFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds4).ToNot(BeNil()) - Expect(iplds4.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds4.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds4.Uncles)).To(Equal(0)) - Expect(len(iplds4.Transactions)).To(Equal(0)) - Expect(len(iplds4.StorageNodes)).To(Equal(0)) - Expect(len(iplds4.StateNodes)).To(Equal(0)) - Expect(len(iplds4.Receipts)).To(Equal(1)) - Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct2IPLD, - Key: test_helpers.Rct2CID.String(), - })) - - iplds5, err := filterer.Filter(rctsForAllCollectedTrxs, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds5).ToNot(BeNil()) - Expect(iplds5.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds5.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds5.Uncles)).To(Equal(0)) - Expect(len(iplds5.Transactions)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx1)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx3)).To(BeTrue()) - Expect(len(iplds5.StorageNodes)).To(Equal(0)) - Expect(len(iplds5.StateNodes)).To(Equal(0)) - Expect(len(iplds5.Receipts)).To(Equal(4)) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct1IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct2IPLD)).To(BeTrue()) - Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct3IPLD)).To(BeTrue()) - - iplds6, err := filterer.Filter(rctsForSelectCollectedTrxs, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds6).ToNot(BeNil()) - Expect(iplds6.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds6.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds6.Uncles)).To(Equal(0)) - Expect(len(iplds6.Transactions)).To(Equal(1)) - Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue()) - Expect(len(iplds6.StorageNodes)).To(Equal(0)) - Expect(len(iplds6.StateNodes)).To(Equal(0)) - Expect(len(iplds6.Receipts)).To(Equal(1)) - Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.Rct2IPLD, - Key: test_helpers.Rct2CID.String(), - })) - - iplds7, err := filterer.Filter(stateFilter, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds7).ToNot(BeNil()) - Expect(iplds7.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds7.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds7.Uncles)).To(Equal(0)) - Expect(len(iplds7.Transactions)).To(Equal(0)) - Expect(len(iplds7.StorageNodes)).To(Equal(0)) - Expect(len(iplds7.Receipts)).To(Equal(0)) - Expect(len(iplds7.StateNodes)).To(Equal(1)) - Expect(iplds7.StateNodes[0].StateLeafKey.Bytes()).To(Equal(test_helpers.AccountLeafKey)) - Expect(iplds7.StateNodes[0].IPLD).To(Equal(models.IPLDModel{ - BlockNumber: test_helpers.BlockNumber.String(), - Data: test_helpers.State2IPLD.RawData(), - Key: test_helpers.State2IPLD.Cid().String(), - })) - - iplds8, err := filterer.Filter(rctTopicsAndAddressFilterFail, test_helpers.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds8).ToNot(BeNil()) - Expect(iplds8.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64())) - Expect(iplds8.Header).To(Equal(models.IPLDModel{})) - Expect(len(iplds8.Uncles)).To(Equal(0)) - Expect(len(iplds8.Transactions)).To(Equal(0)) - Expect(len(iplds8.StorageNodes)).To(Equal(0)) - Expect(len(iplds8.StateNodes)).To(Equal(0)) - Expect(len(iplds8.Receipts)).To(Equal(0)) - }) - }) -}) diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go deleted file mode 100644 index f734f012..00000000 --- a/pkg/eth/ipld_fetcher.go +++ /dev/null @@ -1,248 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "errors" - "fmt" - "math/big" - "strconv" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - "github.com/jmoiron/sqlx" -) - -// Fetcher interface for substituting mocks in tests -type Fetcher interface { - Fetch(cids CIDWrapper) (*IPLDs, error) -} - -// IPLDFetcher satisfies the IPLDFetcher interface for ethereum -// It interfaces directly with PG-IPFS -type IPLDFetcher struct { - db *sqlx.DB -} - -// NewIPLDFetcher creates a pointer to a new IPLDFetcher -func NewIPLDFetcher(db *sqlx.DB) *IPLDFetcher { - return &IPLDFetcher{ - db: db, - } -} - -// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) { - log.Debug("fetching iplds") - iplds := new(IPLDs) - var ok bool - iplds.TotalDifficulty, ok = new(big.Int).SetString(cids.Header.TotalDifficulty, 10) - if !ok { - return nil, errors.New("eth fetcher: unable to set total difficulty") - } - iplds.BlockNumber = cids.BlockNumber - - tx, err := f.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - iplds.Header, err = f.FetchHeader(tx, cids.Header) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) - } - iplds.Uncles, err = f.FetchUncles(tx, cids.Uncles) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) - } - iplds.Transactions, err = f.FetchTrxs(tx, cids.Transactions) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) - } - iplds.Receipts, err = f.FetchRcts(tx, cids.Receipts) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) - } - iplds.StateNodes, err = f.FetchState(tx, cids.StateNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) - } - iplds.StorageNodes, err = f.FetchStorage(tx, cids.StorageNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) - } - return iplds, err -} - -// FetchHeader fetches header -func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) { - log.Debug("fetching header ipld") - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return models.IPLDModel{}, err - } - - headerBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return models.IPLDModel{}, err - } - return models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: headerBytes, - Key: c.CID, - }, nil -} - -// FetchUncles fetches uncles -func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) { - log.Debug("fetching uncle iplds") - uncleIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - uncleBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return nil, err - } - uncleIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: uncleBytes, - Key: c.CID, - } - } - return uncleIPLDs, nil -} - -// FetchTrxs fetches transactions -func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]models.IPLDModel, error) { - log.Debug("fetching transaction iplds") - trxIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - txBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber) - if err != nil { - return nil, err - } - trxIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: txBytes, - Key: c.CID, - } - } - return trxIPLDs, nil -} - -// FetchRcts fetches receipts -func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]models.IPLDModel, error) { - log.Debug("fetching receipt iplds") - rctIPLDs := make([]models.IPLDModel, len(cids)) - for i, c := range cids { - blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - rctBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.LeafMhKey, blockNumber) - if err != nil { - return nil, err - } - //nodeVal, err := DecodeLeafNode(rctBytes) - rctIPLDs[i] = models.IPLDModel{ - BlockNumber: c.BlockNumber, - Data: rctBytes, - Key: c.LeafCID, - } - } - return rctIPLDs, nil -} - -// FetchState fetches state nodes -func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) { - log.Debug("fetching state iplds") - stateNodes := make([]StateNode, 0, len(cids)) - for _, stateNode := range cids { - if stateNode.CID == "" { - continue - } - blockNumber, err := strconv.ParseUint(stateNode.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - stateBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, stateNode.MhKey, blockNumber) - if err != nil { - return nil, err - } - stateNodes = append(stateNodes, StateNode{ - IPLD: models.IPLDModel{ - BlockNumber: stateNode.BlockNumber, - Data: stateBytes, - Key: stateNode.CID, - }, - StateLeafKey: common.HexToHash(stateNode.StateKey), - Type: ResolveToNodeType(stateNode.NodeType), - Path: stateNode.Path, - }) - } - return stateNodes, nil -} - -// FetchStorage fetches storage nodes -func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithStateKeyModel) ([]StorageNode, error) { - log.Debug("fetching storage iplds") - storageNodes := make([]StorageNode, 0, len(cids)) - for _, storageNode := range cids { - if storageNode.CID == "" || storageNode.StateKey == "" { - continue - } - blockNumber, err := strconv.ParseUint(storageNode.BlockNumber, 10, 64) - if err != nil { - return nil, err - } - storageBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, storageNode.MhKey, blockNumber) - if err != nil { - return nil, err - } - storageNodes = append(storageNodes, StorageNode{ - IPLD: models.IPLDModel{ - BlockNumber: storageNode.BlockNumber, - Data: storageBytes, - Key: storageNode.CID, - }, - StateLeafKey: common.HexToHash(storageNode.StateKey), - StorageLeafKey: common.HexToHash(storageNode.StorageKey), - Type: ResolveToNodeType(storageNode.NodeType), - Path: storageNode.Path, - }) - } - return storageNodes, nil -} diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go deleted file mode 100644 index 38d7d84c..00000000 --- a/pkg/eth/ipld_fetcher_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" - "github.com/jmoiron/sqlx" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth" - "github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers" - "github.com/cerc-io/ipld-eth-server/v4/pkg/shared" -) - -var _ = Describe("IPLDFetcher", func() { - var ( - db *sqlx.DB - pubAndIndexer interfaces.StateDiffIndexer - fetcher *eth.IPLDFetcher - ) - Describe("Fetch", func() { - BeforeEach(func() { - var ( - err error - tx interfaces.Batch - ) - db = shared.SetupDB() - pubAndIndexer = shared.SetupTestStateDiffIndexer(ctx, params.TestChainConfig, test_helpers.Genesis.Hash()) - - tx, err = pubAndIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) - for _, node := range test_helpers.MockStateNodes { - err = pubAndIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String()) - Expect(err).ToNot(HaveOccurred()) - } - - err = tx.Submit(err) - Expect(err).ToNot(HaveOccurred()) - fetcher = eth.NewIPLDFetcher(db) - - }) - AfterEach(func() { - shared.TearDownDB(db) - }) - - It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { - iplds, err := fetcher.Fetch(*test_helpers.MockCIDWrapper) - Expect(err).ToNot(HaveOccurred()) - Expect(iplds).ToNot(BeNil()) - Expect(iplds.TotalDifficulty).To(Equal(test_helpers.MockConvertedPayload.TotalDifficulty)) - Expect(iplds.BlockNumber).To(Equal(test_helpers.MockConvertedPayload.Block.Number())) - Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header)) - Expect(len(iplds.Uncles)).To(Equal(0)) - Expect(iplds.Transactions).To(Equal(test_helpers.MockIPLDs.Transactions)) - Expect(iplds.Receipts).To(Equal(test_helpers.MockIPLDs.Receipts)) - Expect(iplds.StateNodes).To(Equal(test_helpers.MockIPLDs.StateNodes)) - Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes)) - }) - }) -}) diff --git a/pkg/serve/helpers.go b/pkg/serve/helpers.go deleted file mode 100644 index 358568a3..00000000 --- a/pkg/serve/helpers.go +++ /dev/null @@ -1,37 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package serve - -import "github.com/cerc-io/ipld-eth-server/v4/pkg/log" - -func sendNonBlockingErr(sub Subscription, err error) { - log.Error(err) - select { - case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Flag: EmptyFlag}: - default: - log.Infof("unable to send error to subscription %s", sub.ID) - } -} - -func sendNonBlockingQuit(sub Subscription) { - select { - case sub.QuitChan <- true: - log.Infof("closing subscription %s", sub.ID) - default: - log.Infof("unable to close subscription %s; channel has no receiver", sub.ID) - } -} diff --git a/pkg/serve/subscription.go b/pkg/serve/subscription.go deleted file mode 100644 index 41383590..00000000 --- a/pkg/serve/subscription.go +++ /dev/null @@ -1,60 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package serve - -import ( - "errors" - - "github.com/ethereum/go-ethereum/rpc" -) - -type Flag int32 - -const ( - EmptyFlag Flag = iota - BackFillCompleteFlag -) - -// Subscription holds the information for an individual client subscription to the watcher -type Subscription struct { - ID rpc.ID - PayloadChan chan<- SubscriptionPayload - QuitChan chan<- bool -} - -// SubscriptionPayload is the struct for a watcher data subscription payload -// It carries data of a type specific to the chain being supported/queried and an error message -type SubscriptionPayload struct { - Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload - Height int64 `json:"height"` - Err string `json:"err"` // field for error - Flag Flag `json:"flag"` // field for message -} - -func (sp SubscriptionPayload) Error() error { - if sp.Err == "" { - return nil - } - return errors.New(sp.Err) -} - -func (sp SubscriptionPayload) BackFillComplete() bool { - if sp.Flag == BackFillCompleteFlag { - return true - } - return false -}