refactor super node commands into one

This commit is contained in:
Ian Norden 2020-01-20 17:44:32 -06:00
parent 7843312815
commit 308ccb5d8c
6 changed files with 111 additions and 396 deletions

View File

@ -34,14 +34,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/eth/node" "github.com/vulcanize/vulcanizedb/pkg/eth/node"
config2 "github.com/vulcanize/vulcanizedb/pkg/super_node/config"
) )
var ( var (
cfgFile string cfgFile string
databaseConfig config.Database databaseConfig config.Database
genConfig config.Plugin genConfig config.Plugin
subscriptionConfig *config2.EthSubscription
ipc string ipc string
levelDbPath string levelDbPath string
queueRecheckInterval time.Duration queueRecheckInterval time.Duration

View File

@ -1,132 +0,0 @@
// Copyright © 2019 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"os"
"path/filepath"
syn "sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/utils"
)
// screenAndServeCmd represents the screenAndServe command
var screenAndServeCmd = &cobra.Command{
Use: "screenAndServe",
Short: "Serve super-node data requests to requesting clients",
Long: ` It then opens up WS and IPC servers on top of the super-node ETH-IPLD index which
relays relevant data to requesting clients. In this mode, the super-node can only relay data which it has
already indexed it does not stream out live data.`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
screenAndServe()
},
}
func init() {
rootCmd.AddCommand(screenAndServeCmd)
}
func screenAndServe() {
superNode, err := newSuperNodeWithoutPairedGethNode()
if err != nil {
logWithCommand.Fatal(err)
}
wg := &syn.WaitGroup{}
quitChan := make(chan bool, 1)
emptyPayloadChan := make(chan interface{})
superNode.ScreenAndServe(wg, emptyPayloadChan, quitChan)
if err := startServers(superNode); err != nil {
logWithCommand.Fatal(err)
}
wg.Wait()
}
func startServers(superNode super_node.NodeInterface) error {
var ipcPath string
ipcPath = viper.GetString("server.ipcPath")
if ipcPath == "" {
home, err := os.UserHomeDir()
if err != nil {
return err
}
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
}
_, _, err := rpc.StartIPCEndpoint(ipcPath, superNode.APIs())
if err != nil {
return err
}
var wsEndpoint string
wsEndpoint = viper.GetString("server.wsEndpoint")
if wsEndpoint == "" {
wsEndpoint = "127.0.0.1:8080"
}
var exposeAll = true
var wsOrigins []string
_, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vdb"}, wsOrigins, exposeAll)
if err != nil {
return err
}
return nil
}
func newSuperNodeWithoutPairedGethNode() (super_node.NodeInterface, error) {
ipfsPath = viper.GetString("client.ipfsPath")
if ipfsPath == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
ipfsPath = filepath.Join(home, ".ipfs")
}
if err := ipfs.InitIPFSPlugins(); err != nil {
return nil, err
}
ipldFetcher, err := super_node.NewIPLDFetcher(config.Ethereum, ipfsPath)
if err != nil {
return nil, err
}
db := utils.LoadPostgres(databaseConfig, core.Node{})
retriever, err := super_node.NewCIDRetriever(config.Ethereum, &db)
if err != nil {
return nil, err
}
resolver, err := super_node.NewIPLDResolver(config.Ethereum)
if err != nil {
return nil, err
}
return &super_node.Service{
IPLDFetcher: ipldFetcher,
Retriever: retriever,
Resolver: resolver,
Subscriptions: make(map[common.Hash]map[rpc.ID]super_node.Subscription),
SubscriptionTypes: make(map[common.Hash]super_node.SubscriptionSettings),
NodeInfo: core.Node{},
}, nil
}

View File

