diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go
index b373c9c3..710d66ce 100644
--- a/cmd/streamSubscribe.go
+++ b/cmd/streamSubscribe.go
@@ -38,9 +38,9 @@ import (
// streamSubscribeCmd represents the streamSubscribe command
var streamSubscribeCmd = &cobra.Command{
Use: "streamSubscribe",
- Short: "This command is used to subscribe to the seed node stream with the provided filters",
- Long: `This command is for demo and testing purposes and is used to subscribe to the seed node with the provided subscription configuration parameters.
-It does not do anything with the data streamed from the seed node other than unpack it and print it out for demonstration purposes.`,
+ Short: "This command is used to subscribe to the super node stream with the provided filters",
+ Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters.
+It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`,
Run: func(cmd *cobra.Command, args []string) {
streamSubscribe()
},
@@ -56,12 +56,12 @@ func streamSubscribe() {
// Create a new rpc client and a subscription streamer with that client
rpcClient := getRpcClient()
- str := streamer.NewSeedNodeStreamer(rpcClient)
+ str := streamer.NewSuperNodeStreamer(rpcClient)
// Buffered channel for reading subscription payloads
- payloadChan := make(chan streamer.SeedNodePayload, 20000)
+ payloadChan := make(chan streamer.SuperNodePayload, 20000)
- // Subscribe to the seed node service with the given config/filter parameters
+ // Subscribe to the super node service with the given config/filter parameters
sub, err := str.Stream(payloadChan, subscriptionConfig)
if err != nil {
log.Fatal(err)
@@ -217,7 +217,7 @@ func configureSubscription() {
func getRpcClient() core.RpcClient {
vulcPath := viper.GetString("subscription.path")
if vulcPath == "" {
- vulcPath = "ws://127.0.0.1:80" // default to and try the default ws url if no path is provided
+ vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
}
rawRpcClient, err := rpc.Dial(vulcPath)
if err != nil {
diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go
index c6a36b25..c68eecad 100644
--- a/cmd/syncAndPublish.go
+++ b/cmd/syncAndPublish.go
@@ -16,11 +16,13 @@
package cmd
import (
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"os"
"path/filepath"
syn "sync"
+ "time"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/spf13/viper"
@@ -51,44 +53,34 @@ it maintains a local index of the IPLD objects' CIDs in Postgres.`,
},
}
+var ipfsPath string
+
func init() {
rootCmd.AddCommand(syncAndPublishCmd)
}
func syncAndPublish() {
- blockChain, rpcClient := getBlockChainAndClient()
-
- db := utils.LoadPostgres(databaseConfig, blockChain.Node())
- quitChan := make(chan bool)
-
- ipfsPath := viper.GetString("client.ipfsPath")
- if ipfsPath == "" {
- home, err := os.UserHomeDir()
+ superNode, err := newSuperNode()
+ if err != nil {
+ log.Fatal(err)
+ }
+ wg := &syn.WaitGroup{}
+ err = superNode.SyncAndPublish(wg, nil, nil)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" {
+ backfiller := newBackFiller(superNode.GetPublisher())
if err != nil {
log.Fatal(err)
}
- ipfsPath = filepath.Join(home, ".ipfs")
- }
- workers := viper.GetInt("client.workers")
- if workers < 1 {
- workers = 1
- }
- processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
- if err != nil {
- log.Fatal(err)
- }
-
- wg := &syn.WaitGroup{}
- err = processor.SyncAndPublish(wg, nil, nil)
- if err != nil {
- log.Fatal(err)
+ backfiller.FillGaps(wg, nil)
}
wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through
}
-func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) {
- rawRpcClient, err := rpc.Dial(ipc)
-
+func getBlockChainAndClient(path string) (*geth.BlockChain, core.RpcClient) {
+ rawRpcClient, err := rpc.Dial(path)
if err != nil {
log.Fatal(err)
}
@@ -100,3 +92,35 @@ func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) {
blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
return blockChain, rpcClient
}
+
+func newSuperNode() (super_node.NodeInterface, error) {
+ blockChain, rpcClient := getBlockChainAndClient(ipc)
+ db := utils.LoadPostgres(databaseConfig, blockChain.Node())
+ quitChan := make(chan bool)
+ ipfsPath = viper.GetString("client.ipfsPath")
+ if ipfsPath == "" {
+ home, err := os.UserHomeDir()
+ if err != nil {
+ log.Fatal(err)
+ }
+ ipfsPath = filepath.Join(home, ".ipfs")
+ }
+ workers := viper.GetInt("client.workers")
+ if workers < 1 {
+ workers = 1
+ }
+ return super_node.NewSuperNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
+}
+
+func newBackFiller(ipfsPublisher ipfs.IPLDPublisher) super_node.BackFillInterface {
+ blockChain, archivalRpcClient := getBlockChainAndClient(viper.GetString("backfill.ipcPath"))
+ db := utils.LoadPostgres(databaseConfig, blockChain.Node())
+ freq := viper.GetInt("backfill.frequency")
+ var frequency time.Duration
+ if freq <= 0 {
+ frequency = time.Minute * 5
+ } else {
+ frequency = time.Duration(freq)
+ }
+ return super_node.NewBackFillService(ipfsPublisher, &db, archivalRpcClient, time.Minute*frequency)
+}
diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go
index b0ef1ad7..00e4f557 100644
--- a/cmd/syncPublishScreenAndServe.go
+++ b/cmd/syncPublishScreenAndServe.go
@@ -20,15 +20,12 @@ import (
"path/filepath"
syn "sync"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
-
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
- "github.com/vulcanize/vulcanizedb/utils"
)
// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command
@@ -50,24 +47,7 @@ func init() {
}
func syncPublishScreenAndServe() {
- blockChain, rpcClient := getBlockChainAndClient()
-
- db := utils.LoadPostgres(databaseConfig, blockChain.Node())
- quitChan := make(chan bool, 1)
-
- ipfsPath := viper.GetString("client.ipfsPath")
- if ipfsPath == "" {
- home, err := os.UserHomeDir()
- if err != nil {
- log.Fatal(err)
- }
- ipfsPath = filepath.Join(home, ".ipfs")
- }
- workers := viper.GetInt("client.workers")
- if workers < 1 {
- workers = 1
- }
- processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
+ superNode, err := newSuperNode()
if err != nil {
log.Fatal(err)
}
@@ -75,11 +55,18 @@ func syncPublishScreenAndServe() {
wg := &syn.WaitGroup{}
forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000)
forwardQuitChan := make(chan bool, 1)
- err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
+ err = superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
if err != nil {
log.Fatal(err)
}
- processor.ScreenAndServe(forwardPayloadChan, forwardQuitChan)
+ superNode.ScreenAndServe(forwardPayloadChan, forwardQuitChan)
+ if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" {
+ backfiller := newBackFiller(superNode.GetPublisher())
+ if err != nil {
+ log.Fatal(err)
+ }
+ backfiller.FillGaps(wg, nil)
+ }
var ipcPath string
ipcPath = viper.GetString("server.ipcPath")
@@ -90,7 +77,7 @@ func syncPublishScreenAndServe() {
}
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
}
- _, _, err = rpc.StartIPCEndpoint(ipcPath, processor.APIs())
+ _, _, err = rpc.StartIPCEndpoint(ipcPath, superNode.APIs())
if err != nil {
log.Fatal(err)
}
@@ -98,11 +85,11 @@ func syncPublishScreenAndServe() {
var wsEndpoint string
wsEndpoint = viper.GetString("server.wsEndpoint")
if wsEndpoint == "" {
- wsEndpoint = "127.0.0.1:80"
+ wsEndpoint = "127.0.0.1:8080"
}
var exposeAll = true
var wsOrigins []string = nil
- _, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll)
+ _, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll)
if err != nil {
log.Fatal(err)
}
diff --git a/dockerfiles/seed_node/Dockerfile b/dockerfiles/seed_node/Dockerfile
index 1772ff4e..82f6cb41 100644
--- a/dockerfiles/seed_node/Dockerfile
+++ b/dockerfiles/seed_node/Dockerfile
@@ -61,7 +61,7 @@ USER $USER
# chown first so dir is writable
# note: using $USER is merged, but not in the stable release yet
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$config_file config.toml
-COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node/startup_script.sh .
+COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node/startup_script.sh .
# keep binaries immutable
COPY --from=builder /go/src/github.com/vulcanize/vulcanizedb/vulcanizedb vulcanizedb
diff --git a/dockerfiles/seed_node/startup_script.sh b/dockerfiles/seed_node/startup_script.sh
index b1bb75b4..203035c0 100755
--- a/dockerfiles/seed_node/startup_script.sh
+++ b/dockerfiles/seed_node/startup_script.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-# Runs the db migrations and starts the seed node services
+# Runs the db migrations and starts the super node services
# Exit if the variable tests fail
set -e
diff --git a/documentation/seed-node.md b/documentation/super-node.md
similarity index 80%
rename from documentation/seed-node.md
rename to documentation/super-node.md
index 5b4227b1..927a83db 100644
--- a/documentation/seed-node.md
+++ b/documentation/super-node.md
@@ -103,7 +103,7 @@ There are two commands to choose from:
#### syncAndPublish
-`syncAndPublih` performs the functions of the seed node- syncing data from Geth, converting them to IPLDs,
+`syncAndPublih` performs the functions of the super node- syncing data from Geth, converting them to IPLDs,
publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata.
Usage:
@@ -149,16 +149,29 @@ The config file for the `syncPublishScreenAndServe` command has two additional f
[server]
ipcPath = "/Users/user/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:80"
+
+[backfill]
+ on = false
+ ipcPath = ""
+ frequency = 5
```
The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url
the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively.
-Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to be known by transformers for them to subscribe to the seed node.
+Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to
+be known by transformers for them to subscribe to the super node.
+
+Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential
+for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and
+anywhere throughout the sync if the processes are interrupted. The `backfill` config mapping is used to optionally configure
+the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps.
+`backfill.on` turns the backfill process on, the `backfill.ipcPath` is the rpc path for the archival geth node, and `backfill.frequency`
+sets at what frequency (in minutes) the backfill process checks for and fills in gaps.
## Dockerfile Setup
-The below provides step-by-step directions for how to setup the seed node using the provided Dockerfile on an AWS Linux AMI instance.
+The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance.
Note that the instance will need sufficient memory and storage for this to work.
1. Install basic dependencies
@@ -230,7 +243,7 @@ createdb vulcanize_public
8. Build and run the Docker image
```
-cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node
+cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node
docker build .
docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID}
```
@@ -241,8 +254,8 @@ docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_
A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer
specifies which subsets of the synced data it is interested in and the server will forward only these data.
-The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the seed-node feed, it subscribes with a set of parameters
-defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use seed-node data,
+The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the super-node feed, it subscribes with a set of parameters
+defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node data,
the shared/libraries/streamer can be used.
Usage:
@@ -294,39 +307,39 @@ The config for `streamSubscribe` has the `subscribe` set of parameters, for exam
intermediateNodes = false
```
-`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the seed-node over
-(the `server.ipcPath` or `server.wsEndpoint` that the seed-node has defined in their config file).
+`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the super-node over
+(the `server.ipcPath` or `server.wsEndpoint` that the super-node has defined in their config file).
-`subscription.backfill` specifies whether or not the seed-node should look up historical data in its cache and
-send that to the subscriber, if this is set to `false` then the seed-node only forwards newly synced/incoming data.
+`subscription.backfill` specifies whether or not the super-node should look up historical data in its cache and
+send that to the subscriber, if this is set to `false` then the super-node only forwards newly synced/incoming data.
-`subscription.backfillOnly` will tell the seed-node to only send historical data and not stream incoming data going forward.
+`subscription.backfillOnly` will tell the super-node to only send historical data and not stream incoming data going forward.
`subscription.startingBlock` is the starting block number for the range we want to receive data in.
`subscription.endingBlock` is the ending block number for the range we want to receive data in;
setting to 0 means there is no end/we will continue indefinitely.
-`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the seed-node to
-not send any headers to the subscriber; setting `finalOnly` to true tells the seed-node to send only canonical headers.
+`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the super-node to
+not send any headers to the subscriber; setting `finalOnly` to true tells the super-node to send only canonical headers.
-`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the seed-node to
+`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to
not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for,
-if they have any addresses then the seed-node will only send transactions that were sent or received by the addresses contained
+if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained
in `src` and `dst`, respectively.
-`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the seed-node to
+`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the super-node to
not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for,
-if it has any topics then the seed-node will only send receipts that contain logs which have that topic0.
+if it has any topics then the super-node will only send receipts that contain logs which have that topic0.
-`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the seed-node to
+`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to
not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for,
-if it has any addresses then the seed-node will only send state leafs (accounts) corresponding to those account addresses. By default the seed-node
+if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node
only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
-`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the seed-node to
+`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to
not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for,
-if it has any addresses then the seed-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string
+if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string
array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas
-the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the seed-node
+the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node
only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
diff --git a/environments/seedNodeSubscription.toml b/environments/seedNodeSubscription.toml
index 3f899939..d5142282 100644
--- a/environments/seedNodeSubscription.toml
+++ b/environments/seedNodeSubscription.toml
@@ -1,5 +1,5 @@
[subscription]
- path = "ws://127.0.0.1:80"
+ path = "ws://seed0.20c.com:8080"
backfill = true
backfillOnly = false
startingBlock = 0
diff --git a/environments/syncPublishScreenAndServe.toml b/environments/syncPublishScreenAndServe.toml
index 4e8cafa9..fb939d37 100644
--- a/environments/syncPublishScreenAndServe.toml
+++ b/environments/syncPublishScreenAndServe.toml
@@ -11,3 +11,8 @@
[server]
ipcPath = "/root/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:8080"
+
+[backfill]
+ on = false
+ ipcPath = ""
+ frequency = 5
diff --git a/libraries/shared/streamer/seed_node_streamer.go b/libraries/shared/streamer/seed_node_streamer.go
index 8f33d007..2a61675b 100644
--- a/libraries/shared/streamer/seed_node_streamer.go
+++ b/libraries/shared/streamer/seed_node_streamer.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-// Streamer is used by watchers to stream eth data from a vulcanizedb seed node
+// Streamer is used by watchers to stream eth data from a vulcanizedb super node
package streamer
import (
@@ -28,30 +28,30 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
)
-// ISeedNodeStreamer is the interface for streaming SeedNodePayloads from a vulcanizeDB seed node
-type ISeedNodeStreamer interface {
- Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
+// ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node
+type ISuperNodeStreamer interface {
+ Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
}
-// SeedNodeStreamer is the underlying struct for the ISeedNodeStreamer interface
-type SeedNodeStreamer struct {
+// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface
+type SuperNodeStreamer struct {
Client core.RpcClient
}
-// NewSeedNodeStreamer creates a pointer to a new SeedNodeStreamer which satisfies the ISeedNodeStreamer interface
-func NewSeedNodeStreamer(client core.RpcClient) *SeedNodeStreamer {
- return &SeedNodeStreamer{
+// NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface
+func NewSuperNodeStreamer(client core.RpcClient) *SuperNodeStreamer {
+ return &SuperNodeStreamer{
Client: client,
}
}
-// Stream is the main loop for subscribing to data from a vulcanizedb seed node
-func (sds *SeedNodeStreamer) Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
- return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters)
+// Stream is the main loop for subscribing to data from a vulcanizedb super node
+func (sds *SuperNodeStreamer) Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
+ return sds.Client.Subscribe("vdb", payloadChan, "stream", streamFilters)
}
-// Payload holds the data returned from the seed node to the requesting client
-type SeedNodePayload struct {
+// Payload holds the data returned from the super node to the requesting client
+type SuperNodePayload struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
@@ -65,20 +65,20 @@ type SeedNodePayload struct {
err error
}
-func (sd *SeedNodePayload) ensureEncoded() {
+func (sd *SuperNodePayload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
-func (sd *SeedNodePayload) Length() int {
+func (sd *SuperNodePayload) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
-func (sd *SeedNodePayload) Encode() ([]byte, error) {
+func (sd *SuperNodePayload) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}
diff --git a/libraries/shared/transformer/seed_node_transformer.go b/libraries/shared/transformer/seed_node_transformer.go
index bc9beb01..0564b60d 100644
--- a/libraries/shared/transformer/seed_node_transformer.go
+++ b/libraries/shared/transformer/seed_node_transformer.go
@@ -22,10 +22,10 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
-type SeedNodeTransformer interface {
+type SuperNodeTransformer interface {
Init() error
Execute() error
GetConfig() config.Subscription
}
-type SeedNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SeedNodeTransformer
+type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SuperNodeTransformer
diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go
index 6fcfad85..e6a160fb 100644
--- a/pkg/config/subscription.go
+++ b/pkg/config/subscription.go
@@ -18,7 +18,7 @@ package config
import "math/big"
-// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node
+// Subscription config is used by a subscribing transformer to specifiy which data to receive from the super node
type Subscription struct {
BackFill bool
BackFillOnly bool
diff --git a/pkg/ipfs/mocks/converter.go b/pkg/ipfs/mocks/converter.go
index 06462807..0aaa9b53 100644
--- a/pkg/ipfs/mocks/converter.go
+++ b/pkg/ipfs/mocks/converter.go
@@ -17,6 +17,8 @@
package mocks
import (
+ "fmt"
+
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
@@ -34,3 +36,22 @@ func (pc *PayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayloa
pc.PassedStatediffPayload = payload
return pc.ReturnIPLDPayload, pc.ReturnErr
}
+
+// IterativePayloadConverter is the underlying struct for the Converter interface
+type IterativePayloadConverter struct {
+ PassedStatediffPayload []statediff.Payload
+ ReturnIPLDPayload []*ipfs.IPLDPayload
+ ReturnErr error
+ iteration int
+}
+
+// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
+func (pc *IterativePayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayload, error) {
+ pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, payload)
+ if len(pc.PassedStatediffPayload) < pc.iteration+1 {
+ return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration)
+ }
+ returnPayload := pc.ReturnIPLDPayload[pc.iteration]
+ pc.iteration++
+ return returnPayload, pc.ReturnErr
+}
diff --git a/pkg/ipfs/mocks/publisher.go b/pkg/ipfs/mocks/publisher.go
index 3ee14b9b..ce31968a 100644
--- a/pkg/ipfs/mocks/publisher.go
+++ b/pkg/ipfs/mocks/publisher.go
@@ -17,6 +17,7 @@
package mocks
import (
+ "fmt"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
@@ -32,3 +33,22 @@ func (pub *IPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload,
pub.PassedIPLDPayload = payload
return pub.ReturnCIDPayload, pub.ReturnErr
}
+
+// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
+type IterativeIPLDPublisher struct {
+ PassedIPLDPayload []*ipfs.IPLDPayload
+ ReturnCIDPayload []*ipfs.CIDPayload
+ ReturnErr error
+ iteration int
+}
+
+// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
+func (pub *IterativeIPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload, error) {
+ pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, payload)
+ if len(pub.ReturnCIDPayload) < pub.iteration+1 {
+ return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration)
+ }
+ returnPayload := pub.ReturnCIDPayload[pub.iteration]
+ pub.iteration++
+ return returnPayload, pub.ReturnErr
+}
diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go
index fe2936d0..2f6c25ff 100644
--- a/pkg/ipfs/mocks/test_data.go
+++ b/pkg/ipfs/mocks/test_data.go
@@ -309,7 +309,7 @@ var (
},
}
- MockSeeNodePayload = streamer.SeedNodePayload{
+ MockSeeNodePayload = streamer.SuperNodePayload{
BlockNumber: big.NewInt(1),
HeadersRlp: [][]byte{MockHeaderRlp},
TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},
diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go
index b5d9d37d..0178edc7 100644
--- a/pkg/ipfs/resolver.go
+++ b/pkg/ipfs/resolver.go
@@ -24,7 +24,7 @@ import (
// IPLDResolver is the interface to resolving IPLDs
type IPLDResolver interface {
- ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error)
+ ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error)
}
// EthIPLDResolver is the underlying struct to support the IPLDResolver interface
@@ -36,8 +36,8 @@ func NewIPLDResolver() *EthIPLDResolver {
}
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
-func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error) {
- response := &streamer.SeedNodePayload{
+func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) {
+ response := &streamer.SuperNodePayload{
BlockNumber: ipfsBlocks.BlockNumber,
StateNodesRlp: make(map[common.Hash][]byte),
StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte),
@@ -51,42 +51,42 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedN
return *response, nil
}
-func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.HeadersRlp = append(response.HeadersRlp, raw)
}
}
-func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.UnclesRlp = append(response.UnclesRlp, raw)
}
}
-func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.TransactionsRlp = append(response.TransactionsRlp, raw)
}
}
-func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.ReceiptsRlp = append(response.ReceiptsRlp, raw)
}
}
-func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) {
for key, block := range blocks {
raw := block.RawData()
response.StateNodesRlp[key] = raw
}
}
-func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) {
+func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) {
for stateKey, storageBlocks := range blocks {
response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte)
for storageKey, storageVal := range storageBlocks {
diff --git a/pkg/ipfs/resolver_test.go b/pkg/ipfs/resolver_test.go
index 3f8a4163..1c07f388 100644
--- a/pkg/ipfs/resolver_test.go
+++ b/pkg/ipfs/resolver_test.go
@@ -22,7 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
)
var (
@@ -35,19 +35,19 @@ var _ = Describe("Resolver", func() {
resolver = ipfs.NewIPLDResolver()
})
It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() {
- seedNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper)
+ superNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper)
Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
- Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
- Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
- Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue())
- Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2))
- Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
+ Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
+ Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
+ Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
+ Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue())
+ Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2))
+ Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
})
})
})
diff --git a/pkg/seed_node/filterer_test.go b/pkg/seed_node/filterer_test.go
deleted file mode 100644
index cb99ebeb..00000000
--- a/pkg/seed_node/filterer_test.go
+++ /dev/null
@@ -1,153 +0,0 @@
-// VulcanizeDB
-// Copyright © 2019 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package seed_node_test
-
-import (
- "bytes"
-
- "github.com/ethereum/go-ethereum/core/types"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
-)
-
-var (
- filterer seed_node.ResponseFilterer
- expectedRctForStorageRLP1 []byte
- expectedRctForStorageRLP2 []byte
-)
-
-var _ = Describe("Filterer", func() {
- Describe("FilterResponse", func() {
- BeforeEach(func() {
- filterer = seed_node.NewResponseFilterer()
- expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0)
- expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1)
- })
-
- It("Transcribes all the data from the IPLDPayload into the SeedNodePayload if given an open filter", func() {
- seedNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
- Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
- Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
- Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
- Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2))
- Expect(seedNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
- Expect(seedNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes))
- Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
- })
-
- It("Applies filters from the provided config.Subscription", func() {
- seedNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload1.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload1.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload1.TransactionsRlp)).To(Equal(0))
- Expect(len(seedNodePayload1.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload1.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload1.ReceiptsRlp)).To(Equal(1))
- Expect(seedNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
-
- seedNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload2.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload2.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload2.TransactionsRlp)).To(Equal(0))
- Expect(len(seedNodePayload2.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload2.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload2.ReceiptsRlp)).To(Equal(1))
- Expect(seedNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
-
- seedNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload3.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload3.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload3.TransactionsRlp)).To(Equal(0))
- Expect(len(seedNodePayload3.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload3.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload3.ReceiptsRlp)).To(Equal(1))
- Expect(seedNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
-
- seedNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload4.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload4.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload4.TransactionsRlp)).To(Equal(0))
- Expect(len(seedNodePayload4.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload4.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload4.ReceiptsRlp)).To(Equal(1))
- Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
-
- seedNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload5.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload5.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload5.TransactionsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
- Expect(len(seedNodePayload5.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload5.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload5.ReceiptsRlp)).To(Equal(2))
- Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
- Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
-
- seedNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload6.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload6.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload6.TransactionsRlp)).To(Equal(1))
- Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
- Expect(len(seedNodePayload6.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload6.StateNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload6.ReceiptsRlp)).To(Equal(1))
- Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
-
- seedNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload)
- Expect(err).ToNot(HaveOccurred())
- Expect(seedNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
- Expect(len(seedNodePayload7.HeadersRlp)).To(Equal(0))
- Expect(len(seedNodePayload7.UnclesRlp)).To(Equal(0))
- Expect(len(seedNodePayload7.TransactionsRlp)).To(Equal(0))
- Expect(len(seedNodePayload7.StorageNodesRlp)).To(Equal(0))
- Expect(len(seedNodePayload7.ReceiptsRlp)).To(Equal(0))
- Expect(len(seedNodePayload7.StateNodesRlp)).To(Equal(1))
- Expect(seedNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
- })
- })
-})
-
-func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte {
- receiptForStorage := (*types.ReceiptForStorage)(receipts[i])
- receiptBuffer := new(bytes.Buffer)
- err := receiptForStorage.EncodeRLP(receiptBuffer)
- Expect(err).ToNot(HaveOccurred())
- return receiptBuffer.Bytes()
-}
diff --git a/pkg/seed_node/api.go b/pkg/super_node/api.go
similarity index 78%
rename from pkg/seed_node/api.go
rename to pkg/super_node/api.go
index 623bcb9a..c3ba5a3f 100644
--- a/pkg/seed_node/api.go
+++ b/pkg/super_node/api.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"context"
@@ -28,25 +28,25 @@ import (
)
// APIName is the namespace used for the state diffing service API
-const APIName = "vulcanizedb"
+const APIName = "vdb"
// APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1"
-// PublicSeedNodeAPI is the public api for the seed node
-type PublicSeedNodeAPI struct {
+// PublicSuperNodeAPI is the public api for the super node
+type PublicSuperNodeAPI struct {
sni NodeInterface
}
-// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process
-func NewPublicSeedNodeAPI(seedNodeInterface NodeInterface) *PublicSeedNodeAPI {
- return &PublicSeedNodeAPI{
- sni: seedNodeInterface,
+// NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
+func NewPublicSuperNodeAPI(superNodeInterface NodeInterface) *PublicSuperNodeAPI {
+ return &PublicSuperNodeAPI{
+ sni: superNodeInterface,
}
}
// Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created
-func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) {
+func (api *PublicSuperNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
@@ -58,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
go func() {
// subscribe to events from the SyncPublishScreenAndServe service
- payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize)
+ payloadChannel := make(chan streamer.SuperNodePayload, payloadChanBufferSize)
quitChan := make(chan bool, 1)
go api.sni.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters)
@@ -84,7 +84,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
return rpcSub, nil
}
-// Node is a public rpc method to allow transformers to fetch the Geth node info for the seed node
-func (api *PublicSeedNodeAPI) Node() core.Node {
+// Node is a public rpc method to allow transformers to fetch the Geth node info for the super node
+func (api *PublicSuperNodeAPI) Node() core.Node {
return api.sni.Node()
}
diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go
new file mode 100644
index 00000000..324a2cc7
--- /dev/null
+++ b/pkg/super_node/backfiller.go
@@ -0,0 +1,135 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package super_node
+
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/params"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
+ "github.com/vulcanize/vulcanizedb/pkg/core"
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs"
+)
+
+// BackFillInterface for filling in gaps in the super node
+type BackFillInterface interface {
+ // Method for the super node to periodically check for and fill in gaps in its data using an archival node
+ FillGaps(wg *sync.WaitGroup, quitChan <-chan bool)
+}
+
+// BackFillService for filling in gaps in the super node
+type BackFillService struct {
+ // Interface for converting statediff payloads into ETH-IPLD object payloads
+ Converter ipfs.PayloadConverter
+ // Interface for publishing the ETH-IPLD payloads to IPFS
+ Publisher ipfs.IPLDPublisher
+ // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
+ Repository CIDRepository
+ // Interface for searching and retrieving CIDs from Postgres index
+ Retriever CIDRetriever
+ // State-diff fetcher; needs to be configured with an archival core.RpcClient
+ StateDiffFetcher fetcher.IStateDiffFetcher
+ // Check frequency
+ GapCheckFrequency time.Duration
+}
+
+// NewBackFillService returns a new BackFillInterface
+func NewBackFillService(ipfsPublisher ipfs.IPLDPublisher, db *postgres.DB, archivalNodeRpcClient core.RpcClient, freq time.Duration) BackFillInterface {
+ return &BackFillService{
+ Repository: NewCIDRepository(db),
+ Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig),
+ Publisher: ipfsPublisher,
+ Retriever: NewCIDRetriever(db),
+ StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRpcClient),
+ GapCheckFrequency: freq,
+ }
+}
+
+// FillGaps periodically checks for and fills in gaps in the super node db
+// this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed
+func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) {
+ ticker := time.NewTicker(bfs.GapCheckFrequency)
+ wg.Add(1)
+
+ go func() {
+ for {
+ select {
+ case <-quitChan:
+ log.Info("quiting FillGaps process")
+ wg.Done()
+ return
+ case <-ticker.C:
+ log.Info("searching for gaps in the super node database")
+ startingBlock, firstBlockErr := bfs.Retriever.RetrieveFirstBlockNumber()
+ if firstBlockErr != nil {
+ log.Error(firstBlockErr)
+ continue
+ }
+ if startingBlock != 1 {
+ startingGap := [2]int64{
+ 1,
+ startingBlock - 1,
+ }
+ log.Info("found gap at the beginning of the sync")
+ bfs.fillGaps(startingGap)
+ }
+
+ gaps, gapErr := bfs.Retriever.RetrieveGapsInData()
+ if gapErr != nil {
+ log.Error(gapErr)
+ continue
+ }
+ for _, gap := range gaps {
+ bfs.fillGaps(gap)
+ }
+ }
+ }
+ }()
+}
+
+func (bfs *BackFillService) fillGaps(gap [2]int64) {
+ log.Infof("filling in gap from block %d to block %d", gap[0], gap[1])
+ blockHeights := make([]uint64, 0, gap[1]-gap[0]+1)
+ for i := gap[0]; i <= gap[1]; i++ {
+ blockHeights = append(blockHeights, uint64(i))
+ }
+ payloads, fetchErr := bfs.StateDiffFetcher.FetchStateDiffsAt(blockHeights)
+ if fetchErr != nil {
+ log.Error(fetchErr)
+ return
+ }
+ for _, payload := range payloads {
+ ipldPayload, convertErr := bfs.Converter.Convert(*payload)
+ if convertErr != nil {
+ log.Error(convertErr)
+ continue
+ }
+ cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload)
+ if publishErr != nil {
+ log.Error(publishErr)
+ continue
+ }
+ indexErr := bfs.Repository.Index(cidPayload)
+ if indexErr != nil {
+ log.Error(indexErr)
+ }
+ }
+}
diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go
new file mode 100644
index 00000000..20fb3e74
--- /dev/null
+++ b/pkg/super_node/backfiller_test.go
@@ -0,0 +1,187 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package super_node_test
+
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/statediff"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+ mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks"
+)
+
+var _ = Describe("BackFiller", func() {
+ Describe("FillGaps", func() {
+ It("Periodically checks for and fills in gaps in the super node's data", func() {
+ mockCidRepo := &mocks3.CIDRepository{
+ ReturnErr: nil,
+ }
+ mockPublisher := &mocks.IterativeIPLDPublisher{
+ ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
+ ReturnErr: nil,
+ }
+ mockConverter := &mocks.IterativePayloadConverter{
+ ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
+ ReturnErr: nil,
+ }
+ mockRetriever := &mocks3.MockCIDRetriever{
+ FirstBlockNumberToReturn: 1,
+ GapsToRetrieve: [][2]int64{
+ {
+ 100, 101,
+ },
+ },
+ }
+ mockFetcher := &mocks2.MockStateDiffFetcher{
+ StateDiffsToReturn: map[uint64]*statediff.Payload{
+ 100: &mocks.MockStateDiffPayload,
+ 101: &mocks.MockStateDiffPayload,
+ },
+ }
+ backfiller := &super_node.BackFillService{
+ Repository: mockCidRepo,
+ Publisher: mockPublisher,
+ Converter: mockConverter,
+ StateDiffFetcher: mockFetcher,
+ Retriever: mockRetriever,
+ GapCheckFrequency: time.Second * 10,
+ }
+ wg := &sync.WaitGroup{}
+ quitChan := make(chan bool, 1)
+ backfiller.FillGaps(wg, quitChan)
+ time.Sleep(time.Second * 15)
+ quitChan <- true
+ Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
+ Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
+ Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
+ Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
+ Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
+ Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload))
+ Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
+ Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
+ Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
+ Expect(mockRetriever.CalledTimes).To(Equal(1))
+ Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
+ Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100, 101}))
+ })
+
+ It("Works for single block `ranges`", func() {
+ mockCidRepo := &mocks3.CIDRepository{
+ ReturnErr: nil,
+ }
+ mockPublisher := &mocks.IterativeIPLDPublisher{
+ ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload},
+ ReturnErr: nil,
+ }
+ mockConverter := &mocks.IterativePayloadConverter{
+ ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload},
+ ReturnErr: nil,
+ }
+ mockRetriever := &mocks3.MockCIDRetriever{
+ FirstBlockNumberToReturn: 1,
+ GapsToRetrieve: [][2]int64{
+ {
+ 100, 100,
+ },
+ },
+ }
+ mockFetcher := &mocks2.MockStateDiffFetcher{
+ StateDiffsToReturn: map[uint64]*statediff.Payload{
+ 100: &mocks.MockStateDiffPayload,
+ },
+ }
+ backfiller := &super_node.BackFillService{
+ Repository: mockCidRepo,
+ Publisher: mockPublisher,
+ Converter: mockConverter,
+ StateDiffFetcher: mockFetcher,
+ Retriever: mockRetriever,
+ GapCheckFrequency: time.Second * 10,
+ }
+ wg := &sync.WaitGroup{}
+ quitChan := make(chan bool, 1)
+ backfiller.FillGaps(wg, quitChan)
+ time.Sleep(time.Second * 15)
+ quitChan <- true
+ Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
+ Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
+ Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1))
+ Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
+ Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1))
+ Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
+ Expect(mockRetriever.CalledTimes).To(Equal(1))
+ Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
+ Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100}))
+ })
+
+ It("Finds beginning gap", func() {
+ mockCidRepo := &mocks3.CIDRepository{
+ ReturnErr: nil,
+ }
+ mockPublisher := &mocks.IterativeIPLDPublisher{
+ ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
+ ReturnErr: nil,
+ }
+ mockConverter := &mocks.IterativePayloadConverter{
+ ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
+ ReturnErr: nil,
+ }
+ mockRetriever := &mocks3.MockCIDRetriever{
+ FirstBlockNumberToReturn: 3,
+ GapsToRetrieve: [][2]int64{},
+ }
+ mockFetcher := &mocks2.MockStateDiffFetcher{
+ StateDiffsToReturn: map[uint64]*statediff.Payload{
+ 1: &mocks.MockStateDiffPayload,
+ 2: &mocks.MockStateDiffPayload,
+ },
+ }
+ backfiller := &super_node.BackFillService{
+ Repository: mockCidRepo,
+ Publisher: mockPublisher,
+ Converter: mockConverter,
+ StateDiffFetcher: mockFetcher,
+ Retriever: mockRetriever,
+ GapCheckFrequency: time.Second * 10,
+ }
+ wg := &sync.WaitGroup{}
+ quitChan := make(chan bool, 1)
+ backfiller.FillGaps(wg, quitChan)
+ time.Sleep(time.Second * 15)
+ quitChan <- true
+ Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
+ Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
+ Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
+ Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
+ Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
+ Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload))
+ Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
+ Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
+ Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
+ Expect(mockRetriever.CalledTimes).To(Equal(1))
+ Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
+ Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{1, 2}))
+ })
+ })
+})
diff --git a/pkg/seed_node/filterer.go b/pkg/super_node/filterer.go
similarity index 91%
rename from pkg/seed_node/filterer.go
rename to pkg/super_node/filterer.go
index 0f93e97c..e63c1880 100644
--- a/pkg/seed_node/filterer.go
+++ b/pkg/super_node/filterer.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"bytes"
@@ -30,7 +30,7 @@ import (
// ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseFilterer interface {
- FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error)
+ FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error)
}
// Filterer is the underlying struct for the ResponseFilterer interface
@@ -42,33 +42,33 @@ func NewResponseFilterer() *Filterer {
}
// FilterResponse is used to filter through eth data to extract and package requested data into a Payload
-func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error) {
- response := new(streamer.SeedNodePayload)
+func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) {
+ response := new(streamer.SuperNodePayload)
err := s.filterHeaders(streamFilters, response, payload)
if err != nil {
- return streamer.SeedNodePayload{}, err
+ return streamer.SuperNodePayload{}, err
}
txHashes, err := s.filterTransactions(streamFilters, response, payload)
if err != nil {
- return streamer.SeedNodePayload{}, err
+ return streamer.SuperNodePayload{}, err
}
err = s.filerReceipts(streamFilters, response, payload, txHashes)
if err != nil {
- return streamer.SeedNodePayload{}, err
+ return streamer.SuperNodePayload{}, err
}
err = s.filterState(streamFilters, response, payload)
if err != nil {
- return streamer.SeedNodePayload{}, err
+ return streamer.SuperNodePayload{}, err
}
err = s.filterStorage(streamFilters, response, payload)
if err != nil {
- return streamer.SeedNodePayload{}, err
+ return streamer.SuperNodePayload{}, err
}
response.BlockNumber = payload.BlockNumber
return *response, nil
}
-func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
+func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly {
@@ -91,7 +91,7 @@ func checkRange(start, end, actual int64) bool {
return false
}
-func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) {
+func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, trx := range payload.BlockBody.Transactions {
@@ -127,7 +127,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false
}
-func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error {
+func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) {
@@ -186,7 +186,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac
return false
}
-func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
+func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
@@ -218,7 +218,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false
}
-func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
+func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))
diff --git a/pkg/super_node/filterer_test.go b/pkg/super_node/filterer_test.go
new file mode 100644
index 00000000..a1c0fabd
--- /dev/null
+++ b/pkg/super_node/filterer_test.go
@@ -0,0 +1,153 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package super_node_test
+
+import (
+ "bytes"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+)
+
+var (
+ filterer super_node.ResponseFilterer
+ expectedRctForStorageRLP1 []byte
+ expectedRctForStorageRLP2 []byte
+)
+
+var _ = Describe("Filterer", func() {
+ Describe("FilterResponse", func() {
+ BeforeEach(func() {
+ filterer = super_node.NewResponseFilterer()
+ expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0)
+ expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1)
+ })
+
+ It("Transcribes all the data from the IPLDPayload into the SuperNodePayload if given an open filter", func() {
+ superNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
+ Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
+ Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
+ Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
+ Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2))
+ Expect(superNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
+ Expect(superNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes))
+ Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
+ })
+
+ It("Applies filters from the provided config.Subscription", func() {
+ superNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload1.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload1.TransactionsRlp)).To(Equal(0))
+ Expect(len(superNodePayload1.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload1.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload1.ReceiptsRlp)).To(Equal(1))
+ Expect(superNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
+
+ superNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload2.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload2.TransactionsRlp)).To(Equal(0))
+ Expect(len(superNodePayload2.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload2.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload2.ReceiptsRlp)).To(Equal(1))
+ Expect(superNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
+
+ superNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload3.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload3.TransactionsRlp)).To(Equal(0))
+ Expect(len(superNodePayload3.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload3.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload3.ReceiptsRlp)).To(Equal(1))
+ Expect(superNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
+
+ superNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload4.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload4.TransactionsRlp)).To(Equal(0))
+ Expect(len(superNodePayload4.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload4.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload4.ReceiptsRlp)).To(Equal(1))
+ Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
+
+ superNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload5.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload5.TransactionsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
+ Expect(len(superNodePayload5.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload5.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload5.ReceiptsRlp)).To(Equal(2))
+ Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
+ Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
+
+ superNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload6.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload6.TransactionsRlp)).To(Equal(1))
+ Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
+ Expect(len(superNodePayload6.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload6.StateNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload6.ReceiptsRlp)).To(Equal(1))
+ Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
+
+ superNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
+ Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0))
+ Expect(len(superNodePayload7.UnclesRlp)).To(Equal(0))
+ Expect(len(superNodePayload7.TransactionsRlp)).To(Equal(0))
+ Expect(len(superNodePayload7.StorageNodesRlp)).To(Equal(0))
+ Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0))
+ Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1))
+ Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
+ })
+ })
+})
+
+func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte {
+ receiptForStorage := (*types.ReceiptForStorage)(receipts[i])
+ receiptBuffer := new(bytes.Buffer)
+ err := receiptForStorage.EncodeRLP(receiptBuffer)
+ Expect(err).ToNot(HaveOccurred())
+ return receiptBuffer.Bytes()
+}
diff --git a/pkg/seed_node/mocks/repository.go b/pkg/super_node/mocks/repository.go
similarity index 90%
rename from pkg/seed_node/mocks/repository.go
rename to pkg/super_node/mocks/repository.go
index dc06ccc9..4c37f468 100644
--- a/pkg/seed_node/mocks/repository.go
+++ b/pkg/super_node/mocks/repository.go
@@ -20,12 +20,12 @@ import "github.com/vulcanize/vulcanizedb/pkg/ipfs"
// CIDRepository is the underlying struct for the Repository interface
type CIDRepository struct {
- PassedCIDPayload *ipfs.CIDPayload
+ PassedCIDPayload []*ipfs.CIDPayload
ReturnErr error
}
// Index indexes a cidPayload in Postgres
func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error {
- repo.PassedCIDPayload = cidPayload
+ repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload)
return repo.ReturnErr
}
diff --git a/pkg/super_node/mocks/retriever.go b/pkg/super_node/mocks/retriever.go
new file mode 100644
index 00000000..2fafe933
--- /dev/null
+++ b/pkg/super_node/mocks/retriever.go
@@ -0,0 +1,44 @@
+package mocks
+
+import (
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs"
+)
+
+// MockCIDRetriever is a mock CID retriever for use in tests
+type MockCIDRetriever struct {
+ GapsToRetrieve [][2]int64
+ GapsToRetrieveErr error
+ CalledTimes int
+ FirstBlockNumberToReturn int64
+ RetrieveFirstBlockNumberErr error
+}
+
+// RetrieveCIDs mock method
+func (*MockCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) {
+ panic("implement me")
+}
+
+// RetrieveLastBlockNumber mock method
+func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
+ panic("implement me")
+}
+
+// RetrieveFirstBlockNumber mock method
+func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
+ return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr
+}
+
+// RetrieveGapsInData mock method
+func (mcr *MockCIDRetriever) RetrieveGapsInData() ([][2]int64, error) {
+ mcr.CalledTimes++
+ return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr
+}
+
+// SetGapsToRetrieve mock method
+func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]int64) {
+ if mcr.GapsToRetrieve == nil {
+ mcr.GapsToRetrieve = make([][2]int64, 0)
+ }
+ mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...)
+}
diff --git a/pkg/seed_node/repository.go b/pkg/super_node/repository.go
similarity index 99%
rename from pkg/seed_node/repository.go
rename to pkg/super_node/repository.go
index f6efacee..ea4fecf6 100644
--- a/pkg/seed_node/repository.go
+++ b/pkg/super_node/repository.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"github.com/jmoiron/sqlx"
diff --git a/pkg/seed_node/repository_test.go b/pkg/super_node/repository_test.go
similarity index 88%
rename from pkg/seed_node/repository_test.go
rename to pkg/super_node/repository_test.go
index 9eb6bdbb..816797d6 100644
--- a/pkg/seed_node/repository_test.go
+++ b/pkg/super_node/repository_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node_test
+package super_node_test
import (
. "github.com/onsi/ginkgo"
@@ -23,23 +23,23 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
)
var (
db *postgres.DB
err error
- repo seed_node.CIDRepository
+ repo super_node.CIDRepository
)
var _ = Describe("Repository", func() {
BeforeEach(func() {
- db, err = seed_node.SetupDB()
+ db, err = super_node.SetupDB()
Expect(err).ToNot(HaveOccurred())
- repo = seed_node.NewCIDRepository(db)
+ repo = super_node.NewCIDRepository(db)
})
AfterEach(func() {
- seed_node.TearDownDB(db)
+ super_node.TearDownDB(db)
})
Describe("Index", func() {
@@ -61,8 +61,8 @@ var _ = Describe("Repository", func() {
err = db.Select(&trxs, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(trxs)).To(Equal(2))
- Expect(seed_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue())
- Expect(seed_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue())
+ Expect(super_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue())
+ Expect(super_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue())
// check receipts were properly indexed
rcts := make([]string, 0)
pgStr = `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
@@ -72,8 +72,8 @@ var _ = Describe("Repository", func() {
err = db.Select(&rcts, pgStr, 1)
Expect(err).ToNot(HaveOccurred())
Expect(len(rcts)).To(Equal(2))
- Expect(seed_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue())
- Expect(seed_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue())
+ Expect(super_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue())
+ Expect(super_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue())
// check that state nodes were properly indexed
stateNodes := make([]ipfs.StateNodeCID, 0)
pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
diff --git a/pkg/seed_node/retriever.go b/pkg/super_node/retriever.go
similarity index 92%
rename from pkg/seed_node/retriever.go
rename to pkg/super_node/retriever.go
index 9799ef29..36dfef47 100644
--- a/pkg/seed_node/retriever.go
+++ b/pkg/super_node/retriever.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"math/big"
@@ -33,6 +33,7 @@ type CIDRetriever interface {
RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error)
RetrieveLastBlockNumber() (int64, error)
RetrieveFirstBlockNumber() (int64, error)
+ RetrieveGapsInData() ([][2]int64, error)
}
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
@@ -303,3 +304,26 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi
err := tx.Select(&storageNodeCIDs, pgStr, args...)
return storageNodeCIDs, err
}
+
+type gap struct {
+ Start int64 `db:"start"`
+ Stop int64 `db:"stop"`
+}
+
+func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) {
+ pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM header_cids
+ LEFT JOIN header_cids r on header_cids.block_number = r.block_number - 1
+ LEFT JOIN header_cids fr on header_cids.block_number < fr.block_number
+ WHERE r.block_number is NULL and fr.block_number IS NOT NULL
+ GROUP BY header_cids.block_number, r.block_number`
+ gaps := make([]gap, 0)
+ err := ecr.db.Select(&gaps, pgStr)
+ if err != nil {
+ return nil, err
+ }
+ gapRanges := make([][2]int64, 0)
+ for _, gap := range gaps {
+ gapRanges = append(gapRanges, [2]int64{gap.Start, gap.Stop})
+ }
+ return gapRanges, nil
+}
diff --git a/pkg/seed_node/retriever_test.go b/pkg/super_node/retriever_test.go
similarity index 90%
rename from pkg/seed_node/retriever_test.go
rename to pkg/super_node/retriever_test.go
index 5789fc5c..de38ed65 100644
--- a/pkg/seed_node/retriever_test.go
+++ b/pkg/super_node/retriever_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node_test
+package super_node_test
import (
"math/big"
@@ -25,11 +25,11 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
)
var (
- retriever seed_node.CIDRetriever
+ retriever super_node.CIDRetriever
openFilter = config.Subscription{
StartingBlock: big.NewInt(0),
EndingBlock: big.NewInt(1),
@@ -178,15 +178,15 @@ var (
var _ = Describe("Retriever", func() {
BeforeEach(func() {
- db, err = seed_node.SetupDB()
+ db, err = super_node.SetupDB()
Expect(err).ToNot(HaveOccurred())
- repo = seed_node.NewCIDRepository(db)
+ repo = super_node.NewCIDRepository(db)
err = repo.Index(mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
- retriever = seed_node.NewCIDRetriever(db)
+ retriever = super_node.NewCIDRetriever(db)
})
AfterEach(func() {
- seed_node.TearDownDB(db)
+ super_node.TearDownDB(db)
})
Describe("RetrieveCIDs", func() {
@@ -197,11 +197,11 @@ var _ = Describe("Retriever", func() {
Expect(len(cidWrapper.Headers)).To(Equal(1))
Expect(cidWrapper.Headers).To(Equal(mocks.MockCIDWrapper.Headers))
Expect(len(cidWrapper.Transactions)).To(Equal(2))
- Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue())
- Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue())
Expect(len(cidWrapper.Receipts)).To(Equal(2))
- Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue())
- Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue())
Expect(len(cidWrapper.StateNodes)).To(Equal(2))
for _, stateNode := range cidWrapper.StateNodes {
if stateNode.CID == "mockStateCID1" {
@@ -265,13 +265,13 @@ var _ = Describe("Retriever", func() {
Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper5.Headers)).To(Equal(0))
Expect(len(cidWrapper5.Transactions)).To(Equal(2))
- Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue())
- Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue())
Expect(len(cidWrapper5.StateNodes)).To(Equal(0))
Expect(len(cidWrapper5.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper5.Receipts)).To(Equal(2))
- Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue())
- Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue())
+ Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue())
cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred())
diff --git a/pkg/seed_node/seed_node_suite_test.go b/pkg/super_node/seed_node_suite_test.go
similarity index 94%
rename from pkg/seed_node/seed_node_suite_test.go
rename to pkg/super_node/seed_node_suite_test.go
index 533d3664..83b84e4b 100644
--- a/pkg/seed_node/seed_node_suite_test.go
+++ b/pkg/super_node/seed_node_suite_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node_test
+package super_node_test
import (
"io/ioutil"
@@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus"
)
-func TestSeedNode(t *testing.T) {
+func TestSuperNode(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Seed Node Suite Test")
}
diff --git a/pkg/seed_node/service.go b/pkg/super_node/service.go
similarity index 87%
rename from pkg/seed_node/service.go
rename to pkg/super_node/service.go
index 4588f5be..28feb405 100644
--- a/pkg/seed_node/service.go
+++ b/pkg/super_node/service.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"sync"
@@ -36,7 +36,9 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
-const payloadChanBufferSize = 20000 // the max eth sub buffer size
+const (
+ payloadChanBufferSize = 20000 // the max eth sub buffer size
+)
// NodeInterface is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
@@ -49,14 +51,17 @@ type NodeInterface interface {
// Main event loop for handling client pub-sub
ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool)
// Method to subscribe to receive state diff processing output
- Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription)
+ Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID)
// Method to access the Geth node info for this service
Node() core.Node
+ // Method used to retrieve the underlying IPFS publisher for this service, so that is can be used for backfilling
+ // This is needed because it's not possible to initialize two ipfs nodes at the same path
+ GetPublisher() ipfs.IPLDPublisher
}
-// Service is the underlying struct for the SyncAndPublish interface
+// Service is the underlying struct for the super node
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
@@ -71,7 +76,7 @@ type Service struct {
// Interface for filtering and serving data according to subscribed clients according to their specification
Filterer ResponseFilterer
// Interface for fetching ETH-IPLD objects from IPFS
- Fetcher ipfs.IPLDFetcher
+ IPLDFetcher ipfs.IPLDFetcher
// Interface for searching and retrieving CIDs from Postgres index
Retriever CIDRetriever
// Interface for resolving ipfs blocks to their data types
@@ -86,17 +91,17 @@ type Service struct {
SubscriptionTypes map[common.Hash]config.Subscription
// Number of workers
WorkerPoolSize int
- // Info for the Geth node that this seed node is working with
+ // Info for the Geth node that this super node is working with
gethNode core.Node
}
-// NewSeedNode creates a new seed_node.Interface using an underlying seed_node.Service struct
-func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) {
+// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
+func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) {
publisher, err := ipfs.NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
}
- fetcher, err := ipfs.NewIPLDFetcher(ipfsPath)
+ ipldFetcher, err := ipfs.NewIPLDFetcher(ipfsPath)
if err != nil {
return nil, err
}
@@ -106,7 +111,7 @@ func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc
Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig),
Publisher: publisher,
Filterer: NewResponseFilterer(),
- Fetcher: fetcher,
+ IPLDFetcher: ipldFetcher,
Retriever: NewCIDRetriever(db),
Resolver: ipfs.NewIPLDResolver(),
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
@@ -123,13 +128,13 @@ func (sap *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
-// APIs returns the RPC descriptors the StateDiffingService offers
+// APIs returns the RPC descriptors the super node service offers
func (sap *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: APIName,
Version: APIVersion,
- Service: NewPublicSeedNodeAPI(sap),
+ Service: NewPublicSuperNodeAPI(sap),
Public: true,
},
}
@@ -256,7 +261,7 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error {
for id, sub := range subs {
select {
case sub.PayloadChan <- response:
- log.Infof("sending seed node payload to subscription %s", id)
+ log.Infof("sending super node payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
}
@@ -267,8 +272,8 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error {
}
// Subscribe is used by the API to subscribe to the service loop
-func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) {
- log.Info("Subscribing to the seed node service")
+func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription) {
+ log.Info("Subscribing to the super node service")
// Subscription type is defined as the hash of its content
// Group subscriptions by type and screen payloads once for subs of the same type
by, err := rlp.EncodeToBytes(streamFilters)
@@ -305,7 +310,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
var err error
startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber()
if err != nil {
- sub.PayloadChan <- streamer.SeedNodePayload{
+ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "unable to set block range start; error: " + err.Error(),
}
}
@@ -314,7 +319,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
}
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil {
- sub.PayloadChan <- streamer.SeedNodePayload{
+ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "unable to set block range end; error: " + err.Error(),
}
}
@@ -327,10 +332,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
// the blocknumbers in the payloads they receive to keep things in order
// TODO: separate backfill into a different rpc subscription method altogether?
go func() {
- for i := con.StartingBlock.Int64(); i <= endingBlock; i++ {
+ for i := startingBlock; i <= endingBlock; i++ {
cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i)
if err != nil {
- sub.PayloadChan <- streamer.SeedNodePayload{
+ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "CID retrieval error: " + err.Error(),
}
continue
@@ -338,10 +343,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
if ipfs.EmptyCIDWrapper(*cidWrapper) {
continue
}
- blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper)
+ blocksWrapper, err := sap.IPLDFetcher.FetchCIDs(*cidWrapper)
if err != nil {
log.Error(err)
- sub.PayloadChan <- streamer.SeedNodePayload{
+ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "IPLD fetching error: " + err.Error(),
}
continue
@@ -349,14 +354,14 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper)
if err != nil {
log.Error(err)
- sub.PayloadChan <- streamer.SeedNodePayload{
+ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "IPLD resolving error: " + err.Error(),
}
continue
}
select {
case sub.PayloadChan <- backFillIplds:
- log.Infof("sending seed node back-fill payload to subscription %s", id)
+ log.Infof("sending super node back-fill payload to subscription %s", id)
default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
}
@@ -366,7 +371,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
// Unsubscribe is used to unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) {
- log.Info("Unsubscribing from the seed node service")
+ log.Info("Unsubscribing from the super node service")
sap.Lock()
for ty := range sap.Subscriptions {
delete(sap.Subscriptions[ty], id)
@@ -381,7 +386,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
// Start is used to begin the service
func (sap *Service) Start(*p2p.Server) error {
- log.Info("Starting seed node service")
+ log.Info("Starting super node service")
wg := new(sync.WaitGroup)
payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize)
quitChan := make(chan bool, 1)
@@ -394,7 +399,7 @@ func (sap *Service) Start(*p2p.Server) error {
// Stop is used to close down the service
func (sap *Service) Stop() error {
- log.Info("Stopping seed node service")
+ log.Info("Stopping super node service")
close(sap.QuitChan)
return nil
}
@@ -421,3 +426,7 @@ func (sap *Service) close() {
}
sap.Unlock()
}
+
+func (sap *Service) GetPublisher() ipfs.IPLDPublisher {
+ return sap.Publisher
+}
diff --git a/pkg/seed_node/service_test.go b/pkg/super_node/service_test.go
similarity index 88%
rename from pkg/seed_node/service_test.go
rename to pkg/super_node/service_test.go
index f3a86345..e2cb4059 100644
--- a/pkg/seed_node/service_test.go
+++ b/pkg/super_node/service_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node_test
+package super_node_test
import (
"sync"
@@ -27,8 +27,8 @@ import (
mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
- "github.com/vulcanize/vulcanizedb/pkg/seed_node"
- mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+ mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks"
)
var _ = Describe("Service", func() {
@@ -55,7 +55,7 @@ var _ = Describe("Service", func() {
ReturnIPLDPayload: mocks.MockIPLDPayload,
ReturnErr: nil,
}
- processor := &seed_node.Service{
+ processor := &super_node.Service{
Repository: mockCidRepo,
Publisher: mockPublisher,
Streamer: mockStreamer,
@@ -70,7 +70,8 @@ var _ = Describe("Service", func() {
quitChan <- true
wg.Wait()
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
- Expect(mockCidRepo.PassedCIDPayload).To(Equal(mocks.MockCIDPayload))
+ Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
+ Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
})
diff --git a/pkg/seed_node/subscription.go b/pkg/super_node/subscription.go
similarity index 90%
rename from pkg/seed_node/subscription.go
rename to pkg/super_node/subscription.go
index 441d8d70..d7d91787 100644
--- a/pkg/seed_node/subscription.go
+++ b/pkg/super_node/subscription.go
@@ -14,14 +14,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
)
-// Subscription holds the information for an individual client subscription to the seed node
+// Subscription holds the information for an individual client subscription to the super node
type Subscription struct {
- PayloadChan chan<- streamer.SeedNodePayload
+ PayloadChan chan<- streamer.SuperNodePayload
QuitChan chan<- bool
}
diff --git a/pkg/seed_node/test_helpers.go b/pkg/super_node/test_helpers.go
similarity index 99%
rename from pkg/seed_node/test_helpers.go
rename to pkg/super_node/test_helpers.go
index 8c03a1eb..db5edd5d 100644
--- a/pkg/seed_node/test_helpers.go
+++ b/pkg/super_node/test_helpers.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package seed_node
+package super_node
import (
"bytes"