From 2db0ce971daa67993fd35c3c954e5f76ec0b12e3 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 28 May 2019 14:13:06 -0500 Subject: [PATCH] vulcanizedb seed node streamer and syncPublishScreenAndServe command --- cmd/syncAndPublish.go | 4 -- cmd/syncPublishScreenAndServe.go | 73 +++++++++++++++++++++++++++ libraries/shared/streamer/streamer.go | 47 +++++++++++++++++ 3 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 cmd/syncPublishScreenAndServe.go create mode 100644 libraries/shared/streamer/streamer.go diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index b3c41b1e..b1dd454a 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -46,10 +46,6 @@ it maintains a local index of the IPLD objects' CIDs in Postgres.`, }, } -var ( - ipfsPath string -) - func init() { rootCmd.AddCommand(syncAndPublishCmd) syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go new file mode 100644 index 00000000..0b0d4b8a --- /dev/null +++ b/cmd/syncPublishScreenAndServe.go @@ -0,0 +1,73 @@ +// Copyright © 2019 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + syn "sync" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/utils" +) + +// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command +var syncPublishScreenAndServeCmd = &cobra.Command{ + Use: "syncPublishScreenAndServe", + Short: "A brief description of your command", + Long: `A longer description that spans multiple lines and likely contains examples +and usage of using your command. For example: + +Cobra is a CLI library for Go that empowers applications. +This application is a tool to generate the needed files +to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + syncPublishScreenAndServe() + }, +} + +func init() { + rootCmd.AddCommand(syncPublishScreenAndServeCmd) + syncPublishScreenAndServeCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") + syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "ipc-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server") +} + +func syncPublishScreenAndServe() { + blockChain, ethClient, rpcClient := getBlockChainAndClients() + + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + quitChan := make(chan bool) + processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) + if err != nil { + log.Fatal(err) + } + + wg := &syn.WaitGroup{} + forwardPayloadChan := make(chan ipfs.IPLDPayload) + forwardQuitChan := make(chan bool) + err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) + if err != nil { + log.Fatal(err) + } + processor.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) + _, _, err = rpc.StartIPCEndpoint(vulcPath, processor.APIs()) + if err != nil { + log.Fatal(err) + } + wg.Wait() +} diff --git a/libraries/shared/streamer/streamer.go b/libraries/shared/streamer/streamer.go new file mode 100644 index 00000000..16a1d964 --- /dev/null +++ b/libraries/shared/streamer/streamer.go @@ -0,0 +1,47 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Streamer is used by watchers to stream eth data from a vulcanizedb seed node +package streamer + +import ( + "github.com/ethereum/go-ethereum/rpc" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" +) + +// IStreamer is the interface for streaming data from a vulcanizeDB seed node +type IStreamer interface { + Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) +} + +// Streamer is the underlying struct for the IStreamer interface +type Streamer struct { + Client core.RpcClient +} + +// NewSeedStreamer creates a pointer to a new Streamer which satisfies the IStreamer interface +func NewSeedStreamer(client core.RpcClient) *Streamer { + return &Streamer{ + Client: client, + } +} + +// Stream is the main loop for subscribing to data from a vulcanizedb seed node +func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) { + return sds.Client.Subscribe("vulcanizedb", payloadChan, "subscribe") +}