diff --git a/cmd/root.go b/cmd/root.go index af6c23e2..fdcd87d2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,7 +33,7 @@ var ( ) var rootCmd = &cobra.Command{ - Use: "ipfs-blockchain-watcher", + Use: "ipld-eth-server", PersistentPreRun: initFuncs, } diff --git a/cmd/serve.go b/cmd/serve.go new file mode 100644 index 00000000..e0ab26e9 --- /dev/null +++ b/cmd/serve.go @@ -0,0 +1,122 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "os" + "os/signal" + s "sync" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" + + "github.com/vulcanize/ipld-eth-server/pkg/serve" + v "github.com/vulcanize/ipld-eth-server/version" +) + +// watchCmd represents the watch command +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "serve chain data from PG-IPFS", + Long: `This command configures a VulcanizeDB ipld-eth-server. + +`, + Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) + watch() + }, +} + +func watch() { + logWithCommand.Infof("running ipld-eth-server version: %s", v.VersionWithMeta) + + var forwardPayloadChan chan eth.ConvertedPayload + wg := new(s.WaitGroup) + logWithCommand.Debug("loading watcher configuration variables") + watcherConfig, err := serve.NewConfig() + if err != nil { + logWithCommand.Fatal(err) + } + logWithCommand.Infof("watcher config: %+v", watcherConfig) + logWithCommand.Debug("initializing new watcher service") + s, err := serve.NewServer(watcherConfig) + if err != nil { + logWithCommand.Fatal(err) + } + + logWithCommand.Info("starting up watcher servers") + forwardPayloadChan = make(chan eth.ConvertedPayload, serve.PayloadChanBufferSize) + s.Serve(wg, forwardPayloadChan) + if err := startServers(s, watcherConfig); err != nil { + logWithCommand.Fatal(err) + } + + + shutdown := make(chan os.Signal) + signal.Notify(shutdown, os.Interrupt) + <-shutdown + s.Stop() + wg.Wait() +} + +func startServers(watcher serve.Server, settings *serve.Config) error { + logWithCommand.Debug("starting up IPC server") + _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, watcher.APIs()) + if err != nil { + return err + } + logWithCommand.Debug("starting up WS server") + _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, watcher.APIs(), []string{"vdb"}, nil, true) + if err != nil { + return err + } + logWithCommand.Debug("starting up HTTP server") + _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, watcher.APIs(), []string{"eth"}, nil, nil, rpc.HTTPTimeouts{}) + return err +} + +func init() { + rootCmd.AddCommand(watchCmd) + + // flags for all config variables + watchCmd.PersistentFlags().String("watcher-ws-path", "", "vdb server ws path") + watchCmd.PersistentFlags().String("watcher-http-path", "", "vdb server http path") + watchCmd.PersistentFlags().String("watcher-ipc-path", "", "vdb server ipc path") + + watchCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node") + watchCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node") + watchCmd.PersistentFlags().String("eth-node-id", "", "eth node id") + watchCmd.PersistentFlags().String("eth-client-name", "", "eth client name") + watchCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash") + watchCmd.PersistentFlags().String("eth-network-id", "", "eth network id") + + // and their bindings + viper.BindPFlag("watcher.wsPath", watchCmd.PersistentFlags().Lookup("watcher-ws-path")) + viper.BindPFlag("watcher.httpPath", watchCmd.PersistentFlags().Lookup("watcher-http-path")) + viper.BindPFlag("watcher.ipcPath", watchCmd.PersistentFlags().Lookup("watcher-ipc-path")) + + viper.BindPFlag("ethereum.wsPath", watchCmd.PersistentFlags().Lookup("eth-ws-path")) + viper.BindPFlag("ethereum.httpPath", watchCmd.PersistentFlags().Lookup("eth-http-path")) + viper.BindPFlag("ethereum.nodeID", watchCmd.PersistentFlags().Lookup("eth-node-id")) + viper.BindPFlag("ethereum.clientName", watchCmd.PersistentFlags().Lookup("eth-client-name")) + viper.BindPFlag("ethereum.genesisBlock", watchCmd.PersistentFlags().Lookup("eth-genesis-block")) + viper.BindPFlag("ethereum.networkID", watchCmd.PersistentFlags().Lookup("eth-network-id")) +} diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index fb7f6f43..4fc692a3 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -28,9 +28,9 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + "github.com/vulcanize/ipld-eth-server/pkg/client" + "github.com/vulcanize/ipld-eth-server/pkg/eth" + w "github.com/vulcanize/ipld-eth-server/pkg/serve" ) // streamEthSubscriptionCmd represents the streamEthSubscription command diff --git a/cmd/version.go b/cmd/version.go index dd38a372..5c01ae83 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -19,20 +19,20 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - v "github.com/vulcanize/ipfs-blockchain-watcher/version" + v "github.com/vulcanize/ipld-eth-server/version" ) // versionCmd represents the version command var versionCmd = &cobra.Command{ Use: "version", - Short: "Prints the version of ipfs-blockchain-watcher", - Long: `Use this command to fetch the version of ipfs-blockchain-watcher + Short: "Prints the version of ipld-eth-server", + Long: `Use this command to fetch the version of ipld-eth-server -Usage: ./ipfs-blockchain-watcher version`, +Usage: ./ipld-eth-server version`, Run: func(cmd *cobra.Command, args []string) { subCommand = cmd.CalledAs() logWithCommand = *log.WithField("SubCommand", subCommand) - logWithCommand.Infof("ipfs-blockchain-watcher version: %s", v.VersionWithMeta) + logWithCommand.Infof("ipld-eth-server version: %s", v.VersionWithMeta) }, } diff --git a/cmd/watch.go b/cmd/watch.go deleted file mode 100644 index de6ea570..00000000 --- a/cmd/watch.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright © 2020 Vulcanize, Inc -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "os" - "os/signal" - s "sync" - - "github.com/ethereum/go-ethereum/rpc" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" - - h "github.com/vulcanize/ipfs-blockchain-watcher/pkg/historical" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" - w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" - v "github.com/vulcanize/ipfs-blockchain-watcher/version" -) - -// watchCmd represents the watch command -var watchCmd = &cobra.Command{ - Use: "watch", - Short: "sync chain data into PG-IPFS", - Long: `This command configures a VulcanizeDB ipfs-blockchain-watcher. - -The Sync process streams all chain data from the appropriate chain, processes this data into IPLD objects -and publishes them to IPFS. It then indexes the CIDs against useful data fields/metadata in Postgres. - -The Serve process creates and exposes a rpc subscription server over ws and ipc. Transformers can subscribe to -these endpoints to stream - -The BackFill process spins up a background process which periodically probes the Postgres database to identify -and fill in gaps in the data -`, - Run: func(cmd *cobra.Command, args []string) { - subCommand = cmd.CalledAs() - logWithCommand = *log.WithField("SubCommand", subCommand) - watch() - }, -} - -func watch() { - logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta) - - var forwardPayloadChan chan shared.ConvertedData - wg := new(s.WaitGroup) - logWithCommand.Debug("loading watcher configuration variables") - watcherConfig, err := w.NewConfig() - if err != nil { - logWithCommand.Fatal(err) - } - logWithCommand.Infof("watcher config: %+v", watcherConfig) - logWithCommand.Debug("initializing new watcher service") - watcher, err := w.NewWatcher(watcherConfig) - if err != nil { - logWithCommand.Fatal(err) - } - - if watcherConfig.Serve { - logWithCommand.Info("starting up watcher servers") - forwardPayloadChan = make(chan shared.ConvertedData, w.PayloadChanBufferSize) - watcher.Serve(wg, forwardPayloadChan) - if err := startServers(watcher, watcherConfig); err != nil { - logWithCommand.Fatal(err) - } - } - - if watcherConfig.Sync { - logWithCommand.Info("starting up watcher sync process") - if err := watcher.Sync(wg, forwardPayloadChan); err != nil { - logWithCommand.Fatal(err) - } - } - - var backFiller h.BackFillInterface - if watcherConfig.Historical { - historicalConfig, err := h.NewConfig() - if err != nil { - logWithCommand.Fatal(err) - } - logWithCommand.Debug("initializing new historical backfill service") - backFiller, err = h.NewBackFillService(historicalConfig, forwardPayloadChan) - if err != nil { - logWithCommand.Fatal(err) - } - logWithCommand.Info("starting up watcher backfill process") - backFiller.BackFill(wg) - } - - shutdown := make(chan os.Signal) - signal.Notify(shutdown, os.Interrupt) - <-shutdown - if watcherConfig.Historical { - backFiller.Stop() - } - watcher.Stop() - wg.Wait() -} - -func startServers(watcher w.Watcher, settings *w.Config) error { - logWithCommand.Debug("starting up IPC server") - _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, watcher.APIs()) - if err != nil { - return err - } - logWithCommand.Debug("starting up WS server") - _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, watcher.APIs(), []string{"vdb"}, nil, true) - if err != nil { - return err - } - logWithCommand.Debug("starting up HTTP server") - _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, watcher.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{}) - return err -} - -func init() { - rootCmd.AddCommand(watchCmd) - - // flags for all config variables - watchCmd.PersistentFlags().String("watcher-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.") - watchCmd.PersistentFlags().Bool("watcher-server", false, "turn vdb server on or off") - watchCmd.PersistentFlags().String("watcher-ws-path", "", "vdb server ws path") - watchCmd.PersistentFlags().String("watcher-http-path", "", "vdb server http path") - watchCmd.PersistentFlags().String("watcher-ipc-path", "", "vdb server ipc path") - watchCmd.PersistentFlags().Bool("watcher-sync", false, "turn vdb sync on or off") - watchCmd.PersistentFlags().Int("watcher-workers", 0, "how many worker goroutines to publish and index data") - watchCmd.PersistentFlags().Bool("watcher-back-fill", false, "turn vdb backfill on or off") - watchCmd.PersistentFlags().Int("watcher-frequency", 0, "how often (in seconds) the backfill process checks for gaps") - watchCmd.PersistentFlags().Int("watcher-batch-size", 0, "data fetching batch size") - watchCmd.PersistentFlags().Int("watcher-batch-number", 0, "how many goroutines to fetch data concurrently") - watchCmd.PersistentFlags().Int("watcher-validation-level", 0, "backfill will resync any data below this level") - watchCmd.PersistentFlags().Int("watcher-timeout", 0, "timeout used for backfill http requests") - - watchCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node") - watchCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node") - watchCmd.PersistentFlags().String("btc-password", "", "password for btc node") - watchCmd.PersistentFlags().String("btc-username", "", "username for btc node") - watchCmd.PersistentFlags().String("btc-node-id", "", "btc node id") - watchCmd.PersistentFlags().String("btc-client-name", "", "btc client name") - watchCmd.PersistentFlags().String("btc-genesis-block", "", "btc genesis block hash") - watchCmd.PersistentFlags().String("btc-network-id", "", "btc network id") - - watchCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node") - watchCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node") - watchCmd.PersistentFlags().String("eth-node-id", "", "eth node id") - watchCmd.PersistentFlags().String("eth-client-name", "", "eth client name") - watchCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash") - watchCmd.PersistentFlags().String("eth-network-id", "", "eth network id") - - // and their bindings - viper.BindPFlag("watcher.chain", watchCmd.PersistentFlags().Lookup("watcher-chain")) - viper.BindPFlag("watcher.server", watchCmd.PersistentFlags().Lookup("watcher-server")) - viper.BindPFlag("watcher.wsPath", watchCmd.PersistentFlags().Lookup("watcher-ws-path")) - viper.BindPFlag("watcher.httpPath", watchCmd.PersistentFlags().Lookup("watcher-http-path")) - viper.BindPFlag("watcher.ipcPath", watchCmd.PersistentFlags().Lookup("watcher-ipc-path")) - viper.BindPFlag("watcher.sync", watchCmd.PersistentFlags().Lookup("watcher-sync")) - viper.BindPFlag("watcher.workers", watchCmd.PersistentFlags().Lookup("watcher-workers")) - viper.BindPFlag("watcher.backFill", watchCmd.PersistentFlags().Lookup("watcher-back-fill")) - viper.BindPFlag("watcher.frequency", watchCmd.PersistentFlags().Lookup("watcher-frequency")) - viper.BindPFlag("watcher.batchSize", watchCmd.PersistentFlags().Lookup("watcher-batch-size")) - viper.BindPFlag("watcher.batchNumber", watchCmd.PersistentFlags().Lookup("watcher-batch-number")) - viper.BindPFlag("watcher.validationLevel", watchCmd.PersistentFlags().Lookup("watcher-validation-level")) - viper.BindPFlag("watcher.timeout", watchCmd.PersistentFlags().Lookup("watcher-timeout")) - - viper.BindPFlag("bitcoin.wsPath", watchCmd.PersistentFlags().Lookup("btc-ws-path")) - viper.BindPFlag("bitcoin.httpPath", watchCmd.PersistentFlags().Lookup("btc-http-path")) - viper.BindPFlag("bitcoin.pass", watchCmd.PersistentFlags().Lookup("btc-password")) - viper.BindPFlag("bitcoin.user", watchCmd.PersistentFlags().Lookup("btc-username")) - viper.BindPFlag("bitcoin.nodeID", watchCmd.PersistentFlags().Lookup("btc-node-id")) - viper.BindPFlag("bitcoin.clientName", watchCmd.PersistentFlags().Lookup("btc-client-name")) - viper.BindPFlag("bitcoin.genesisBlock", watchCmd.PersistentFlags().Lookup("btc-genesis-block")) - viper.BindPFlag("bitcoin.networkID", watchCmd.PersistentFlags().Lookup("btc-network-id")) - - viper.BindPFlag("ethereum.wsPath", watchCmd.PersistentFlags().Lookup("eth-ws-path")) - viper.BindPFlag("ethereum.httpPath", watchCmd.PersistentFlags().Lookup("eth-http-path")) - viper.BindPFlag("ethereum.nodeID", watchCmd.PersistentFlags().Lookup("eth-node-id")) - viper.BindPFlag("ethereum.clientName", watchCmd.PersistentFlags().Lookup("eth-client-name")) - viper.BindPFlag("ethereum.genesisBlock", watchCmd.PersistentFlags().Lookup("eth-genesis-block")) - viper.BindPFlag("ethereum.networkID", watchCmd.PersistentFlags().Lookup("eth-network-id")) -} diff --git a/main.go b/main.go index af533f77..6bc9b782 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,7 @@ package main import ( "github.com/sirupsen/logrus" - "github.com/vulcanize/ipfs-blockchain-watcher/cmd" + "github.com/vulcanize/ipld-eth-server/cmd" ) func main() { diff --git a/pkg/client/client.go b/pkg/client/client.go index 590e0166..3ffb296f 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -22,7 +22,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipld-eth-server/pkg/watch" + "github.com/vulcanize/ipld-eth-server/pkg/serve" ) // Client is used to subscribe to the ipld-eth-server ipld data stream @@ -38,6 +38,6 @@ func NewClient(c *rpc.Client) *Client { } // Stream is the main loop for subscribing to iplds from an ipld-eth-server server -func (c *Client) Stream(payloadChan chan watch.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) { +func (c *Client) Stream(payloadChan chan serve.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) { return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", rlpParams) } diff --git a/pkg/watch/api.go b/pkg/serve/api.go similarity index 86% rename from pkg/watch/api.go rename to pkg/serve/api.go index fd95b79d..0e2aaa4d 100644 --- a/pkg/watch/api.go +++ b/pkg/serve/api.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package serve import ( "context" @@ -36,20 +36,20 @@ const APIName = "vdb" // APIVersion is the version of the state diffing service API const APIVersion = "0.0.1" -// PublicWatcherAPI is the public api for the watcher -type PublicWatcherAPI struct { - w Watcher +// PublicServerAPI is the public api for the watcher +type PublicServerAPI struct { + w Server } -// NewPublicWatcherAPI creates a new PublicWatcherAPI with the provided underlying Watcher process -func NewPublicWatcherAPI(w Watcher) *PublicWatcherAPI { - return &PublicWatcherAPI{ +// NewPublicServerAPI creates a new PublicServerAPI with the provided underlying Server process +func NewPublicServerAPI(w Server) *PublicServerAPI { + return &PublicServerAPI{ w: w, } } // Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed -func (api *PublicWatcherAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) { +func (api *PublicServerAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -89,12 +89,12 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, params eth.Subscription // Node is a public rpc method to allow transformers to fetch the node info for the watcher // NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself -func (api *PublicWatcherAPI) Node() *node.Info { +func (api *PublicServerAPI) Node() *node.Info { return api.w.Node() } // Chain returns the chain type that this watcher instance supports -func (api *PublicWatcherAPI) Chain() shared.ChainType { +func (api *PublicServerAPI) Chain() shared.ChainType { return api.w.Chain() } diff --git a/pkg/watch/config.go b/pkg/serve/config.go similarity index 99% rename from pkg/watch/config.go rename to pkg/serve/config.go index 45880186..5a525c30 100644 --- a/pkg/watch/config.go +++ b/pkg/serve/config.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package serve import ( "os" diff --git a/pkg/watch/helpers.go b/pkg/serve/helpers.go similarity index 98% rename from pkg/watch/helpers.go rename to pkg/serve/helpers.go index 1e13c18a..a4571197 100644 --- a/pkg/watch/helpers.go +++ b/pkg/serve/helpers.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package serve import log "github.com/sirupsen/logrus" diff --git a/pkg/watch/service.go b/pkg/serve/service.go similarity index 97% rename from pkg/watch/service.go rename to pkg/serve/service.go index b3ce55bf..adaf027e 100644 --- a/pkg/watch/service.go +++ b/pkg/serve/service.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package serve import ( "fmt" @@ -40,10 +40,10 @@ const ( PayloadChanBufferSize = 2000 ) -// Watcher is the top level interface for streaming, converting to IPLDs, publishing, +// Server is the top level interface for streaming, converting to IPLDs, publishing, // and indexing all chain data; screening this data; and serving it up to subscribed clients // This service is compatible with the Ethereum service interface (node.Service) -type Watcher interface { +type Server interface { // APIs(), Protocols(), Start() and Stop() ethnode.Service // Pub-Sub handling event loop @@ -82,8 +82,8 @@ type Service struct { serveWg *sync.WaitGroup } -// NewWatcher creates a new Watcher using an underlying Service struct -func NewWatcher(settings *Config) (Watcher, error) { +// NewServer creates a new Server using an underlying Service struct +func NewServer(settings *Config) (Server, error) { sn := new(Service) sn.Retriever = eth.NewCIDRetriever(settings.DB) sn.IPLDFetcher = eth.NewIPLDFetcher(settings.DB) @@ -108,7 +108,7 @@ func (sap *Service) APIs() []rpc.API { { Namespace: APIName, Version: APIVersion, - Service: NewPublicWatcherAPI(sap), + Service: NewPublicServerAPI(sap), Public: true, }, { diff --git a/pkg/watch/subscription.go b/pkg/serve/subscription.go similarity index 99% rename from pkg/watch/subscription.go rename to pkg/serve/subscription.go index 1b3474c7..41383590 100644 --- a/pkg/watch/subscription.go +++ b/pkg/serve/subscription.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package serve import ( "errors"