seed => super; port 80 => port 8080; backfill process for the super_node

This commit is contained in:
Ian Norden 2019-10-02 09:10:37 -05:00
parent 83fd76bc8a
commit 40c3aff597
33 changed files with 843 additions and 373 deletions

View File

@ -38,9 +38,9 @@ import (
// streamSubscribeCmd represents the streamSubscribe command // streamSubscribeCmd represents the streamSubscribe command
var streamSubscribeCmd = &cobra.Command{ var streamSubscribeCmd = &cobra.Command{
Use: "streamSubscribe", Use: "streamSubscribe",
Short: "This command is used to subscribe to the seed node stream with the provided filters", Short: "This command is used to subscribe to the super node stream with the provided filters",
Long: `This command is for demo and testing purposes and is used to subscribe to the seed node with the provided subscription configuration parameters. Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters.
It does not do anything with the data streamed from the seed node other than unpack it and print it out for demonstration purposes.`, It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
streamSubscribe() streamSubscribe()
}, },
@ -56,12 +56,12 @@ func streamSubscribe() {
// Create a new rpc client and a subscription streamer with that client // Create a new rpc client and a subscription streamer with that client
rpcClient := getRpcClient() rpcClient := getRpcClient()
str := streamer.NewSeedNodeStreamer(rpcClient) str := streamer.NewSuperNodeStreamer(rpcClient)
// Buffered channel for reading subscription payloads // Buffered channel for reading subscription payloads
payloadChan := make(chan streamer.SeedNodePayload, 20000) payloadChan := make(chan streamer.SuperNodePayload, 20000)
// Subscribe to the seed node service with the given config/filter parameters // Subscribe to the super node service with the given config/filter parameters
sub, err := str.Stream(payloadChan, subscriptionConfig) sub, err := str.Stream(payloadChan, subscriptionConfig)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -217,7 +217,7 @@ func configureSubscription() {
func getRpcClient() core.RpcClient { func getRpcClient() core.RpcClient {
vulcPath := viper.GetString("subscription.path") vulcPath := viper.GetString("subscription.path")
if vulcPath == "" { if vulcPath == "" {
vulcPath = "ws://127.0.0.1:80" // default to and try the default ws url if no path is provided vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
} }
rawRpcClient, err := rpc.Dial(vulcPath) rawRpcClient, err := rpc.Dial(vulcPath)
if err != nil { if err != nil {

View File

@ -16,11 +16,13 @@
package cmd package cmd
import ( import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"os" "os"
"path/filepath" "path/filepath"
syn "sync" syn "sync"
"time"
"github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -51,44 +53,34 @@ it maintains a local index of the IPLD objects' CIDs in Postgres.`,
}, },
} }
var ipfsPath string
func init() { func init() {
rootCmd.AddCommand(syncAndPublishCmd) rootCmd.AddCommand(syncAndPublishCmd)
} }
func syncAndPublish() { func syncAndPublish() {
blockChain, rpcClient := getBlockChainAndClient() superNode, err := newSuperNode()
if err != nil {
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) log.Fatal(err)
quitChan := make(chan bool) }
wg := &syn.WaitGroup{}
ipfsPath := viper.GetString("client.ipfsPath") err = superNode.SyncAndPublish(wg, nil, nil)
if ipfsPath == "" { if err != nil {
home, err := os.UserHomeDir() log.Fatal(err)
}
if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" {
backfiller := newBackFiller(superNode.GetPublisher())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
ipfsPath = filepath.Join(home, ".ipfs") backfiller.FillGaps(wg, nil)
}
workers := viper.GetInt("client.workers")
if workers < 1 {
workers = 1
}
processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
if err != nil {
log.Fatal(err)
}
wg := &syn.WaitGroup{}
err = processor.SyncAndPublish(wg, nil, nil)
if err != nil {
log.Fatal(err)
} }
wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through
} }
func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) { func getBlockChainAndClient(path string) (*geth.BlockChain, core.RpcClient) {
rawRpcClient, err := rpc.Dial(ipc) rawRpcClient, err := rpc.Dial(path)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -100,3 +92,35 @@ func getBlockChainAndClient() (*geth.BlockChain, core.RpcClient) {
blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) blockChain := geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
return blockChain, rpcClient return blockChain, rpcClient
} }
func newSuperNode() (super_node.NodeInterface, error) {
blockChain, rpcClient := getBlockChainAndClient(ipc)
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
quitChan := make(chan bool)
ipfsPath = viper.GetString("client.ipfsPath")
if ipfsPath == "" {
home, err := os.UserHomeDir()
if err != nil {
log.Fatal(err)
}
ipfsPath = filepath.Join(home, ".ipfs")
}
workers := viper.GetInt("client.workers")
if workers < 1 {
workers = 1
}
return super_node.NewSuperNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
}
func newBackFiller(ipfsPublisher ipfs.IPLDPublisher) super_node.BackFillInterface {
blockChain, archivalRpcClient := getBlockChainAndClient(viper.GetString("backfill.ipcPath"))
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
freq := viper.GetInt("backfill.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Minute * 5
} else {
frequency = time.Duration(freq)
}
return super_node.NewBackFillService(ipfsPublisher, &db, archivalRpcClient, time.Minute*frequency)
}

View File

@ -20,15 +20,12 @@ import (
"path/filepath" "path/filepath"
syn "sync" syn "sync"
"github.com/vulcanize/vulcanizedb/pkg/seed_node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/utils"
) )
// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command // syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command
@ -50,24 +47,7 @@ func init() {
} }
func syncPublishScreenAndServe() { func syncPublishScreenAndServe() {
blockChain, rpcClient := getBlockChainAndClient() superNode, err := newSuperNode()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
quitChan := make(chan bool, 1)
ipfsPath := viper.GetString("client.ipfsPath")
if ipfsPath == "" {
home, err := os.UserHomeDir()
if err != nil {
log.Fatal(err)
}
ipfsPath = filepath.Join(home, ".ipfs")
}
workers := viper.GetInt("client.workers")
if workers < 1 {
workers = 1
}
processor, err := seed_node.NewSeedNode(ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -75,11 +55,18 @@ func syncPublishScreenAndServe() {
wg := &syn.WaitGroup{} wg := &syn.WaitGroup{}
forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000) forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000)
forwardQuitChan := make(chan bool, 1) forwardQuitChan := make(chan bool, 1)
err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) err = superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
processor.ScreenAndServe(forwardPayloadChan, forwardQuitChan) superNode.ScreenAndServe(forwardPayloadChan, forwardQuitChan)
if viper.GetBool("backfill.on") && viper.GetString("backfill.ipcPath") != "" {
backfiller := newBackFiller(superNode.GetPublisher())
if err != nil {
log.Fatal(err)
}
backfiller.FillGaps(wg, nil)
}
var ipcPath string var ipcPath string
ipcPath = viper.GetString("server.ipcPath") ipcPath = viper.GetString("server.ipcPath")
@ -90,7 +77,7 @@ func syncPublishScreenAndServe() {
} }
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
} }
_, _, err = rpc.StartIPCEndpoint(ipcPath, processor.APIs()) _, _, err = rpc.StartIPCEndpoint(ipcPath, superNode.APIs())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -98,11 +85,11 @@ func syncPublishScreenAndServe() {
var wsEndpoint string var wsEndpoint string
wsEndpoint = viper.GetString("server.wsEndpoint") wsEndpoint = viper.GetString("server.wsEndpoint")
if wsEndpoint == "" { if wsEndpoint == "" {
wsEndpoint = "127.0.0.1:80" wsEndpoint = "127.0.0.1:8080"
} }
var exposeAll = true var exposeAll = true
var wsOrigins []string = nil var wsOrigins []string = nil
_, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll) _, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -61,7 +61,7 @@ USER $USER
# chown first so dir is writable # chown first so dir is writable
# note: using $USER is merged, but not in the stable release yet # note: using $USER is merged, but not in the stable release yet
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$config_file config.toml COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$config_file config.toml
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node/startup_script.sh . COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node/startup_script.sh .
# keep binaries immutable # keep binaries immutable
COPY --from=builder /go/src/github.com/vulcanize/vulcanizedb/vulcanizedb vulcanizedb COPY --from=builder /go/src/github.com/vulcanize/vulcanizedb/vulcanizedb vulcanizedb

