diff --git a/cmd/root.go b/cmd/root.go index 1b647625..47b15592 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -40,7 +40,7 @@ var ( cfgFile string databaseConfig config.Database genConfig config.Plugin - subConfig config.Subscription + subscriptionConfig config.Subscription ipc string levelDbPath string queueRecheckInterval time.Duration diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index b7bd6273..b373c9c3 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -33,14 +33,14 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/geth/client" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) // streamSubscribeCmd represents the streamSubscribe command var streamSubscribeCmd = &cobra.Command{ Use: "streamSubscribe", Short: "This command is used to subscribe to the seed node stream with the provided filters", - Long: ``, + Long: `This command is for demo and testing purposes and is used to subscribe to the seed node with the provided subscription configuration parameters. +It does not do anything with the data streamed from the seed node other than unpack it and print it out for demonstration purposes.`, Run: func(cmd *cobra.Command, args []string) { streamSubscribe() }, @@ -52,17 +52,17 @@ func init() { func streamSubscribe() { // Prep the subscription config/filters to be sent to the server - subscriptionConfig() + configureSubscription() // Create a new rpc client and a subscription streamer with that client rpcClient := getRpcClient() - str := streamer.NewSeedStreamer(rpcClient) + str := streamer.NewSeedNodeStreamer(rpcClient) // Buffered channel for reading subscription payloads - payloadChan := make(chan ipfs.ResponsePayload, 20000) + payloadChan := make(chan streamer.SeedNodePayload, 20000) // Subscribe to the seed node service with the given config/filter parameters - sub, err := str.Stream(payloadChan, subConfig) + sub, err := str.Stream(payloadChan, subscriptionConfig) if err != nil { log.Fatal(err) } @@ -161,9 +161,9 @@ func streamSubscribe() { } } -func subscriptionConfig() { +func configureSubscription() { log.Info("loading subscription config") - subConfig = config.Subscription{ + subscriptionConfig = config.Subscription{ // Below default to false, which means we do not backfill by default BackFill: viper.GetBool("subscription.backfill"), BackFillOnly: viper.GetBool("subscription.backfillOnly"), diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index 0ff96bc5..1ea3a718 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -20,6 +20,8 @@ import ( "path/filepath" syn "sync" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/spf13/viper" log "github.com/sirupsen/logrus" @@ -33,7 +35,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/geth/client" vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc" "github.com/vulcanize/vulcanizedb/pkg/geth/node" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/utils" ) @@ -68,7 +69,7 @@ func syncAndPublish() { } ipfsPath = filepath.Join(home, ".ipfs") } - processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) + processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) if err != nil { log.Fatal(err) } diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 1c23f698..63182caa 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -20,6 +20,8 @@ import ( "path/filepath" syn "sync" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -61,7 +63,7 @@ func syncPublishScreenAndServe() { } ipfsPath = filepath.Join(home, ".ipfs") } - processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) + processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) if err != nil { log.Fatal(err) } @@ -94,7 +96,9 @@ func syncPublishScreenAndServe() { if wsEndpoint == "" { wsEndpoint = "127.0.0.1:80" } - _, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, nil, true) + var exposeAll = true + var wsOrigins []string = nil + _, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll) if err != nil { log.Fatal(err) } diff --git a/db/migrations/00033_create_ipfs_blocks_table.sql b/db/migrations/00033_create_ipfs_blocks_table.sql index 03a99488..6e3941e0 100644 --- a/db/migrations/00033_create_ipfs_blocks_table.sql +++ b/db/migrations/00033_create_ipfs_blocks_table.sql @@ -1,5 +1,5 @@ -- +goose Up -CREATE TABLE public.blocks ( +CREATE TABLE IF NOT EXISTS public.blocks ( key TEXT UNIQUE NOT NULL, data BYTEA NOT NULL ); diff --git a/documentation/contributing.md b/documentation/contributing.md index 6c49ce53..9b198261 100644 --- a/documentation/contributing.md +++ b/documentation/contributing.md @@ -14,7 +14,7 @@ conform to the [standard-readme specification](https://github.com/RichardLitt/standard-readme). - Once a Pull Request has received two approvals it can be merged in by a core developer. -Pull requests should be opened against the `master` branch. Periodically, updates on `master` will be ported over to `staging` for tagged release. +Pull requests should be opened against the `staging` branch. Periodically, updates on `staging` will be ported over to `master` for tagged release. ## Creating a new migration file 1. `make new_migration NAME=add_columnA_to_table1` diff --git a/documentation/seed-node.md b/documentation/seed-node.md index 37388410..f3b5ed20 100644 --- a/documentation/seed-node.md +++ b/documentation/seed-node.md @@ -131,8 +131,8 @@ when the geth sync was started), and `client.ipfsPath` which is the path the ipf #### syncPublishScreenAndServe -`syncPublishScreenAndServe` does everythin th at `syncAndPublish` does, plut it opens up an RPC server which exposes -an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to thier transformations +`syncPublishScreenAndServe` does everything that `syncAndPublish` does, plus it opens up an RPC server which exposes +an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to their transformations Usage: @@ -152,11 +152,12 @@ The config file for the `syncPublishScreenAndServe` command has two additional f [server] ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" - wsEndpoint = "127.0.0.1:2019" + wsEndpoint = "127.0.0.1:80" ``` The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively. +Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to be known by transformers for them to subscribe to the seed node. #### Subscribing diff --git a/libraries/shared/streamer/seed_node_streamer.go b/libraries/shared/streamer/seed_node_streamer.go new file mode 100644 index 00000000..d491db64 --- /dev/null +++ b/libraries/shared/streamer/seed_node_streamer.go @@ -0,0 +1,84 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Streamer is used by watchers to stream eth data from a vulcanizedb seed node +package streamer + +import ( + "encoding/json" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +// ISeedNodeStreamer is the interface for streaming data from a vulcanizeDB seed node +type ISeedNodeStreamer interface { + Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) +} + +// SeedNodeStreamer is the underlying struct for the ISeedNodeStreamer interface +type SeedNodeStreamer struct { + Client core.RpcClient +} + +// NewSeedNodeStreamer creates a pointer to a new SeedNodeStreamer which satisfies the ISeedNodeStreamer interface +func NewSeedNodeStreamer(client core.RpcClient) *SeedNodeStreamer { + return &SeedNodeStreamer{ + Client: client, + } +} + +// Stream is the main loop for subscribing to data from a vulcanizedb seed node +func (sds *SeedNodeStreamer) Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { + return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) +} + +// Payload holds the data returned from the seed node to the requesting client +type SeedNodePayload struct { + BlockNumber *big.Int `json:"blockNumber"` + HeadersRlp [][]byte `json:"headersRlp"` + UnclesRlp [][]byte `json:"unclesRlp"` + TransactionsRlp [][]byte `json:"transactionsRlp"` + ReceiptsRlp [][]byte `json:"receiptsRlp"` + StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"` + StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"` + ErrMsg string `json:"errMsg"` + + encoded []byte + err error +} + +func (sd *SeedNodePayload) ensureEncoded() { + if sd.encoded == nil && sd.err == nil { + sd.encoded, sd.err = json.Marshal(sd) + } +} + +// Length to implement Encoder interface for StateDiff +func (sd *SeedNodePayload) Length() int { + sd.ensureEncoded() + return len(sd.encoded) +} + +// Encode to implement Encoder interface for StateDiff +func (sd *SeedNodePayload) Encode() ([]byte, error) { + sd.ensureEncoded() + return sd.encoded, sd.err +} diff --git a/libraries/shared/streamer/streamer.go b/libraries/shared/streamer/streamer.go deleted file mode 100644 index 5d74a449..00000000 --- a/libraries/shared/streamer/streamer.go +++ /dev/null @@ -1,48 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -// Streamer is used by watchers to stream eth data from a vulcanizedb seed node -package streamer - -import ( - "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/vulcanizedb/pkg/config" - - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" -) - -// IStreamer is the interface for streaming data from a vulcanizeDB seed node -type IStreamer interface { - Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) -} - -// Streamer is the underlying struct for the IStreamer interface -type Streamer struct { - Client core.RpcClient -} - -// NewSeedStreamer creates a pointer to a new Streamer which satisfies the IStreamer interface -func NewSeedStreamer(client core.RpcClient) *Streamer { - return &Streamer{ - Client: client, - } -} - -// Stream is the main loop for subscribing to data from a vulcanizedb seed node -func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) -} diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index 0aa52ec9..c7b9aa23 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -49,6 +49,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack block rlp to access fields block := new(types.Block) err := rlp.DecodeBytes(payload.BlockRlp, block) + if err != nil { + return nil, err + } header := block.Header() headerRlp, err := rlp.EncodeToBytes(header) if err != nil { @@ -74,7 +77,7 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { } txMeta := &TrxMetaData{ Dst: handleNullAddr(trx.To()), - Src: from.Hex(), + Src: handleNullAddr(&from), } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta) diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go deleted file mode 100644 index 545dea67..00000000 --- a/pkg/ipfs/converter_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" -) - -var _ = Describe("Converter", func() { - Describe("Convert", func() { - It("Converts StatediffPayloads into IPLDPayloads", func() { - mockConverter := mocks.PayloadConverter{} - mockConverter.ReturnIPLDPayload = &mocks.MockIPLDPayload - ipldPayload, err := mockConverter.Convert(mocks.MockStatediffPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(ipldPayload).To(Equal(&mocks.MockIPLDPayload)) - Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStatediffPayload)) - }) - }) -}) diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index fffd0514..9ac58001 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -28,7 +28,7 @@ import ( // IPLDFetcher is an interface for fetching IPLDs type IPLDFetcher interface { - FetchCIDs(cids CidWrapper) (*IpldWrapper, error) + FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) } // EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS @@ -47,11 +47,11 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { }, nil } -// FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper -func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { +// FetchCIDs is the exported method for fetching and returning all the cids passed in a CIDWrapper +func (f *EthIPLDFetcher) FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) { log.Debug("fetching iplds") - blocks := &IpldWrapper{ + blocks := &IPLDWrapper{ BlockNumber: cids.BlockNumber, Headers: make([]blocks.Block, 0), Uncles: make([]blocks.Block, 0), @@ -91,7 +91,7 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { // fetchHeaders fetches headers // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchHeaders(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching header iplds") headerCids := make([]cid.Cid, 0, len(cids.Headers)) for _, c := range cids.Headers { @@ -110,7 +110,7 @@ func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) erro // fetchUncles fetches uncles // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchUncles(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching uncle iplds") uncleCids := make([]cid.Cid, 0, len(cids.Uncles)) for _, c := range cids.Uncles { @@ -129,7 +129,7 @@ func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error // fetchTrxs fetches transactions // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchTrxs(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching transaction iplds") trxCids := make([]cid.Cid, 0, len(cids.Transactions)) for _, c := range cids.Transactions { @@ -148,7 +148,7 @@ func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error { // fetchRcts fetches receipts // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchRcts(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching receipt iplds") rctCids := make([]cid.Cid, 0, len(cids.Receipts)) for _, c := range cids.Receipts { @@ -168,7 +168,7 @@ func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error { // fetchState fetches state nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state keys -func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchState(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching state iplds") for _, stateNode := range cids.StateNodes { if stateNode.CID == "" || stateNode.Key == "" { @@ -190,7 +190,7 @@ func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error // fetchStorage fetches storage nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state and storage keys -func (f *EthIPLDFetcher) fetchStorage(cids CidWrapper, blocks *IpldWrapper) error { +func (f *EthIPLDFetcher) fetchStorage(cids CIDWrapper, blocks *IPLDWrapper) error { log.Debug("fetching storage iplds") for _, storageNode := range cids.StorageNodes { if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/helpers.go index 230b2342..9096762a 100644 --- a/pkg/ipfs/helpers.go +++ b/pkg/ipfs/helpers.go @@ -55,8 +55,9 @@ func HexToKey(hex string) common.Hash { return crypto.Keccak256Hash(addr[:]) } -func emptyCidWrapper(cids CidWrapper) bool { - if len(cids.Transactions) > 0 || len(cids.Headers) > 0 || len(cids.Uncles) > 0 || len(cids.Receipts) > 0 || len(cids.StateNodes) > 0 || len(cids.StorageNodes) > 0 || cids.BlockNumber == nil { +// EmptyCIDWrapper returns whether or not the provided CIDWrapper has any Cids we need to process +func EmptyCIDWrapper(cids CIDWrapper) bool { + if len(cids.Transactions) > 0 || len(cids.Headers) > 0 || len(cids.Uncles) > 0 || len(cids.Receipts) > 0 || len(cids.StateNodes) > 0 || len(cids.StorageNodes) > 0 { return false } return true diff --git a/pkg/ipfs/mocks/api.go b/pkg/ipfs/mocks/api.go deleted file mode 100644 index f726b26e..00000000 --- a/pkg/ipfs/mocks/api.go +++ /dev/null @@ -1 +0,0 @@ -package mocks diff --git a/pkg/ipfs/mocks/screener.go b/pkg/ipfs/mocks/screener.go deleted file mode 100644 index f726b26e..00000000 --- a/pkg/ipfs/mocks/screener.go +++ /dev/null @@ -1 +0,0 @@ -package mocks diff --git a/pkg/ipfs/mocks/streamer.go b/pkg/ipfs/mocks/streamer.go deleted file mode 100644 index cd387ee6..00000000 --- a/pkg/ipfs/mocks/streamer.go +++ /dev/null @@ -1,43 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" -) - -// StateDiffStreamer is the underlying struct for the Streamer interface -type StateDiffStreamer struct { - PassedPayloadChan chan statediff.Payload - ReturnSub *rpc.ClientSubscription - ReturnErr error - StreamPayloads []statediff.Payload -} - -// Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { - sds.PassedPayloadChan = payloadChan - - go func() { - for _, payload := range sds.StreamPayloads { - sds.PassedPayloadChan <- payload - } - }() - - return sds.ReturnSub, sds.ReturnErr -} diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 0bbeaf47..2e98327e 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -21,7 +21,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - rlp2 "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rlp" "github.com/ipfs/go-ipfs/plugin/loader" "github.com/vulcanize/eth-block-extractor/pkg/ipfs" @@ -30,7 +30,7 @@ import ( "github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_transactions" "github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_state_trie" "github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_storage_trie" - "github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp" + rlp2 "github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp" ) // IPLDPublisher is the interface for publishing an IPLD payload @@ -66,7 +66,7 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { return nil, err } return &Publisher{ - HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp.RlpDecoder{}), + HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp2.RlpDecoder{}), TransactionPutter: eth_block_transactions.NewBlockTransactionsDagPutter(node), ReceiptPutter: eth_block_receipts.NewEthBlockReceiptDagPutter(node), StatePutter: eth_state_trie.NewStateTrieDagPutter(node), @@ -85,7 +85,7 @@ func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { // Process and publish uncles uncleCids := make(map[common.Hash]string) for _, uncle := range payload.BlockBody.Uncles { - uncleRlp, err := rlp2.EncodeToBytes(uncle) + uncleRlp, err := rlp.EncodeToBytes(uncle) if err != nil { return nil, err } diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go deleted file mode 100644 index e3279d06..00000000 --- a/pkg/ipfs/publisher_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" -) - -var _ = Describe("Publisher", func() { - Describe("Publish", func() { - It("Publishes IPLDPayload to IPFS", func() { - mockPublisher := mocks.IPLDPublisher{} - mockPublisher.ReturnCIDPayload = &mocks.MockCIDPayload - cidPayload, err := mockPublisher.Publish(&mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(cidPayload).To(Equal(&mocks.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(&mocks.MockIPLDPayload)) - }) - }) -}) diff --git a/pkg/ipfs/repository_test.go b/pkg/ipfs/repository_test.go deleted file mode 100644 index dcffd0fe..00000000 --- a/pkg/ipfs/repository_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" -) - -var _ = Describe("Repository", func() { - Describe("Index", func() { - It("Indexes CIDs against their metadata", func() { - mockRepo := mocks.CIDRepository{} - err := mockRepo.Index(&mocks.MockCIDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(mockRepo.PassedCIDPayload).To(Equal(&mocks.MockCIDPayload)) - }) - }) -}) diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index 63d7a19a..7bc9e3dc 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -19,11 +19,12 @@ package ipfs import ( "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" ) // IPLDResolver is the interface to resolving IPLDs type IPLDResolver interface { - ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) + ResolveIPLDs(ipfsBlocks IPLDWrapper) (*streamer.SeedNodePayload, error) } // EthIPLDResolver is the underlying struct to support the IPLDResolver interface @@ -35,8 +36,8 @@ func NewIPLDResolver() *EthIPLDResolver { } // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper -func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) { - response := new(ResponsePayload) +func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (*streamer.SeedNodePayload, error) { + response := new(streamer.SeedNodePayload) response.BlockNumber = ipfsBlocks.BlockNumber eir.resolveHeaders(ipfsBlocks.Headers, response) eir.resolveUncles(ipfsBlocks.Uncles, response) @@ -47,35 +48,35 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePaylo return response, nil } -func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SeedNodePayload) { for _, block := range blocks { raw := block.RawData() response.HeadersRlp = append(response.HeadersRlp, raw) } } -func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SeedNodePayload) { for _, block := range blocks { raw := block.RawData() response.UnclesRlp = append(response.UnclesRlp, raw) } } -func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SeedNodePayload) { for _, block := range blocks { raw := block.RawData() response.TransactionsRlp = append(response.TransactionsRlp, raw) } } -func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SeedNodePayload) { for _, block := range blocks { raw := block.RawData() response.ReceiptsRlp = append(response.ReceiptsRlp, raw) } } -func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { if response.StateNodesRlp == nil { response.StateNodesRlp = make(map[common.Hash][]byte) } @@ -85,7 +86,7 @@ func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, re } } -func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *ResponsePayload) { +func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { if response.StateNodesRlp == nil { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) } diff --git a/pkg/ipfs/retreiver_test.go b/pkg/ipfs/retreiver_test.go deleted file mode 100644 index a0c59fc2..00000000 --- a/pkg/ipfs/retreiver_test.go +++ /dev/null @@ -1 +0,0 @@ -package ipfs diff --git a/pkg/ipfs/screener_test.go b/pkg/ipfs/screener_test.go deleted file mode 100644 index a0c59fc2..00000000 --- a/pkg/ipfs/screener_test.go +++ /dev/null @@ -1 +0,0 @@ -package ipfs diff --git a/pkg/ipfs/streamer.go b/pkg/ipfs/streamer.go deleted file mode 100644 index 095ec927..00000000 --- a/pkg/ipfs/streamer.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs - -import ( - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" - - "github.com/vulcanize/vulcanizedb/pkg/core" -) - -// StateDiffStreamer is the interface for streaming a statediff subscription -type StateDiffStreamer interface { - Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) -} - -// Streamer is the underlying struct for the StateDiffStreamer interface -type Streamer struct { - Client core.RpcClient -} - -// NewStateDiffStreamer creates a pointer to a new Streamer which satisfies the StateDiffStreamer interface -func NewStateDiffStreamer(client core.RpcClient) *Streamer { - return &Streamer{ - Client: client, - } -} - -// Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *Streamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("statediff", payloadChan, "stream") -} diff --git a/pkg/ipfs/streamer_test.go b/pkg/ipfs/streamer_test.go deleted file mode 100644 index 70387142..00000000 --- a/pkg/ipfs/streamer_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs_test - -import ( - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" -) - -var _ = Describe("Streamer", func() { - Describe("Stream", func() { - It("Streams StatediffPayloads from a Geth RPC subscription", func() { - mockStreamer := mocks.StateDiffStreamer{} - mockStreamer.ReturnSub = &rpc.ClientSubscription{} - mockStreamer.StreamPayloads = []statediff.Payload{ - mocks.MockStatediffPayload, - } - payloadChan := make(chan statediff.Payload, 1) - sub, err := mockStreamer.Stream(payloadChan) - Expect(err).ToNot(HaveOccurred()) - Expect(sub).To(Equal(&rpc.ClientSubscription{})) - Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) - streamedPayload := <-payloadChan - Expect(streamedPayload).To(Equal(mocks.MockStatediffPayload)) - }) - }) -}) diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index b1b43dec..d560fac4 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -17,7 +17,6 @@ package ipfs import ( - "encoding/json" "math/big" "github.com/ethereum/go-ethereum/common" @@ -25,47 +24,8 @@ import ( "github.com/ipfs/go-block-format" ) -// Subscription holds the information for an individual client subscription -type Subscription struct { - PayloadChan chan<- ResponsePayload - QuitChan chan<- bool -} - -// ResponsePayload holds the data returned from the seed node to the requesting client -type ResponsePayload struct { - BlockNumber *big.Int `json:"blockNumber"` - HeadersRlp [][]byte `json:"headersRlp"` - UnclesRlp [][]byte `json:"unclesRlp"` - TransactionsRlp [][]byte `json:"transactionsRlp"` - ReceiptsRlp [][]byte `json:"receiptsRlp"` - StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"` - StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"` - ErrMsg string `json:"errMsg"` - - encoded []byte - err error -} - -func (sd *ResponsePayload) ensureEncoded() { - if sd.encoded == nil && sd.err == nil { - sd.encoded, sd.err = json.Marshal(sd) - } -} - -// Length to implement Encoder interface for StateDiff -func (sd *ResponsePayload) Length() int { - sd.ensureEncoded() - return len(sd.encoded) -} - -// Encode to implement Encoder interface for StateDiff -func (sd *ResponsePayload) Encode() ([]byte, error) { - sd.ensureEncoded() - return sd.encoded, sd.err -} - -// CidWrapper is used to package CIDs retrieved from the local Postgres cache -type CidWrapper struct { +// CIDWrapper is used to package CIDs retrieved from the local Postgres cache +type CIDWrapper struct { BlockNumber *big.Int Headers []string Uncles []string @@ -75,8 +35,8 @@ type CidWrapper struct { StorageNodes []StorageNodeCID } -// IpldWrapper is used to package raw IPLD block data for resolution -type IpldWrapper struct { +// IPLDWrapper is used to package raw IPLD block data for resolution +type IPLDWrapper struct { BlockNumber *big.Int Headers []blocks.Block Uncles []blocks.Block diff --git a/pkg/ipfs/api.go b/pkg/seed_node/api.go similarity index 90% rename from pkg/ipfs/api.go rename to pkg/seed_node/api.go index d33de181..7364ad73 100644 --- a/pkg/ipfs/api.go +++ b/pkg/seed_node/api.go @@ -14,11 +14,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node import ( "context" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" @@ -33,13 +35,13 @@ const APIVersion = "0.0.1" // PublicSeedNodeAPI is the public api for the seed node type PublicSeedNodeAPI struct { - snp SyncPublishScreenAndServe + snp Processor } // NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process -func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI { +func NewPublicSeedNodeAPI(seedNodeProcessor Processor) *PublicSeedNodeAPI { return &PublicSeedNodeAPI{ - snp: snp, + snp: seedNodeProcessor, } } @@ -56,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S go func() { // subscribe to events from the SyncPublishScreenAndServe service - payloadChannel := make(chan ResponsePayload, payloadChanBufferSize) + payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize) quitChan := make(chan bool, 1) go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) diff --git a/pkg/ipfs/screener.go b/pkg/seed_node/filterer.go similarity index 80% rename from pkg/ipfs/screener.go rename to pkg/seed_node/filterer.go index f59a55c4..a2ac5bcf 100644 --- a/pkg/ipfs/screener.go +++ b/pkg/seed_node/filterer.go @@ -14,11 +14,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node import ( "bytes" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -26,22 +29,22 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/config" ) -// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload -type ResponseScreener interface { - ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) +// ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload +type ResponseFilterer interface { + FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (*streamer.SeedNodePayload, error) } -// Screener is the underlying struct for the ReponseScreener interface -type Screener struct{} +// Filterer is the underlying struct for the ReponseFilterer interface +type Filterer struct{} -// NewResponseScreener creates a new Screener satisfyign the ReponseScreener interface -func NewResponseScreener() *Screener { - return &Screener{} +// NewResponseFilterer creates a new Filterer satisfyign the ReponseFilterer interface +func NewResponseFilterer() *Filterer { + return &Filterer{} } -// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload -func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) { - response := new(ResponsePayload) +// FilterResponse is used to filter through eth data to extract and package requested data into a Payload +func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (*streamer.SeedNodePayload, error) { + response := new(streamer.SeedNodePayload) err := s.filterHeaders(streamFilters, response, payload) if err != nil { return nil, err @@ -66,7 +69,7 @@ func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPL return response, nil } -func (s *Screener) filterHeaders(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { +func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) if !streamFilters.HeaderFilter.FinalOnly { @@ -89,7 +92,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *Screener) filterTransactions(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { +func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, trx := range payload.BlockBody.Transactions { @@ -125,7 +128,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { +func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { @@ -181,12 +184,12 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac return false } -func (s *Screener) filterState(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { +func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.StateNodesRlp = make(map[common.Hash][]byte) keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) for _, addr := range streamFilters.StateFilter.Addresses { - keyFilter := AddressToKey(common.HexToAddress(addr)) + keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) keyFilters = append(keyFilters, keyFilter) } for key, stateNode := range payload.StateNodes { @@ -213,17 +216,17 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *Screener) filterStorage(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { +func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) for _, addr := range streamFilters.StorageFilter.Addresses { - keyFilter := AddressToKey(common.HexToAddress(addr)) + keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) stateKeyFilters = append(stateKeyFilters, keyFilter) } storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys)) for _, store := range streamFilters.StorageFilter.StorageKeys { - keyFilter := HexToKey(store) + keyFilter := ipfs.HexToKey(store) storageKeyFilters = append(storageKeyFilters, keyFilter) } for stateKey, storageNodes := range payload.StorageNodes { diff --git a/pkg/ipfs/mocks/repository.go b/pkg/seed_node/mocks/repository.go similarity index 100% rename from pkg/ipfs/mocks/repository.go rename to pkg/seed_node/mocks/repository.go diff --git a/pkg/ipfs/repository.go b/pkg/seed_node/repository.go similarity index 90% rename from pkg/ipfs/repository.go rename to pkg/seed_node/repository.go index 5fd9b021..31fbc038 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/seed_node/repository.go @@ -14,18 +14,19 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -// CIDRepository is an interface for indexing CIDPayloads +// CIDRepository is an interface for indexing ipfs.CIDPayloads type CIDRepository interface { - Index(cidPayload *CIDPayload) error + Index(cidPayload *ipfs.CIDPayload) error } // Repository is the underlying struct for the CIDRepository interface @@ -41,7 +42,7 @@ func NewCIDRepository(db *postgres.DB) *Repository { } // Index indexes a cidPayload in Postgres -func (repo *Repository) Index(cidPayload *CIDPayload) error { +func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error { tx, err := repo.db.Beginx() if err != nil { return err @@ -87,7 +88,7 @@ func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string return err } -func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { +func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for hash, trxCidMeta := range payload.TransactionCIDs { var txID int64 err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) @@ -108,13 +109,13 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CID return nil } -func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error { +func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ipfs.ReceiptMetaData, txID int64) error { _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s) VALUES ($1, $2, $3, $4)`, txID, cidMeta.CID, cidMeta.ContractAddress, pq.Array(cidMeta.Topic0s)) return err } -func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { +func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for accountKey, stateCID := range payload.StateNodeCIDs { var stateID int64 err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) @@ -134,7 +135,7 @@ func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayloa return nil } -func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error { +func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID ipfs.StorageNodeCID, stateID int64) error { _, err := tx.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4) ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`, stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) diff --git a/pkg/ipfs/retreiver.go b/pkg/seed_node/retreiver.go similarity index 94% rename from pkg/ipfs/retreiver.go rename to pkg/seed_node/retreiver.go index aa10da00..1bbd512c 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/seed_node/retreiver.go @@ -14,11 +14,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node import ( "math/big" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" @@ -29,7 +31,7 @@ import ( // CIDRetriever is the interface for retrieving CIDs from the Postgres cache type CIDRetriever interface { - RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error) + RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) RetrieveLastBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error) } @@ -60,7 +62,7 @@ func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) { } // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error) { +func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) { log.Debug("retrieving cids") var err error tx, err := ecr.db.Beginx() @@ -69,7 +71,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc } // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO - cw := new(CidWrapper) + cw := new(ipfs.CIDWrapper) cw.BlockNumber = big.NewInt(blockNumber) // Retrieve cached header CIDs @@ -212,7 +214,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su return receiptCids, err } -func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StateNodeCID, error) { +func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StateNodeCID, error) { log.Debug("retrieving state cids for block ", blockNumber) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) @@ -222,7 +224,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config. if addrLen > 0 { keys := make([]string, 0, addrLen) for _, addr := range streamFilters.StateFilter.Addresses { - keys = append(keys, HexToKey(addr).Hex()) + keys = append(keys, ipfs.HexToKey(addr).Hex()) } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) @@ -230,12 +232,12 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config. if !streamFilters.StorageFilter.IntermediateNodes { pgStr += ` AND state_cids.leaf = TRUE` } - stateNodeCIDs := make([]StateNodeCID, 0) + stateNodeCIDs := make([]ipfs.StateNodeCID, 0) err := tx.Select(&stateNodeCIDs, pgStr, args...) return stateNodeCIDs, err } -func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StorageNodeCID, error) { +func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StorageNodeCID, error) { log.Debug("retrieving storage cids for block ", blockNumber) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids @@ -247,7 +249,7 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi if addrLen > 0 { keys := make([]string, 0, addrLen) for _, addr := range streamFilters.StorageFilter.Addresses { - keys = append(keys, HexToKey(addr).Hex()) + keys = append(keys, ipfs.HexToKey(addr).Hex()) } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) @@ -259,7 +261,7 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi if !streamFilters.StorageFilter.IntermediateNodes { pgStr += ` AND storage_cids.leaf = TRUE` } - storageNodeCIDs := make([]StorageNodeCID, 0) + storageNodeCIDs := make([]ipfs.StorageNodeCID, 0) err := tx.Select(&storageNodeCIDs, pgStr, args...) return storageNodeCIDs, err } diff --git a/pkg/ipfs/ipfs_suite_test.go b/pkg/seed_node/seed_node_suite_test.go similarity index 90% rename from pkg/ipfs/ipfs_suite_test.go rename to pkg/seed_node/seed_node_suite_test.go index 98eb91fa..e0122031 100644 --- a/pkg/ipfs/ipfs_suite_test.go +++ b/pkg/seed_node/seed_node_suite_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs_test +package seed_node_test import ( "io/ioutil" @@ -26,9 +26,9 @@ import ( . "github.com/onsi/gomega" ) -func TestIPFS(t *testing.T) { +func TestSeedNode(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "IPFS Suite Test") + RunSpecs(t, "Seed Node Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/ipfs/service.go b/pkg/seed_node/service.go similarity index 84% rename from pkg/ipfs/service.go rename to pkg/seed_node/service.go index 98fe1885..99cd36c6 100644 --- a/pkg/ipfs/service.go +++ b/pkg/seed_node/service.go @@ -14,11 +14,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node import ( "sync" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" @@ -36,18 +39,18 @@ import ( const payloadChanBufferSize = 20000 // the max eth sub buffer size const workerPoolSize = 1 -// SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing, +// Processor is the top level interface for streaming, converting to IPLDs, publishing, // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients // This service is compatible with the Ethereum service interface (node.Service) -type SyncPublishScreenAndServe interface { +type Processor interface { // APIs(), Protocols(), Start() and Stop() node.Service // Main event loop for syncAndPublish processes - SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error + SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- ipfs.IPLDPayload, forwardQuitchan chan<- bool) error // Main event loop for handling client pub-sub - ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) + ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) + Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) } @@ -57,21 +60,21 @@ type Service struct { // Used to sync access to the Subscriptions sync.Mutex // Interface for streaming statediff payloads over a geth rpc subscription - Streamer StateDiffStreamer + Streamer streamer.IStateDiffStreamer // Interface for converting statediff payloads into ETH-IPLD object payloads - Converter PayloadConverter + Converter ipfs.PayloadConverter // Interface for publishing the ETH-IPLD payloads to IPFS - Publisher IPLDPublisher + Publisher ipfs.IPLDPublisher // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres Repository CIDRepository // Interface for filtering and serving data according to subscribed clients according to their specification - Screener ResponseScreener + Filterer ResponseFilterer // Interface for fetching ETH-IPLD objects from IPFS - Fetcher IPLDFetcher + Fetcher ipfs.IPLDFetcher // Interface for searching and retrieving CIDs from Postgres index Retriever CIDRetriever // Interface for resolving ipfs blocks to their data types - Resolver IPLDResolver + Resolver ipfs.IPLDResolver // Chan the processor uses to subscribe to state diff payloads from the Streamer PayloadChan chan statediff.Payload // Used to signal shutdown of the service @@ -82,25 +85,25 @@ type Service struct { SubscriptionTypes map[common.Hash]config.Subscription } -// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct -func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishScreenAndServe, error) { - publisher, err := NewIPLDPublisher(ipfsPath) +// NewProcessor creates a new Processor interface using an underlying Service struct +func NewProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (Processor, error) { + publisher, err := ipfs.NewIPLDPublisher(ipfsPath) if err != nil { return nil, err } - fetcher, err := NewIPLDFetcher(ipfsPath) + fetcher, err := ipfs.NewIPLDFetcher(ipfsPath) if err != nil { return nil, err } return &Service{ - Streamer: NewStateDiffStreamer(rpcClient), + Streamer: streamer.NewStateDiffStreamer(rpcClient), Repository: NewCIDRepository(db), - Converter: NewPayloadConverter(ethClient), + Converter: ipfs.NewPayloadConverter(ethClient), Publisher: publisher, - Screener: NewResponseScreener(), + Filterer: NewResponseFilterer(), Fetcher: fetcher, Retriever: NewCIDRetriever(db), - Resolver: NewIPLDResolver(), + Resolver: ipfs.NewIPLDResolver(), PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), QuitChan: qc, Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), @@ -128,7 +131,7 @@ func (sap *Service) APIs() []rpc.API { // SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // which filters and sends relevant data to client subscriptions, if there are any -func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- IPLDPayload, screenAndServeQuit chan<- bool) error { +func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- ipfs.IPLDPayload, screenAndServeQuit chan<- bool) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -136,7 +139,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha wg.Add(1) // Channels for forwarding data to the publishAndIndex workers - publishAndIndexPayload := make(chan IPLDPayload, payloadChanBufferSize) + publishAndIndexPayload := make(chan ipfs.IPLDPayload, payloadChanBufferSize) publishAndIndexQuit := make(chan bool, workerPoolSize) // publishAndIndex worker pool to handle publishing and indexing concurrently, while // limiting the number of Postgres connections we can possibly open so as to prevent error @@ -192,7 +195,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha return nil } -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPayload, publishAndIndexQuit <-chan bool) { +func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan ipfs.IPLDPayload, publishAndIndexQuit <-chan bool) { go func() { for { select { @@ -216,7 +219,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPa // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node // and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) { +func (sap *Service) ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) { go func() { for { select { @@ -233,7 +236,7 @@ func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, scr }() } -func (sap *Service) sendResponse(payload IPLDPayload) error { +func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error { sap.Lock() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type @@ -242,7 +245,7 @@ func (sap *Service) sendResponse(payload IPLDPayload) error { log.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) continue } - response, err := sap.Screener.ScreenResponse(subConfig, payload) + response, err := sap.Filterer.FilterResponse(subConfig, payload) if err != nil { log.Error(err) continue @@ -261,7 +264,7 @@ func (sap *Service) sendResponse(payload IPLDPayload) error { } // Subscribe is used by the API to subscribe to the service loop -func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) { +func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) { log.Info("Subscribing to the seed node service") // Subscription type is defined as the hash of its content // Group subscriptions by type and screen payloads once for subs of the same type @@ -299,7 +302,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio var err error startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() if err != nil { - sub.PayloadChan <- ResponsePayload{ + sub.PayloadChan <- streamer.SeedNodePayload{ ErrMsg: "unable to set block range; error: " + err.Error(), } } @@ -309,7 +312,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio if con.EndingBlock.Int64() <= 0 || con.EndingBlock.Int64() <= startingBlock { endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() if err != nil { - sub.PayloadChan <- ResponsePayload{ + sub.PayloadChan <- streamer.SeedNodePayload{ ErrMsg: "unable to set block range; error: " + err.Error(), } } @@ -323,18 +326,18 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio for i := con.StartingBlock.Int64(); i <= endingBlock; i++ { cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i) if err != nil { - sub.PayloadChan <- ResponsePayload{ + sub.PayloadChan <- streamer.SeedNodePayload{ ErrMsg: "CID retrieval error: " + err.Error(), } continue } - if emptyCidWrapper(*cidWrapper) { + if ipfs.EmptyCIDWrapper(*cidWrapper) { continue } blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper) if err != nil { log.Error(err) - sub.PayloadChan <- ResponsePayload{ + sub.PayloadChan <- streamer.SeedNodePayload{ ErrMsg: "IPLD fetching error: " + err.Error(), } continue @@ -342,7 +345,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) if err != nil { log.Error(err) - sub.PayloadChan <- ResponsePayload{ + sub.PayloadChan <- streamer.SeedNodePayload{ ErrMsg: "IPLD resolving error: " + err.Error(), } continue @@ -376,7 +379,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Start(*p2p.Server) error { log.Info("Starting seed node service") wg := new(sync.WaitGroup) - payloadChan := make(chan IPLDPayload, payloadChanBufferSize) + payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize) quitChan := make(chan bool, 1) if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { return err diff --git a/pkg/ipfs/service_test.go b/pkg/seed_node/service_test.go similarity index 87% rename from pkg/ipfs/service_test.go rename to pkg/seed_node/service_test.go index fbe0963f..9b113b69 100644 --- a/pkg/ipfs/service_test.go +++ b/pkg/seed_node/service_test.go @@ -14,19 +14,22 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs_test +package seed_node_test import ( "sync" "time" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" + mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks" ) var _ = Describe("Service", func() { @@ -36,14 +39,14 @@ var _ = Describe("Service", func() { wg := new(sync.WaitGroup) payloadChan := make(chan statediff.Payload, 1) quitChan := make(chan bool, 1) - mockCidRepo := &mocks.CIDRepository{ + mockCidRepo := &mocks3.CIDRepository{ ReturnErr: nil, } mockPublisher := &mocks.IPLDPublisher{ ReturnCIDPayload: &mocks.MockCIDPayload, ReturnErr: nil, } - mockStreamer := &mocks.StateDiffStreamer{ + mockStreamer := &mocks2.StateDiffStreamer{ ReturnSub: &rpc.ClientSubscription{}, StreamPayloads: []statediff.Payload{ mocks.MockStatediffPayload, @@ -54,7 +57,7 @@ var _ = Describe("Service", func() { ReturnIPLDPayload: &mocks.MockIPLDPayload, ReturnErr: nil, } - processor := &ipfs.Service{ + processor := &seed_node.Service{ Repository: mockCidRepo, Publisher: mockPublisher, Streamer: mockStreamer, diff --git a/pkg/ipfs/fetcher_test.go b/pkg/seed_node/subscription.go similarity index 71% rename from pkg/ipfs/fetcher_test.go rename to pkg/seed_node/subscription.go index 63fc346e..441d8d70 100644 --- a/pkg/ipfs/fetcher_test.go +++ b/pkg/seed_node/subscription.go @@ -14,4 +14,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package ipfs +package seed_node + +import ( + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" +) + +// Subscription holds the information for an individual client subscription to the seed node +type Subscription struct { + PayloadChan chan<- streamer.SeedNodePayload + QuitChan chan<- bool +}