From 40c3aff597c8df8aff7f31067e628053ea1fbc13 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 2 Oct 2019 09:10:37 -0500 Subject: [PATCH] seed => super; port 80 => port 8080; backfill process for the super_node --- cmd/streamSubscribe.go | 14 +- cmd/syncAndPublish.go | 78 +++++--- cmd/syncPublishScreenAndServe.go | 39 ++-- dockerfiles/seed_node/Dockerfile | 2 +- dockerfiles/seed_node/startup_script.sh | 2 +- documentation/{seed-node.md => super-node.md} | 57 +++--- environments/seedNodeSubscription.toml | 2 +- environments/syncPublishScreenAndServe.toml | 5 + .../shared/streamer/seed_node_streamer.go | 34 ++-- .../transformer/seed_node_transformer.go | 4 +- pkg/config/subscription.go | 2 +- pkg/ipfs/mocks/converter.go | 21 ++ pkg/ipfs/mocks/publisher.go | 20 ++ pkg/ipfs/mocks/test_data.go | 2 +- pkg/ipfs/resolver.go | 18 +- pkg/ipfs/resolver_test.go | 26 +-- pkg/seed_node/filterer_test.go | 153 -------------- pkg/{seed_node => super_node}/api.go | 24 +-- pkg/super_node/backfiller.go | 135 +++++++++++++ pkg/super_node/backfiller_test.go | 187 ++++++++++++++++++ pkg/{seed_node => super_node}/filterer.go | 28 +-- pkg/super_node/filterer_test.go | 153 ++++++++++++++ .../mocks/repository.go | 4 +- pkg/super_node/mocks/retriever.go | 44 +++++ pkg/{seed_node => super_node}/repository.go | 2 +- .../repository_test.go | 20 +- pkg/{seed_node => super_node}/retriever.go | 26 ++- .../retriever_test.go | 30 +-- .../seed_node_suite_test.go | 4 +- pkg/{seed_node => super_node}/service.go | 61 +++--- pkg/{seed_node => super_node}/service_test.go | 11 +- pkg/{seed_node => super_node}/subscription.go | 6 +- pkg/{seed_node => super_node}/test_helpers.go | 2 +- 33 files changed, 843 insertions(+), 373 deletions(-) rename documentation/{seed-node.md => super-node.md} (80%) delete mode 100644 pkg/seed_node/filterer_test.go rename pkg/{seed_node => super_node}/api.go (78%) create mode 100644 pkg/super_node/backfiller.go create mode 100644 pkg/super_node/backfiller_test.go rename pkg/{seed_node => super_node}/filterer.go (91%) create mode 100644 pkg/super_node/filterer_test.go rename pkg/{seed_node => super_node}/mocks/repository.go (90%) create mode 100644 pkg/super_node/mocks/retriever.go rename pkg/{seed_node => super_node}/repository.go (99%) rename pkg/{seed_node => super_node}/repository_test.go (88%) rename pkg/{seed_node => super_node}/retriever.go (92%) rename pkg/{seed_node => super_node}/retriever_test.go (90%) rename pkg/{seed_node => super_node}/seed_node_suite_test.go (94%) rename pkg/{seed_node => super_node}/service.go (87%) rename pkg/{seed_node => super_node}/service_test.go (88%) rename pkg/{seed_node => super_node}/subscription.go (90%) rename pkg/{seed_node => super_node}/test_helpers.go (99%) diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index b373c9c3..710d66ce 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -38,9 +38,9 @@ import ( // streamSubscribeCmd represents the streamSubscribe command var streamSubscribeCmd = &cobra.Command{ Use: "streamSubscribe", - Short: "This command is used to subscribe to the seed node stream with the provided filters", - Long: `This command is for demo and testing purposes and is used to subscribe to the seed node with the provided subscription configuration parameters. -It does not do anything with the data streamed from the seed node other than unpack it and print it out for demonstration purposes.`, + Short: "This command is used to subscribe to the super node stream with the provided filters", + Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters. +It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`, Run: func(cmd *cobra.Command, args []string) { streamSubscribe() }, @@ -56,12 +56,12 @@ func streamSubscribe() { // Create a new rpc client and a subscription streamer with that client rpcClient := getRpcClient() - str := streamer.NewSeedNodeStreamer(rpcClient) + str := streamer.NewSuperNodeStreamer(rpcClient) // Buffered channel for reading subscription payloads - payloadChan := make(chan streamer.SeedNodePayload, 20000) + payloadChan := make(chan streamer.SuperNodePayload, 20000) - // Subscribe to the seed node service with the given config/filter parameters + // Subscribe to the super node service with the given config/filter parameters sub, err := str.Stream(payloadChan, subscriptionConfig) if err != nil { log.Fatal(err) @@ -217,7 +217,7 @@ func configureSubscription() { func getRpcClient() core.RpcClient { vulcPath := viper.GetString("subscription.path") if vulcPath == "" { - vulcPath = "ws://127.0.0.1:80" // default to and try the default ws url if no path is provided + vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } rawRpcClient, err := rpc.Dial(vulcPath) if err != nil { diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index c6a36b25..c68eecad 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -16,11 +16,13 @@ package cmd import ( + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "os" "path/filepath" syn "sync" + "time" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/spf13/viper" @@ -51,44 +53,34 @@ it maintains a local index of the IPLD objects' CIDs in Postgres.`, }, } +var ipfsPath string + func init() { rootCmd.AddCommand(syncAndPublishCmd) } func syncAndPublish() { - blockChain, rpcClient := getBlockChainAndClient() - - db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - quitChan := make(chan bool) - - ipfsPath := viper.GetString("client.ipfsPath") - if ipfsPath == "" { - home, err := os.UserHomeDir() + superNode, err := newSuperNode() + if err != nil { + log.Fatal(err) + } + wg := &syn.WaitGroup{} + err = superNode.SyncAndPublish(wg, nil, nil) + if err != nil { + log.Fatal(err) + } + if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { + backfiller := newBackFiller(superNode.GetPublisher()) if err != nil { log.Fatal(err) } - ipfsPath = filepath.Join(home, ".ipfs") - } - workers := viper.GetInt("client.workers") - if workers < 1 { - workers = 1 - } - processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node()) - if err != nil { - log.Fatal(err) - } - - wg := &syn.WaitGroup{} - err = processor.SyncAndPublish(wg, nil, nil) - if err != nil { - log.Fatal(err) + backfiller.FillGaps(wg, nil) } wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through } -func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) { - rawRpcClient, err := rpc.Dial(ipc) - +func getBlockChainAndClient(path string) (*geth.BlockChain, core.RpcClient) { + rawRpcClient, err := rpc.Dial(path) if err != nil { log.Fatal(err) } @@ -100,3 +92,35 @@ func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) { blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) return blockChain, rpcClient } + +func newSuperNode() (super_node.NodeInterface, error) { + blockChain, rpcClient := getBlockChainAndClient(ipc) + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + quitChan := make(chan bool) + ipfsPath = viper.GetString("client.ipfsPath") + if ipfsPath == "" { + home, err := os.UserHomeDir() + if err != nil { + log.Fatal(err) + } + ipfsPath = filepath.Join(home, ".ipfs") + } + workers := viper.GetInt("client.workers") + if workers < 1 { + workers = 1 + } + return super_node.NewSuperNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node()) +} + +func newBackFiller(ipfsPublisher ipfs.IPLDPublisher) super_node.BackFillInterface { + blockChain, archivalRpcClient := getBlockChainAndClient(viper.GetString("backfill.ipcPath")) + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + freq := viper.GetInt("backfill.frequency") + var frequency time.Duration + if freq <= 0 { + frequency = time.Minute * 5 + } else { + frequency = time.Duration(freq) + } + return super_node.NewBackFillService(ipfsPublisher, &db, archivalRpcClient, time.Minute*frequency) +} diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index b0ef1ad7..00e4f557 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -20,15 +20,12 @@ import ( "path/filepath" syn "sync" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/vulcanize/vulcanizedb/utils" ) // syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command @@ -50,24 +47,7 @@ func init() { } func syncPublishScreenAndServe() { - blockChain, rpcClient := getBlockChainAndClient() - - db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - quitChan := make(chan bool, 1) - - ipfsPath := viper.GetString("client.ipfsPath") - if ipfsPath == "" { - home, err := os.UserHomeDir() - if err != nil { - log.Fatal(err) - } - ipfsPath = filepath.Join(home, ".ipfs") - } - workers := viper.GetInt("client.workers") - if workers < 1 { - workers = 1 - } - processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node()) + superNode, err := newSuperNode() if err != nil { log.Fatal(err) } @@ -75,11 +55,18 @@ func syncPublishScreenAndServe() { wg := &syn.WaitGroup{} forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000) forwardQuitChan := make(chan bool, 1) - err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) + err = superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) if err != nil { log.Fatal(err) } - processor.ScreenAndServe(forwardPayloadChan, forwardQuitChan) + superNode.ScreenAndServe(forwardPayloadChan, forwardQuitChan) + if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" { + backfiller := newBackFiller(superNode.GetPublisher()) + if err != nil { + log.Fatal(err) + } + backfiller.FillGaps(wg, nil) + } var ipcPath string ipcPath = viper.GetString("server.ipcPath") @@ -90,7 +77,7 @@ func syncPublishScreenAndServe() { } ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") } - _, _, err = rpc.StartIPCEndpoint(ipcPath, processor.APIs()) + _, _, err = rpc.StartIPCEndpoint(ipcPath, superNode.APIs()) if err != nil { log.Fatal(err) } @@ -98,11 +85,11 @@ func syncPublishScreenAndServe() { var wsEndpoint string wsEndpoint = viper.GetString("server.wsEndpoint") if wsEndpoint == "" { - wsEndpoint = "127.0.0.1:80" + wsEndpoint = "127.0.0.1:8080" } var exposeAll = true var wsOrigins []string = nil - _, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll) + _, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll) if err != nil { log.Fatal(err) } diff --git a/dockerfiles/seed_node/Dockerfile b/dockerfiles/seed_node/Dockerfile index 1772ff4e..82f6cb41 100644 --- a/dockerfiles/seed_node/Dockerfile +++ b/dockerfiles/seed_node/Dockerfile @@ -61,7 +61,7 @@ USER $USER # chown first so dir is writable # note: using $USER is merged, but not in the stable release yet COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$config_file config.toml -COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node/startup_script.sh . +COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node/startup_script.sh . # keep binaries immutable COPY --from=builder /go/src/github.com/vulcanize/vulcanizedb/vulcanizedb vulcanizedb diff --git a/dockerfiles/seed_node/startup_script.sh b/dockerfiles/seed_node/startup_script.sh index b1bb75b4..203035c0 100755 --- a/dockerfiles/seed_node/startup_script.sh +++ b/dockerfiles/seed_node/startup_script.sh @@ -1,5 +1,5 @@ #!/bin/sh -# Runs the db migrations and starts the seed node services +# Runs the db migrations and starts the super node services # Exit if the variable tests fail set -e diff --git a/documentation/seed-node.md b/documentation/super-node.md similarity index 80% rename from documentation/seed-node.md rename to documentation/super-node.md index 5b4227b1..927a83db 100644 --- a/documentation/seed-node.md +++ b/documentation/super-node.md @@ -103,7 +103,7 @@ There are two commands to choose from: #### syncAndPublish -`syncAndPublih` performs the functions of the seed node- syncing data from Geth, converting them to IPLDs, +`syncAndPublih` performs the functions of the super node- syncing data from Geth, converting them to IPLDs, publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata. Usage: @@ -149,16 +149,29 @@ The config file for the `syncPublishScreenAndServe` command has two additional f [server] ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" wsEndpoint = "127.0.0.1:80" + +[backfill] + on = false + ipcPath = "" + frequency = 5 ``` The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively. -Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to be known by transformers for them to subscribe to the seed node. +Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to +be known by transformers for them to subscribe to the super node. + +Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential +for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and +anywhere throughout the sync if the processes are interrupted. The `backfill` config mapping is used to optionally configure +the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps. +`backfill.on` turns the backfill process on, the `backfill.ipcPath` is the rpc path for the archival geth node, and `backfill.frequency` +sets at what frequency (in minutes) the backfill process checks for and fills in gaps. ## Dockerfile Setup -The below provides step-by-step directions for how to setup the seed node using the provided Dockerfile on an AWS Linux AMI instance. +The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance. Note that the instance will need sufficient memory and storage for this to work. 1. Install basic dependencies @@ -230,7 +243,7 @@ createdb vulcanize_public 8. Build and run the Docker image ``` -cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node +cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node docker build . docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID} ``` @@ -241,8 +254,8 @@ docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_ A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer specifies which subsets of the synced data it is interested in and the server will forward only these data. -The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the seed-node feed, it subscribes with a set of parameters -defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use seed-node data, +The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the super-node feed, it subscribes with a set of parameters +defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node data, the shared/libraries/streamer can be used. Usage: @@ -294,39 +307,39 @@ The config for `streamSubscribe` has the `subscribe` set of parameters, for exam intermediateNodes = false ``` -`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the seed-node over -(the `server.ipcPath` or `server.wsEndpoint` that the seed-node has defined in their config file). +`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the super-node over +(the `server.ipcPath` or `server.wsEndpoint` that the super-node has defined in their config file). -`subscription.backfill` specifies whether or not the seed-node should look up historical data in its cache and -send that to the subscriber, if this is set to `false` then the seed-node only forwards newly synced/incoming data. +`subscription.backfill` specifies whether or not the super-node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super-node only forwards newly synced/incoming data. -`subscription.backfillOnly` will tell the seed-node to only send historical data and not stream incoming data going forward. +`subscription.backfillOnly` will tell the super-node to only send historical data and not stream incoming data going forward. `subscription.startingBlock` is the starting block number for the range we want to receive data in. `subscription.endingBlock` is the ending block number for the range we want to receive data in; setting to 0 means there is no end/we will continue indefinitely. -`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the seed-node to -not send any headers to the subscriber; setting `finalOnly` to true tells the seed-node to send only canonical headers. +`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the super-node to +not send any headers to the subscriber; setting `finalOnly` to true tells the super-node to send only canonical headers. -`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the seed-node to +`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, -if they have any addresses then the seed-node will only send transactions that were sent or received by the addresses contained +if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained in `src` and `dst`, respectively. -`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the seed-node to +`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the super-node to not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, -if it has any topics then the seed-node will only send receipts that contain logs which have that topic0. +if it has any topics then the super-node will only send receipts that contain logs which have that topic0. -`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the seed-node to +`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, -if it has any addresses then the seed-node will only send state leafs (accounts) corresponding to those account addresses. By default the seed-node +if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. -`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the seed-node to +`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, -if it has any addresses then the seed-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string +if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas -the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the seed-node +the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. diff --git a/environments/seedNodeSubscription.toml b/environments/seedNodeSubscription.toml index 3f899939..d5142282 100644 --- a/environments/seedNodeSubscription.toml +++ b/environments/seedNodeSubscription.toml @@ -1,5 +1,5 @@ [subscription] - path = "ws://127.0.0.1:80" + path = "ws://seed0.20c.com:8080" backfill = true backfillOnly = false startingBlock = 0 diff --git a/environments/syncPublishScreenAndServe.toml b/environments/syncPublishScreenAndServe.toml index 4e8cafa9..fb939d37 100644 --- a/environments/syncPublishScreenAndServe.toml +++ b/environments/syncPublishScreenAndServe.toml @@ -11,3 +11,8 @@ [server] ipcPath = "/root/.vulcanize/vulcanize.ipc" wsEndpoint = "127.0.0.1:8080" + +[backfill] + on = false + ipcPath = "" + frequency = 5 diff --git a/libraries/shared/streamer/seed_node_streamer.go b/libraries/shared/streamer/seed_node_streamer.go index 8f33d007..2a61675b 100644 --- a/libraries/shared/streamer/seed_node_streamer.go +++ b/libraries/shared/streamer/seed_node_streamer.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Streamer is used by watchers to stream eth data from a vulcanizedb seed node +// Streamer is used by watchers to stream eth data from a vulcanizedb super node package streamer import ( @@ -28,30 +28,30 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -// ISeedNodeStreamer is the interface for streaming SeedNodePayloads from a vulcanizeDB seed node -type ISeedNodeStreamer interface { - Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) +// ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node +type ISuperNodeStreamer interface { + Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) } -// SeedNodeStreamer is the underlying struct for the ISeedNodeStreamer interface -type SeedNodeStreamer struct { +// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface +type SuperNodeStreamer struct { Client core.RpcClient } -// NewSeedNodeStreamer creates a pointer to a new SeedNodeStreamer which satisfies the ISeedNodeStreamer interface -func NewSeedNodeStreamer(client core.RpcClient) *SeedNodeStreamer { - return &SeedNodeStreamer{ +// NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface +func NewSuperNodeStreamer(client core.RpcClient) *SuperNodeStreamer { + return &SuperNodeStreamer{ Client: client, } } -// Stream is the main loop for subscribing to data from a vulcanizedb seed node -func (sds *SeedNodeStreamer) Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) +// Stream is the main loop for subscribing to data from a vulcanizedb super node +func (sds *SuperNodeStreamer) Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { + return sds.Client.Subscribe("vdb", payloadChan, "stream", streamFilters) } -// Payload holds the data returned from the seed node to the requesting client -type SeedNodePayload struct { +// Payload holds the data returned from the super node to the requesting client +type SuperNodePayload struct { BlockNumber *big.Int `json:"blockNumber"` HeadersRlp [][]byte `json:"headersRlp"` UnclesRlp [][]byte `json:"unclesRlp"` @@ -65,20 +65,20 @@ type SeedNodePayload struct { err error } -func (sd *SeedNodePayload) ensureEncoded() { +func (sd *SuperNodePayload) ensureEncoded() { if sd.encoded == nil && sd.err == nil { sd.encoded, sd.err = json.Marshal(sd) } } // Length to implement Encoder interface for StateDiff -func (sd *SeedNodePayload) Length() int { +func (sd *SuperNodePayload) Length() int { sd.ensureEncoded() return len(sd.encoded) } // Encode to implement Encoder interface for StateDiff -func (sd *SeedNodePayload) Encode() ([]byte, error) { +func (sd *SuperNodePayload) Encode() ([]byte, error) { sd.ensureEncoded() return sd.encoded, sd.err } diff --git a/libraries/shared/transformer/seed_node_transformer.go b/libraries/shared/transformer/seed_node_transformer.go index bc9beb01..0564b60d 100644 --- a/libraries/shared/transformer/seed_node_transformer.go +++ b/libraries/shared/transformer/seed_node_transformer.go @@ -22,10 +22,10 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -type SeedNodeTransformer interface { +type SuperNodeTransformer interface { Init() error Execute() error GetConfig() config.Subscription } -type SeedNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SeedNodeTransformer +type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SuperNodeTransformer diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go index 6fcfad85..e6a160fb 100644 --- a/pkg/config/subscription.go +++ b/pkg/config/subscription.go @@ -18,7 +18,7 @@ package config import "math/big" -// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node +// Subscription config is used by a subscribing transformer to specifiy which data to receive from the super node type Subscription struct { BackFill bool BackFillOnly bool diff --git a/pkg/ipfs/mocks/converter.go b/pkg/ipfs/mocks/converter.go index 06462807..0aaa9b53 100644 --- a/pkg/ipfs/mocks/converter.go +++ b/pkg/ipfs/mocks/converter.go @@ -17,6 +17,8 @@ package mocks import ( + "fmt" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/ipfs" @@ -34,3 +36,22 @@ func (pc *PayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayloa pc.PassedStatediffPayload = payload return pc.ReturnIPLDPayload, pc.ReturnErr } + +// IterativePayloadConverter is the underlying struct for the Converter interface +type IterativePayloadConverter struct { + PassedStatediffPayload []statediff.Payload + ReturnIPLDPayload []*ipfs.IPLDPayload + ReturnErr error + iteration int +} + +// Convert method is used to convert a geth statediff.Payload to a IPLDPayload +func (pc *IterativePayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayload, error) { + pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, payload) + if len(pc.PassedStatediffPayload) < pc.iteration+1 { + return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration) + } + returnPayload := pc.ReturnIPLDPayload[pc.iteration] + pc.iteration++ + return returnPayload, pc.ReturnErr +} diff --git a/pkg/ipfs/mocks/publisher.go b/pkg/ipfs/mocks/publisher.go index 3ee14b9b..ce31968a 100644 --- a/pkg/ipfs/mocks/publisher.go +++ b/pkg/ipfs/mocks/publisher.go @@ -17,6 +17,7 @@ package mocks import ( + "fmt" "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) @@ -32,3 +33,22 @@ func (pub *IPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload, pub.PassedIPLDPayload = payload return pub.ReturnCIDPayload, pub.ReturnErr } + +// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing +type IterativeIPLDPublisher struct { + PassedIPLDPayload []*ipfs.IPLDPayload + ReturnCIDPayload []*ipfs.CIDPayload + ReturnErr error + iteration int +} + +// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload +func (pub *IterativeIPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload, error) { + pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, payload) + if len(pub.ReturnCIDPayload) < pub.iteration+1 { + return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration) + } + returnPayload := pub.ReturnCIDPayload[pub.iteration] + pub.iteration++ + return returnPayload, pub.ReturnErr +} diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go index fe2936d0..2f6c25ff 100644 --- a/pkg/ipfs/mocks/test_data.go +++ b/pkg/ipfs/mocks/test_data.go @@ -309,7 +309,7 @@ var ( }, } - MockSeeNodePayload = streamer.SeedNodePayload{ + MockSeeNodePayload = streamer.SuperNodePayload{ BlockNumber: big.NewInt(1), HeadersRlp: [][]byte{MockHeaderRlp}, TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index b5d9d37d..0178edc7 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -24,7 +24,7 @@ import ( // IPLDResolver is the interface to resolving IPLDs type IPLDResolver interface { - ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error) + ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) } // EthIPLDResolver is the underlying struct to support the IPLDResolver interface @@ -36,8 +36,8 @@ func NewIPLDResolver() *EthIPLDResolver { } // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper -func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error) { - response := &streamer.SeedNodePayload{ +func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) { + response := &streamer.SuperNodePayload{ BlockNumber: ipfsBlocks.BlockNumber, StateNodesRlp: make(map[common.Hash][]byte), StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte), @@ -51,42 +51,42 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedN return *response, nil } -func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) { for _, block := range blocks { raw := block.RawData() response.HeadersRlp = append(response.HeadersRlp, raw) } } -func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SuperNodePayload) { for _, block := range blocks { raw := block.RawData() response.UnclesRlp = append(response.UnclesRlp, raw) } } -func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SuperNodePayload) { for _, block := range blocks { raw := block.RawData() response.TransactionsRlp = append(response.TransactionsRlp, raw) } } -func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SuperNodePayload) { for _, block := range blocks { raw := block.RawData() response.ReceiptsRlp = append(response.ReceiptsRlp, raw) } } -func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { for key, block := range blocks { raw := block.RawData() response.StateNodesRlp[key] = raw } } -func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { +func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { for stateKey, storageBlocks := range blocks { response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) for storageKey, storageVal := range storageBlocks { diff --git a/pkg/ipfs/resolver_test.go b/pkg/ipfs/resolver_test.go index 3f8a4163..1c07f388 100644 --- a/pkg/ipfs/resolver_test.go +++ b/pkg/ipfs/resolver_test.go @@ -22,7 +22,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node" ) var ( @@ -35,19 +35,19 @@ var _ = Describe("Resolver", func() { resolver = ipfs.NewIPLDResolver() }) It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { - seedNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper) + superNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper) Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) - Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) - Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) - Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2)) - Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) + Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) + Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) + Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) + Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2)) + Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) }) }) }) diff --git a/pkg/seed_node/filterer_test.go b/pkg/seed_node/filterer_test.go deleted file mode 100644 index cb99ebeb..00000000 --- a/pkg/seed_node/filterer_test.go +++ /dev/null @@ -1,153 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package seed_node_test - -import ( - "bytes" - - "github.com/ethereum/go-ethereum/core/types" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" -) - -var ( - filterer seed_node.ResponseFilterer - expectedRctForStorageRLP1 []byte - expectedRctForStorageRLP2 []byte -) - -var _ = Describe("Filterer", func() { - Describe("FilterResponse", func() { - BeforeEach(func() { - filterer = seed_node.NewResponseFilterer() - expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0) - expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1) - }) - - It("Transcribes all the data from the IPLDPayload into the SeedNodePayload if given an open filter", func() { - seedNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) - Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) - Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) - Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2)) - Expect(seedNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) - Expect(seedNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes)) - Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) - }) - - It("Applies filters from the provided config.Subscription", func() { - seedNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload1.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload1.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload1.TransactionsRlp)).To(Equal(0)) - Expect(len(seedNodePayload1.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload1.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload1.ReceiptsRlp)).To(Equal(1)) - Expect(seedNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) - - seedNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload2.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload2.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload2.TransactionsRlp)).To(Equal(0)) - Expect(len(seedNodePayload2.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload2.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload2.ReceiptsRlp)).To(Equal(1)) - Expect(seedNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) - - seedNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload3.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload3.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload3.TransactionsRlp)).To(Equal(0)) - Expect(len(seedNodePayload3.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload3.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload3.ReceiptsRlp)).To(Equal(1)) - Expect(seedNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) - - seedNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload4.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload4.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload4.TransactionsRlp)).To(Equal(0)) - Expect(len(seedNodePayload4.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload4.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload4.ReceiptsRlp)).To(Equal(1)) - Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) - - seedNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload5.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload5.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload5.TransactionsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(seedNodePayload5.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload5.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload5.ReceiptsRlp)).To(Equal(2)) - Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) - Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) - - seedNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload6.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload6.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload6.TransactionsRlp)).To(Equal(1)) - Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(seedNodePayload6.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload6.StateNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload6.ReceiptsRlp)).To(Equal(1)) - Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) - - seedNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload) - Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) - Expect(len(seedNodePayload7.HeadersRlp)).To(Equal(0)) - Expect(len(seedNodePayload7.UnclesRlp)).To(Equal(0)) - Expect(len(seedNodePayload7.TransactionsRlp)).To(Equal(0)) - Expect(len(seedNodePayload7.StorageNodesRlp)).To(Equal(0)) - Expect(len(seedNodePayload7.ReceiptsRlp)).To(Equal(0)) - Expect(len(seedNodePayload7.StateNodesRlp)).To(Equal(1)) - Expect(seedNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) - }) - }) -}) - -func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte { - receiptForStorage := (*types.ReceiptForStorage)(receipts[i]) - receiptBuffer := new(bytes.Buffer) - err := receiptForStorage.EncodeRLP(receiptBuffer) - Expect(err).ToNot(HaveOccurred()) - return receiptBuffer.Bytes() -} diff --git a/pkg/seed_node/api.go b/pkg/super_node/api.go similarity index 78% rename from pkg/seed_node/api.go rename to pkg/super_node/api.go index 623bcb9a..c3ba5a3f 100644 --- a/pkg/seed_node/api.go +++ b/pkg/super_node/api.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "context" @@ -28,25 +28,25 @@ import ( ) // APIName is the namespace used for the state diffing service API -const APIName = "vulcanizedb" +const APIName = "vdb" // APIVersion is the version of the state diffing service API const APIVersion = "0.0.1" -// PublicSeedNodeAPI is the public api for the seed node -type PublicSeedNodeAPI struct { +// PublicSuperNodeAPI is the public api for the super node +type PublicSuperNodeAPI struct { sni NodeInterface } -// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process -func NewPublicSeedNodeAPI(seedNodeInterface NodeInterface) *PublicSeedNodeAPI { - return &PublicSeedNodeAPI{ - sni: seedNodeInterface, +// NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process +func NewPublicSuperNodeAPI(superNodeInterface NodeInterface) *PublicSuperNodeAPI { + return &PublicSuperNodeAPI{ + sni: superNodeInterface, } } // Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created -func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) { +func (api *PublicSuperNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -58,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S go func() { // subscribe to events from the SyncPublishScreenAndServe service - payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize) + payloadChannel := make(chan streamer.SuperNodePayload, payloadChanBufferSize) quitChan := make(chan bool, 1) go api.sni.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) @@ -84,7 +84,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S return rpcSub, nil } -// Node is a public rpc method to allow transformers to fetch the Geth node info for the seed node -func (api *PublicSeedNodeAPI) Node() core.Node { +// Node is a public rpc method to allow transformers to fetch the Geth node info for the super node +func (api *PublicSuperNodeAPI) Node() core.Node { return api.sni.Node() } diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go new file mode 100644 index 00000000..324a2cc7 --- /dev/null +++ b/pkg/super_node/backfiller.go @@ -0,0 +1,135 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package super_node + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/params" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" +) + +// BackFillInterface for filling in gaps in the super node +type BackFillInterface interface { + // Method for the super node to periodically check for and fill in gaps in its data using an archival node + FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) +} + +// BackFillService for filling in gaps in the super node +type BackFillService struct { + // Interface for converting statediff payloads into ETH-IPLD object payloads + Converter ipfs.PayloadConverter + // Interface for publishing the ETH-IPLD payloads to IPFS + Publisher ipfs.IPLDPublisher + // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres + Repository CIDRepository + // Interface for searching and retrieving CIDs from Postgres index + Retriever CIDRetriever + // State-diff fetcher; needs to be configured with an archival core.RpcClient + StateDiffFetcher fetcher.IStateDiffFetcher + // Check frequency + GapCheckFrequency time.Duration +} + +// NewBackFillService returns a new BackFillInterface +func NewBackFillService(ipfsPublisher ipfs.IPLDPublisher, db *postgres.DB, archivalNodeRpcClient core.RpcClient, freq time.Duration) BackFillInterface { + return &BackFillService{ + Repository: NewCIDRepository(db), + Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), + Publisher: ipfsPublisher, + Retriever: NewCIDRetriever(db), + StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRpcClient), + GapCheckFrequency: freq, + } +} + +// FillGaps periodically checks for and fills in gaps in the super node db +// this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed +func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { + ticker := time.NewTicker(bfs.GapCheckFrequency) + wg.Add(1) + + go func() { + for { + select { + case <-quitChan: + log.Info("quiting FillGaps process") + wg.Done() + return + case <-ticker.C: + log.Info("searching for gaps in the super node database") + startingBlock, firstBlockErr := bfs.Retriever.RetrieveFirstBlockNumber() + if firstBlockErr != nil { + log.Error(firstBlockErr) + continue + } + if startingBlock != 1 { + startingGap := [2]int64{ + 1, + startingBlock - 1, + } + log.Info("found gap at the beginning of the sync") + bfs.fillGaps(startingGap) + } + + gaps, gapErr := bfs.Retriever.RetrieveGapsInData() + if gapErr != nil { + log.Error(gapErr) + continue + } + for _, gap := range gaps { + bfs.fillGaps(gap) + } + } + } + }() +} + +func (bfs *BackFillService) fillGaps(gap [2]int64) { + log.Infof("filling in gap from block %d to block %d", gap[0], gap[1]) + blockHeights := make([]uint64, 0, gap[1]-gap[0]+1) + for i := gap[0]; i <= gap[1]; i++ { + blockHeights = append(blockHeights, uint64(i)) + } + payloads, fetchErr := bfs.StateDiffFetcher.FetchStateDiffsAt(blockHeights) + if fetchErr != nil { + log.Error(fetchErr) + return + } + for _, payload := range payloads { + ipldPayload, convertErr := bfs.Converter.Convert(*payload) + if convertErr != nil { + log.Error(convertErr) + continue + } + cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload) + if publishErr != nil { + log.Error(publishErr) + continue + } + indexErr := bfs.Repository.Index(cidPayload) + if indexErr != nil { + log.Error(indexErr) + } + } +} diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go new file mode 100644 index 00000000..20fb3e74 --- /dev/null +++ b/pkg/super_node/backfiller_test.go @@ -0,0 +1,187 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package super_node_test + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks" +) + +var _ = Describe("BackFiller", func() { + Describe("FillGaps", func() { + It("Periodically checks for and fills in gaps in the super node's data", func() { + mockCidRepo := &mocks3.CIDRepository{ + ReturnErr: nil, + } + mockPublisher := &mocks.IterativeIPLDPublisher{ + ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload}, + ReturnErr: nil, + } + mockConverter := &mocks.IterativePayloadConverter{ + ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnErr: nil, + } + mockRetriever := &mocks3.MockCIDRetriever{ + FirstBlockNumberToReturn: 1, + GapsToRetrieve: [][2]int64{ + { + 100, 101, + }, + }, + } + mockFetcher := &mocks2.MockStateDiffFetcher{ + StateDiffsToReturn: map[uint64]*statediff.Payload{ + 100: &mocks.MockStateDiffPayload, + 101: &mocks.MockStateDiffPayload, + }, + } + backfiller := &super_node.BackFillService{ + Repository: mockCidRepo, + Publisher: mockPublisher, + Converter: mockConverter, + StateDiffFetcher: mockFetcher, + Retriever: mockRetriever, + GapCheckFrequency: time.Second * 10, + } + wg := &sync.WaitGroup{} + quitChan := make(chan bool, 1) + backfiller.FillGaps(wg, quitChan) + time.Sleep(time.Second * 15) + quitChan <- true + Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) + Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) + Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) + Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) + Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) + Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockRetriever.CalledTimes).To(Equal(1)) + Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1)) + Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100, 101})) + }) + + It("Works for single block `ranges`", func() { + mockCidRepo := &mocks3.CIDRepository{ + ReturnErr: nil, + } + mockPublisher := &mocks.IterativeIPLDPublisher{ + ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload}, + ReturnErr: nil, + } + mockConverter := &mocks.IterativePayloadConverter{ + ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload}, + ReturnErr: nil, + } + mockRetriever := &mocks3.MockCIDRetriever{ + FirstBlockNumberToReturn: 1, + GapsToRetrieve: [][2]int64{ + { + 100, 100, + }, + }, + } + mockFetcher := &mocks2.MockStateDiffFetcher{ + StateDiffsToReturn: map[uint64]*statediff.Payload{ + 100: &mocks.MockStateDiffPayload, + }, + } + backfiller := &super_node.BackFillService{ + Repository: mockCidRepo, + Publisher: mockPublisher, + Converter: mockConverter, + StateDiffFetcher: mockFetcher, + Retriever: mockRetriever, + GapCheckFrequency: time.Second * 10, + } + wg := &sync.WaitGroup{} + quitChan := make(chan bool, 1) + backfiller.FillGaps(wg, quitChan) + time.Sleep(time.Second * 15) + quitChan <- true + Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) + Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) + Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) + Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1)) + Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockRetriever.CalledTimes).To(Equal(1)) + Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1)) + Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100})) + }) + + It("Finds beginning gap", func() { + mockCidRepo := &mocks3.CIDRepository{ + ReturnErr: nil, + } + mockPublisher := &mocks.IterativeIPLDPublisher{ + ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload}, + ReturnErr: nil, + } + mockConverter := &mocks.IterativePayloadConverter{ + ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnErr: nil, + } + mockRetriever := &mocks3.MockCIDRetriever{ + FirstBlockNumberToReturn: 3, + GapsToRetrieve: [][2]int64{}, + } + mockFetcher := &mocks2.MockStateDiffFetcher{ + StateDiffsToReturn: map[uint64]*statediff.Payload{ + 1: &mocks.MockStateDiffPayload, + 2: &mocks.MockStateDiffPayload, + }, + } + backfiller := &super_node.BackFillService{ + Repository: mockCidRepo, + Publisher: mockPublisher, + Converter: mockConverter, + StateDiffFetcher: mockFetcher, + Retriever: mockRetriever, + GapCheckFrequency: time.Second * 10, + } + wg := &sync.WaitGroup{} + quitChan := make(chan bool, 1) + backfiller.FillGaps(wg, quitChan) + time.Sleep(time.Second * 15) + quitChan <- true + Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) + Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) + Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) + Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) + Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) + Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) + Expect(mockRetriever.CalledTimes).To(Equal(1)) + Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1)) + Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{1, 2})) + }) + }) +}) diff --git a/pkg/seed_node/filterer.go b/pkg/super_node/filterer.go similarity index 91% rename from pkg/seed_node/filterer.go rename to pkg/super_node/filterer.go index 0f93e97c..e63c1880 100644 --- a/pkg/seed_node/filterer.go +++ b/pkg/super_node/filterer.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "bytes" @@ -30,7 +30,7 @@ import ( // ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload type ResponseFilterer interface { - FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error) + FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) } // Filterer is the underlying struct for the ResponseFilterer interface @@ -42,33 +42,33 @@ func NewResponseFilterer() *Filterer { } // FilterResponse is used to filter through eth data to extract and package requested data into a Payload -func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error) { - response := new(streamer.SeedNodePayload) +func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) { + response := new(streamer.SuperNodePayload) err := s.filterHeaders(streamFilters, response, payload) if err != nil { - return streamer.SeedNodePayload{}, err + return streamer.SuperNodePayload{}, err } txHashes, err := s.filterTransactions(streamFilters, response, payload) if err != nil { - return streamer.SeedNodePayload{}, err + return streamer.SuperNodePayload{}, err } err = s.filerReceipts(streamFilters, response, payload, txHashes) if err != nil { - return streamer.SeedNodePayload{}, err + return streamer.SuperNodePayload{}, err } err = s.filterState(streamFilters, response, payload) if err != nil { - return streamer.SeedNodePayload{}, err + return streamer.SuperNodePayload{}, err } err = s.filterStorage(streamFilters, response, payload) if err != nil { - return streamer.SeedNodePayload{}, err + return streamer.SuperNodePayload{}, err } response.BlockNumber = payload.BlockNumber return *response, nil } -func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { +func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) if !streamFilters.HeaderFilter.FinalOnly { @@ -91,7 +91,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { +func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, trx := range payload.BlockBody.Transactions { @@ -127,7 +127,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { +func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { @@ -186,7 +186,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac return false } -func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { +func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.StateNodesRlp = make(map[common.Hash][]byte) keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) @@ -218,7 +218,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { +func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) diff --git a/pkg/super_node/filterer_test.go b/pkg/super_node/filterer_test.go new file mode 100644 index 00000000..a1c0fabd --- /dev/null +++ b/pkg/super_node/filterer_test.go @@ -0,0 +1,153 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package super_node_test + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node" +) + +var ( + filterer super_node.ResponseFilterer + expectedRctForStorageRLP1 []byte + expectedRctForStorageRLP2 []byte +) + +var _ = Describe("Filterer", func() { + Describe("FilterResponse", func() { + BeforeEach(func() { + filterer = super_node.NewResponseFilterer() + expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0) + expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1) + }) + + It("Transcribes all the data from the IPLDPayload into the SuperNodePayload if given an open filter", func() { + superNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) + Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) + Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) + Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2)) + Expect(superNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + Expect(superNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes)) + Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) + }) + + It("Applies filters from the provided config.Subscription", func() { + superNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload1.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload1.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload1.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload1.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload1.ReceiptsRlp)).To(Equal(1)) + Expect(superNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + superNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload2.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload2.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload2.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload2.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload2.ReceiptsRlp)).To(Equal(1)) + Expect(superNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + + superNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload3.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload3.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload3.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload3.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload3.ReceiptsRlp)).To(Equal(1)) + Expect(superNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + + superNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload4.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload4.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload4.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload4.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload4.ReceiptsRlp)).To(Equal(1)) + Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + superNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload5.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload5.TransactionsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(superNodePayload5.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload5.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload5.ReceiptsRlp)).To(Equal(2)) + Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) + + superNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload6.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload6.TransactionsRlp)).To(Equal(1)) + Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(superNodePayload6.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload6.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload6.ReceiptsRlp)).To(Equal(1)) + Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + superNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload7.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload7.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload7.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0)) + Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1)) + Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + }) + }) +}) + +func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte { + receiptForStorage := (*types.ReceiptForStorage)(receipts[i]) + receiptBuffer := new(bytes.Buffer) + err := receiptForStorage.EncodeRLP(receiptBuffer) + Expect(err).ToNot(HaveOccurred()) + return receiptBuffer.Bytes() +} diff --git a/pkg/seed_node/mocks/repository.go b/pkg/super_node/mocks/repository.go similarity index 90% rename from pkg/seed_node/mocks/repository.go rename to pkg/super_node/mocks/repository.go index dc06ccc9..4c37f468 100644 --- a/pkg/seed_node/mocks/repository.go +++ b/pkg/super_node/mocks/repository.go @@ -20,12 +20,12 @@ import "github.com/vulcanize/vulcanizedb/pkg/ipfs" // CIDRepository is the underlying struct for the Repository interface type CIDRepository struct { - PassedCIDPayload *ipfs.CIDPayload + PassedCIDPayload []*ipfs.CIDPayload ReturnErr error } // Index indexes a cidPayload in Postgres func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error { - repo.PassedCIDPayload = cidPayload + repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload) return repo.ReturnErr } diff --git a/pkg/super_node/mocks/retriever.go b/pkg/super_node/mocks/retriever.go new file mode 100644 index 00000000..2fafe933 --- /dev/null +++ b/pkg/super_node/mocks/retriever.go @@ -0,0 +1,44 @@ +package mocks + +import ( + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" +) + +// MockCIDRetriever is a mock CID retriever for use in tests +type MockCIDRetriever struct { + GapsToRetrieve [][2]int64 + GapsToRetrieveErr error + CalledTimes int + FirstBlockNumberToReturn int64 + RetrieveFirstBlockNumberErr error +} + +// RetrieveCIDs mock method +func (*MockCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) { + panic("implement me") +} + +// RetrieveLastBlockNumber mock method +func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) { + panic("implement me") +} + +// RetrieveFirstBlockNumber mock method +func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) { + return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr +} + +// RetrieveGapsInData mock method +func (mcr *MockCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { + mcr.CalledTimes++ + return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr +} + +// SetGapsToRetrieve mock method +func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]int64) { + if mcr.GapsToRetrieve == nil { + mcr.GapsToRetrieve = make([][2]int64, 0) + } + mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) +} diff --git a/pkg/seed_node/repository.go b/pkg/super_node/repository.go similarity index 99% rename from pkg/seed_node/repository.go rename to pkg/super_node/repository.go index f6efacee..ea4fecf6 100644 --- a/pkg/seed_node/repository.go +++ b/pkg/super_node/repository.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "github.com/jmoiron/sqlx" diff --git a/pkg/seed_node/repository_test.go b/pkg/super_node/repository_test.go similarity index 88% rename from pkg/seed_node/repository_test.go rename to pkg/super_node/repository_test.go index 9eb6bdbb..816797d6 100644 --- a/pkg/seed_node/repository_test.go +++ b/pkg/super_node/repository_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node_test +package super_node_test import ( . "github.com/onsi/ginkgo" @@ -23,23 +23,23 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node" ) var ( db *postgres.DB err error - repo seed_node.CIDRepository + repo super_node.CIDRepository ) var _ = Describe("Repository", func() { BeforeEach(func() { - db, err = seed_node.SetupDB() + db, err = super_node.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = seed_node.NewCIDRepository(db) + repo = super_node.NewCIDRepository(db) }) AfterEach(func() { - seed_node.TearDownDB(db) + super_node.TearDownDB(db) }) Describe("Index", func() { @@ -61,8 +61,8 @@ var _ = Describe("Repository", func() { err = db.Select(&trxs, pgStr, 1) Expect(err).ToNot(HaveOccurred()) Expect(len(trxs)).To(Equal(2)) - Expect(seed_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue()) - Expect(seed_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue()) + Expect(super_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue()) + Expect(super_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue()) // check receipts were properly indexed rcts := make([]string, 0) pgStr = `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids @@ -72,8 +72,8 @@ var _ = Describe("Repository", func() { err = db.Select(&rcts, pgStr, 1) Expect(err).ToNot(HaveOccurred()) Expect(len(rcts)).To(Equal(2)) - Expect(seed_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue()) - Expect(seed_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue()) + Expect(super_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue()) + Expect(super_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue()) // check that state nodes were properly indexed stateNodes := make([]ipfs.StateNodeCID, 0) pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) diff --git a/pkg/seed_node/retriever.go b/pkg/super_node/retriever.go similarity index 92% rename from pkg/seed_node/retriever.go rename to pkg/super_node/retriever.go index 9799ef29..36dfef47 100644 --- a/pkg/seed_node/retriever.go +++ b/pkg/super_node/retriever.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "math/big" @@ -33,6 +33,7 @@ type CIDRetriever interface { RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) RetrieveLastBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error) + RetrieveGapsInData() ([][2]int64, error) } // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface @@ -303,3 +304,26 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi err := tx.Select(&storageNodeCIDs, pgStr, args...) return storageNodeCIDs, err } + +type gap struct { + Start int64 `db:"start"` + Stop int64 `db:"stop"` +} + +func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM header_cids + LEFT JOIN header_cids r on header_cids.block_number = r.block_number - 1 + LEFT JOIN header_cids fr on header_cids.block_number < fr.block_number + WHERE r.block_number is NULL and fr.block_number IS NOT NULL + GROUP BY header_cids.block_number, r.block_number` + gaps := make([]gap, 0) + err := ecr.db.Select(&gaps, pgStr) + if err != nil { + return nil, err + } + gapRanges := make([][2]int64, 0) + for _, gap := range gaps { + gapRanges = append(gapRanges, [2]int64{gap.Start, gap.Stop}) + } + return gapRanges, nil +} diff --git a/pkg/seed_node/retriever_test.go b/pkg/super_node/retriever_test.go similarity index 90% rename from pkg/seed_node/retriever_test.go rename to pkg/super_node/retriever_test.go index 5789fc5c..de38ed65 100644 --- a/pkg/seed_node/retriever_test.go +++ b/pkg/super_node/retriever_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node_test +package super_node_test import ( "math/big" @@ -25,11 +25,11 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node" ) var ( - retriever seed_node.CIDRetriever + retriever super_node.CIDRetriever openFilter = config.Subscription{ StartingBlock: big.NewInt(0), EndingBlock: big.NewInt(1), @@ -178,15 +178,15 @@ var ( var _ = Describe("Retriever", func() { BeforeEach(func() { - db, err = seed_node.SetupDB() + db, err = super_node.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = seed_node.NewCIDRepository(db) + repo = super_node.NewCIDRepository(db) err = repo.Index(mocks.MockCIDPayload) Expect(err).ToNot(HaveOccurred()) - retriever = seed_node.NewCIDRetriever(db) + retriever = super_node.NewCIDRetriever(db) }) AfterEach(func() { - seed_node.TearDownDB(db) + super_node.TearDownDB(db) }) Describe("RetrieveCIDs", func() { @@ -197,11 +197,11 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper.Headers)).To(Equal(1)) Expect(cidWrapper.Headers).To(Equal(mocks.MockCIDWrapper.Headers)) Expect(len(cidWrapper.Transactions)).To(Equal(2)) - Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue()) - Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue()) Expect(len(cidWrapper.Receipts)).To(Equal(2)) - Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue()) - Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue()) Expect(len(cidWrapper.StateNodes)).To(Equal(2)) for _, stateNode := range cidWrapper.StateNodes { if stateNode.CID == "mockStateCID1" { @@ -265,13 +265,13 @@ var _ = Describe("Retriever", func() { Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper5.Headers)).To(Equal(0)) Expect(len(cidWrapper5.Transactions)).To(Equal(2)) - Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue()) - Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue()) Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) Expect(len(cidWrapper5.Receipts)).To(Equal(2)) - Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue()) - Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue()) + Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue()) cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/seed_node/seed_node_suite_test.go b/pkg/super_node/seed_node_suite_test.go similarity index 94% rename from pkg/seed_node/seed_node_suite_test.go rename to pkg/super_node/seed_node_suite_test.go index 533d3664..83b84e4b 100644 --- a/pkg/seed_node/seed_node_suite_test.go +++ b/pkg/super_node/seed_node_suite_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node_test +package super_node_test import ( "io/ioutil" @@ -25,7 +25,7 @@ import ( "github.com/sirupsen/logrus" ) -func TestSeedNode(t *testing.T) { +func TestSuperNode(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Seed Node Suite Test") } diff --git a/pkg/seed_node/service.go b/pkg/super_node/service.go similarity index 87% rename from pkg/seed_node/service.go rename to pkg/super_node/service.go index 4588f5be..28feb405 100644 --- a/pkg/seed_node/service.go +++ b/pkg/super_node/service.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "sync" @@ -36,7 +36,9 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) -const payloadChanBufferSize = 20000 // the max eth sub buffer size +const ( + payloadChanBufferSize = 20000 // the max eth sub buffer size +) // NodeInterface is the top level interface for streaming, converting to IPLDs, publishing, // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients @@ -49,14 +51,17 @@ type NodeInterface interface { // Main event loop for handling client pub-sub ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) + Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) // Method to access the Geth node info for this service Node() core.Node + // Method used to retrieve the underlying IPFS publisher for this service, so that is can be used for backfilling + // This is needed because it's not possible to initialize two ipfs nodes at the same path + GetPublisher() ipfs.IPLDPublisher } -// Service is the underlying struct for the SyncAndPublish interface +// Service is the underlying struct for the super node type Service struct { // Used to sync access to the Subscriptions sync.Mutex @@ -71,7 +76,7 @@ type Service struct { // Interface for filtering and serving data according to subscribed clients according to their specification Filterer ResponseFilterer // Interface for fetching ETH-IPLD objects from IPFS - Fetcher ipfs.IPLDFetcher + IPLDFetcher ipfs.IPLDFetcher // Interface for searching and retrieving CIDs from Postgres index Retriever CIDRetriever // Interface for resolving ipfs blocks to their data types @@ -86,17 +91,17 @@ type Service struct { SubscriptionTypes map[common.Hash]config.Subscription // Number of workers WorkerPoolSize int - // Info for the Geth node that this seed node is working with + // Info for the Geth node that this super node is working with gethNode core.Node } -// NewSeedNode creates a new seed_node.Interface using an underlying seed_node.Service struct -func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { +// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct +func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { publisher, err := ipfs.NewIPLDPublisher(ipfsPath) if err != nil { return nil, err } - fetcher, err := ipfs.NewIPLDFetcher(ipfsPath) + ipldFetcher, err := ipfs.NewIPLDFetcher(ipfsPath) if err != nil { return nil, err } @@ -106,7 +111,7 @@ func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), Publisher: publisher, Filterer: NewResponseFilterer(), - Fetcher: fetcher, + IPLDFetcher: ipldFetcher, Retriever: NewCIDRetriever(db), Resolver: ipfs.NewIPLDResolver(), PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), @@ -123,13 +128,13 @@ func (sap *Service) Protocols() []p2p.Protocol { return []p2p.Protocol{} } -// APIs returns the RPC descriptors the StateDiffingService offers +// APIs returns the RPC descriptors the super node service offers func (sap *Service) APIs() []rpc.API { return []rpc.API{ { Namespace: APIName, Version: APIVersion, - Service: NewPublicSeedNodeAPI(sap), + Service: NewPublicSuperNodeAPI(sap), Public: true, }, } @@ -256,7 +261,7 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error { for id, sub := range subs { select { case sub.PayloadChan <- response: - log.Infof("sending seed node payload to subscription %s", id) + log.Infof("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) } @@ -267,8 +272,8 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error { } // Subscribe is used by the API to subscribe to the service loop -func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) { - log.Info("Subscribing to the seed node service") +func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription) { + log.Info("Subscribing to the super node service") // Subscription type is defined as the hash of its content // Group subscriptions by type and screen payloads once for subs of the same type by, err := rlp.EncodeToBytes(streamFilters) @@ -305,7 +310,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio var err error startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() if err != nil { - sub.PayloadChan <- streamer.SeedNodePayload{ + sub.PayloadChan <- streamer.SuperNodePayload{ ErrMsg: "unable to set block range start; error: " + err.Error(), } } @@ -314,7 +319,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio } endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() if err != nil { - sub.PayloadChan <- streamer.SeedNodePayload{ + sub.PayloadChan <- streamer.SuperNodePayload{ ErrMsg: "unable to set block range end; error: " + err.Error(), } } @@ -327,10 +332,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio // the blocknumbers in the payloads they receive to keep things in order // TODO: separate backfill into a different rpc subscription method altogether? go func() { - for i := con.StartingBlock.Int64(); i <= endingBlock; i++ { + for i := startingBlock; i <= endingBlock; i++ { cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i) if err != nil { - sub.PayloadChan <- streamer.SeedNodePayload{ + sub.PayloadChan <- streamer.SuperNodePayload{ ErrMsg: "CID retrieval error: " + err.Error(), } continue @@ -338,10 +343,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio if ipfs.EmptyCIDWrapper(*cidWrapper) { continue } - blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper) + blocksWrapper, err := sap.IPLDFetcher.FetchCIDs(*cidWrapper) if err != nil { log.Error(err) - sub.PayloadChan <- streamer.SeedNodePayload{ + sub.PayloadChan <- streamer.SuperNodePayload{ ErrMsg: "IPLD fetching error: " + err.Error(), } continue @@ -349,14 +354,14 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) if err != nil { log.Error(err) - sub.PayloadChan <- streamer.SeedNodePayload{ + sub.PayloadChan <- streamer.SuperNodePayload{ ErrMsg: "IPLD resolving error: " + err.Error(), } continue } select { case sub.PayloadChan <- backFillIplds: - log.Infof("sending seed node back-fill payload to subscription %s", id) + log.Infof("sending super node back-fill payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) } @@ -366,7 +371,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio // Unsubscribe is used to unsubscribe to the StateDiffingService loop func (sap *Service) Unsubscribe(id rpc.ID) { - log.Info("Unsubscribing from the seed node service") + log.Info("Unsubscribing from the super node service") sap.Lock() for ty := range sap.Subscriptions { delete(sap.Subscriptions[ty], id) @@ -381,7 +386,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { // Start is used to begin the service func (sap *Service) Start(*p2p.Server) error { - log.Info("Starting seed node service") + log.Info("Starting super node service") wg := new(sync.WaitGroup) payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize) quitChan := make(chan bool, 1) @@ -394,7 +399,7 @@ func (sap *Service) Start(*p2p.Server) error { // Stop is used to close down the service func (sap *Service) Stop() error { - log.Info("Stopping seed node service") + log.Info("Stopping super node service") close(sap.QuitChan) return nil } @@ -421,3 +426,7 @@ func (sap *Service) close() { } sap.Unlock() } + +func (sap *Service) GetPublisher() ipfs.IPLDPublisher { + return sap.Publisher +} diff --git a/pkg/seed_node/service_test.go b/pkg/super_node/service_test.go similarity index 88% rename from pkg/seed_node/service_test.go rename to pkg/super_node/service_test.go index f3a86345..e2cb4059 100644 --- a/pkg/seed_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node_test +package super_node_test import ( "sync" @@ -27,8 +27,8 @@ import ( mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" - mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks" ) var _ = Describe("Service", func() { @@ -55,7 +55,7 @@ var _ = Describe("Service", func() { ReturnIPLDPayload: mocks.MockIPLDPayload, ReturnErr: nil, } - processor := &seed_node.Service{ + processor := &super_node.Service{ Repository: mockCidRepo, Publisher: mockPublisher, Streamer: mockStreamer, @@ -70,7 +70,8 @@ var _ = Describe("Service", func() { quitChan <- true wg.Wait() Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) - Expect(mockCidRepo.PassedCIDPayload).To(Equal(mocks.MockCIDPayload)) + Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) + Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) }) diff --git a/pkg/seed_node/subscription.go b/pkg/super_node/subscription.go similarity index 90% rename from pkg/seed_node/subscription.go rename to pkg/super_node/subscription.go index 441d8d70..d7d91787 100644 --- a/pkg/seed_node/subscription.go +++ b/pkg/super_node/subscription.go @@ -14,14 +14,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" ) -// Subscription holds the information for an individual client subscription to the seed node +// Subscription holds the information for an individual client subscription to the super node type Subscription struct { - PayloadChan chan<- streamer.SeedNodePayload + PayloadChan chan<- streamer.SuperNodePayload QuitChan chan<- bool } diff --git a/pkg/seed_node/test_helpers.go b/pkg/super_node/test_helpers.go similarity index 99% rename from pkg/seed_node/test_helpers.go rename to pkg/super_node/test_helpers.go index 8c03a1eb..db5edd5d 100644 --- a/pkg/seed_node/test_helpers.go +++ b/pkg/super_node/test_helpers.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package seed_node +package super_node import ( "bytes"