View File

@ -1,5 +1,5 @@
#!/bin/sh #!/bin/sh
# Runs the db migrations and starts the seed node services # Runs the db migrations and starts the super node services
# Exit if the variable tests fail # Exit if the variable tests fail
set -e set -e

View File

@ -103,7 +103,7 @@ There are two commands to choose from:
#### syncAndPublish #### syncAndPublish
`syncAndPublih` performs the functions of the seed node- syncing data from Geth, converting them to IPLDs, `syncAndPublih` performs the functions of the super node- syncing data from Geth, converting them to IPLDs,
publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata. publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata.
Usage: Usage:
@ -149,16 +149,29 @@ The config file for the `syncPublishScreenAndServe` command has two additional f
[server] [server]
ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" ipcPath = "/Users/user/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:80" wsEndpoint = "127.0.0.1:80"
[backfill]
on = false
ipcPath = ""
frequency = 5
``` ```
The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url
the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively. the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively.
Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to be known by transformers for them to subscribe to the seed node. Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to
be known by transformers for them to subscribe to the super node.
Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential
for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and
anywhere throughout the sync if the processes are interrupted. The `backfill` config mapping is used to optionally configure
the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps.
`backfill.on` turns the backfill process on, the `backfill.ipcPath` is the rpc path for the archival geth node, and `backfill.frequency`
sets at what frequency (in minutes) the backfill process checks for and fills in gaps.
## Dockerfile Setup ## Dockerfile Setup
The below provides step-by-step directions for how to setup the seed node using the provided Dockerfile on an AWS Linux AMI instance. The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance.
Note that the instance will need sufficient memory and storage for this to work. Note that the instance will need sufficient memory and storage for this to work.
1. Install basic dependencies 1. Install basic dependencies
@ -230,7 +243,7 @@ createdb vulcanize_public
8. Build and run the Docker image 8. Build and run the Docker image
``` ```
cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/seed_node cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node
docker build . docker build .
docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID} docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID}
``` ```
@ -241,8 +254,8 @@ docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_
A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer
specifies which subsets of the synced data it is interested in and the server will forward only these data. specifies which subsets of the synced data it is interested in and the server will forward only these data.
The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the seed-node feed, it subscribes with a set of parameters The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the super-node feed, it subscribes with a set of parameters
defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use seed-node data, defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node data,
the shared/libraries/streamer can be used. the shared/libraries/streamer can be used.
Usage: Usage:
@ -294,39 +307,39 @@ The config for `streamSubscribe` has the `subscribe` set of parameters, for exam
intermediateNodes = false intermediateNodes = false
``` ```
`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the seed-node over `subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the super-node over
(the `server.ipcPath` or `server.wsEndpoint` that the seed-node has defined in their config file). (the `server.ipcPath` or `server.wsEndpoint` that the super-node has defined in their config file).
`subscription.backfill` specifies whether or not the seed-node should look up historical data in its cache and `subscription.backfill` specifies whether or not the super-node should look up historical data in its cache and
send that to the subscriber, if this is set to `false` then the seed-node only forwards newly synced/incoming data. send that to the subscriber, if this is set to `false` then the super-node only forwards newly synced/incoming data.
`subscription.backfillOnly` will tell the seed-node to only send historical data and not stream incoming data going forward. `subscription.backfillOnly` will tell the super-node to only send historical data and not stream incoming data going forward.
`subscription.startingBlock` is the starting block number for the range we want to receive data in. `subscription.startingBlock` is the starting block number for the range we want to receive data in.
`subscription.endingBlock` is the ending block number for the range we want to receive data in; `subscription.endingBlock` is the ending block number for the range we want to receive data in;
setting to 0 means there is no end/we will continue indefinitely. setting to 0 means there is no end/we will continue indefinitely.
`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the seed-node to `subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the super-node to
not send any headers to the subscriber; setting `finalOnly` to true tells the seed-node to send only canonical headers. not send any headers to the subscriber; setting `finalOnly` to true tells the super-node to send only canonical headers.
`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the seed-node to `subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to
not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for,
if they have any addresses then the seed-node will only send transactions that were sent or received by the addresses contained if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained
in `src` and `dst`, respectively. in `src` and `dst`, respectively.
`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the seed-node to `subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the super-node to
not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for,
if it has any topics then the seed-node will only send receipts that contain logs which have that topic0. if it has any topics then the super-node will only send receipts that contain logs which have that topic0.
`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the seed-node to `subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to
not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for,
if it has any addresses then the seed-node will only send state leafs (accounts) corresponding to those account addresses. By default the seed-node if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node
only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.
`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the seed-node to `subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to
not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for,
if it has any addresses then the seed-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string
array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas
the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the seed-node the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node
only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`.

View File

@ -1,5 +1,5 @@
[subscription] [subscription]
path = "ws://127.0.0.1:80" path = "ws://seed0.20c.com:8080"
backfill = true backfill = true
backfillOnly = false backfillOnly = false
startingBlock = 0 startingBlock = 0

View File

@ -11,3 +11,8 @@
[server] [server]
ipcPath = "/root/.vulcanize/vulcanize.ipc" ipcPath = "/root/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:8080" wsEndpoint = "127.0.0.1:8080"
[backfill]
on = false
ipcPath = ""
frequency = 5

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
// Streamer is used by watchers to stream eth data from a vulcanizedb seed node // Streamer is used by watchers to stream eth data from a vulcanizedb super node
package streamer package streamer
import ( import (
@ -28,30 +28,30 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
// ISeedNodeStreamer is the interface for streaming SeedNodePayloads from a vulcanizeDB seed node // ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node
type ISeedNodeStreamer interface { type ISuperNodeStreamer interface {
Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
} }
// SeedNodeStreamer is the underlying struct for the ISeedNodeStreamer interface // SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface
type SeedNodeStreamer struct { type SuperNodeStreamer struct {
Client core.RpcClient Client core.RpcClient
} }
// NewSeedNodeStreamer creates a pointer to a new SeedNodeStreamer which satisfies the ISeedNodeStreamer interface // NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface
func NewSeedNodeStreamer(client core.RpcClient) *SeedNodeStreamer { func NewSuperNodeStreamer(client core.RpcClient) *SuperNodeStreamer {
return &SeedNodeStreamer{ return &SuperNodeStreamer{
Client: client, Client: client,
} }
} }
// Stream is the main loop for subscribing to data from a vulcanizedb seed node // Stream is the main loop for subscribing to data from a vulcanizedb super node
func (sds *SeedNodeStreamer) Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) { func (sds *SuperNodeStreamer) Stream(payloadChan chan SuperNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters) return sds.Client.Subscribe("vdb", payloadChan, "stream", streamFilters)
} }
// Payload holds the data returned from the seed node to the requesting client // Payload holds the data returned from the super node to the requesting client
type SeedNodePayload struct { type SuperNodePayload struct {
BlockNumber *big.Int `json:"blockNumber"` BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"` HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"` UnclesRlp [][]byte `json:"unclesRlp"`
@ -65,20 +65,20 @@ type SeedNodePayload struct {
err error err error
} }
func (sd *SeedNodePayload) ensureEncoded() { func (sd *SuperNodePayload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil { if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd) sd.encoded, sd.err = json.Marshal(sd)
} }
} }
// Length to implement Encoder interface for StateDiff // Length to implement Encoder interface for StateDiff
func (sd *SeedNodePayload) Length() int { func (sd *SuperNodePayload) Length() int {
sd.ensureEncoded() sd.ensureEncoded()
return len(sd.encoded) return len(sd.encoded)
} }
// Encode to implement Encoder interface for StateDiff // Encode to implement Encoder interface for StateDiff
func (sd *SeedNodePayload) Encode() ([]byte, error) { func (sd *SuperNodePayload) Encode() ([]byte, error) {
sd.ensureEncoded() sd.ensureEncoded()
return sd.encoded, sd.err return sd.encoded, sd.err
} }

