From 330a083749455a08b13576a89095ecacfdea7179 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 25 Feb 2020 16:38:27 -0600 Subject: [PATCH] watcher configuration; super node versioning --- cmd/superNode.go | 7 +- pkg/eth/node/node.go | 4 + pkg/super_node/api.go | 39 +++++++++- pkg/super_node/backfiller.go | 23 +++--- pkg/super_node/{shared => }/config.go | 32 ++++---- pkg/super_node/service.go | 77 ++++++++++++------- pkg/super_node/version.go | 63 +++++++++++++++ pkg/wasm/instantiator.go | 15 +++- pkg/watcher/config.go | 106 +++++++++++++++++++++++++- pkg/watcher/constructors.go | 16 +++- pkg/watcher/eth/repository.go | 4 +- pkg/watcher/service.go | 14 ++-- pkg/watcher/shared/source_type.go | 58 ++++++++++++++ 13 files changed, 381 insertions(+), 77 deletions(-) rename pkg/super_node/{shared => }/config.go (92%) create mode 100644 pkg/super_node/version.go create mode 100644 pkg/watcher/shared/source_type.go diff --git a/cmd/superNode.go b/cmd/superNode.go index 617ae819..d66986a7 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -18,12 +18,11 @@ package cmd import ( "sync" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -55,7 +54,7 @@ func init() { } func superNode() { - superNodeConfigs, err := shared.NewSuperNodeConfigs() + superNodeConfigs, err := super_node.NewSuperNodeConfigs() if err != nil { logWithCommand.Fatal(err) } @@ -92,7 +91,7 @@ func superNode() { wg.Wait() } -func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error { +func startServers(superNode super_node.SuperNode, settings *super_node.Config) error { _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) if err != nil { return err diff --git a/pkg/eth/node/node.go b/pkg/eth/node/node.go index 3ca2b0e1..e918d2a5 100644 --- a/pkg/eth/node/node.go +++ b/pkg/eth/node/node.go @@ -83,6 +83,10 @@ func makePropertiesReader(client core.RPCClient) IPropertiesReader { } func getNodeType(client core.RPCClient) core.NodeType { + // TODO: fix this + // This heuristics for figuring out the node type are not usefull... + // for example we often port forward remote nodes to localhost + // and geth does not have to expose the admin api... if strings.Contains(client.IpcPath(), "infura") { return core.INFURA } diff --git a/pkg/super_node/api.go b/pkg/super_node/api.go index 85839f86..8d33eadd 100644 --- a/pkg/super_node/api.go +++ b/pkg/super_node/api.go @@ -19,8 +19,8 @@ package super_node import ( "context" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" @@ -105,6 +105,41 @@ func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*r } // Node is a public rpc method to allow transformers to fetch the node info for the super node -func (api *PublicSuperNodeAPI) Node() core.Node { +// NOTE: this is the node info for the node that the super node is syncing from, not the node info for the super node itself +func (api *PublicSuperNodeAPI) Node() *core.Node { return api.sn.Node() } + +// Chain returns the chain type that this super node instance supports +func (api *PublicSuperNodeAPI) Chain() shared.ChainType { + return api.sn.Chain() +} + +// Struct for holding super node meta data +type InfoAPI struct{} + +// NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process +func NewInfoAPI() *InfoAPI { + return &InfoAPI{} +} + +// Modules returns modules supported by this api +func (iapi *InfoAPI) Modules() map[string]string { + return map[string]string{ + "vdb": "Stream", + } +} + +// NodeInfo gathers and returns a collection of metadata for the super node +func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo { + return &p2p.NodeInfo{ + // TODO: formalize this + ID: "vulcanizeDB", + Name: "superNode", + } +} + +// Version returns the version of the super node +func (iapi *InfoAPI) Version() string { + return VersionWithMeta +} diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 1144b899..2a857c60 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -17,7 +17,7 @@ package super_node import ( - "errors" + "fmt" "sync" "sync/atomic" "time" @@ -59,10 +59,12 @@ type BackFillService struct { BatchSize uint64 // Channel for receiving quit signal QuitChan chan bool + // Chain type + chain shared.ChainType } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { +func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) if err != nil { return nil, err @@ -97,6 +99,7 @@ func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan cha BatchSize: batchSize, ScreenAndServeChan: screenAndServeChan, QuitChan: settings.Quit, + chain: settings.Chain, }, nil } @@ -109,18 +112,18 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { for { select { case <-bfs.QuitChan: - log.Info("quiting FillGaps process") + log.Infof("quiting %s FillGaps process", bfs.chain.String()) wg.Done() return case <-ticker.C: - log.Info("searching for gaps in the super node database") + log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() if err != nil { log.Error(err) continue } if startingBlock != 0 { - log.Info("found gap at the beginning of the sync") + log.Infof("found gap at the beginning of the %s sync", bfs.chain.String()) bfs.fillGaps(0, uint64(startingBlock-1)) } gaps, err := bfs.Retriever.RetrieveGapsInData() @@ -136,11 +139,11 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } } }() - log.Info("fillGaps goroutine successfully spun up") + log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String()) } func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { - log.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) + log.Infof("going to fill in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) errChan := make(chan error) done := make(chan bool) err := bfs.backFill(startingBlock, endingBlock, errChan, done) @@ -152,7 +155,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { case err := <-errChan: log.Error(err) case <-done: - log.Infof("finished filling in gap from %d to %d", startingBlock, endingBlock) + log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) return nil } } @@ -162,7 +165,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { if endingBlock < startingBlock { - return errors.New("backfill: ending block number needs to be greater than starting block number") + return fmt.Errorf("%s backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) } // // break the range up into bins of smaller ranges @@ -230,7 +233,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan case forwardDone <- true: default: } - log.Infof("finished filling in gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1]) + log.Infof("finished filling in %s gap sub-bin from %d to %d", bfs.chain.String(), doneWithHeights[0], doneWithHeights[1]) goroutinesFinished++ if goroutinesFinished >= len(blockRangeBins) { done <- true diff --git a/pkg/super_node/shared/config.go b/pkg/super_node/config.go similarity index 92% rename from pkg/super_node/shared/config.go rename to pkg/super_node/config.go index ad489d2a..119c4827 100644 --- a/pkg/super_node/shared/config.go +++ b/pkg/super_node/config.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package shared +package super_node import ( "fmt" @@ -31,13 +31,14 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/eth/node" "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/utils" ) -// SuperNodeConfig struct -type SuperNodeConfig struct { +// Config struct +type Config struct { // Ubiquitous fields - Chain ChainType + Chain shared.ChainType IPFSPath string DB *postgres.DB DBConfig config.Database @@ -61,9 +62,9 @@ type SuperNodeConfig struct { // NewSuperNodeConfigs is used to initialize multiple SuperNode configs from a single config .toml file // Separate chain supernode instances need to be ran in the same process in order to avoid lock contention on the ipfs repository -func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) { +func NewSuperNodeConfigs() ([]*Config, error) { chains := viper.GetStringSlice("superNode.chains") - configs := make([]*SuperNodeConfig, len(chains)) + configs := make([]*Config, len(chains)) var err error ipfsPath := viper.GetString("superNode.ipfsPath") if ipfsPath == "" { @@ -74,8 +75,8 @@ func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) { ipfsPath = filepath.Join(home, ".ipfs") } for i, chain := range chains { - sn := new(SuperNodeConfig) - sn.Chain, err = NewChainType(chain) + sn := new(Config) + sn.Chain, err = shared.NewChainType(chain) if err != nil { return nil, err } @@ -96,9 +97,12 @@ func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) { } sn.Workers = workers switch sn.Chain { - case Ethereum: + case shared.Ethereum: sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.sync.wsPath")) - case Bitcoin: + if err != nil { + return nil, err + } + case shared.Bitcoin: sn.NodeInfo = core.Node{ ID: viper.GetString("superNode.bitcoin.node.nodeID"), ClientName: viper.GetString("superNode.bitcoin.node.clientName"), @@ -146,21 +150,21 @@ func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) { } configs[i] = sn } - return configs, err + return configs, nil } // BackFillFields is used to fill in the BackFill fields of the config -func (sn *SuperNodeConfig) BackFillFields(chain string) error { +func (sn *Config) BackFillFields(chain string) error { sn.BackFill = true var httpClient interface{} var err error switch sn.Chain { - case Ethereum: + case shared.Ethereum: _, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.backFill.httpPath")) if err != nil { return err } - case Bitcoin: + case shared.Bitcoin: httpClient = &rpcclient.ConnConfig{ Host: viper.GetString("superNode.bitcoin.backFill.httpPath"), HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index cae7e0ac..9ad7e401 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -52,7 +52,7 @@ type SuperNode interface { // Method to unsubscribe from the service Unsubscribe(id rpc.ID) // Method to access the node info for the service - Node() core.Node + Node() *core.Node // Method to access chain type Chain() shared.ChainType } @@ -84,7 +84,7 @@ type Service struct { // A mapping of subscription params hash to the corresponding subscription params SubscriptionTypes map[common.Hash]shared.SubscriptionSettings // Info for the Geth node that this super node is working with - NodeInfo core.Node + NodeInfo *core.Node // Number of publishAndIndex workers WorkerPoolSize int // chain type for this service @@ -96,7 +96,7 @@ type Service struct { } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct -func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { +func NewSuperNode(settings *Config) (SuperNode, error) { sn := new(Service) var err error // If we are syncing, initialize the needed interfaces @@ -137,7 +137,7 @@ func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.WorkerPoolSize = settings.Workers - sn.NodeInfo = settings.NodeInfo + sn.NodeInfo = &settings.NodeInfo sn.ipfsPath = settings.IPFSPath sn.chain = settings.Chain sn.db = settings.DB @@ -151,6 +151,7 @@ func (sap *Service) Protocols() []p2p.Protocol { // APIs returns the RPC descriptors the super node service offers func (sap *Service) APIs() []rpc.API { + ifnoAPI := NewInfoAPI() apis := []rpc.API{ { Namespace: APIName, @@ -158,6 +159,24 @@ func (sap *Service) APIs() []rpc.API { Service: NewPublicSuperNodeAPI(sap), Public: true, }, + { + Namespace: "rpc", + Version: APIVersion, + Service: ifnoAPI, + Public: true, + }, + { + Namespace: "net", + Version: APIVersion, + Service: ifnoAPI, + Public: true, + }, + { + Namespace: "admin", + Version: APIVersion, + Service: ifnoAPI, + Public: true, + }, } chainAPI, err := NewPublicAPI(sap.chain, sap.db, sap.ipfsPath) if err != nil { @@ -204,13 +223,13 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- case err := <-sub.Err(): log.Error(err) case <-sap.QuitChan: - log.Info("quiting SyncAndPublish process") + log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) wg.Done() return } } }() - log.Info("ProcessData goroutine successfully spun up") + log.Infof("%s ProcessData goroutine successfully spun up", sap.chain.String()) return nil } @@ -232,7 +251,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared } } }() - log.Debugf("publishAndIndex goroutine successfully spun up") + log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String()) } // FilterAndServe listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process @@ -247,24 +266,24 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c case payload := <-screenAndServePayload: sap.filterAndServe(payload) case <-sap.QuitChan: - log.Info("quiting ScreenAndServe process") + log.Infof("quiting %s ScreenAndServe process", sap.chain.String()) wg.Done() return } } }() - log.Info("FilterAndServe goroutine successfully spun up") + log.Infof("%s FilterAndServe goroutine successfully spun up", sap.chain.String()) } // filterAndServe filters the payload according to each subscription type and sends to the subscriptions func (sap *Service) filterAndServe(payload shared.ConvertedData) { - log.Debugf("Sending payload to subscriptions") + log.Debugf("Sending %s payload to subscriptions", sap.chain.String()) sap.Lock() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] if !ok { - log.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) + log.Errorf("%s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) sap.closeType(ty) continue } @@ -288,9 +307,9 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { for id, sub := range subs { select { case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: - log.Debugf("sending super node payload to subscription %s", id) + log.Debugf("sending super node %s payload to subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send payload to subscription %s; channel has no receiver", id) + log.Infof("unable to send %s payload to subscription %s; channel has no receiver", sap.chain.String(), id) } } } @@ -300,7 +319,7 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { // Subscribe is used by the API to remotely subscribe to the service loop // The params must be rlp serializable and satisfy the SubscriptionSettings() interface func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { - log.Infof("New subscription %s", id) + log.Infof("New %s subscription %s", sap.chain.String(), id) subscription := Subscription{ ID: id, PayloadChan: sub, @@ -342,7 +361,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha // sendHistoricalData sends historical data to the requesting subscription func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { - log.Info("Sending historical data to subscription", id) + log.Infof("Sending %s historical data to subscription %s", sap.chain.String(), id) // Retrieve cached CIDs relevant to this subscriber var endingBlock int64 var startingBlock int64 @@ -361,13 +380,13 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share if endingBlock > params.EndingBlock().Int64() && params.EndingBlock().Int64() > 0 && params.EndingBlock().Int64() > startingBlock { endingBlock = params.EndingBlock().Int64() } - log.Debug("historical data starting block:", params.StartingBlock()) - log.Debug("historical data ending block:", endingBlock) + log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) + log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) go func() { for i := startingBlock; i <= endingBlock; i++ { cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("CID Retrieval error at block %d\r%s", i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("%s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } if empty { @@ -376,7 +395,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for _, cids := range cidWrappers { response, err := sap.IPLDFetcher.Fetch(cids) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("%s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } responseRLP, err := rlp.EncodeToBytes(response) @@ -386,18 +405,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share } select { case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: - log.Debugf("sending super node historical data payload to subscription %s", id) + log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) + log.Infof("unable to send back-fill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) } } } // when we are done backfilling send an empty payload signifying so in the msg select { case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: - log.Debugf("sending backfill completion notice to subscription %s", id) + log.Debugf("sending backfill completion notice to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send backfill completion notice to subscription %s", id) + log.Infof("unable to send backfill completion notice to %s subscription %s", sap.chain.String(), id) } }() return nil @@ -405,7 +424,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share // Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop func (sap *Service) Unsubscribe(id rpc.ID) { - log.Infof("Unsubscribing %s from the super node service", id) + log.Infof("Unsubscribing %s from the %s super node service", id, sap.chain.String()) sap.Lock() for ty := range sap.Subscriptions { delete(sap.Subscriptions[ty], id) @@ -421,7 +440,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { // Start is used to begin the service // This is mostly just to satisfy the node.Service interface func (sap *Service) Start(*p2p.Server) error { - log.Info("Starting super node service") + log.Infof("Starting %s super node service", sap.chain.String()) wg := new(sync.WaitGroup) payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize) if err := sap.ProcessData(wg, payloadChan); err != nil { @@ -434,7 +453,7 @@ func (sap *Service) Start(*p2p.Server) error { // Stop is used to close down the service // This is mostly just to satisfy the node.Service interface func (sap *Service) Stop() error { - log.Info("Stopping super node service") + log.Infof("Stopping %s super node service", sap.chain.String()) sap.Lock() close(sap.QuitChan) sap.close() @@ -443,7 +462,7 @@ func (sap *Service) Stop() error { } // Node returns the node info for this service -func (sap *Service) Node() core.Node { +func (sap *Service) Node() *core.Node { return sap.NodeInfo } @@ -455,7 +474,7 @@ func (sap *Service) Chain() shared.ChainType { // close is used to close all listening subscriptions // close needs to be called with subscription access locked func (sap *Service) close() { - log.Info("Closing all subscriptions") + log.Infof("Closing all %s subscriptions", sap.chain.String()) for subType, subs := range sap.Subscriptions { for _, sub := range subs { sendNonBlockingQuit(sub) @@ -468,7 +487,7 @@ func (sap *Service) close() { // closeType is used to close all subscriptions of given type // closeType needs to be called with subscription access locked func (sap *Service) closeType(subType common.Hash) { - log.Infof("Closing all subscriptions of type %s", subType.String()) + log.Infof("Closing all %s subscriptions of type %s", sap.chain.String(), subType.String()) subs := sap.Subscriptions[subType] for _, sub := range subs { sendNonBlockingQuit(sub) diff --git a/pkg/super_node/version.go b/pkg/super_node/version.go new file mode 100644 index 00000000..a85bb41f --- /dev/null +++ b/pkg/super_node/version.go @@ -0,0 +1,63 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package super_node + +import "fmt" + +const ( + VersionMajor = 0 // Major version component of the current release + VersionMinor = 1 // Minor version component of the current release + VersionPatch = 0 // Patch version component of the current release + VersionMeta = "alpha" // Version metadata to append to the version string +) + +// Version holds the textual version string. +var Version = func() string { + return fmt.Sprintf("%d.%d.%d", VersionMajor, VersionMinor, VersionPatch) +}() + +// VersionWithMeta holds the textual version string including the metadata. +var VersionWithMeta = func() string { + v := Version + if VersionMeta != "" { + v += "-" + VersionMeta + } + return v +}() + +// ArchiveVersion holds the textual version string +func ArchiveVersion(gitCommit string) string { + vsn := Version + if VersionMeta != "stable" { + vsn += "-" + VersionMeta + } + if len(gitCommit) >= 8 { + vsn += "-" + gitCommit[:8] + } + return vsn +} + +func VersionWithCommit(gitCommit, gitDate string) string { + vsn := VersionWithMeta + if len(gitCommit) >= 8 { + vsn += "-" + gitCommit[:8] + } + if (VersionMeta != "stable") && (gitDate != "") { + vsn += "-" + gitDate + } + return vsn +} diff --git a/pkg/wasm/instantiator.go b/pkg/wasm/instantiator.go index 89542880..f45ca07c 100644 --- a/pkg/wasm/instantiator.go +++ b/pkg/wasm/instantiator.go @@ -16,16 +16,23 @@ package wasm -import "github.com/vulcanize/vulcanizedb/pkg/postgres" +import ( + "github.com/vulcanize/vulcanizedb/pkg/postgres" +) // Instantiator is used to instantiate WASM functions in Postgres type Instantiator struct { db *postgres.DB - instances [][2]string // list of WASM file paths and namespaces + instances []WasmFunction // WASM file paths and namespaces +} + +type WasmFunction struct { + BinaryPath string + Namespace string } // NewWASMInstantiator returns a pointer to a new Instantiator -func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator { +func NewWASMInstantiator(db *postgres.DB, instances []WasmFunction) *Instantiator { return &Instantiator{ db: db, instances: instances, @@ -39,7 +46,7 @@ func (i *Instantiator) Instantiate() error { return err } for _, pn := range i.instances { - _, err := tx.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1]) + _, err := tx.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn.BinaryPath, pn.Namespace) if err != nil { return err } diff --git a/pkg/watcher/config.go b/pkg/watcher/config.go index 7190ac9a..dc8de350 100644 --- a/pkg/watcher/config.go +++ b/pkg/watcher/config.go @@ -17,10 +17,24 @@ package watcher import ( + "context" + "errors" + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/wasm" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/viper" + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + shared2 "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" + "github.com/vulcanize/vulcanizedb/utils" ) // Config holds all of the parameters necessary for defining and running an instance of a watcher @@ -32,9 +46,93 @@ type Config struct { // DB itself DB *postgres.DB // Subscription client - Client core.RPCClient + Client interface{} // WASM instantiation paths and namespaces - WASMInstances [][2]string - // Path and names for trigger functions (sql files) that (can) use the instantiated wasm namespaces - TriggerFunctions [][2]string + WASMFunctions []wasm.WasmFunction + // File paths for trigger functions (sql files) that (can) use the instantiated wasm namespaces + TriggerFunctions []string + // Chain type used to specify what type of raw data we will be processing + Chain shared.ChainType + // Source type used to specify which streamer to use based on what API we will be interfacing with + Source shared2.SourceType + // Info for the node + NodeInfo core.Node +} + +func NewWatcherConfig() (*Config, error) { + c := new(Config) + var err error + chain := viper.GetString("watcher.chain") + c.Chain, err = shared.NewChainType(chain) + if err != nil { + return nil, err + } + switch c.Chain { + case shared.Ethereum: + c.SubscriptionConfig, err = eth.NewEthSubscriptionConfig() + if err != nil { + return nil, err + } + case shared.Bitcoin: + c.SubscriptionConfig, err = btc.NewEthSubscriptionConfig() + if err != nil { + return nil, err + } + case shared.Omni: + return nil, errors.New("omni chain type currently not supported") + default: + return nil, fmt.Errorf("unexpected chain type %s", c.Chain.String()) + } + sourcePath := viper.GetString("watcher.dataSource") + if sourcePath == "" { + sourcePath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided + } + sourceType := viper.GetString("watcher.dataPath") + c.Source, err = shared2.NewSourceType(sourceType) + if err != nil { + return nil, err + } + switch c.Source { + case shared2.Ethereum: + return nil, errors.New("ethereum data source currently not supported") + case shared2.Bitcoin: + return nil, errors.New("bitcoin data source currently not supported") + case shared2.VulcanizeDB: + rawRPCClient, err := rpc.Dial(sourcePath) + if err != nil { + return nil, err + } + cli := client.NewRPCClient(rawRPCClient, sourcePath) + var nodeInfo core.Node + if err := cli.CallContext(context.Background(), &nodeInfo, "vdb_node"); err != nil { + return nil, err + } + c.NodeInfo = nodeInfo + c.Client = cli + default: + return nil, fmt.Errorf("unexpected data source type %s", c.Source.String()) + } + wasmBinaries := viper.GetStringSlice("watcher.wasmBinaries") + wasmNamespaces := viper.GetStringSlice("watcher.wasmNamespaces") + if len(wasmBinaries) != len(wasmNamespaces) { + return nil, fmt.Errorf("watcher config needs a namespace for every wasm binary\r\nhave %d binaries and %d namespaces", len(wasmBinaries), len(wasmNamespaces)) + } + c.WASMFunctions = make([]wasm.WasmFunction, len(wasmBinaries)) + for i, bin := range wasmBinaries { + c.WASMFunctions[i] = wasm.WasmFunction{ + BinaryPath: bin, + Namespace: wasmNamespaces[i], + } + } + c.TriggerFunctions = viper.GetStringSlice("watcher.triggerFunctions") + c.DBConfig = config.Database{ + Name: viper.GetString("watcher.database.name"), + Hostname: viper.GetString("watcher.database.hostname"), + Port: viper.GetInt("watcher.database.port"), + User: viper.GetString("watcher.database.user"), + Password: viper.GetString("watcher.database.password"), + } + db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.DB = &db + return c, nil } diff --git a/pkg/watcher/constructors.go b/pkg/watcher/constructors.go index d4e40b8e..f8112c49 100644 --- a/pkg/watcher/constructors.go +++ b/pkg/watcher/constructors.go @@ -28,12 +28,22 @@ import ( ) // NewSuperNodeStreamer returns a new shared.SuperNodeStreamer -func NewSuperNodeStreamer(client core.RPCClient) shared.SuperNodeStreamer { - return streamer.NewSuperNodeStreamer(client) +func NewSuperNodeStreamer(source shared.SourceType, client interface{}) (shared.SuperNodeStreamer, error) { + switch source { + case shared.VulcanizeDB: + cli, ok := client.(core.RPCClient) + if !ok { + var expectedClientType core.RPCClient + return nil, fmt.Errorf("vulcanizedb NewSuperNodeStreamer construct expects client type %T got %T", expectedClientType, client) + } + return streamer.NewSuperNodeStreamer(cli), nil + default: + return nil, fmt.Errorf("NewSuperNodeStreamer constructor unexpected souce type %s", source.String()) + } } // NewRepository constructs and returns a new Repository that satisfies the shared.Repository interface for the specified chain -func NewRepository(chain shared2.ChainType, db *postgres.DB, triggerFuncs [][2]string) (shared.Repository, error) { +func NewRepository(chain shared2.ChainType, db *postgres.DB, triggerFuncs []string) (shared.Repository, error) { switch chain { case shared2.Ethereum: return eth.NewRepository(db, triggerFuncs), nil diff --git a/pkg/watcher/eth/repository.go b/pkg/watcher/eth/repository.go index da2e692b..48a55d46 100644 --- a/pkg/watcher/eth/repository.go +++ b/pkg/watcher/eth/repository.go @@ -32,12 +32,12 @@ var ( // Repository is the underlying struct for satisfying the shared.Repository interface for eth type Repository struct { db *postgres.DB - triggerFunctions [][2]string + triggerFunctions []string deleteCalls int64 } // NewRepository returns a new eth.Repository that satisfies the shared.Repository interface -func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Repository { +func NewRepository(db *postgres.DB, triggerFunctions []string) shared.Repository { return &Repository{ db: db, triggerFunctions: triggerFunctions, diff --git a/pkg/watcher/service.go b/pkg/watcher/service.go index b8f50832..a67d9615 100644 --- a/pkg/watcher/service.go +++ b/pkg/watcher/service.go @@ -40,7 +40,7 @@ type Watcher interface { // Service is the underlying struct for the SuperNodeWatcher type Service struct { // Config - WatcherConfig Config + WatcherConfig *Config // Interface for streaming data from super node SuperNodeStreamer shared.SuperNodeStreamer // Interface for db operations @@ -60,16 +60,20 @@ type Service struct { } // NewWatcher returns a new Service which satisfies the Watcher interface -func NewWatcher(c Config, quitChan chan bool) (Watcher, error) { +func NewWatcher(c *Config, quitChan chan bool) (Watcher, error) { repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions) if err != nil { return nil, err } + streamer, err := NewSuperNodeStreamer(c.Source, c.Client) + if err != nil { + return nil, err + } return &Service{ WatcherConfig: c, - SuperNodeStreamer: NewSuperNodeStreamer(c.Client), + SuperNodeStreamer: streamer, Repository: repo, - WASMIniter: wasm.NewWASMInstantiator(c.DB, c.WASMInstances), + WASMIniter: wasm.NewWASMInstantiator(c.DB, c.WASMFunctions), PayloadChan: make(chan super_node.SubscriptionPayload, super_node.PayloadChanBufferSize), QuitChan: quitChan, }, nil @@ -85,7 +89,7 @@ func (s *Service) Init() error { return s.Repository.LoadTriggers() } -// Watch is the top level loop for watching super node +// Watch is the top level loop for watching func (s *Service) Watch(wg *sync.WaitGroup) error { rlpConfig, err := rlp.EncodeToBytes(s.WatcherConfig.SubscriptionConfig) if err != nil { diff --git a/pkg/watcher/shared/source_type.go b/pkg/watcher/shared/source_type.go new file mode 100644 index 00000000..7a1ce730 --- /dev/null +++ b/pkg/watcher/shared/source_type.go @@ -0,0 +1,58 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +import ( + "errors" + "strings" +) + +// SourceType enum for specifying source type for raw chain data +type SourceType int + +const ( + Unknown SourceType = iota + VulcanizeDB + Ethereum + Bitcoin +) + +func (c SourceType) String() string { + switch c { + case Ethereum: + return "Ethereum" + case Bitcoin: + return "Bitcoin" + case VulcanizeDB: + return "VulcanizeDB" + default: + return "" + } +} + +func NewSourceType(name string) (SourceType, error) { + switch strings.ToLower(name) { + case "ethereum", "eth": + return Ethereum, nil + case "bitcoin", "btc", "xbt": + return Bitcoin, nil + case "vulcanizedb", "vdb": + return VulcanizeDB, nil + default: + return Unknown, errors.New("invalid name for data source") + } +}