diff --git a/cmd/root.go b/cmd/root.go index 39a374a1..8b4698e8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -34,14 +34,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/client" vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" "github.com/vulcanize/vulcanizedb/pkg/eth/node" - config2 "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) var ( cfgFile string databaseConfig config.Database genConfig config.Plugin - subscriptionConfig *config2.EthSubscription ipc string levelDbPath string queueRecheckInterval time.Duration diff --git a/cmd/screenAndServe.go b/cmd/screenAndServe.go deleted file mode 100644 index 9a28d383..00000000 --- a/cmd/screenAndServe.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "os" - "path/filepath" - syn "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" - "github.com/vulcanize/vulcanizedb/utils" -) - -// screenAndServeCmd represents the screenAndServe command -var screenAndServeCmd = &cobra.Command{ - Use: "screenAndServe", - Short: "Serve super-node data requests to requesting clients", - Long: ` It then opens up WS and IPC servers on top of the super-node ETH-IPLD index which -relays relevant data to requesting clients. In this mode, the super-node can only relay data which it has -already indexed it does not stream out live data.`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - screenAndServe() - }, -} - -func init() { - rootCmd.AddCommand(screenAndServeCmd) -} - -func screenAndServe() { - superNode, err := newSuperNodeWithoutPairedGethNode() - if err != nil { - logWithCommand.Fatal(err) - } - wg := &syn.WaitGroup{} - quitChan := make(chan bool, 1) - emptyPayloadChan := make(chan interface{}) - superNode.ScreenAndServe(wg, emptyPayloadChan, quitChan) - - if err := startServers(superNode); err != nil { - logWithCommand.Fatal(err) - } - wg.Wait() -} - -func startServers(superNode super_node.NodeInterface) error { - var ipcPath string - ipcPath = viper.GetString("server.ipcPath") - if ipcPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return err - } - ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") - } - _, _, err := rpc.StartIPCEndpoint(ipcPath, superNode.APIs()) - if err != nil { - return err - } - - var wsEndpoint string - wsEndpoint = viper.GetString("server.wsEndpoint") - if wsEndpoint == "" { - wsEndpoint = "127.0.0.1:8080" - } - var exposeAll = true - var wsOrigins []string - _, _, err = rpc.StartWSEndpoint(wsEndpoint, superNode.APIs(), []string{"vdb"}, wsOrigins, exposeAll) - if err != nil { - return err - } - return nil -} - -func newSuperNodeWithoutPairedGethNode() (super_node.NodeInterface, error) { - ipfsPath = viper.GetString("client.ipfsPath") - if ipfsPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - ipfsPath = filepath.Join(home, ".ipfs") - } - if err := ipfs.InitIPFSPlugins(); err != nil { - return nil, err - } - ipldFetcher, err := super_node.NewIPLDFetcher(config.Ethereum, ipfsPath) - if err != nil { - return nil, err - } - db := utils.LoadPostgres(databaseConfig, core.Node{}) - retriever, err := super_node.NewCIDRetriever(config.Ethereum, &db) - if err != nil { - return nil, err - } - resolver, err := super_node.NewIPLDResolver(config.Ethereum) - if err != nil { - return nil, err - } - return &super_node.Service{ - IPLDFetcher: ipldFetcher, - Retriever: retriever, - Resolver: resolver, - Subscriptions: make(map[common.Hash]map[rpc.ID]super_node.Subscription), - SubscriptionTypes: make(map[common.Hash]super_node.SubscriptionSettings), - NodeInfo: core.Node{}, - }, nil -} diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 9ae75a61..42d0c3f7 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -18,7 +18,6 @@ package cmd import ( "bytes" "fmt" - "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -37,26 +36,26 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" ) -// streamSubscribeCmd represents the streamSubscribe command -var streamSubscribeCmd = &cobra.Command{ - Use: "streamSubscribe", - Short: "This command is used to subscribe to the super node stream with the provided filters", +// streamEthSubscriptionCmd represents the streamEthSubscription command +var streamEthSubscriptionCmd = &cobra.Command{ + Use: "streamEthSubscription", + Short: "This command is used to subscribe to the super node eth stream with the provided filters", Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters. It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`, Run: func(cmd *cobra.Command, args []string) { subCommand = cmd.CalledAs() logWithCommand = *log.WithField("SubCommand", subCommand) - streamSubscribe() + streamEthSubscription() }, } func init() { - rootCmd.AddCommand(streamSubscribeCmd) + rootCmd.AddCommand(streamEthSubscriptionCmd) } -func streamSubscribe() { +func streamEthSubscription() { // Prep the subscription config/filters to be sent to the server - configureSubscription() + ethSubConfig := config.NewEthSubscriptionConfig() // Create a new rpc client and a subscription streamer with that client rpcClient := getRPCClient() @@ -66,7 +65,7 @@ func streamSubscribe() { payloadChan := make(chan super_node.Payload, 20000) // Subscribe to the super node service with the given config/filter parameters - sub, err := str.StreamETH(payloadChan, subscriptionConfig) + sub, err := str.Stream(payloadChan, ethSubConfig) if err != nil { logWithCommand.Fatal(err) } @@ -170,61 +169,8 @@ func streamSubscribe() { } } -func configureSubscription() { - logWithCommand.Info("loading subscription config") - subscriptionConfig = &config.EthSubscription{ - // Below default to false, which means we do not backfill by default - BackFill: viper.GetBool("subscription.backfill"), - BackFillOnly: viper.GetBool("subscription.backfillOnly"), - - // Below default to 0 - // 0 start means we start at the beginning and 0 end means we continue indefinitely - Start: big.NewInt(viper.GetInt64("subscription.startingBlock")), - End: big.NewInt(viper.GetInt64("subscription.endingBlock")), - - // Below default to false, which means we get all headers by default - HeaderFilter: config.HeaderFilter{ - Off: viper.GetBool("subscription.headerFilter.off"), - Uncles: viper.GetBool("subscription.headerFilter.uncles"), - }, - - // Below defaults to false and two slices of length 0 - // Which means we get all transactions by default - TrxFilter: config.TrxFilter{ - Off: viper.GetBool("subscription.trxFilter.off"), - Src: viper.GetStringSlice("subscription.trxFilter.src"), - Dst: viper.GetStringSlice("subscription.trxFilter.dst"), - }, - - // Below defaults to false and one slice of length 0 - // Which means we get all receipts by default - ReceiptFilter: config.ReceiptFilter{ - Off: viper.GetBool("subscription.receiptFilter.off"), - Contracts: viper.GetStringSlice("subscription.receiptFilter.contracts"), - Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"), - }, - - // Below defaults to two false, and a slice of length 0 - // Which means we get all state leafs by default, but no intermediate nodes - StateFilter: config.StateFilter{ - Off: viper.GetBool("subscription.stateFilter.off"), - IntermediateNodes: viper.GetBool("subscription.stateFilter.intermediateNodes"), - Addresses: viper.GetStringSlice("subscription.stateFilter.addresses"), - }, - - // Below defaults to two false, and two slices of length 0 - // Which means we get all storage leafs by default, but no intermediate nodes - StorageFilter: config.StorageFilter{ - Off: viper.GetBool("subscription.storageFilter.off"), - IntermediateNodes: viper.GetBool("subscription.storageFilter.intermediateNodes"), - Addresses: viper.GetStringSlice("subscription.storageFilter.addresses"), - StorageKeys: viper.GetStringSlice("subscription.storageFilter.storageKeys"), - }, - } -} - func getRPCClient() core.RPCClient { - vulcPath := viper.GetString("subscription.path") + vulcPath := viper.GetString("superNode.ethSubscription.wsPath") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } diff --git a/cmd/superNode.go b/cmd/superNode.go new file mode 100644 index 00000000..58eac22c --- /dev/null +++ b/cmd/superNode.go @@ -0,0 +1,101 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "sync" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/config" +) + +// superNodeCmd represents the superNode command +var superNodeCmd = &cobra.Command{ + Use: "superNode", + Short: "VulcanizeDB SuperNode", + Long: `This command works alongside a modified geth node which streams +all block and state (diff) data over a websocket subscription. This process +then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, +it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which +relays relevant data to requesting clients.`, + Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) + superNode() + }, +} + +func init() { + rootCmd.AddCommand(superNodeCmd) +} + +func superNode() { + superNode, superNodeConfig, err := newSuperNode() + if err != nil { + logWithCommand.Fatal(err) + } + wg := &sync.WaitGroup{} + var forwardQuitChan chan bool + var forwardPayloadChan chan interface{} + if superNodeConfig.Serve { + forwardQuitChan = make(chan bool) + forwardPayloadChan = make(chan interface{}, super_node.PayloadChanBufferSize) + superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) + if err := startServers(superNode, superNodeConfig); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.Sync { + if err := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.BackFill { + backFiller, err := super_node.NewBackFillService(superNodeConfig.BackFillSettings) + if err != nil { + logWithCommand.Fatal(err) + } + backFiller.FillGaps(wg, nil) + } +} + +func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) { + superNodeConfig, err := config.NewSuperNodeConfig() + if err != nil { + return nil, nil, err + } + sn, err := super_node.NewSuperNode(superNodeConfig) + if err != nil { + return nil, nil, err + } + return sn, superNodeConfig, nil +} + +func startServers(superNode super_node.SuperNode, settings *config.SuperNode) error { + _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) + if err != nil { + return err + } + _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true) + if err != nil { + return err + } + return nil +} diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go deleted file mode 100644 index 389d8152..00000000 --- a/cmd/syncAndPublish.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "os" - "path/filepath" - syn "sync" - "time" - - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/rpc" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/eth" - "github.com/vulcanize/vulcanizedb/pkg/eth/client" - vRpc "github.com/vulcanize/vulcanizedb/pkg/eth/converters/rpc" - "github.com/vulcanize/vulcanizedb/pkg/eth/node" - "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" - "github.com/vulcanize/vulcanizedb/utils" -) - -// syncAndPublishCmd represents the syncAndPublish command -var syncAndPublishCmd = &cobra.Command{ - Use: "syncAndPublish", - Short: "Syncs all Ethereum data into IPFS, indexing the CIDs", - Long: `This command works alongside a modified geth node which streams -all block and state (diff) data over a websocket subscription. This process -then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, -it maintains a local index of the IPLD objects' CIDs in Postgres.`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - syncAndPublish() - }, -} - -var ipfsPath string - -func init() { - rootCmd.AddCommand(syncAndPublishCmd) -} - -func syncAndPublish() { - superNode, newNodeErr := newSuperNode() - if newNodeErr != nil { - logWithCommand.Fatal(newNodeErr) - } - wg := &syn.WaitGroup{} - syncAndPubErr := superNode.SyncAndPublish(wg, nil, nil) - if syncAndPubErr != nil { - logWithCommand.Fatal(syncAndPubErr) - } - if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" { - backfiller, newBackFillerErr := newBackFiller() - if newBackFillerErr != nil { - logWithCommand.Fatal(newBackFillerErr) - } - backfiller.FillGaps(wg, nil) - } - wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through -} - -func getBlockChainAndClient(path string) (*eth.BlockChain, core.RPCClient) { - rawRPCClient, dialErr := rpc.Dial(path) - if dialErr != nil { - logWithCommand.Fatal(dialErr) - } - rpcClient := client.NewRPCClient(rawRPCClient, ipc) - ethClient := ethclient.NewClient(rawRPCClient) - vdbEthClient := client.NewEthClient(ethClient) - vdbNode := node.MakeNode(rpcClient) - transactionConverter := vRpc.NewRPCTransactionConverter(ethClient) - blockChain := eth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) - return blockChain, rpcClient -} - -func newSuperNode() (super_node.NodeInterface, error) { - blockChain, rpcClient := getBlockChainAndClient(ipc) - db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - quitChan := make(chan bool) - ipfsPath = viper.GetString("client.ipfsPath") - if ipfsPath == "" { - home, homeDirErr := os.UserHomeDir() - if homeDirErr != nil { - logWithCommand.Fatal(homeDirErr) - } - ipfsPath = filepath.Join(home, ".ipfs") - } - workers := viper.GetInt("client.workers") - if workers < 1 { - workers = 1 - } - return super_node.NewSuperNode(config.Ethereum, ipfsPath, &db, rpcClient, quitChan, workers, blockChain.Node()) -} - -func newBackFiller() (super_node.BackFillInterface, error) { - blockChain, archivalRPCClient := getBlockChainAndClient(viper.GetString("superNodeBackFill.rpcPath")) - db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - freq := viper.GetInt("superNodeBackFill.frequency") - var frequency time.Duration - if freq <= 0 { - frequency = time.Minute * 5 - } else { - frequency = time.Duration(freq) - } - return super_node.NewBackFillService(config.Ethereum, ipfsPath, &db, archivalRPCClient, time.Minute*frequency, super_node.DefaultMaxBatchSize) -} diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go deleted file mode 100644 index e181e84b..00000000 --- a/cmd/syncPublishScreenAndServe.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright © 2019 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - syn "sync" - - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command -var syncPublishScreenAndServeCmd = &cobra.Command{ - Use: "syncPublishScreenAndServe", - Short: "Syncs all Ethereum data into IPFS, indexing the CIDs, and uses this to serve data requests to requesting clients", - Long: `This command works alongside a modified geth node which streams -all block and state (diff) data over a websocket subscription. This process -then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, -it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which -relays relevant data to requesting clients.`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - syncPublishScreenAndServe() - }, -} - -func init() { - rootCmd.AddCommand(syncPublishScreenAndServeCmd) -} - -func syncPublishScreenAndServe() { - superNode, newNodeErr := newSuperNode() - if newNodeErr != nil { - logWithCommand.Fatal(newNodeErr) - } - - wg := &syn.WaitGroup{} - forwardPayloadChan := make(chan interface{}, 20000) - forwardQuitChan := make(chan bool, 1) - syncAndPubErr := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) - if syncAndPubErr != nil { - logWithCommand.Fatal(syncAndPubErr) - } - superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) - if viper.GetBool("superNodeBackFill.on") && viper.GetString("superNodeBackFill.rpcPath") != "" { - backfiller, newBackFillerErr := newBackFiller() - if newBackFillerErr != nil { - logWithCommand.Fatal(newBackFillerErr) - } - backfiller.FillGaps(wg, nil) - } - - serverErr := startServers(superNode) - if serverErr != nil { - logWithCommand.Fatal(serverErr) - } - wg.Wait() -}