View File

@ -22,10 +22,10 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
type SeedNodeTransformer interface { type SuperNodeTransformer interface {
Init() error Init() error
Execute() error Execute() error
GetConfig() config.Subscription GetConfig() config.Subscription
} }
type SeedNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SeedNodeTransformer type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SuperNodeTransformer

View File

@ -18,7 +18,7 @@ package config
import "math/big" import "math/big"
// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node // Subscription config is used by a subscribing transformer to specifiy which data to receive from the super node
type Subscription struct { type Subscription struct {
BackFill bool BackFill bool
BackFillOnly bool BackFillOnly bool

View File

@ -17,6 +17,8 @@
package mocks package mocks
import ( import (
"fmt"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
@ -34,3 +36,22 @@ func (pc *PayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayloa
pc.PassedStatediffPayload = payload pc.PassedStatediffPayload = payload
return pc.ReturnIPLDPayload, pc.ReturnErr return pc.ReturnIPLDPayload, pc.ReturnErr
} }
// IterativePayloadConverter is the underlying struct for the Converter interface
type IterativePayloadConverter struct {
PassedStatediffPayload []statediff.Payload
ReturnIPLDPayload []*ipfs.IPLDPayload
ReturnErr error
iteration int
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *IterativePayloadConverter) Convert(payload statediff.Payload) (*ipfs.IPLDPayload, error) {
pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, payload)
if len(pc.PassedStatediffPayload) < pc.iteration+1 {
return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration)
}
returnPayload := pc.ReturnIPLDPayload[pc.iteration]
pc.iteration++
return returnPayload, pc.ReturnErr
}

View File

@ -17,6 +17,7 @@
package mocks package mocks
import ( import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
) )
@ -32,3 +33,22 @@ func (pub *IPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload,
pub.PassedIPLDPayload = payload pub.PassedIPLDPayload = payload
return pub.ReturnCIDPayload, pub.ReturnErr return pub.ReturnCIDPayload, pub.ReturnErr
} }
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
type IterativeIPLDPublisher struct {
PassedIPLDPayload []*ipfs.IPLDPayload
ReturnCIDPayload []*ipfs.CIDPayload
ReturnErr error
iteration int
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload *ipfs.IPLDPayload) (*ipfs.CIDPayload, error) {
pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, payload)
if len(pub.ReturnCIDPayload) < pub.iteration+1 {
return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration)
}
returnPayload := pub.ReturnCIDPayload[pub.iteration]
pub.iteration++
return returnPayload, pub.ReturnErr
}

View File

@ -309,7 +309,7 @@ var (
}, },
} }
MockSeeNodePayload = streamer.SeedNodePayload{ MockSeeNodePayload = streamer.SuperNodePayload{
BlockNumber: big.NewInt(1), BlockNumber: big.NewInt(1),
HeadersRlp: [][]byte{MockHeaderRlp}, HeadersRlp: [][]byte{MockHeaderRlp},
TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},

View File

@ -24,7 +24,7 @@ import (
// IPLDResolver is the interface to resolving IPLDs // IPLDResolver is the interface to resolving IPLDs
type IPLDResolver interface { type IPLDResolver interface {
ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error)
} }
// EthIPLDResolver is the underlying struct to support the IPLDResolver interface // EthIPLDResolver is the underlying struct to support the IPLDResolver interface
@ -36,8 +36,8 @@ func NewIPLDResolver() *EthIPLDResolver {
} }
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedNodePayload, error) { func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SuperNodePayload, error) {
response := &streamer.SeedNodePayload{ response := &streamer.SuperNodePayload{
BlockNumber: ipfsBlocks.BlockNumber, BlockNumber: ipfsBlocks.BlockNumber,
StateNodesRlp: make(map[common.Hash][]byte), StateNodesRlp: make(map[common.Hash][]byte),
StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte), StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte),
@ -51,42 +51,42 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (streamer.SeedN
return *response, nil return *response, nil
} }
func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks { for _, block := range blocks {
raw := block.RawData() raw := block.RawData()
response.HeadersRlp = append(response.HeadersRlp, raw) response.HeadersRlp = append(response.HeadersRlp, raw)
} }
} }
func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks { for _, block := range blocks {
raw := block.RawData() raw := block.RawData()
response.UnclesRlp = append(response.UnclesRlp, raw) response.UnclesRlp = append(response.UnclesRlp, raw)
} }
} }
func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks { for _, block := range blocks {
raw := block.RawData() raw := block.RawData()
response.TransactionsRlp = append(response.TransactionsRlp, raw) response.TransactionsRlp = append(response.TransactionsRlp, raw)
} }
} }
func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks { for _, block := range blocks {
raw := block.RawData() raw := block.RawData()
response.ReceiptsRlp = append(response.ReceiptsRlp, raw) response.ReceiptsRlp = append(response.ReceiptsRlp, raw)
} }
} }
func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) {
for key, block := range blocks { for key, block := range blocks {
raw := block.RawData() raw := block.RawData()
response.StateNodesRlp[key] = raw response.StateNodesRlp[key] = raw
} }
} }
func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) { func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) {
for stateKey, storageBlocks := range blocks { for stateKey, storageBlocks := range blocks {
response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte)
for storageKey, storageVal := range storageBlocks { for storageKey, storageVal := range storageBlocks {

View File

@ -22,7 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
) )
var ( var (
@ -35,19 +35,19 @@ var _ = Describe("Resolver", func() {
resolver = ipfs.NewIPLDResolver() resolver = ipfs.NewIPLDResolver()
}) })
It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() {
seedNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper) superNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2)) Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2)) Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue())
Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2)) Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2))
Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
}) })
}) })
}) })

