major refactor pt 3

This commit is contained in:
Ian Norden 2020-06-29 19:16:52 -05:00
parent 449d23757e
commit e2bcc06f8a
49 changed files with 557 additions and 646 deletions

View File

@ -192,7 +192,7 @@ This set of parameters needs to be set no matter the chain type.
path = "~/.ipfs" # $IPFS_PATH
mode = "postgres" # $IPFS_MODE
[superNode]
[watcher]
chain = "bitcoin" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH

View File

@ -30,7 +30,7 @@ import (
var resyncCmd = &cobra.Command{
Use: "resync",
Short: "Resync historical data",
Long: `Use this command to fill in sections of missing data in the super node`,
Long: `Use this command to fill in sections of missing data in the ipfs-blockchain-watcher database`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
@ -40,8 +40,8 @@ var resyncCmd = &cobra.Command{
func rsyncCmdCommand() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading super node configuration variables")
rConfig, err := resync.NewReSyncConfig()
logWithCommand.Debug("loading resync configuration variables")
rConfig, err := resync.NewConfig()
if err != nil {
logWithCommand.Fatal(err)
}

View File

@ -29,18 +29,16 @@ import (
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/streamer"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
)
// streamEthSubscriptionCmd represents the streamEthSubscription command
var streamEthSubscriptionCmd = &cobra.Command{
Use: "streamEthSubscription",
Short: "This command is used to subscribe to the super node eth stream with the provided filters",
Long: `This command is for demo and testing purposes and is used to subscribe to the super node with the provided subscription configuration parameters.
It does not do anything with the data streamed from the super node other than unpack it and print it out for demonstration purposes.`,
Short: "This command is used to subscribe to the eth ipfs watcher data stream with the provided filters",
Long: `This command is for demo and testing purposes and is used to subscribe to the watcher with the provided subscription configuration parameters.
It does not do anything with the data streamed from the watcher other than unpack it and print it out for demonstration purposes.`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
@ -60,18 +58,21 @@ func streamEthSubscription() {
}
// Create a new rpc client and a subscription streamer with that client
rpcClient := getRPCClient()
str := streamer.NewSuperNodeStreamer(rpcClient)
rpcClient, err := getRPCClient()
if err != nil {
logWithCommand.Fatal(err)
}
subClient := client.NewClient(rpcClient)
// Buffered channel for reading subscription payloads
payloadChan := make(chan watcher.SubscriptionPayload, 20000)
payloadChan := make(chan w.SubscriptionPayload, 20000)
// Subscribe to the super node service with the given config/filter parameters
// Subscribe to the watcher service with the given config/filter parameters
rlpParams, err := rlp.EncodeToBytes(ethSubConfig)
if err != nil {
logWithCommand.Fatal(err)
}
sub, err := str.Stream(payloadChan, rlpParams)
sub, err := subClient.Stream(payloadChan, rlpParams)
if err != nil {
logWithCommand.Fatal(err)
}
@ -167,14 +168,10 @@ func streamEthSubscription() {
}
}
func getRPCClient() core.RPCClient {
vulcPath := viper.GetString("superNode.ethSubscription.wsPath")
func getRPCClient() (*rpc.Client, error) {
vulcPath := viper.GetString("watcher.ethSubscription.wsPath")
if vulcPath == "" {
vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
}
rawRPCClient, err := rpc.Dial(vulcPath)
if err != nil {
logWithCommand.Fatal(err)
}
return client.NewRPCClient(rawRPCClient, vulcPath)
return rpc.Dial(vulcPath)
}

View File

@ -18,22 +18,23 @@ package cmd
import (
"os"
"os/signal"
"sync"
s "sync"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
h "github.com/vulcanize/ipfs-blockchain-watcher/pkg/historical"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
v "github.com/vulcanize/ipfs-blockchain-watcher/version"
)
// superNodeCmd represents the superNode command
var superNodeCmd = &cobra.Command{
Use: "superNode",
// watchCmd represents the watch command
var watchCmd = &cobra.Command{
Use: "watch",
Short: "sync chain data into PG-IPFS",
Long: `This command configures a VulcanizeDB ipfs-blockchain-watcher.
@ -49,146 +50,155 @@ and fill in gaps in the data
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *log.WithField("SubCommand", subCommand)
superNode()
watch()
},
}
func superNode() {
func watch() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading super node configuration variables")
superNodeConfig, err := watcher.NewSuperNodeConfig()
var forwardPayloadChan chan shared.ConvertedData
wg := new(s.WaitGroup)
logWithCommand.Debug("loading watcher configuration variables")
watcherConfig, err := w.NewConfig()
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("super node config: %+v", superNodeConfig)
if superNodeConfig.IPFSMode == shared.LocalInterface {
logWithCommand.Infof("watcher config: %+v", watcherConfig)
if watcherConfig.IPFSMode == shared.LocalInterface {
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
}
wg := &sync.WaitGroup{}
logWithCommand.Debug("initializing new super node service")
superNode, err := watcher.NewSuperNode(superNodeConfig)
logWithCommand.Debug("initializing new watcher service")
watcher, err := w.NewWatcher(watcherConfig)
if err != nil {
logWithCommand.Fatal(err)
}
var forwardPayloadChan chan shared.ConvertedData
if superNodeConfig.Serve {
logWithCommand.Info("starting up super node servers")
forwardPayloadChan = make(chan shared.ConvertedData, watcher.PayloadChanBufferSize)
superNode.Serve(wg, forwardPayloadChan)
if err := startServers(superNode, superNodeConfig); err != nil {
if watcherConfig.Serve {
logWithCommand.Info("starting up watcher servers")
forwardPayloadChan = make(chan shared.ConvertedData, w.PayloadChanBufferSize)
watcher.Serve(wg, forwardPayloadChan)
if err := startServers(watcher, watcherConfig); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.Sync {
logWithCommand.Info("starting up super node sync process")
if err := superNode.Sync(wg, forwardPayloadChan); err != nil {
if watcherConfig.Sync {
logWithCommand.Info("starting up watcher sync process")
if err := watcher.Sync(wg, forwardPayloadChan); err != nil {
logWithCommand.Fatal(err)
}
}
var backFiller watcher.BackFillInterface
if superNodeConfig.BackFill {
logWithCommand.Debug("initializing new super node backfill service")
backFiller, err = watcher.NewBackFillService(superNodeConfig, forwardPayloadChan)
var backFiller h.BackFillInterface
if watcherConfig.Historical {
historicalConfig, err := h.NewConfig()
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Info("starting up super node backfill process")
logWithCommand.Debug("initializing new historical backfill service")
backFiller, err = h.NewBackFillService(historicalConfig, forwardPayloadChan)
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Info("starting up watcher backfill process")
backFiller.BackFill(wg)
}
shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
if superNodeConfig.BackFill {
if watcherConfig.Historical {
backFiller.Stop()
}
superNode.Stop()
watcher.Stop()
wg.Wait()
}
func startServers(superNode watcher.SuperNode, settings *watcher.Config) error {
func startServers(watcher w.Watcher, settings *w.Config) error {
logWithCommand.Debug("starting up IPC server")
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, watcher.APIs())
if err != nil {
return err
}
logWithCommand.Debug("starting up WS server")
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true)
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, watcher.APIs(), []string{"vdb"}, nil, true)
if err != nil {
return err
}
logWithCommand.Debug("starting up HTTP server")
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, watcher.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
return err
}
func init() {
rootCmd.AddCommand(superNodeCmd)
rootCmd.AddCommand(watchCmd)
// flags for all config variables
superNodeCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path")
watchCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path")
superNodeCmd.PersistentFlags().String("supernode-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.")
superNodeCmd.PersistentFlags().Bool("supernode-server", false, "turn vdb server on or off")
superNodeCmd.PersistentFlags().String("supernode-ws-path", "", "vdb server ws path")
superNodeCmd.PersistentFlags().String("supernode-http-path", "", "vdb server http path")
superNodeCmd.PersistentFlags().String("supernode-ipc-path", "", "vdb server ipc path")
superNodeCmd.PersistentFlags().Bool("supernode-sync", false, "turn vdb sync on or off")
superNodeCmd.PersistentFlags().Int("supernode-workers", 0, "how many worker goroutines to publish and index data")
superNodeCmd.PersistentFlags().Bool("supernode-back-fill", false, "turn vdb backfill on or off")
superNodeCmd.PersistentFlags().Int("supernode-frequency", 0, "how often (in seconds) the backfill process checks for gaps")
superNodeCmd.PersistentFlags().Int("supernode-batch-size", 0, "data fetching batch size")
superNodeCmd.PersistentFlags().Int("supernode-batch-number", 0, "how many goroutines to fetch data concurrently")
superNodeCmd.PersistentFlags().Int("supernode-validation-level", 0, "backfill will resync any data below this level")
superNodeCmd.PersistentFlags().Int("supernode-timeout", 0, "timeout used for backfill http requests")
watchCmd.PersistentFlags().String("watcher-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.")
watchCmd.PersistentFlags().Bool("watcher-server", false, "turn vdb server on or off")
watchCmd.PersistentFlags().String("watcher-ws-path", "", "vdb server ws path")
watchCmd.PersistentFlags().String("watcher-http-path", "", "vdb server http path")
watchCmd.PersistentFlags().String("watcher-ipc-path", "", "vdb server ipc path")
watchCmd.PersistentFlags().Bool("watcher-sync", false, "turn vdb sync on or off")
watchCmd.PersistentFlags().Int("watcher-workers", 0, "how many worker goroutines to publish and index data")
watchCmd.PersistentFlags().Bool("watcher-back-fill", false, "turn vdb backfill on or off")
watchCmd.PersistentFlags().Int("watcher-frequency", 0, "how often (in seconds) the backfill process checks for gaps")
watchCmd.PersistentFlags().Int("watcher-batch-size", 0, "data fetching batch size")
watchCmd.PersistentFlags().Int("watcher-batch-number", 0, "how many goroutines to fetch data concurrently")
watchCmd.PersistentFlags().Int("watcher-validation-level", 0, "backfill will resync any data below this level")
watchCmd.PersistentFlags().Int("watcher-timeout", 0, "timeout used for backfill http requests")
superNodeCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node")
superNodeCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
superNodeCmd.PersistentFlags().String("btc-password", "", "password for btc node")
superNodeCmd.PersistentFlags().String("btc-username", "", "username for btc node")
superNodeCmd.PersistentFlags().String("btc-node-id", "", "btc node id")
superNodeCmd.PersistentFlags().String("btc-client-name", "", "btc client name")
superNodeCmd.PersistentFlags().String("btc-genesis-block", "", "btc genesis block hash")
superNodeCmd.PersistentFlags().String("btc-network-id", "", "btc network id")
watchCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node")
watchCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node")
watchCmd.PersistentFlags().String("btc-password", "", "password for btc node")
watchCmd.PersistentFlags().String("btc-username", "", "username for btc node")
watchCmd.PersistentFlags().String("btc-node-id", "", "btc node id")
watchCmd.PersistentFlags().String("btc-client-name", "", "btc client name")
watchCmd.PersistentFlags().String("btc-genesis-block", "", "btc genesis block hash")
watchCmd.PersistentFlags().String("btc-network-id", "", "btc network id")
superNodeCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
superNodeCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
superNodeCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
superNodeCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
superNodeCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
superNodeCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
watchCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
watchCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
watchCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
watchCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
watchCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
watchCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
// and their bindings
viper.BindPFlag("ipfs.path", superNodeCmd.PersistentFlags().Lookup("ipfs-path"))
viper.BindPFlag("ipfs.path", watchCmd.PersistentFlags().Lookup("ipfs-path"))
viper.BindPFlag("superNode.chain", superNodeCmd.PersistentFlags().Lookup("supernode-chain"))
viper.BindPFlag("superNode.server", superNodeCmd.PersistentFlags().Lookup("supernode-server"))
viper.BindPFlag("superNode.wsPath", superNodeCmd.PersistentFlags().Lookup("supernode-ws-path"))
viper.BindPFlag("superNode.httpPath", superNodeCmd.PersistentFlags().Lookup("supernode-http-path"))
viper.BindPFlag("superNode.ipcPath", superNodeCmd.PersistentFlags().Lookup("supernode-ipc-path"))
viper.BindPFlag("superNode.sync", superNodeCmd.PersistentFlags().Lookup("supernode-sync"))
viper.BindPFlag("superNode.workers", superNodeCmd.PersistentFlags().Lookup("supernode-workers"))
viper.BindPFlag("superNode.backFill", superNodeCmd.PersistentFlags().Lookup("supernode-back-fill"))
viper.BindPFlag("superNode.frequency", superNodeCmd.PersistentFlags().Lookup("supernode-frequency"))
viper.BindPFlag("superNode.batchSize", superNodeCmd.PersistentFlags().Lookup("supernode-batch-size"))
viper.BindPFlag("superNode.batchNumber", superNodeCmd.PersistentFlags().Lookup("supernode-batch-number"))
viper.BindPFlag("superNode.validationLevel", superNodeCmd.PersistentFlags().Lookup("supernode-validation-level"))
viper.BindPFlag("superNode.timeout", superNodeCmd.PersistentFlags().Lookup("supernode-timeout"))
viper.BindPFlag("watcher.chain", watchCmd.PersistentFlags().Lookup("watcher-chain"))
viper.BindPFlag("watcher.server", watchCmd.PersistentFlags().Lookup("watcher-server"))
viper.BindPFlag("watcher.wsPath", watchCmd.PersistentFlags().Lookup("watcher-ws-path"))
viper.BindPFlag("watcher.httpPath", watchCmd.PersistentFlags().Lookup("watcher-http-path"))
viper.BindPFlag("watcher.ipcPath", watchCmd.PersistentFlags().Lookup("watcher-ipc-path"))
viper.BindPFlag("watcher.sync", watchCmd.PersistentFlags().Lookup("watcher-sync"))
viper.BindPFlag("watcher.workers", watchCmd.PersistentFlags().Lookup("watcher-workers"))
viper.BindPFlag("watcher.backFill", watchCmd.PersistentFlags().Lookup("watcher-back-fill"))
viper.BindPFlag("watcher.frequency", watchCmd.PersistentFlags().Lookup("watcher-frequency"))
viper.BindPFlag("watcher.batchSize", watchCmd.PersistentFlags().Lookup("watcher-batch-size"))
viper.BindPFlag("watcher.batchNumber", watchCmd.PersistentFlags().Lookup("watcher-batch-number"))
viper.BindPFlag("watcher.validationLevel", watchCmd.PersistentFlags().Lookup("watcher-validation-level"))
viper.BindPFlag("watcher.timeout", watchCmd.PersistentFlags().Lookup("watcher-timeout"))
viper.BindPFlag("bitcoin.wsPath", superNodeCmd.PersistentFlags().Lookup("btc-ws-path"))
viper.BindPFlag("bitcoin.httpPath", superNodeCmd.PersistentFlags().Lookup("btc-http-path"))
viper.BindPFlag("bitcoin.pass", superNodeCmd.PersistentFlags().Lookup("btc-password"))
viper.BindPFlag("bitcoin.user", superNodeCmd.PersistentFlags().Lookup("btc-username"))
viper.BindPFlag("bitcoin.nodeID", superNodeCmd.PersistentFlags().Lookup("btc-node-id"))
viper.BindPFlag("bitcoin.clientName", superNodeCmd.PersistentFlags().Lookup("btc-client-name"))
viper.BindPFlag("bitcoin.genesisBlock", superNodeCmd.PersistentFlags().Lookup("btc-genesis-block"))
viper.BindPFlag("bitcoin.networkID", superNodeCmd.PersistentFlags().Lookup("btc-network-id"))
viper.BindPFlag("bitcoin.wsPath", watchCmd.PersistentFlags().Lookup("btc-ws-path"))
viper.BindPFlag("bitcoin.httpPath", watchCmd.PersistentFlags().Lookup("btc-http-path"))
viper.BindPFlag("bitcoin.pass", watchCmd.PersistentFlags().Lookup("btc-password"))
viper.BindPFlag("bitcoin.user", watchCmd.PersistentFlags().Lookup("btc-username"))
viper.BindPFlag("bitcoin.nodeID", watchCmd.PersistentFlags().Lookup("btc-node-id"))
viper.BindPFlag("bitcoin.clientName", watchCmd.PersistentFlags().Lookup("btc-client-name"))
viper.BindPFlag("bitcoin.genesisBlock", watchCmd.PersistentFlags().Lookup("btc-genesis-block"))
viper.BindPFlag("bitcoin.networkID", watchCmd.PersistentFlags().Lookup("btc-network-id"))
viper.BindPFlag("ethereum.wsPath", superNodeCmd.PersistentFlags().Lookup("eth-ws-path"))
viper.BindPFlag("ethereum.httpPath", superNodeCmd.PersistentFlags().Lookup("eth-http-path"))
viper.BindPFlag("ethereum.nodeID", superNodeCmd.PersistentFlags().Lookup("eth-node-id"))
viper.BindPFlag("ethereum.clientName", superNodeCmd.PersistentFlags().Lookup("eth-client-name"))
viper.BindPFlag("ethereum.genesisBlock", superNodeCmd.PersistentFlags().Lookup("eth-genesis-block"))
viper.BindPFlag("ethereum.networkID", superNodeCmd.PersistentFlags().Lookup("eth-network-id"))
viper.BindPFlag("ethereum.wsPath", watchCmd.PersistentFlags().Lookup("eth-ws-path"))
viper.BindPFlag("ethereum.httpPath", watchCmd.PersistentFlags().Lookup("eth-http-path"))
viper.BindPFlag("ethereum.nodeID", watchCmd.PersistentFlags().Lookup("eth-node-id"))
viper.BindPFlag("ethereum.clientName", watchCmd.PersistentFlags().Lookup("eth-client-name"))
viper.BindPFlag("ethereum.genesisBlock", watchCmd.PersistentFlags().Lookup("eth-genesis-block"))
viper.BindPFlag("ethereum.networkID", watchCmd.PersistentFlags().Lookup("eth-network-id"))
}

View File

@ -1,5 +1,5 @@
#!/bin/sh
# Runs the db migrations and starts the super node services
# Runs the db migrations and starts the watcher services
# Exit if the variable tests fail
set -e

View File

@ -26,9 +26,9 @@ services:
dockerfile: ./dockerfiles/super_node/Dockerfile
args:
USER: "vdbm"
CONFIG_FILE: ./environments/superNodeBTC.toml
CONFIG_FILE: ./environments/watcherBTC.toml
environment:
VDB_COMMAND: "superNode"
VDB_COMMAND: "watcher"
DATABASE_NAME: "vulcanize_public"
DATABASE_HOSTNAME: "db"
DATABASE_PORT: 5432
@ -49,9 +49,9 @@ services:
dockerfile: ./dockerfiles/super_node/Dockerfile
args:
USER: "vdbm"
CONFIG_FILE: ./environments/superNodeETH.toml
CONFIG_FILE: ./environments/watcherETH.toml
environment:
VDB_COMMAND: "superNode"
VDB_COMMAND: "watcher"
DATABASE_NAME: "vulcanize_public"
DATABASE_HOSTNAME: "db"
DATABASE_PORT: 5432

View File

@ -1,5 +1,5 @@
#!/bin/sh
# Runs the db migrations and starts the super node services
# Runs the db migrations and starts the watcher services
# Exit if the variable tests fail
set -e
@ -14,7 +14,7 @@ set +x
#test $DATABASE_PASSWORD
#test $IPFS_INIT
#test $IPFS_PATH
VDB_COMMAND=${VDB_COMMAND:-superNode}
VDB_COMMAND=${VDB_COMMAND:-watch}
set +e
# Construct the connection string for postgres

View File

@ -1,5 +1,5 @@
#!/bin/sh
# Runs the db migrations and starts the super node services
# Runs the db migrations and starts the watcher services
# Exit if the variable tests fail
set -e

View File

@ -24,7 +24,7 @@ All of their data can then be queried with standard [GraphQL](https://graphql.or
### RPC Subscription Interface
A direct, real-time subscription to the data being processed by ipfs-blockchain-watcher can be established over WS or IPC through the [Stream](../pkg/watch/api.go#L53) RPC method.
This method is not chain-specific and each chain-type supports it, it is accessed under the "vdb" namespace rather than a chain-specific namespace. An interface for
subscribing to this endpoint is provided [here](../pkg/streamer/super_node_streamer.go).
subscribing to this endpoint is provided [here](../pkg/client/client.go).
When subscribing to this endpoint, the subscriber provides a set of RLP-encoded subscription parameters. These parameters will be chain-specific, and are used
by ipfs-blockchain-watcher to filter and return a requested subset of chain data to the subscriber. (e.g. [BTC](../pkg/btc/subscription_config.go), [ETH](../../pkg/eth/subscription_config.go)).
@ -48,7 +48,7 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block
config, _ := eth.NewEthSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("superNode.ethSubscription.path")
vulcPath := viper.GetString("watcher.ethSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
stream := streamer.NewSuperNodeStreamer(rpcClient)
@ -67,32 +67,32 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block
The .toml file being used to fill the Ethereum subscription config would look something like this:
```toml
[superNode]
[superNode.ethSubscription]
[watcher]
[watcher.ethSubscription]
historicalData = false
historicalDataOnly = false
startingBlock = 0
endingBlock = 0
wsPath = "ws://127.0.0.1:8080"
[superNode.ethSubscription.headerFilter]
[watcher.ethSubscription.headerFilter]
off = false
uncles = false
[superNode.ethSubscription.txFilter]
[watcher.ethSubscription.txFilter]
off = false
src = []
dst = []
[superNode.ethSubscription.receiptFilter]
[watcher.ethSubscription.receiptFilter]
off = false
contracts = []
topic0s = []
topic1s = []
topic2s = []
topic3s = []
[superNode.ethSubscription.stateFilter]
[watcher.ethSubscription.stateFilter]
off = false
addresses = []
intermediateNodes = false
[superNode.ethSubscription.storageFilter]
[watcher.ethSubscription.storageFilter]
off = true
addresses = []
storageKeys = []
@ -131,9 +131,9 @@ in `src` and `dst`, respectively.
- Setting `off` to true tells ipfs-blockchain-watcher to not send any receipts to the subscriber
- `topic0s` is a string array which can be filled with event topics we want to filter for,
if it has any topics then ipfs-blockchain-watcher will only send receipts that contain logs which have that topic0.
- `contracts` is a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super node will
- `contracts` is a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the watcher will
only send receipts that correspond to one of those contracts.
- `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the super node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters.
- `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for transactions will be sent by the watcher, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters.
`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`.
@ -170,7 +170,7 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc
config, _ := btc.NewBtcSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("superNode.btcSubscription.path")
vulcPath := viper.GetString("watcher.btcSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
stream := streamer.NewSuperNodeStreamer(rpcClient)
@ -189,16 +189,16 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc
The .toml file being used to fill the Bitcoin subscription config would look something like this:
```toml
[superNode]
[superNode.btcSubscription]
[watcher]
[watcher.btcSubscription]
historicalData = false
historicalDataOnly = false
startingBlock = 0
endingBlock = 0
wsPath = "ws://127.0.0.1:8080"
[superNode.btcSubscription.headerFilter]
[watcher.btcSubscription.headerFilter]
off = false
[superNode.btcSubscription.txFilter]
[watcher.btcSubscription.txFilter]
off = false
segwit = false
witnessHashes = []

View File

@ -8,17 +8,17 @@
1. [IPFS Considerations](#ipfs-considerations)
## Processes
ipfs-blockchain-watcher is a [service](../pkg/super_node/service.go#L61) comprised of the following interfaces:
ipfs-blockchain-watcher is a [service](../pkg/watch/service.go#L61) comprised of the following interfaces:
* [Payload Fetcher](../pkg/super_node/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/super_node/btc/payload_fetcher.go), [ETH](../../pkg/super_node/eth/payload_fetcher.go)).
* [Payload Streamer](../pkg/super_node/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/super_node/btc/http_streamer.go), [ETH](../../pkg/super_node/eth/streamer.go)).
* [Payload Converter](../pkg/super_node/shared/interfaces.go#L34): Converters raw chain data to an intermediary form prepared for IPFS publishing. ([BTC](../../pkg/super_node/btc/converter.go), [ETH](../../pkg/super_node/eth/converter.go)).
* [IPLD Publisher](../pkg/super_node/shared/interfaces.go#L39): Publishes the converted data to IPFS, returning their CIDs and associated metadata for indexing. ([BTC](../../pkg/super_node/btc/publisher.go), [ETH](../../pkg/super_node/eth/publisher.go)).
* [CID Indexer](../pkg/super_node/shared/interfaces.go#L44): Indexes CIDs in Postgres with their associated metadata. This metadata is chain specific and selected based on utility. ([BTC](../../pkg/super_node/btc/indexer.go), [ETH](../../pkg/super_node/eth/indexer.go)).
* [CID Retriever](../pkg/super_node/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/super_node/btc/retriever.go), [ETH](../../pkg/super_node/eth/retriever.go)).
* [IPLD Fetcher](../pkg/super_node/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/super_node/btc/ipld_fetcher.go), [ETH](../../pkg/super_node/eth/ipld_fetcher.go))
* [Response Filterer](../pkg/super_node/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/super_node/btc/filterer.go), [ETH](../../pkg/super_node/eth/filterer.go)).
* [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/super_node/api.go), [ETH](../../pkg/super_node/eth/api.go)).
* [Payload Fetcher](../pkg/shared/interfaces.go#L29): Fetches raw chain data from a half-duplex endpoint (HTTP/IPC), used for historical data fetching. ([BTC](../../pkg/btc/payload_fetcher.go), [ETH](../../pkg/eth/payload_fetcher.go)).
* [Payload Streamer](../pkg/shared/interfaces.go#L24): Streams raw chain data from a full-duplex endpoint (WebSocket/IPC), used for syncing data at the head of the chain in real-time. ([BTC](../../pkg/btc/http_streamer.go), [ETH](../../pkg/eth/streamer.go)).
* [Payload Converter](../pkg/shared/interfaces.go#L34): Converters raw chain data to an intermediary form prepared for IPFS publishing. ([BTC](../../pkg/btc/converter.go), [ETH](../../pkg/eth/converter.go)).
* [IPLD Publisher](../pkg/shared/interfaces.go#L39): Publishes the converted data to IPFS, returning their CIDs and associated metadata for indexing. ([BTC](../../pkg/btc/publisher.go), [ETH](../../pkg/eth/publisher.go)).
* [CID Indexer](../pkg/shared/interfaces.go#L44): Indexes CIDs in Postgres with their associated metadata. This metadata is chain specific and selected based on utility. ([BTC](../../pkg/btc/indexer.go), [ETH](../../pkg/eth/indexer.go)).
* [CID Retriever](../pkg/shared/interfaces.go#L54): Retrieves CIDs from Postgres by searching against their associated metadata, is used to lookup data to serve API requests/subscriptions. ([BTC](../../pkg/btc/retriever.go), [ETH](../../pkg/eth/retriever.go)).
* [IPLD Fetcher](../pkg/shared/interfaces.go#L62): Fetches the IPLDs needed to service API requests/subscriptions from IPFS using retrieved CIDS; can route through a IPFS block-exchange to search for objects that are not directly available. ([BTC](../../pkg/btc/ipld_fetcher.go), [ETH](../../pkg/eth/ipld_fetcher.go))
* [Response Filterer](../pkg/shared/interfaces.go#L49): Filters converted data payloads served to API subscriptions; filters according to the subscriber provided parameters. ([BTC](../../pkg/btc/filterer.go), [ETH](../../pkg/eth/filterer.go)).
* [API](https://github.com/ethereum/go-ethereum/blob/master/rpc/types.go#L31): Expose RPC methods for clients to interface with the data. Chain-specific APIs should aim to recapitulate as much of the native API as possible. ([VDB](../../pkg/api.go), [ETH](../../pkg/eth/api.go)).
Appropriating the service for a new chain is done by creating underlying types to satisfy these interfaces for
@ -56,7 +56,7 @@ This set of parameters needs to be set no matter the chain type.
path = "~/.ipfs" # $IPFS_PATH
mode = "direct" # $IPFS_MODE
[superNode]
[watcher]
chain = "bitcoin" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH

View File

@ -20,7 +20,7 @@
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION
[superNode]
[watcher]
chain = "bitcoin" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH

View File

@ -21,7 +21,7 @@
clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION
[superNode]
[watcher]
chain = "ethereum" # $SUPERNODE_CHAIN
server = true # $SUPERNODE_SERVER
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SUPERNODE_IPC_PATH

View File

@ -1,29 +1,29 @@
[superNode]
[superNode.ethSubscription]
[watcher]
[watcher.ethSubscription]
historicalData = false
historicalDataOnly = false
startingBlock = 0
endingBlock = 0
wsPath = "ws://127.0.0.1:8080"
[superNode.ethSubscription.headerFilter]
[watcher.ethSubscription.headerFilter]
off = false
uncles = false
[superNode.ethSubscription.txFilter]
[watcher.ethSubscription.txFilter]
off = false
src = []
dst = []
[superNode.ethSubscription.receiptFilter]
[watcher.ethSubscription.receiptFilter]
off = false
contracts = []
topic0s = []
topic1s = []
topic2s = []
topic3s = []
[superNode.ethSubscription.stateFilter]
[watcher.ethSubscription.stateFilter]
off = false
addresses = []
intermediateNodes = false
[superNode.ethSubscription.storageFilter]
[watcher.ethSubscription.storageFilter]
off = true
addresses = []
storageKeys = []

View File

@ -25,9 +25,9 @@ import (
"github.com/sirupsen/logrus"
)
func TestBTCSuperNode(t *testing.T) {
func TestBTCWatcher(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Super Node BTC Suite Test")
RunSpecs(t, "BTC IPFS Watcher Suite Test")
}
var _ = BeforeSuite(func() {

View File

@ -169,7 +169,7 @@ func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
func (bcr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
log.Info("searching for gaps in the btc super node database")
log.Info("searching for gaps in the btc ipfs watcher database")
startingBlock, err := bcr.RetrieveFirstBlockNumber()
if err != nil {
return nil, fmt.Errorf("btc CIDRetriever RetrieveFirstBlockNumber error: %v", err)

View File

@ -25,7 +25,7 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// SubscriptionSettings config is used by a subscriber to specify what bitcoin data to stream from the super node
// SubscriptionSettings config is used by a subscriber to specify what bitcoin data to stream from the watcher
type SubscriptionSettings struct {
BackFill bool
BackFillOnly bool
@ -55,36 +55,36 @@ type TxFilter struct {
func NewBtcSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("superNode.btcSubscription.historicalDataOnly")
sc.BackFill = viper.GetBool("watcher.btcSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("watcher.btcSubscription.historicalDataOnly")
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
sc.Start = big.NewInt(viper.GetInt64("superNode.btcSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("superNode.btcSubscription.endingBlock"))
sc.Start = big.NewInt(viper.GetInt64("watcher.btcSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("watcher.btcSubscription.endingBlock"))
// Below default to false, which means we get all headers by default
sc.HeaderFilter = HeaderFilter{
Off: viper.GetBool("superNode.btcSubscription.headerFilter.off"),
Off: viper.GetBool("watcher.btcSubscription.headerFilter.off"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
pksc := viper.Get("superNode.btcSubscription.txFilter.pkScriptClass")
pksc := viper.Get("watcher.btcSubscription.txFilter.pkScriptClass")
pkScriptClasses, ok := pksc.([]uint8)
if !ok {
return nil, errors.New("superNode.btcSubscription.txFilter.pkScriptClass needs to be an array of uint8s")
return nil, errors.New("watcher.btcSubscription.txFilter.pkScriptClass needs to be an array of uint8s")
}
is := viper.Get("superNode.btcSubscription.txFilter.indexes")
is := viper.Get("watcher.btcSubscription.txFilter.indexes")
indexes, ok := is.([]int64)
if !ok {
return nil, errors.New("superNode.btcSubscription.txFilter.indexes needs to be an array of int64s")
return nil, errors.New("watcher.btcSubscription.txFilter.indexes needs to be an array of int64s")
}
sc.TxFilter = TxFilter{
Off: viper.GetBool("superNode.btcSubscription.txFilter.off"),
Segwit: viper.GetBool("superNode.btcSubscription.txFilter.segwit"),
WitnessHashes: viper.GetStringSlice("superNode.btcSubscription.txFilter.witnessHashes"),
Off: viper.GetBool("watcher.btcSubscription.txFilter.off"),
Segwit: viper.GetBool("watcher.btcSubscription.txFilter.segwit"),
WitnessHashes: viper.GetStringSlice("watcher.btcSubscription.txFilter.witnessHashes"),
PkScriptClasses: pkScriptClasses,
Indexes: indexes,
MultiSig: viper.GetBool("superNode.btcSubscription.txFilter.multiSig"),
Addresses: viper.GetStringSlice("superNode.btcSubscription.txFilter.addresses"),
MultiSig: viper.GetBool("watcher.btcSubscription.txFilter.multiSig"),
Addresses: viper.GetStringSlice("watcher.btcSubscription.txFilter.addresses"),
}
return sc, nil
}

View File

@ -22,7 +22,7 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
// TearDownDB is used to tear down the super node dbs after tests
// TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(db *postgres.DB) {
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package builders
import (
"fmt"

View File

@ -14,29 +14,30 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Streamer is used by watchers to stream eth data from a vulcanizedb super node
package streamer
// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipfs-blockchain-watcher
package client
import (
"context"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
)
// SuperNodeStreamer is the underlying struct for the shared.SuperNodeStreamer interface
type SuperNodeStreamer struct {
Client core.RPCClient
// Client is used to subscribe to the ipfs-blockchain-watcher ipld data stream
type Client struct {
c *rpc.Client
}
// NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface
func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer {
return &SuperNodeStreamer{
Client: client,
// NewClient creates a new Client
func NewClient(c *rpc.Client) *Client {
return &Client{
c: c,
}
}
// Stream is the main loop for subscribing to data from a vulcanizedb super node
func (sds *SuperNodeStreamer) Stream(payloadChan chan watcher.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vdb", payloadChan, "stream", rlpParams)
// Stream is the main loop for subscribing to iplds from an ipfs-blockchain-watcher server
func (c *Client) Stream(payloadChan chan watch.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) {
return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", rlpParams)
}

View File

@ -1,93 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"context"
"errors"
"reflect"
"github.com/ethereum/go-ethereum/rpc"
)
// RPCClient is a wrapper around the geth RPC client
type RPCClient struct {
client *rpc.Client
ipcPath string
}
// BatchElem is a struct to hold the elements of a BatchCall
type BatchElem struct {
Method string
Args []interface{}
Result interface{}
Error error
}
// NewRPCClient creates a new RpcClient
func NewRPCClient(client *rpc.Client, ipcPath string) RPCClient {
return RPCClient{
client: client,
ipcPath: ipcPath,
}
}
// CallContext makes an rpc method call with the provided context and arguments
func (client RPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
//If an empty interface (or other nil object) is passed to CallContext, when the JSONRPC message is created the params will
//be interpreted as [null]. This seems to work fine for most of the ethereum clients (which presumably ignore a null parameter.
//Ganache however does not ignore it, and throws an 'Incorrect number of arguments' error.
if args == nil {
return client.client.CallContext(ctx, result, method)
}
return client.client.CallContext(ctx, result, method, args...)
}
func (client RPCClient) IpcPath() string {
return client.ipcPath
}
func (client RPCClient) SupportedModules() (map[string]string, error) {
return client.client.SupportedModules()
}
func (client RPCClient) BatchCall(batch []BatchElem) error {
var rpcBatch []rpc.BatchElem
for _, batchElem := range batch {
var newBatchElem = rpc.BatchElem{
Result: batchElem.Result,
Method: batchElem.Method,
Args: batchElem.Args,
Error: batchElem.Error,
}
rpcBatch = append(rpcBatch, newBatchElem)
}
return client.client.BatchCall(rpcBatch)
}
// Subscribe subscribes to an rpc "namespace_subscribe" subscription with the given channel
// The first argument needs to be the method we wish to invoke
func (client RPCClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
chanVal := reflect.ValueOf(payloadChan)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
return nil, errors.New("second argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
return nil, errors.New("channel given to Subscribe must not be nil")
}
return client.client.Subscribe(context.Background(), namespace, payloadChan, args...)
}

View File

@ -1,36 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type EthClient interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
TransactionSender(ctx context.Context, tx *types.Transaction, block common.Hash, index uint) (common.Address, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
}

View File

@ -29,10 +29,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
// APIName is the namespace for the super node's eth api
// APIName is the namespace for the watcher's eth api
const APIName = "eth"
// APIVersion is the version of the super node's eth api
// APIVersion is the version of the watcher's eth api
const APIVersion = "0.0.1"
type PublicEthAPI struct {
@ -181,7 +181,7 @@ func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, f
}
// GetTransactionByHash returns the transaction for the given hash
// SuperNode cannot currently handle pending/tx_pool txs
// eth ipfs-blockchain-watcher cannot currently handle pending/tx_pool txs
func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash)

View File

@ -158,7 +158,7 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
}
// BlockByNumber returns the requested canonical block.
// Since the SuperNode can contain forked blocks, it is recommended to fetch BlockByHash as
// Since the ipfs-blockchain-watcher database can contain forked blocks, it is recommended to fetch BlockByHash as
// fetching by number can return non-deterministic results (returns the first block found at that height)
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
var err error

View File

@ -443,7 +443,7 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
// it finds the union of heights where no data exists and where the times_validated is lower than the validation level
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
log.Info("searching for gaps in the eth super node database")
log.Info("searching for gaps in the eth ipfs watcher database")
startingBlock, err := ecr.RetrieveFirstBlockNumber()
if err != nil {
return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err)

View File

@ -25,9 +25,9 @@ import (
"github.com/sirupsen/logrus"
)
func TestETHSuperNode(t *testing.T) {
func TestETHWatcher(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Super Node ETH Suite Test")
RunSpecs(t, "ETH IPFS WatcherSuite Test")
}
var _ = BeforeSuite(func() {

View File

@ -22,9 +22,7 @@ import (
"errors"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
)
// BackFillerClient is a mock client for use in backfiller tests
@ -46,7 +44,7 @@ func (mc *BackFillerClient) SetReturnDiffAt(height uint64, diffPayload statediff
}
// BatchCall mockClient method to simulate batch call to geth
func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
func (mc *BackFillerClient) BatchCall(batch []rpc.BatchElem) error {
if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
}

View File

@ -24,7 +24,7 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the super node
// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher
type SubscriptionSettings struct {
BackFill bool
BackFillOnly bool
@ -78,50 +78,50 @@ type StorageFilter struct {
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly")
sc.BackFill = viper.GetBool("watcher.ethSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("watcher.ethSubscription.historicalDataOnly")
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
sc.Start = big.NewInt(viper.GetInt64("superNode.ethSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock"))
sc.Start = big.NewInt(viper.GetInt64("watcher.ethSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("watcher.ethSubscription.endingBlock"))
// Below default to false, which means we get all headers and no uncles by default
sc.HeaderFilter = HeaderFilter{
Off: viper.GetBool("superNode.ethSubscription.headerFilter.off"),
Uncles: viper.GetBool("superNode.ethSubscription.headerFilter.uncles"),
Off: viper.GetBool("watcher.ethSubscription.headerFilter.off"),
Uncles: viper.GetBool("watcher.ethSubscription.headerFilter.uncles"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
sc.TxFilter = TxFilter{
Off: viper.GetBool("superNode.ethSubscription.txFilter.off"),
Src: viper.GetStringSlice("superNode.ethSubscription.txFilter.src"),
Dst: viper.GetStringSlice("superNode.ethSubscription.txFilter.dst"),
Off: viper.GetBool("watcher.ethSubscription.txFilter.off"),
Src: viper.GetStringSlice("watcher.ethSubscription.txFilter.src"),
Dst: viper.GetStringSlice("watcher.ethSubscription.txFilter.dst"),
}
// By default all of the topic slices will be empty => match on any/all topics
topics := make([][]string, 4)
topics[0] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic0s")
topics[1] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic1s")
topics[2] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic2s")
topics[3] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic3s")
topics[0] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic0s")
topics[1] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic1s")
topics[2] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic2s")
topics[3] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic3s")
sc.ReceiptFilter = ReceiptFilter{
Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"),
MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"),
LogAddresses: viper.GetStringSlice("superNode.ethSubscription.receiptFilter.contracts"),
Off: viper.GetBool("watcher.ethSubscription.receiptFilter.off"),
MatchTxs: viper.GetBool("watcher.ethSubscription.receiptFilter.matchTxs"),
LogAddresses: viper.GetStringSlice("watcher.ethSubscription.receiptFilter.contracts"),
Topics: topics,
}
// Below defaults to two false, and a slice of length 0
// Which means we get all state leafs by default, but no intermediate nodes
sc.StateFilter = StateFilter{
Off: viper.GetBool("superNode.ethSubscription.stateFilter.off"),
IntermediateNodes: viper.GetBool("superNode.ethSubscription.stateFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("superNode.ethSubscription.stateFilter.addresses"),
Off: viper.GetBool("watcher.ethSubscription.stateFilter.off"),
IntermediateNodes: viper.GetBool("watcher.ethSubscription.stateFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("watcher.ethSubscription.stateFilter.addresses"),
}
// Below defaults to two false, and two slices of length 0
// Which means we get all storage leafs by default, but no intermediate nodes
sc.StorageFilter = StorageFilter{
Off: viper.GetBool("superNode.ethSubscription.storageFilter.off"),
IntermediateNodes: viper.GetBool("superNode.ethSubscription.storageFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("superNode.ethSubscription.storageFilter.addresses"),
StorageKeys: viper.GetStringSlice("superNode.ethSubscription.storageFilter.storageKeys"),
Off: viper.GetBool("watcher.ethSubscription.storageFilter.off"),
IntermediateNodes: viper.GetBool("watcher.ethSubscription.storageFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("watcher.ethSubscription.storageFilter.addresses"),
StorageKeys: viper.GetStringSlice("watcher.ethSubscription.storageFilter.storageKeys"),
}
return sc, nil
}

View File

@ -22,7 +22,7 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
// TearDownDB is used to tear down the super node dbs after tests
// TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(db *postgres.DB) {
tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred())

149
pkg/historical/config.go Normal file
View File

@ -0,0 +1,149 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package historical
import (
"fmt"
"time"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils"
)
// Env variables
const (
SUPERNODE_CHAIN = "SUPERNODE_CHAIN"
SUPERNODE_FREQUENCY = "SUPERNODE_FREQUENCY"
SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE"
SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER"
SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL"
BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS"
BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS"
BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME"
)
// Config struct
type Config struct {
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
DBConfig config.Database
DB *postgres.DB
HTTPClient interface{}
Frequency time.Duration
BatchSize uint64
BatchNumber uint64
ValidationLevel int
Timeout time.Duration // HTTP connection timeout in seconds
NodeInfo node.Node
}
// NewConfig is used to initialize a historical config from a .toml file
func NewConfig() (*Config, error) {
c := new(Config)
var err error
viper.BindEnv("superNode.chain", SUPERNODE_CHAIN)
chain := viper.GetString("superNode.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
}
c.IPFSMode, err = shared.GetIPFSMode()
if err != nil {
return nil, err
}
if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient {
c.IPFSPath, err = shared.GetIPFSPath()
if err != nil {
return nil, err
}
}
c.DBConfig.Init()
if err := c.init(); err != nil {
return nil, err
}
return c, nil
}
func (c *Config) init() error {
var err error
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY)
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
switch c.Chain {
case shared.Ethereum:
ethHTTP := viper.GetString("ethereum.httpPath")
c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP))
if err != nil {
return err
}
case shared.Bitcoin:
btcHTTP := viper.GetString("bitcoin.httpPath")
c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP)
}
freq := viper.GetInt("superNode.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Second * 30
} else {
frequency = time.Second * time.Duration(freq)
}
c.Frequency = frequency
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel")
dbConn := overrideDBConnConfig(c.DBConfig)
db := utils.LoadPostgres(dbConn, c.NodeInfo)
c.DB = &db
return nil
}
func overrideDBConnConfig(con config.Database) config.Database {
viper.BindEnv("database.backFill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.backFill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.backFill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.backFill.maxIdle")
con.MaxOpen = viper.GetInt("database.backFill.maxOpen")
con.MaxLifetime = viper.GetInt("database.backFill.maxLifetime")
return con
}

View File

@ -14,20 +14,22 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package core
package historical_test
import (
"context"
"io/ioutil"
"testing"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
)
type RPCClient interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCall(batch []client.BatchElem) error
IpcPath() string
SupportedModules() (map[string]string, error)
Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
func TestIPFSWatcher(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "IPFS Watcher Historical Suite Test")
}
var _ = BeforeSuite(func() {
logrus.SetOutput(ioutil.Discard)
})

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package historical
import (
"sync"
@ -22,23 +22,19 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/builders"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils"
)
const (
DefaultMaxBatchSize uint64 = 100
DefaultMaxBatchNumber int64 = 50
)
// BackFillInterface for filling in gaps in the super node
// BackFillInterface for filling in gaps in the ipfs-blockchain-watcher db
type BackFillInterface interface {
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
// Method for the watcher to periodically check for and fill in gaps in its data using an archival node
BackFill(wg *sync.WaitGroup)
Stop() error
}
// BackFillService for filling in gaps in the super node
// BackFillService for filling in gaps in the watcher
type BackFillService struct {
// Interface for converting payloads into IPLD object payloads
Converter shared.PayloadConverter
@ -68,33 +64,33 @@ type BackFillService struct {
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode)
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode)
indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
converter, err := NewPayloadConverter(settings.Chain)
converter, err := builders.NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn)
retriever, err := builders.NewCIDRetriever(settings.Chain, settings.DB)
if err != nil {
return nil, err
}
fetcher, err := NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
fetcher, err := builders.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
if err != nil {
return nil, err
}
batchSize := settings.BatchSize
if batchSize == 0 {
batchSize = DefaultMaxBatchSize
batchSize = shared.DefaultMaxBatchSize
}
batchNumber := int64(settings.BatchNumber)
if batchNumber == 0 {
batchNumber = DefaultMaxBatchNumber
batchNumber = shared.DefaultMaxBatchNumber
}
return &BackFillService{
Indexer: indexer,
@ -112,7 +108,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
}, nil
}
// BackFill periodically checks for and fills in gaps in the super node db
// BackFill periodically checks for and fills in gaps in the watcher db
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
ticker := time.NewTicker(bfs.GapCheckFrequency)
go func() {
@ -126,7 +122,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
case <-ticker.C:
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
if err != nil {
log.Errorf("%s super node db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err)
log.Errorf("%s watcher db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err)
continue
}
// spin up worker goroutines for this search pass
@ -140,7 +136,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
log.Infof("backFilling %s data from %d to %d", bfs.chain.String(), gap.Start, gap.Stop)
blockRangeBins, err := utils.GetBlockHeightBins(gap.Start, gap.Stop, bfs.BatchSize)
if err != nil {
log.Errorf("%s super node db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err)
log.Errorf("%s watcher db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err)
continue
}
for _, heights := range blockRangeBins {

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
package historical_test
import (
"sync"
@ -25,14 +25,14 @@ import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/historical"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
mocks2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared/mocks"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
)
var _ = Describe("BackFiller", func() {
Describe("FillGaps", func() {
It("Periodically checks for and fills in gaps in the super node's data", func() {
It("Periodically checks for and fills in gaps in the watcher's data", func() {
mockCidRepo := &mocks.CIDIndexer{
ReturnErr: nil,
}
@ -59,15 +59,15 @@ var _ = Describe("BackFiller", func() {
},
}
quitChan := make(chan bool, 1)
backfiller := &watcher.BackFillService{
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2,
BatchSize: watcher.DefaultMaxBatchSize,
BatchNumber: watcher.DefaultMaxBatchNumber,
BatchSize: shared.DefaultMaxBatchSize,
BatchNumber: shared.DefaultMaxBatchNumber,
QuitChan: quitChan,
}
wg := &sync.WaitGroup{}
@ -114,15 +114,15 @@ var _ = Describe("BackFiller", func() {
},
}
quitChan := make(chan bool, 1)
backfiller := &watcher.BackFillService{
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2,
BatchSize: watcher.DefaultMaxBatchSize,
BatchNumber: watcher.DefaultMaxBatchNumber,
BatchSize: shared.DefaultMaxBatchSize,
BatchNumber: shared.DefaultMaxBatchNumber,
QuitChan: quitChan,
}
wg := &sync.WaitGroup{}
@ -168,15 +168,15 @@ var _ = Describe("BackFiller", func() {
},
}
quitChan := make(chan bool, 1)
backfiller := &watcher.BackFillService{
backfiller := &historical.BackFillService{
Indexer: mockCidRepo,
Publisher: mockPublisher,
Converter: mockConverter,
Fetcher: mockFetcher,
Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2,
BatchSize: watcher.DefaultMaxBatchSize,
BatchNumber: watcher.DefaultMaxBatchNumber,
BatchSize: shared.DefaultMaxBatchSize,
BatchNumber: shared.DefaultMaxBatchNumber,
QuitChan: quitChan,
}
wg := &sync.WaitGroup{}

View File

@ -14,24 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"fmt"
)
type NodeType int
const (
GETH NodeType = iota
PARITY
INFURA
GANACHE
)
const (
KOVAN_NETWORK_ID = 42
)
package node
type Node struct {
GenesisBlock string
@ -39,19 +22,3 @@ type Node struct {
ID string
ClientName string
}
type ParityNodeInfo struct {
Track string
ParityVersion `json:"version"`
Hash string
}
func (pn ParityNodeInfo) String() string {
return fmt.Sprintf("Parity/v%d.%d.%d/", pn.Major, pn.Minor, pn.Patch)
}
type ParityVersion struct {
Major int
Minor int
Patch int
}

View File

@ -22,16 +22,16 @@ import (
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
)
type DB struct {
*sqlx.DB
Node core.Node
Node node.Node
NodeID int64
}
func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) {
func NewDB(databaseConfig config.Database, node node.Node) (*DB, error) {
connectString := config.DbConnectionString(databaseConfig)
db, connectErr := sqlx.Connect("postgres", connectString)
if connectErr != nil {
@ -55,7 +55,7 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) {
return &pg, nil
}
func (db *DB) CreateNode(node *core.Node) error {
func (db *DB) CreateNode(node *node.Node) error {
var nodeID int64
err := db.QueryRow(
`INSERT INTO nodes (genesis_block, network_id, node_id, client_name)

View File

@ -28,7 +28,7 @@ import (
. "github.com/onsi/gomega"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/test_config"
)
@ -84,7 +84,7 @@ var _ = Describe("Postgres DB", func() {
It("throws error when can't connect to the database", func() {
invalidDatabase := config.Database{}
node := core.Node{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
node := node.Node{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewDB(invalidDatabase, node)
@ -94,7 +94,7 @@ var _ = Describe("Postgres DB", func() {
It("throws error when can't create node", func() {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
node := core.Node{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
node := node.Node{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewDB(test_config.DBConfig, node)

View File

@ -23,7 +23,7 @@ import (
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils"
@ -55,15 +55,15 @@ type Config struct {
IPFSMode shared.IPFSMode
HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s)
NodeInfo core.Node // Info for the associated node
NodeInfo node.Node // Info for the associated node
Ranges [][2]uint64 // The block height ranges to resync
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
Timeout time.Duration // HTTP connection timeout in seconds
BatchNumber uint64
}
// NewReSyncConfig fills and returns a resync config from toml parameters
func NewReSyncConfig() (*Config, error) {
// NewConfig fills and returns a resync config from toml parameters
func NewConfig() (*Config, error) {
c := new(Config)
var err error

View File

@ -21,8 +21,8 @@ import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/builders"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
"github.com/vulcanize/ipfs-blockchain-watcher/utils"
)
@ -63,37 +63,37 @@ type Service struct {
// NewResyncService creates and returns a resync service from the provided settings
func NewResyncService(settings *Config) (Resync, error) {
publisher, err := watcher.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := watcher.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
converter, err := watcher.NewPayloadConverter(settings.Chain)
converter, err := builders.NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
retriever, err := watcher.NewCIDRetriever(settings.Chain, settings.DB)
retriever, err := builders.NewCIDRetriever(settings.Chain, settings.DB)
if err != nil {
return nil, err
}
fetcher, err := watcher.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
fetcher, err := builders.NewPaylaodFetcher(settings.Chain, settings.HTTPClient, settings.Timeout)
if err != nil {
return nil, err
}
cleaner, err := watcher.NewCleaner(settings.Chain, settings.DB)
cleaner, err := builders.NewCleaner(settings.Chain, settings.DB)
if err != nil {
return nil, err
}
batchSize := settings.BatchSize
if batchSize == 0 {
batchSize = watcher.DefaultMaxBatchSize
batchSize = shared.DefaultMaxBatchSize
}
batchNumber := int64(settings.BatchNumber)
if batchNumber == 0 {
batchNumber = watcher.DefaultMaxBatchNumber
batchNumber = shared.DefaultMaxBatchNumber
}
return &Service{
Indexer: indexer,

View File

@ -14,8 +14,9 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package config
package shared
type Client struct {
IPCPath string
}
const (
DefaultMaxBatchSize uint64 = 100
DefaultMaxBatchNumber int64 = 50
)

View File

@ -24,7 +24,7 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
)
// Env variables
@ -51,7 +51,7 @@ const (
)
// GetEthNodeAndClient returns eth node info and client from path url
func GetEthNodeAndClient(path string) (core.Node, *rpc.Client, error) {
func GetEthNodeAndClient(path string) (node.Node, *rpc.Client, error) {
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
@ -59,9 +59,9 @@ func GetEthNodeAndClient(path string) (core.Node, *rpc.Client, error) {
rpcClient, err := rpc.Dial(path)
if err != nil {
return core.Node{}, nil, err
return node.Node{}, nil, err
}
return core.Node{
return node.Node{
ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
@ -94,7 +94,7 @@ func GetIPFSMode() (IPFSMode, error) {
}
// GetBtcNodeAndClient returns btc node info from path url
func GetBtcNodeAndClient(path string) (core.Node, *rpcclient.ConnConfig) {
func GetBtcNodeAndClient(path string) (node.Node, *rpcclient.ConnConfig) {
viper.BindEnv("bitcoin.nodeID", BTC_NODE_ID)
viper.BindEnv("bitcoin.clientName", BTC_CLIENT_NAME)
viper.BindEnv("bitcoin.genesisBlock", BTC_GENESIS_BLOCK)
@ -103,7 +103,7 @@ func GetBtcNodeAndClient(path string) (core.Node, *rpcclient.ConnConfig) {
viper.BindEnv("bitcoin.user", BTC_NODE_USER)
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
return core.Node{
return node.Node{
ID: viper.GetString("bitcoin.nodeID"),
ClientName: viper.GetString("bitcoin.clientName"),
GenesisBlock: viper.GetString("bitcoin.genesisBlock"),

View File

@ -19,11 +19,8 @@ package shared
import (
"bytes"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
@ -31,6 +28,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
// ListContainsString used to check if a list of strings contains a particular string

View File

@ -18,15 +18,15 @@ package shared
import (
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
// SetupDB is use to setup a db for super node tests
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
return postgres.NewDB(config.Database{
Hostname: "localhost",
Name: "vulcanize_testing",
Port: 5432,
}, core.Node{})
}, node.Node{})
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package watch
import (
"context"
@ -25,8 +25,8 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
v "github.com/vulcanize/ipfs-blockchain-watcher/version"
)
@ -37,22 +37,22 @@ const APIName = "vdb"
// APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1"
// PublicSuperNodeAPI is the public api for the super node
type PublicSuperNodeAPI struct {
sn SuperNode
// PublicWatcherAPI is the public api for the watcher
type PublicWatcherAPI struct {
w Watcher
}
// NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI {
return &PublicSuperNodeAPI{
sn: superNodeInterface,
// NewPublicWatcherAPI creates a new PublicWatcherAPI with the provided underlying Watcher process
func NewPublicWatcherAPI(w Watcher) *PublicWatcherAPI {
return &PublicWatcherAPI{
w: w,
}
}
// Stream is the public method to setup a subscription that fires off super node payloads as they are processed
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) {
// Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed
func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) {
var params shared.SubscriptionSettings
switch api.sn.Chain() {
switch api.w.Chain() {
case shared.Ethereum:
var ethParams eth.SubscriptionSettings
if err := rlp.DecodeBytes(rlpParams, &ethParams); err != nil {
@ -81,22 +81,22 @@ func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*r
// subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan SubscriptionPayload, PayloadChanBufferSize)
quitChan := make(chan bool, 1)
go api.sn.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
go api.w.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
// loop and await payloads and relay them to the subscriber using notifier
for {
select {
case packet := <-payloadChannel:
if err := notifier.Notify(rpcSub.ID, packet); err != nil {
log.Error("Failed to send super node packet", "err", err)
api.sn.Unsubscribe(rpcSub.ID)
log.Error("Failed to send watcher data packet", "err", err)
api.w.Unsubscribe(rpcSub.ID)
return
}
case <-rpcSub.Err():
api.sn.Unsubscribe(rpcSub.ID)
api.w.Unsubscribe(rpcSub.ID)
return
case <-quitChan:
// don't need to unsubscribe to super node, the service does so before sending the quit signal this way
// don't need to unsubscribe from the watcher, the service does so before sending the quit signal this way
return
}
}
@ -105,21 +105,21 @@ func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*r
return rpcSub, nil
}
// Node is a public rpc method to allow transformers to fetch the node info for the super node
// NOTE: this is the node info for the node that the super node is syncing from, not the node info for the super node itself
func (api *PublicSuperNodeAPI) Node() *core.Node {
return api.sn.Node()
// Node is a public rpc method to allow transformers to fetch the node info for the watcher
// NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself
func (api *PublicWatcherAPI) Node() *node.Node {
return api.w.Node()
}
// Chain returns the chain type that this super node instance supports
func (api *PublicSuperNodeAPI) Chain() shared.ChainType {
return api.sn.Chain()
// Chain returns the chain type that this watcher instance supports
func (api *PublicWatcherAPI) Chain() shared.ChainType {
return api.w.Chain()
}
// Struct for holding super node meta data
// Struct for holding watcher meta data
type InfoAPI struct{}
// NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
// NewInfoAPI creates a new InfoAPI
func NewInfoAPI() *InfoAPI {
return &InfoAPI{}
}
@ -131,7 +131,7 @@ func (iapi *InfoAPI) Modules() map[string]string {
}
}
// NodeInfo gathers and returns a collection of metadata for the super node
// NodeInfo gathers and returns a collection of metadata for the watcher
func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo {
return &p2p.NodeInfo{
// TODO: formalize this
@ -140,7 +140,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo {
}
}
// Version returns the version of the super node
// Version returns the version of the watcher
func (iapi *InfoAPI) Version() string {
return v.VersionWithMeta
}

View File

@ -14,18 +14,17 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package watch
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/vulcanize/ipfs-blockchain-watcher/utils"
@ -33,27 +32,19 @@ import (
// Env variables
const (
SUPERNODE_CHAIN = "SUPERNODE_CHAIN"
SUPERNODE_SYNC = "SUPERNODE_SYNC"
SUPERNODE_WORKERS = "SUPERNODE_WORKERS"
SUPERNODE_SERVER = "SUPERNODE_SERVER"
SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH"
SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH"
SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH"
SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL"
SUPERNODE_FREQUENCY = "SUPERNODE_FREQUENCY"
SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE"
SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER"
SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL"
SUPERNODE_CHAIN = "SUPERNODE_CHAIN"
SUPERNODE_SYNC = "SUPERNODE_SYNC"
SUPERNODE_WORKERS = "SUPERNODE_WORKERS"
SUPERNODE_SERVER = "SUPERNODE_SERVER"
SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH"
SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH"
SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH"
SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL"
SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME"
BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS"
BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS"
BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME"
SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS"
SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS"
SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME"
@ -61,7 +52,6 @@ const (
// Config struct
type Config struct {
// Ubiquitous fields
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
@ -77,21 +67,14 @@ type Config struct {
SyncDBConn *postgres.DB
Workers int
WSClient interface{}
NodeInfo core.Node
// Backfiller params
BackFill bool
BackFillDBConn *postgres.DB
HTTPClient interface{}
Frequency time.Duration
BatchSize uint64
BatchNumber uint64
ValidationLevel int
Timeout time.Duration // HTTP connection timeout in seconds
NodeInfo node.Node
// Historical switch
Historical bool
}
// NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file
// Separate chain supernode instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
func NewSuperNodeConfig() (*Config, error) {
// NewConfig is used to initialize a watcher config from a .toml file
// Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
func NewConfig() (*Config, error) {
c := new(Config)
var err error
@ -106,6 +89,7 @@ func NewSuperNodeConfig() (*Config, error) {
viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL)
c.Historical = viper.GetBool("superNode.backFill")
chain := viper.GetString("superNode.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
@ -174,70 +158,14 @@ func NewSuperNodeConfig() (*Config, error) {
c.ServeDBConn = &serveDB
}
c.BackFill = viper.GetBool("superNode.backFill")
if c.BackFill {
if err := c.BackFillFields(); err != nil {
return nil, err
}
}
return c, nil
}
// BackFillFields is used to fill in the BackFill fields of the config
func (c *Config) BackFillFields() error {
var err error
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY)
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
switch c.Chain {
case shared.Ethereum:
ethHTTP := viper.GetString("ethereum.httpPath")
c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP))
if err != nil {
return err
}
case shared.Bitcoin:
btcHTTP := viper.GetString("bitcoin.httpPath")
c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP)
}
freq := viper.GetInt("superNode.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Second * 30
} else {
frequency = time.Second * time.Duration(freq)
}
c.Frequency = frequency
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel")
backFillDBConn := overrideDBConnConfig(c.DBConfig, BackFill)
backFillDB := utils.LoadPostgres(backFillDBConn, c.NodeInfo)
c.BackFillDBConn = &backFillDB
return nil
}
type mode string
var (
Sync mode = "sync"
BackFill mode = "backFill"
Serve mode = "serve"
Sync mode = "sync"
Serve mode = "serve"
)
func overrideDBConnConfig(con config.Database, m mode) config.Database {
@ -249,13 +177,6 @@ func overrideDBConnConfig(con config.Database, m mode) config.Database {
con.MaxIdle = viper.GetInt("database.sync.maxIdle")
con.MaxOpen = viper.GetInt("database.sync.maxOpen")
con.MaxLifetime = viper.GetInt("database.sync.maxLifetime")
case BackFill:
viper.BindEnv("database.backFill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.backFill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.backFill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.backFill.maxIdle")
con.MaxOpen = viper.GetInt("database.backFill.maxOpen")
con.MaxLifetime = viper.GetInt("database.backFill.maxLifetime")
case Serve:
viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS)

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package watch
import log "github.com/sirupsen/logrus"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package watch
import (
"fmt"
@ -22,13 +22,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
ethnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/builders"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
@ -37,12 +38,12 @@ const (
PayloadChanBufferSize = 2000
)
// SuperNode is the top level interface for streaming, converting to IPLDs, publishing,
// Watcher is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all chain data; screening this data; and serving it up to subscribed clients
// This service is compatible with the Ethereum service interface (node.Service)
type SuperNode interface {
type Watcher interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
ethnode.Service
// Data processing event loop
Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
// Pub-Sub handling event loop
@ -52,12 +53,12 @@ type SuperNode interface {
// Method to unsubscribe from the service
Unsubscribe(id rpc.ID)
// Method to access the node info for the service
Node() *core.Node
Node() *node.Node
// Method to access chain type
Chain() shared.ChainType
}
// Service is the underlying struct for the super node
// Service is the underlying struct for the watcher
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
@ -83,8 +84,8 @@ type Service struct {
Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription params hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]shared.SubscriptionSettings
// Info for the Geth node that this super node is working with
NodeInfo *core.Node
// Info for the Geth node that this watcher is working with
NodeInfo *node.Node
// Number of publishAndIndex workers
WorkerPoolSize int
// chain type for this service
@ -97,40 +98,40 @@ type Service struct {
serveWg *sync.WaitGroup
}
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
func NewSuperNode(settings *Config) (SuperNode, error) {
// NewWatcher creates a new Watcher using an underlying Service struct
func NewWatcher(settings *Config) (Watcher, error) {
sn := new(Service)
var err error
// If we are syncing, initialize the needed interfaces
if settings.Sync {
sn.Streamer, sn.PayloadChan, err = NewPayloadStreamer(settings.Chain, settings.WSClient)
sn.Streamer, sn.PayloadChan, err = builders.NewPayloadStreamer(settings.Chain, settings.WSClient)
if err != nil {
return nil, err
}
sn.Converter, err = NewPayloadConverter(settings.Chain)
sn.Converter, err = builders.NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
sn.Indexer, err = builders.NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Filterer, err = NewResponseFilterer(settings.Chain)
sn.Filterer, err = builders.NewResponseFilterer(settings.Chain)
if err != nil {
return nil, err
}
}
// If we are serving, initialize the needed interfaces
if settings.Serve {
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn)
sn.Retriever, err = builders.NewCIDRetriever(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
@ -151,14 +152,14 @@ func (sap *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns the RPC descriptors the super node service offers
// APIs returns the RPC descriptors the watcher service offers
func (sap *Service) APIs() []rpc.API {
ifnoAPI := NewInfoAPI()
apis := []rpc.API{
{
Namespace: APIName,
Version: APIVersion,
Service: NewPublicSuperNodeAPI(sap),
Service: NewPublicWatcherAPI(sap),
Public: true,
},
{
@ -180,7 +181,7 @@ func (sap *Service) APIs() []rpc.API {
Public: true,
},
}
chainAPI, err := NewPublicAPI(sap.chain, sap.db, sap.ipfsPath)
chainAPI, err := builders.NewPublicAPI(sap.chain, sap.db, sap.ipfsPath)
if err != nil {
log.Error(err)
return apis
@ -211,7 +212,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
case payload := <-sap.PayloadChan:
ipldPayload, err := sap.Converter.Convert(payload)
if err != nil {
log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err)
log.Errorf("watcher conversion error for chain %s: %v", sap.chain.String(), err)
continue
}
log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
@ -229,7 +230,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
publishAndIndexPayload <- ipldPayload
}
case err := <-sub.Err():
log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
log.Errorf("watcher subscription error for chain %s: %v", sap.chain.String(), err)
case <-sap.QuitChan:
log.Infof("quiting %s Sync process", sap.chain.String())
return
@ -248,18 +249,18 @@ func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexP
for {
select {
case payload := <-publishAndIndexPayload:
log.Debugf("%s super node publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height())
log.Debugf("%s watcher publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height())
cidPayload, err := sap.Publisher.Publish(payload)
if err != nil {
log.Errorf("%s super node publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err)
log.Errorf("%s watcher publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err)
continue
}
log.Debugf("%s super node publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
log.Debugf("%s watcher publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s super node publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err)
log.Errorf("%s watcher publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err)
}
case <-sap.QuitChan:
log.Infof("%s super node publishAndIndex worker %d shutting down", sap.chain.String(), id)
log.Infof("%s watcher publishAndIndex worker %d shutting down", sap.chain.String(), id)
return
}
}
@ -298,7 +299,7 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
// Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty]
if !ok {
log.Errorf("super node %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex())
log.Errorf("watcher %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex())
sap.closeType(ty)
continue
}
@ -310,19 +311,19 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
}
response, err := sap.Filterer.Filter(subConfig, payload)
if err != nil {
log.Errorf("super node filtering error for chain %s: %v", sap.chain.String(), err)
log.Errorf("watcher filtering error for chain %s: %v", sap.chain.String(), err)
sap.closeType(ty)
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Errorf("super node rlp encoding error for chain %s: %v", sap.chain.String(), err)
log.Errorf("watcher rlp encoding error for chain %s: %v", sap.chain.String(), err)
continue
}
for id, sub := range subs {
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node %s payload to subscription %s", sap.chain.String(), id)
log.Debugf("sending watcher %s payload to subscription %s", sap.chain.String(), id)
default:
log.Infof("unable to send %s payload to subscription %s; channel has no receiver", sap.chain.String(), id)
}
@ -368,7 +369,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() {
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, fmt.Errorf("%s super node subscriber backfill error: %v", sap.chain.String(), err))
sendNonBlockingErr(subscription, fmt.Errorf("%s watcher subscriber backfill error: %v", sap.chain.String(), err))
sendNonBlockingQuit(subscription)
return
}
@ -404,13 +405,13 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for i := startingBlock; i <= endingBlock; i++ {
select {
case <-sap.QuitChan:
log.Infof("%s super node historical data feed to subscription %s closed", sap.chain.String(), id)
log.Infof("%s watcher historical data feed to subscription %s closed", sap.chain.String(), id)
return
default:
}
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf(" %s super node CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
sendNonBlockingErr(sub, fmt.Errorf(" %s watcher CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue
}
if empty {
@ -419,7 +420,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("%s super node IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
sendNonBlockingErr(sub, fmt.Errorf("%s watcher IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
@ -429,7 +430,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
}
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id)
log.Debugf("sending watcher historical data payload to %s subscription %s", sap.chain.String(), id)
default:
log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id)
}
@ -448,7 +449,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
// Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) {
log.Infof("Unsubscribing %s from the %s super node service", id, sap.chain.String())
log.Infof("Unsubscribing %s from the %s watcher service", id, sap.chain.String())
sap.Lock()
for ty := range sap.Subscriptions {
delete(sap.Subscriptions[ty], id)
@ -464,7 +465,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
// Start is used to begin the service
// This is mostly just to satisfy the node.Service interface
func (sap *Service) Start(*p2p.Server) error {
log.Infof("Starting %s super node service", sap.chain.String())
log.Infof("Starting %s watcher service", sap.chain.String())
wg := new(sync.WaitGroup)
payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize)
if err := sap.Sync(wg, payloadChan); err != nil {
@ -477,7 +478,7 @@ func (sap *Service) Start(*p2p.Server) error {
// Stop is used to close down the service
// This is mostly just to satisfy the node.Service interface
func (sap *Service) Stop() error {
log.Infof("Stopping %s super node service", sap.chain.String())
log.Infof("Stopping %s watcher service", sap.chain.String())
sap.Lock()
close(sap.QuitChan)
sap.close()
@ -486,7 +487,7 @@ func (sap *Service) Stop() error {
}
// Node returns the node info for this service
func (sap *Service) Node() *core.Node {
func (sap *Service) Node() *node.Node {
return sap.NodeInfo
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
package watch_test
import (
"sync"
@ -54,7 +54,7 @@ var _ = Describe("Service", func() {
ReturnIPLDPayload: mocks.MockConvertedPayload,
ReturnErr: nil,
}
processor := &watcher.Service{
processor := &watch.Service{
Indexer: mockCidIndexer,
Publisher: mockPublisher,
Streamer: mockStreamer,

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher
package watch
import (
"errors"
@ -29,14 +29,14 @@ const (
BackFillCompleteFlag
)
// Subscription holds the information for an individual client subscription to the super node
// Subscription holds the information for an individual client subscription to the watcher
type Subscription struct {
ID rpc.ID
PayloadChan chan<- SubscriptionPayload
QuitChan chan<- bool
}
// SubscriptionPayload is the struct for a super node stream payload
// SubscriptionPayload is the struct for a watcher data subscription payload
// It carries data of a type specific to the chain being supported/queried and an error message
type SubscriptionPayload struct {
Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package watcher_test
package watch_test
import (
"io/ioutil"

View File

@ -22,12 +22,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
func LoadPostgres(database config.Database, node core.Node) postgres.DB {
func LoadPostgres(database config.Database, node node.Node) postgres.DB {
db, err := postgres.NewDB(database, node)
if err != nil {
logrus.Fatal("Error loading postgres: ", err)
@ -45,8 +45,7 @@ func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint6
}
length := endingBlock - startingBlock + 1
numberOfBins := length / batchSize
remainder := length % batchSize
if remainder != 0 {
if length%batchSize != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)