@ -18,7 +18,6 @@ package cmd
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -37,26 +36,26 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
) )
// streamSubscribeCmd represents the streamSubscribe command // streamEthSubscriptionCmd represents the streamEthSubscription command
var streamSubscribeCmd = &cobra.Command{ var streamEthSubscriptionCmd = &cobra.Command{
Use: "streamSubscribe", Use: "streamEthSubscription",
Short: "This command is used to subscribe to the super node stream with the provided filters", Short: "This command is used to subscribe to the super node eth stream with the provided filters",
Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters. Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters.
It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`, It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs() subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand) logWithCommand = *log.WithField("SubCommand", subCommand)
streamSubscribe() streamEthSubscription()
}, },
} }
func init() { func init() {
rootCmd.AddCommand(streamSubscribeCmd) rootCmd.AddCommand(streamEthSubscriptionCmd)
} }
func streamSubscribe() { func streamEthSubscription() {
// Prep the subscription config/filters to be sent to the server // Prep the subscription config/filters to be sent to the server
configureSubscription() ethSubConfig := config.NewEthSubscriptionConfig()
// Create a new rpc client and a subscription streamer with that client // Create a new rpc client and a subscription streamer with that client
rpcClient := getRPCClient() rpcClient := getRPCClient()
@ -66,7 +65,7 @@ func streamSubscribe() {
payloadChan := make(chan super_node.Payload, 20000) payloadChan := make(chan super_node.Payload, 20000)
// Subscribe to the super node service with the given config/filter parameters // Subscribe to the super node service with the given config/filter parameters
sub, err := str.StreamETH(payloadChan, subscriptionConfig) sub, err := str.Stream(payloadChan, ethSubConfig)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
@ -170,61 +169,8 @@ func streamSubscribe() {
} }
} }
func configureSubscription() {
logWithCommand.Info("loading subscription config")
subscriptionConfig = &config.EthSubscription{
// Below default to false, which means we do not backfill by default
BackFill: viper.GetBool("subscription.backfill"),
BackFillOnly: viper.GetBool("subscription.backfillOnly"),
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
Start: big.NewInt(viper.GetInt64("subscription.startingBlock")),
End: big.NewInt(viper.GetInt64("subscription.endingBlock")),
// Below default to false, which means we get all headers by default
HeaderFilter: config.HeaderFilter{
Off: viper.GetBool("subscription.headerFilter.off"),
Uncles: viper.GetBool("subscription.headerFilter.uncles"),
},
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
TrxFilter: config.TrxFilter{
Off: viper.GetBool("subscription.trxFilter.off"),
Src: viper.GetStringSlice("subscription.trxFilter.src"),
Dst: viper.GetStringSlice("subscription.trxFilter.dst"),
},
// Below defaults to false and one slice of length 0
// Which means we get all receipts by default
ReceiptFilter: config.ReceiptFilter{
Off: viper.GetBool("subscription.receiptFilter.off"),
Contracts: viper.GetStringSlice("subscription.receiptFilter.contracts"),
Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"),
},
// Below defaults to two false, and a slice of length 0
// Which means we get all state leafs by default, but no intermediate nodes
StateFilter: config.StateFilter{
Off: viper.GetBool("subscription.stateFilter.off"),
IntermediateNodes: viper.GetBool("subscription.stateFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("subscription.stateFilter.addresses"),
},
// Below defaults to two false, and two slices of length 0
// Which means we get all storage leafs by default, but no intermediate nodes
StorageFilter: config.StorageFilter{
Off: viper.GetBool("subscription.storageFilter.off"),
IntermediateNodes: viper.GetBool("subscription.storageFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("subscription.storageFilter.addresses"),
StorageKeys: viper.GetStringSlice("subscription.storageFilter.storageKeys"),
},
}
}
func getRPCClient() core.RPCClient { func getRPCClient() core.RPCClient {
vulcPath := viper.GetString("subscription.path") vulcPath := viper.GetString("superNode.ethSubscription.wsPath")
if vulcPath == "" { if vulcPath == "" {
vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
} }

101
cmd/superNode.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"sync"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
)
// superNodeCmd represents the superNode command
var superNodeCmd = &cobra.Command{
Use: "superNode",
Short: "VulcanizeDB SuperNode",
Long: `This command works alongside a modified geth node which streams
all block and state (diff) data over a websocket subscription. This process
then converts the eth data to IPLD objects and publishes them to IPFS. Additionally,
it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which
relays relevant data to requesting clients.`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
superNode()
},
}
func init() {
rootCmd.AddCommand(superNodeCmd)
}
func superNode() {
superNode, superNodeConfig, err := newSuperNode()
if err != nil {
logWithCommand.Fatal(err)
}
wg := &sync.WaitGroup{}
var forwardQuitChan chan bool
var forwardPayloadChan chan interface{}
if superNodeConfig.Serve {
forwardQuitChan = make(chan bool)
forwardPayloadChan = make(chan interface{}, super_node.PayloadChanBufferSize)
superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan)
if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.Sync {
if err := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.BackFill {
backFiller, err := super_node.NewBackFillService(superNodeConfig.BackFillSettings)
if err != nil {
logWithCommand.Fatal(err)
}
backFiller.FillGaps(wg, nil)
}
}
func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) {
superNodeConfig, err := config.NewSuperNodeConfig()
if err != nil {
return nil, nil, err
}
sn, err := super_node.NewSuperNode(superNodeConfig)
if err != nil {
return nil, nil, err
}
return sn, superNodeConfig, nil
}
func startServers(superNode super_node.SuperNode, settings *config.SuperNode) error {
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
if err != nil {
return err
}
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true)
if err != nil {
return err
}
return nil
}

View File

@ -1,125 +0,0 @@
// Copyright © 2019 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"os"
"path/filepath"
syn "sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/eth"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/eth/node"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/utils"
)
// syncAndPublishCmd represents the syncAndPublish command
var syncAndPublishCmd = &cobra.Command{
Use: "syncAndPublish",
Short: "Syncs all Ethereum data into IPFS, indexing the CIDs",
Long: `This command works alongside a modified geth node which streams
all block and state (diff) data over a websocket subscription. This process
then converts the eth data to IPLD objects and publishes them to IPFS. Additionally,
it maintains a local index of the IPLD objects' CIDs in Postgres.`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
syncAndPublish()
},
}
var ipfsPath string
func init() {
rootCmd.AddCommand(syncAndPublishCmd)
}
func syncAndPublish() {
superNode, newNodeErr := newSuperNode()
if newNodeErr != nil {
logWithCommand.Fatal(newNodeErr)
}
wg := &syn.WaitGroup{}
syncAndPubErr := superNode.SyncAndPublish(wg, nil, nil)
if syncAndPubErr != nil {
logWithCommand.Fatal(syncAndPubErr)
}
if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" {
backfiller, newBackFillerErr := newBackFiller()
if newBackFillerErr != nil {
logWithCommand.Fatal(newBackFillerErr)
}
backfiller.FillGaps(wg, nil)
}
wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through
}
func getBlockChainAndClient(path string) (*eth.BlockChain, core.RPCClient) {
rawRPCClient, dialErr := rpc.Dial(path)
if dialErr != nil {
logWithCommand.Fatal(dialErr)
}
rpcClient := client.NewRPCClient(rawRPCClient, ipc)
ethClient := ethclient.NewClient(rawRPCClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRPCTransactionConverter(ethClient)
blockChain := eth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
return blockChain, rpcClient
}
func newSuperNode() (super_node.NodeInterface, error) {
blockChain, rpcClient := getBlockChainAndClient(ipc)
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
quitChan := make(chan bool)
ipfsPath = viper.GetString("client.ipfsPath")
if ipfsPath == "" {
home, homeDirErr := os.UserHomeDir()
if homeDirErr != nil {
logWithCommand.Fatal(homeDirErr)
}
ipfsPath = filepath.Join(home, ".ipfs")
}
workers := viper.GetInt("client.workers")
if workers < 1 {
workers = 1
}
return super_node.NewSuperNode(config.Ethereum, ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node())
}
func newBackFiller() (super_node.BackFillInterface, error) {
blockChain, archivalRPCClient := getBlockChainAndClient(viper.GetString("superNodeBackFill.rpcPath"))
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
freq := viper.GetInt("superNodeBackFill.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Minute * 5
} else {
frequency = time.Duration(freq)
}
return super_node.NewBackFillService(config.Ethereum, ipfsPath, &db, archivalRPCClient, time.Minute*frequency, super_node.DefaultMaxBatchSize)
}

View File

@ -1,73 +0,0 @@
// Copyright © 2019 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
syn "sync"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command
var syncPublishScreenAndServeCmd = &cobra.Command{
Use: "syncPublishScreenAndServe",
Short: "Syncs all Ethereum data into IPFS, indexing the CIDs, and uses this to serve data requests to requesting clients",
Long: `This command works alongside a modified geth node which streams
all block and state (diff) data over a websocket subscription. This process
then converts the eth data to IPLD objects and publishes them to IPFS. Additionally,
it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which
relays relevant data to requesting clients.`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
syncPublishScreenAndServe()
},
}
func init() {
rootCmd.AddCommand(syncPublishScreenAndServeCmd)
}
func syncPublishScreenAndServe() {
superNode, newNodeErr := newSuperNode()
if newNodeErr != nil {
logWithCommand.Fatal(newNodeErr)
}
wg := &syn.WaitGroup{}
forwardPayloadChan := make(chan interface{}, 20000)
forwardQuitChan := make(chan bool, 1)
syncAndPubErr := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
if syncAndPubErr != nil {
logWithCommand.Fatal(syncAndPubErr)
}
superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan)
if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" {
backfiller, newBackFillerErr := newBackFiller()
if newBackFillerErr != nil {
logWithCommand.Fatal(newBackFillerErr)
}
backfiller.FillGaps(wg, nil)
}
serverErr := startServers(superNode)
if serverErr != nil {
logWithCommand.Fatal(serverErr)
}
wg.Wait()
}