View File

@ -1,153 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node_test
import (
"bytes"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/seed_node"
)
var (
filterer seed_node.ResponseFilterer
expectedRctForStorageRLP1 []byte
expectedRctForStorageRLP2 []byte
)
var _ = Describe("Filterer", func() {
Describe("FilterResponse", func() {
BeforeEach(func() {
filterer = seed_node.NewResponseFilterer()
expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0)
expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1)
})
It("Transcribes all the data from the IPLDPayload into the SeedNodePayload if given an open filter", func() {
seedNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2))
Expect(seedNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
Expect(seedNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes))
Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
})
It("Applies filters from the provided config.Subscription", func() {
seedNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload1.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload1.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload1.TransactionsRlp)).To(Equal(0))
Expect(len(seedNodePayload1.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload1.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload1.ReceiptsRlp)).To(Equal(1))
Expect(seedNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
seedNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload2.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload2.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload2.TransactionsRlp)).To(Equal(0))
Expect(len(seedNodePayload2.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload2.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload2.ReceiptsRlp)).To(Equal(1))
Expect(seedNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
seedNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload3.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload3.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload3.TransactionsRlp)).To(Equal(0))
Expect(len(seedNodePayload3.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload3.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload3.ReceiptsRlp)).To(Equal(1))
Expect(seedNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
seedNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload4.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload4.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload4.TransactionsRlp)).To(Equal(0))
Expect(len(seedNodePayload4.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload4.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload4.ReceiptsRlp)).To(Equal(1))
Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
seedNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload5.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload5.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload5.TransactionsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(seedNodePayload5.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload5.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload5.ReceiptsRlp)).To(Equal(2))
Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
seedNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload6.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload6.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload6.TransactionsRlp)).To(Equal(1))
Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(seedNodePayload6.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload6.StateNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload6.ReceiptsRlp)).To(Equal(1))
Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
seedNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(seedNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(seedNodePayload7.HeadersRlp)).To(Equal(0))
Expect(len(seedNodePayload7.UnclesRlp)).To(Equal(0))
Expect(len(seedNodePayload7.TransactionsRlp)).To(Equal(0))
Expect(len(seedNodePayload7.StorageNodesRlp)).To(Equal(0))
Expect(len(seedNodePayload7.ReceiptsRlp)).To(Equal(0))
Expect(len(seedNodePayload7.StateNodesRlp)).To(Equal(1))
Expect(seedNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
})
})
})
func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte {
receiptForStorage := (*types.ReceiptForStorage)(receipts[i])
receiptBuffer := new(bytes.Buffer)
err := receiptForStorage.EncodeRLP(receiptBuffer)
Expect(err).ToNot(HaveOccurred())
return receiptBuffer.Bytes()
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"context" "context"
@ -28,25 +28,25 @@ import (
) )
// APIName is the namespace used for the state diffing service API // APIName is the namespace used for the state diffing service API
const APIName = "vulcanizedb" const APIName = "vdb"
// APIVersion is the version of the state diffing service API // APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1" const APIVersion = "0.0.1"
// PublicSeedNodeAPI is the public api for the seed node // PublicSuperNodeAPI is the public api for the super node
type PublicSeedNodeAPI struct { type PublicSuperNodeAPI struct {
sni NodeInterface sni NodeInterface
} }
// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process // NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
func NewPublicSeedNodeAPI(seedNodeInterface NodeInterface) *PublicSeedNodeAPI { func NewPublicSuperNodeAPI(superNodeInterface NodeInterface) *PublicSuperNodeAPI {
return &PublicSeedNodeAPI{ return &PublicSuperNodeAPI{
sni: seedNodeInterface, sni: superNodeInterface,
} }
} }
// Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created // Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created
func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) { func (api *PublicSuperNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
@ -58,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
go func() { go func() {
// subscribe to events from the SyncPublishScreenAndServe service // subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize) payloadChannel := make(chan streamer.SuperNodePayload, payloadChanBufferSize)
quitChan := make(chan bool, 1) quitChan := make(chan bool, 1)
go api.sni.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) go api.sni.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters)
@ -84,7 +84,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
return rpcSub, nil return rpcSub, nil
} }
// Node is a public rpc method to allow transformers to fetch the Geth node info for the seed node // Node is a public rpc method to allow transformers to fetch the Geth node info for the super node
func (api *PublicSeedNodeAPI) Node() core.Node { func (api *PublicSuperNodeAPI) Node() core.Node {
return api.sni.Node() return api.sni.Node()
} }

View File

@ -0,0 +1,135 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package super_node
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/params"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
// BackFillInterface for filling in gaps in the super node
type BackFillInterface interface {
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
FillGaps(wg *sync.WaitGroup, quitChan <-chan bool)
}
// BackFillService for filling in gaps in the super node
type BackFillService struct {
// Interface for converting statediff payloads into ETH-IPLD object payloads
Converter ipfs.PayloadConverter
// Interface for publishing the ETH-IPLD payloads to IPFS
Publisher ipfs.IPLDPublisher
// Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
Repository CIDRepository
// Interface for searching and retrieving CIDs from Postgres index
Retriever CIDRetriever
// State-diff fetcher; needs to be configured with an archival core.RpcClient
StateDiffFetcher fetcher.IStateDiffFetcher
// Check frequency
GapCheckFrequency time.Duration
}
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(ipfsPublisher ipfs.IPLDPublisher, db *postgres.DB, archivalNodeRpcClient core.RpcClient, freq time.Duration) BackFillInterface {
return &BackFillService{
Repository: NewCIDRepository(db),
Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig),
Publisher: ipfsPublisher,
Retriever: NewCIDRetriever(db),
StateDiffFetcher: fetcher.NewStateDiffFetcher(archivalNodeRpcClient),
GapCheckFrequency: freq,
}
}
// FillGaps periodically checks for and fills in gaps in the super node db
// this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed
func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) {
ticker := time.NewTicker(bfs.GapCheckFrequency)
wg.Add(1)
go func() {
for {
select {
case <-quitChan:
log.Info("quiting FillGaps process")
wg.Done()
return
case <-ticker.C:
log.Info("searching for gaps in the super node database")
startingBlock, firstBlockErr := bfs.Retriever.RetrieveFirstBlockNumber()
if firstBlockErr != nil {
log.Error(firstBlockErr)
continue
}
if startingBlock != 1 {
startingGap := [2]int64{
1,
startingBlock - 1,
}
log.Info("found gap at the beginning of the sync")
bfs.fillGaps(startingGap)
}
gaps, gapErr := bfs.Retriever.RetrieveGapsInData()
if gapErr != nil {
log.Error(gapErr)
continue
}
for _, gap := range gaps {
bfs.fillGaps(gap)
}
}
}
}()
}
func (bfs *BackFillService) fillGaps(gap [2]int64) {
log.Infof("filling in gap from block %d to block %d", gap[0], gap[1])
blockHeights := make([]uint64, 0, gap[1]-gap[0]+1)
for i := gap[0]; i <= gap[1]; i++ {
blockHeights = append(blockHeights, uint64(i))
}
payloads, fetchErr := bfs.StateDiffFetcher.FetchStateDiffsAt(blockHeights)
if fetchErr != nil {
log.Error(fetchErr)
return
}
for _, payload := range payloads {
ipldPayload, convertErr := bfs.Converter.Convert(*payload)
if convertErr != nil {
log.Error(convertErr)
continue
}
cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload)
if publishErr != nil {
log.Error(publishErr)
continue
}
indexErr := bfs.Repository.Index(cidPayload)
if indexErr != nil {
log.Error(indexErr)
}
}
}

View File

@ -0,0 +1,187 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package super_node_test
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks"
)
var _ = Describe("BackFiller", func() {
Describe("FillGaps", func() {
It("Periodically checks for and fills in gaps in the super node's data", func() {
mockCidRepo := &mocks3.CIDRepository{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks3.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
GapsToRetrieve: [][2]int64{
{
100, 101,
},
},
}
mockFetcher := &mocks2.MockStateDiffFetcher{
StateDiffsToReturn: map[uint64]*statediff.Payload{
100: &mocks.MockStateDiffPayload,
101: &mocks.MockStateDiffPayload,
},
}
backfiller := &super_node.BackFillService{
Repository: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
StateDiffFetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 10,
}
wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 15)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockRetriever.CalledTimes).To(Equal(1))
Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100, 101}))
})
It("Works for single block `ranges`", func() {
mockCidRepo := &mocks3.CIDRepository{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload},
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks3.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
GapsToRetrieve: [][2]int64{
{
100, 100,
},
},
}
mockFetcher := &mocks2.MockStateDiffFetcher{
StateDiffsToReturn: map[uint64]*statediff.Payload{
100: &mocks.MockStateDiffPayload,
},
}
backfiller := &super_node.BackFillService{
Repository: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
StateDiffFetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 10,
}
wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 15)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockRetriever.CalledTimes).To(Equal(1))
Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{100}))
})
It("Finds beginning gap", func() {
mockCidRepo := &mocks3.CIDRepository{
ReturnErr: nil,
}
mockPublisher := &mocks.IterativeIPLDPublisher{
ReturnCIDPayload: []*ipfs.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload},
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*ipfs.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks3.MockCIDRetriever{
FirstBlockNumberToReturn: 3,
GapsToRetrieve: [][2]int64{},
}
mockFetcher := &mocks2.MockStateDiffFetcher{
StateDiffsToReturn: map[uint64]*statediff.Payload{
1: &mocks.MockStateDiffPayload,
2: &mocks.MockStateDiffPayload,
},
}
backfiller := &super_node.BackFillService{
Repository: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
StateDiffFetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 10,
}
wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 15)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockRetriever.CalledTimes).To(Equal(1))
Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{1, 2}))
})
})
})

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"bytes" "bytes"
@ -30,7 +30,7 @@ import (
// ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload // ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseFilterer interface { type ResponseFilterer interface {
FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error)
} }
// Filterer is the underlying struct for the ResponseFilterer interface // Filterer is the underlying struct for the ResponseFilterer interface
@ -42,33 +42,33 @@ func NewResponseFilterer() *Filterer {
} }
// FilterResponse is used to filter through eth data to extract and package requested data into a Payload // FilterResponse is used to filter through eth data to extract and package requested data into a Payload
func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SeedNodePayload, error) { func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) {
response := new(streamer.SeedNodePayload) response := new(streamer.SuperNodePayload)
err := s.filterHeaders(streamFilters, response, payload) err := s.filterHeaders(streamFilters, response, payload)
if err != nil { if err != nil {
return streamer.SeedNodePayload{}, err return streamer.SuperNodePayload{}, err
} }
txHashes, err := s.filterTransactions(streamFilters, response, payload) txHashes, err := s.filterTransactions(streamFilters, response, payload)
if err != nil { if err != nil {
return streamer.SeedNodePayload{}, err return streamer.SuperNodePayload{}, err
} }
err = s.filerReceipts(streamFilters, response, payload, txHashes) err = s.filerReceipts(streamFilters, response, payload, txHashes)
if err != nil { if err != nil {
return streamer.SeedNodePayload{}, err return streamer.SuperNodePayload{}, err
} }
err = s.filterState(streamFilters, response, payload) err = s.filterState(streamFilters, response, payload)
if err != nil { if err != nil {
return streamer.SeedNodePayload{}, err return streamer.SuperNodePayload{}, err
} }
err = s.filterStorage(streamFilters, response, payload) err = s.filterStorage(streamFilters, response, payload)
if err != nil { if err != nil {
return streamer.SeedNodePayload{}, err return streamer.SuperNodePayload{}, err
} }
response.BlockNumber = payload.BlockNumber response.BlockNumber = payload.BlockNumber
return *response, nil return *response, nil
} }
func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly { if !streamFilters.HeaderFilter.FinalOnly {
@ -91,7 +91,7 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, trx := range payload.BlockBody.Transactions { for i, trx := range payload.BlockBody.Transactions {
@ -127,7 +127,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false return false
} }
func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) {
@ -186,7 +186,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac
return false return false
} }
func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StateNodesRlp = make(map[common.Hash][]byte) response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
@ -218,7 +218,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false return false
} }
func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))

View File

@ -0,0 +1,153 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package super_node_test
import (
"bytes"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
)
var (
filterer super_node.ResponseFilterer
expectedRctForStorageRLP1 []byte
expectedRctForStorageRLP2 []byte
)
var _ = Describe("Filterer", func() {
Describe("FilterResponse", func() {
BeforeEach(func() {
filterer = super_node.NewResponseFilterer()
expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0)
expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1)
})
It("Transcribes all the data from the IPLDPayload into the SuperNodePayload if given an open filter", func() {
superNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))
Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2))
Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
Expect(super_node.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2))
Expect(superNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
Expect(superNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes))
Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp))
})
It("Applies filters from the provided config.Subscription", func() {
superNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload1.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload1.TransactionsRlp)).To(Equal(0))
Expect(len(superNodePayload1.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload1.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload1.ReceiptsRlp)).To(Equal(1))
Expect(superNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
superNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload2.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload2.TransactionsRlp)).To(Equal(0))
Expect(len(superNodePayload2.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload2.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload2.ReceiptsRlp)).To(Equal(1))
Expect(superNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
superNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload3.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload3.TransactionsRlp)).To(Equal(0))
Expect(len(superNodePayload3.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload3.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload3.ReceiptsRlp)).To(Equal(1))
Expect(superNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1))
superNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload4.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload4.TransactionsRlp)).To(Equal(0))
Expect(len(superNodePayload4.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload4.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload4.ReceiptsRlp)).To(Equal(1))
Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
superNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload5.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload5.TransactionsRlp)).To(Equal(2))
Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload5.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload5.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload5.ReceiptsRlp)).To(Equal(2))
Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue())
Expect(super_node.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue())
superNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload6.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload6.TransactionsRlp)).To(Equal(1))
Expect(super_node.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload6.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload6.StateNodesRlp)).To(Equal(0))
Expect(len(superNodePayload6.ReceiptsRlp)).To(Equal(1))
Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2))
superNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0))
Expect(len(superNodePayload7.UnclesRlp)).To(Equal(0))
Expect(len(superNodePayload7.TransactionsRlp)).To(Equal(0))
Expect(len(superNodePayload7.StorageNodesRlp)).To(Equal(0))
Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0))
Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1))
Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes))
})
})
})
func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte {
receiptForStorage := (*types.ReceiptForStorage)(receipts[i])
receiptBuffer := new(bytes.Buffer)
err := receiptForStorage.EncodeRLP(receiptBuffer)
Expect(err).ToNot(HaveOccurred())
return receiptBuffer.Bytes()
}

View File

@ -20,12 +20,12 @@ import "github.com/vulcanize/vulcanizedb/pkg/ipfs"
// CIDRepository is the underlying struct for the Repository interface // CIDRepository is the underlying struct for the Repository interface
type CIDRepository struct { type CIDRepository struct {
PassedCIDPayload *ipfs.CIDPayload PassedCIDPayload []*ipfs.CIDPayload
ReturnErr error ReturnErr error
} }
// Index indexes a cidPayload in Postgres // Index indexes a cidPayload in Postgres
func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error { func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error {
repo.PassedCIDPayload = cidPayload repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload)
return repo.ReturnErr return repo.ReturnErr
} }

View File

@ -0,0 +1,44 @@
package mocks
import (
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
// MockCIDRetriever is a mock CID retriever for use in tests
type MockCIDRetriever struct {
GapsToRetrieve [][2]int64
GapsToRetrieveErr error
CalledTimes int
FirstBlockNumberToReturn int64
RetrieveFirstBlockNumberErr error
}
// RetrieveCIDs mock method
func (*MockCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) {
panic("implement me")
}
// RetrieveLastBlockNumber mock method
func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
panic("implement me")
}
// RetrieveFirstBlockNumber mock method
func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr
}
// RetrieveGapsInData mock method
func (mcr *MockCIDRetriever) RetrieveGapsInData() ([][2]int64, error) {
mcr.CalledTimes++
return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr
}
// SetGapsToRetrieve mock method
func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]int64) {
if mcr.GapsToRetrieve == nil {
mcr.GapsToRetrieve = make([][2]int64, 0)
}
mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...)
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node_test package super_node_test
import ( import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -23,23 +23,23 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
) )
var ( var (
db *postgres.DB db *postgres.DB
err error err error
repo seed_node.CIDRepository repo super_node.CIDRepository
) )
var _ = Describe("Repository", func() { var _ = Describe("Repository", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = seed_node.SetupDB() db, err = super_node.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
repo = seed_node.NewCIDRepository(db) repo = super_node.NewCIDRepository(db)
}) })
AfterEach(func() { AfterEach(func() {
seed_node.TearDownDB(db) super_node.TearDownDB(db)
}) })
Describe("Index", func() { Describe("Index", func() {
@ -61,8 +61,8 @@ var _ = Describe("Repository", func() {
err = db.Select(&trxs, pgStr, 1) err = db.Select(&trxs, pgStr, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(trxs)).To(Equal(2)) Expect(len(trxs)).To(Equal(2))
Expect(seed_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue()) Expect(super_node.ListContainsString(trxs, "mockTrxCID1")).To(BeTrue())
Expect(seed_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue()) Expect(super_node.ListContainsString(trxs, "mockTrxCID2")).To(BeTrue())
// check receipts were properly indexed // check receipts were properly indexed
rcts := make([]string, 0) rcts := make([]string, 0)
pgStr = `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids pgStr = `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
@ -72,8 +72,8 @@ var _ = Describe("Repository", func() {
err = db.Select(&rcts, pgStr, 1) err = db.Select(&rcts, pgStr, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(rcts)).To(Equal(2)) Expect(len(rcts)).To(Equal(2))
Expect(seed_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue()) Expect(super_node.ListContainsString(rcts, "mockRctCID1")).To(BeTrue())
Expect(seed_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue()) Expect(super_node.ListContainsString(rcts, "mockRctCID2")).To(BeTrue())
// check that state nodes were properly indexed // check that state nodes were properly indexed
stateNodes := make([]ipfs.StateNodeCID, 0) stateNodes := make([]ipfs.StateNodeCID, 0)
pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) pgStr = `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"math/big" "math/big"
@ -33,6 +33,7 @@ type CIDRetriever interface {
RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error)
RetrieveLastBlockNumber() (int64, error) RetrieveLastBlockNumber() (int64, error)
RetrieveFirstBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error)
RetrieveGapsInData() ([][2]int64, error)
} }
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
@ -303,3 +304,26 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi
err := tx.Select(&storageNodeCIDs, pgStr, args...) err := tx.Select(&storageNodeCIDs, pgStr, args...)
return storageNodeCIDs, err return storageNodeCIDs, err
} }
type gap struct {
Start int64 `db:"start"`
Stop int64 `db:"stop"`
}
func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) {
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM header_cids
LEFT JOIN header_cids r on header_cids.block_number = r.block_number - 1
LEFT JOIN header_cids fr on header_cids.block_number < fr.block_number
WHERE r.block_number is NULL and fr.block_number IS NOT NULL
GROUP BY header_cids.block_number, r.block_number`
gaps := make([]gap, 0)
err := ecr.db.Select(&gaps, pgStr)
if err != nil {
return nil, err
}
gapRanges := make([][2]int64, 0)
for _, gap := range gaps {
gapRanges = append(gapRanges, [2]int64{gap.Start, gap.Stop})
}
return gapRanges, nil
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node_test package super_node_test
import ( import (
"math/big" "math/big"
@ -25,11 +25,11 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
) )
var ( var (
retriever seed_node.CIDRetriever retriever super_node.CIDRetriever
openFilter = config.Subscription{ openFilter = config.Subscription{
StartingBlock: big.NewInt(0), StartingBlock: big.NewInt(0),
EndingBlock: big.NewInt(1), EndingBlock: big.NewInt(1),
@ -178,15 +178,15 @@ var (
var _ = Describe("Retriever", func() { var _ = Describe("Retriever", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = seed_node.SetupDB() db, err = super_node.SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
repo = seed_node.NewCIDRepository(db) repo = super_node.NewCIDRepository(db)
err = repo.Index(mocks.MockCIDPayload) err = repo.Index(mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
retriever = seed_node.NewCIDRetriever(db) retriever = super_node.NewCIDRetriever(db)
}) })
AfterEach(func() { AfterEach(func() {
seed_node.TearDownDB(db) super_node.TearDownDB(db)
}) })
Describe("RetrieveCIDs", func() { Describe("RetrieveCIDs", func() {
@ -197,11 +197,11 @@ var _ = Describe("Retriever", func() {
Expect(len(cidWrapper.Headers)).To(Equal(1)) Expect(len(cidWrapper.Headers)).To(Equal(1))
Expect(cidWrapper.Headers).To(Equal(mocks.MockCIDWrapper.Headers)) Expect(cidWrapper.Headers).To(Equal(mocks.MockCIDWrapper.Headers))
Expect(len(cidWrapper.Transactions)).To(Equal(2)) Expect(len(cidWrapper.Transactions)).To(Equal(2))
Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0])).To(BeTrue())
Expect(seed_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1])).To(BeTrue())
Expect(len(cidWrapper.Receipts)).To(Equal(2)) Expect(len(cidWrapper.Receipts)).To(Equal(2))
Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0])).To(BeTrue())
Expect(seed_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1])).To(BeTrue())
Expect(len(cidWrapper.StateNodes)).To(Equal(2)) Expect(len(cidWrapper.StateNodes)).To(Equal(2))
for _, stateNode := range cidWrapper.StateNodes { for _, stateNode := range cidWrapper.StateNodes {
if stateNode.CID == "mockStateCID1" { if stateNode.CID == "mockStateCID1" {
@ -265,13 +265,13 @@ var _ = Describe("Retriever", func() {
Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper5.Headers)).To(Equal(0)) Expect(len(cidWrapper5.Headers)).To(Equal(0))
Expect(len(cidWrapper5.Transactions)).To(Equal(2)) Expect(len(cidWrapper5.Transactions)).To(Equal(2))
Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue())
Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue())
Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) Expect(len(cidWrapper5.StateNodes)).To(Equal(0))
Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) Expect(len(cidWrapper5.StorageNodes)).To(Equal(0))
Expect(len(cidWrapper5.Receipts)).To(Equal(2)) Expect(len(cidWrapper5.Receipts)).To(Equal(2))
Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue())
Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue())
cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1) cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node_test package super_node_test
import ( import (
"io/ioutil" "io/ioutil"
@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func TestSeedNode(t *testing.T) { func TestSuperNode(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "Seed Node Suite Test") RunSpecs(t, "Seed Node Suite Test")
} }

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"sync" "sync"
@ -36,7 +36,9 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
) )
const payloadChanBufferSize = 20000 // the max eth sub buffer size const (
payloadChanBufferSize = 20000 // the max eth sub buffer size
)
// NodeInterface is the top level interface for streaming, converting to IPLDs, publishing, // NodeInterface is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
@ -49,14 +51,17 @@ type NodeInterface interface {
// Main event loop for handling client pub-sub // Main event loop for handling client pub-sub
ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool)
// Method to subscribe to receive state diff processing output // Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription)
// Method to unsubscribe from state diff processing // Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) Unsubscribe(id rpc.ID)
// Method to access the Geth node info for this service // Method to access the Geth node info for this service
Node() core.Node Node() core.Node
// Method used to retrieve the underlying IPFS publisher for this service, so that is can be used for backfilling
// This is needed because it's not possible to initialize two ipfs nodes at the same path
GetPublisher() ipfs.IPLDPublisher
} }
// Service is the underlying struct for the SyncAndPublish interface // Service is the underlying struct for the super node
type Service struct { type Service struct {
// Used to sync access to the Subscriptions // Used to sync access to the Subscriptions
sync.Mutex sync.Mutex
@ -71,7 +76,7 @@ type Service struct {
// Interface for filtering and serving data according to subscribed clients according to their specification // Interface for filtering and serving data according to subscribed clients according to their specification
Filterer ResponseFilterer Filterer ResponseFilterer
// Interface for fetching ETH-IPLD objects from IPFS // Interface for fetching ETH-IPLD objects from IPFS
Fetcher ipfs.IPLDFetcher IPLDFetcher ipfs.IPLDFetcher
// Interface for searching and retrieving CIDs from Postgres index // Interface for searching and retrieving CIDs from Postgres index
Retriever CIDRetriever Retriever CIDRetriever
// Interface for resolving ipfs blocks to their data types // Interface for resolving ipfs blocks to their data types
@ -86,17 +91,17 @@ type Service struct {
SubscriptionTypes map[common.Hash]config.Subscription SubscriptionTypes map[common.Hash]config.Subscription
// Number of workers // Number of workers
WorkerPoolSize int WorkerPoolSize int
// Info for the Geth node that this seed node is working with // Info for the Geth node that this super node is working with
gethNode core.Node gethNode core.Node
} }
// NewSeedNode creates a new seed_node.Interface using an underlying seed_node.Service struct // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) {
publisher, err := ipfs.NewIPLDPublisher(ipfsPath) publisher, err := ipfs.NewIPLDPublisher(ipfsPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fetcher, err := ipfs.NewIPLDFetcher(ipfsPath) ipldFetcher, err := ipfs.NewIPLDFetcher(ipfsPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -106,7 +111,7 @@ func NewSeedNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc
Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig),
Publisher: publisher, Publisher: publisher,
Filterer: NewResponseFilterer(), Filterer: NewResponseFilterer(),
Fetcher: fetcher, IPLDFetcher: ipldFetcher,
Retriever: NewCIDRetriever(db), Retriever: NewCIDRetriever(db),
Resolver: ipfs.NewIPLDResolver(), Resolver: ipfs.NewIPLDResolver(),
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
@ -123,13 +128,13 @@ func (sap *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{} return []p2p.Protocol{}
} }
// APIs returns the RPC descriptors the StateDiffingService offers // APIs returns the RPC descriptors the super node service offers
func (sap *Service) APIs() []rpc.API { func (sap *Service) APIs() []rpc.API {
return []rpc.API{ return []rpc.API{
{ {
Namespace: APIName, Namespace: APIName,
Version: APIVersion, Version: APIVersion,
Service: NewPublicSeedNodeAPI(sap), Service: NewPublicSuperNodeAPI(sap),
Public: true, Public: true,
}, },
} }
@ -256,7 +261,7 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error {
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.PayloadChan <- response: case sub.PayloadChan <- response:
log.Infof("sending seed node payload to subscription %s", id) log.Infof("sending super node payload to subscription %s", id)
default: default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id) log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
} }
@ -267,8 +272,8 @@ func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error {
} }
// Subscribe is used by the API to subscribe to the service loop // Subscribe is used by the API to subscribe to the service loop
func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) { func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SuperNodePayload, quitChan chan<- bool, streamFilters config.Subscription) {
log.Info("Subscribing to the seed node service") log.Info("Subscribing to the super node service")
// Subscription type is defined as the hash of its content // Subscription type is defined as the hash of its content
// Group subscriptions by type and screen payloads once for subs of the same type // Group subscriptions by type and screen payloads once for subs of the same type
by, err := rlp.EncodeToBytes(streamFilters) by, err := rlp.EncodeToBytes(streamFilters)
@ -305,7 +310,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
var err error var err error
startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber()
if err != nil { if err != nil {
sub.PayloadChan <- streamer.SeedNodePayload{ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "unable to set block range start; error: " + err.Error(), ErrMsg: "unable to set block range start; error: " + err.Error(),
} }
} }
@ -314,7 +319,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
} }
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil { if err != nil {
sub.PayloadChan <- streamer.SeedNodePayload{ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "unable to set block range end; error: " + err.Error(), ErrMsg: "unable to set block range end; error: " + err.Error(),
} }
} }
@ -327,10 +332,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
// the blocknumbers in the payloads they receive to keep things in order // the blocknumbers in the payloads they receive to keep things in order
// TODO: separate backfill into a different rpc subscription method altogether? // TODO: separate backfill into a different rpc subscription method altogether?
go func() { go func() {
for i := con.StartingBlock.Int64(); i <= endingBlock; i++ { for i := startingBlock; i <= endingBlock; i++ {
cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i) cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i)
if err != nil { if err != nil {
sub.PayloadChan <- streamer.SeedNodePayload{ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "CID retrieval error: " + err.Error(), ErrMsg: "CID retrieval error: " + err.Error(),
} }
continue continue
@ -338,10 +343,10 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
if ipfs.EmptyCIDWrapper(*cidWrapper) { if ipfs.EmptyCIDWrapper(*cidWrapper) {
continue continue
} }
blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper) blocksWrapper, err := sap.IPLDFetcher.FetchCIDs(*cidWrapper)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
sub.PayloadChan <- streamer.SeedNodePayload{ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "IPLD fetching error: " + err.Error(), ErrMsg: "IPLD fetching error: " + err.Error(),
} }
continue continue
@ -349,14 +354,14 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
sub.PayloadChan <- streamer.SeedNodePayload{ sub.PayloadChan <- streamer.SuperNodePayload{
ErrMsg: "IPLD resolving error: " + err.Error(), ErrMsg: "IPLD resolving error: " + err.Error(),
} }
continue continue
} }
select { select {
case sub.PayloadChan <- backFillIplds: case sub.PayloadChan <- backFillIplds:
log.Infof("sending seed node back-fill payload to subscription %s", id) log.Infof("sending super node back-fill payload to subscription %s", id)
default: default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
} }
@ -366,7 +371,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
// Unsubscribe is used to unsubscribe to the StateDiffingService loop // Unsubscribe is used to unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Unsubscribe(id rpc.ID) {
log.Info("Unsubscribing from the seed node service") log.Info("Unsubscribing from the super node service")
sap.Lock() sap.Lock()
for ty := range sap.Subscriptions { for ty := range sap.Subscriptions {
delete(sap.Subscriptions[ty], id) delete(sap.Subscriptions[ty], id)
@ -381,7 +386,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
// Start is used to begin the service // Start is used to begin the service
func (sap *Service) Start(*p2p.Server) error { func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting seed node service") log.Info("Starting super node service")
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize) payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize)
quitChan := make(chan bool, 1) quitChan := make(chan bool, 1)
@ -394,7 +399,7 @@ func (sap *Service) Start(*p2p.Server) error {
// Stop is used to close down the service // Stop is used to close down the service
func (sap *Service) Stop() error { func (sap *Service) Stop() error {
log.Info("Stopping seed node service") log.Info("Stopping super node service")
close(sap.QuitChan) close(sap.QuitChan)
return nil return nil
} }
@ -421,3 +426,7 @@ func (sap *Service) close() {
} }
sap.Unlock() sap.Unlock()
} }
func (sap *Service) GetPublisher() ipfs.IPLDPublisher {
return sap.Publisher
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node_test package super_node_test
import ( import (
"sync" "sync"
@ -27,8 +27,8 @@ import (
mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks" mocks3 "github.com/vulcanize/vulcanizedb/pkg/super_node/mocks"
) )
var _ = Describe("Service", func() { var _ = Describe("Service", func() {
@ -55,7 +55,7 @@ var _ = Describe("Service", func() {
ReturnIPLDPayload: mocks.MockIPLDPayload, ReturnIPLDPayload: mocks.MockIPLDPayload,
ReturnErr: nil, ReturnErr: nil,
} }
processor := &seed_node.Service{ processor := &super_node.Service{
Repository: mockCidRepo, Repository: mockCidRepo,
Publisher: mockPublisher, Publisher: mockPublisher,
Streamer: mockStreamer, Streamer: mockStreamer,
@ -70,7 +70,8 @@ var _ = Describe("Service", func() {
quitChan <- true quitChan <- true
wg.Wait() wg.Wait()
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
Expect(mockCidRepo.PassedCIDPayload).To(Equal(mocks.MockCIDPayload)) Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
}) })

View File

@ -14,14 +14,14 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer" "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
) )
// Subscription holds the information for an individual client subscription to the seed node // Subscription holds the information for an individual client subscription to the super node
type Subscription struct { type Subscription struct {
PayloadChan chan<- streamer.SeedNodePayload PayloadChan chan<- streamer.SuperNodePayload
QuitChan chan<- bool QuitChan chan<- bool
} }

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package seed_node package super_node
import ( import (
"bytes" "bytes"