misc adjustments

This commit is contained in:
Ian Norden 2020-08-06 15:00:41 -05:00
parent 74752d15aa
commit 7b0b613b92
11 changed files with 79 additions and 54 deletions

View File

@ -13,14 +13,27 @@
1. [License](#license) 1. [License](#license)
## Background ## Background
ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, store, and index
all chain data. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications. all blockchain data in Postgres-IPFS. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications.
Currently the service supports complete processing of all Bitcoin and Ethereum data. Currently the service supports complete processing of all Bitcoin and Ethereum data.
## Architecture ## Architecture
More details on the design of ipfs-blockchain-watcher can be found in [here](./documentation/architecture.md) More details on the design of ipfs-blockchain-watcher can be found in [here](./documentation/architecture.md)
## Dependencies
Minimal build dependencies
* Go (1.13)
* Git
* GCC compiler
* This repository
Potential external dependencies
* Goose
* Postgres
* Statediffing go-ethereum
* Bitcoin node
## Install ## Install
1. [Goose](#goose) 1. [Goose](#goose)
1. [Postgres](#postgres) 1. [Postgres](#postgres)

View File

@ -39,7 +39,7 @@ var resyncCmd = &cobra.Command{
} }
func rsyncCmdCommand() { func rsyncCmdCommand() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading resync configuration variables") logWithCommand.Debug("loading resync configuration variables")
rConfig, err := resync.NewConfig() rConfig, err := resync.NewConfig()
if err != nil { if err != nil {

View File

@ -55,7 +55,7 @@ and fill in gaps in the data
} }
func watch() { func watch() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta)
var forwardPayloadChan chan shared.ConvertedData var forwardPayloadChan chan shared.ConvertedData
wg := new(s.WaitGroup) wg := new(s.WaitGroup)

View File

@ -42,18 +42,16 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/streamer"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
) )
config, _ := eth.NewEthSubscriptionConfig() config, _ := eth.NewEthSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config) rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("watcher.ethSubscription.path") vulcPath := viper.GetString("watcher.ethSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath) rpcClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) subClient := client.NewClient(rpcClient)
stream := streamer.NewSuperNodeStreamer(rpcClient) payloadChan := make(chan watch.SubscriptionPayload, 20000)
payloadChan := make(chan watcher.SubscriptionPayload, 20000) subscription, _ := subClient.Stream(payloadChan, rlpConfig)
subscription, _ := stream.Stream(payloadChan, rlpConfig)
for { for {
select { select {
case payload := <- payloadChan: case payload := <- payloadChan:
@ -162,20 +160,18 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/libraries/shared/streamer" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/client" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node/btc"
) )
config, _ := btc.NewBtcSubscriptionConfig() config, _ := btc.NewBtcSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config) rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("watcher.btcSubscription.path") vulcPath := viper.GetString("watcher.btcSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath) rpcClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) subClient := client.NewClient(rpcClient)
stream := streamer.NewSuperNodeStreamer(rpcClient) payloadChan := make(chan watch.SubscriptionPayload, 20000)
payloadChan := make(chan super_node.SubscriptionPayload, 20000) subscription, _ := subClient.Stream(payloadChan, rlpConfig)
subscription, _ := stream.Stream(payloadChan, rlpConfig)
for { for {
select { select {
case payload := <- payloadChan: case payload := <- payloadChan:
@ -210,7 +206,7 @@ The .toml file being used to fill the Bitcoin subscription config would look som
These configuration parameters are broken down as follows: These configuration parameters are broken down as follows:
`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint to subscribe to `btcSubscription.wsPath` is used to define the ipfs-blockchain-watcher ws url OR ipc endpoint to subscribe to
`btcSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and `btcSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and
send that to the subscriber, if this is set to `false` then ipfs-blockchain-watcher only streams newly synced/incoming data send that to the subscriber, if this is set to `false` then ipfs-blockchain-watcher only streams newly synced/incoming data

View File

@ -102,7 +102,7 @@ For Ethereum:
## Database ## Database
Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations). Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../db/migrations).
Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema. Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema.
Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely. Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely.

19
main.go
View File

@ -1,9 +1,24 @@
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main package main
import ( import (
"github.com/vulcanize/ipfs-blockchain-watcher/cmd"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/cmd"
) )
func main() { func main() {

View File

@ -64,8 +64,8 @@ func NewConfig() (*Config, error) {
c := new(Config) c := new(Config)
var err error var err error
viper.BindEnv("superNode.chain", SUPERNODE_CHAIN) viper.BindEnv("watcher.chain", SUPERNODE_CHAIN)
chain := viper.GetString("superNode.chain") chain := viper.GetString("watcher.chain")
c.Chain, err = shared.NewChainType(chain) c.Chain, err = shared.NewChainType(chain)
if err != nil { if err != nil {
return nil, err return nil, err
@ -96,13 +96,13 @@ func (c *Config) init() error {
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH) viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY) viper.BindEnv("watcher.frequency", SUPERNODE_FREQUENCY)
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) viper.BindEnv("watcher.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) viper.BindEnv("watcher.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) viper.BindEnv("watcher.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) viper.BindEnv("watcher.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout") timeout := viper.GetInt("watcher.timeout")
if timeout < 15 { if timeout < 15 {
timeout = 15 timeout = 15
} }
@ -120,7 +120,7 @@ func (c *Config) init() error {
c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP) c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP)
} }
freq := viper.GetInt("superNode.frequency") freq := viper.GetInt("watcher.frequency")
var frequency time.Duration var frequency time.Duration
if freq <= 0 { if freq <= 0 {
frequency = time.Second * 30 frequency = time.Second * 30
@ -128,9 +128,9 @@ func (c *Config) init() error {
frequency = time.Second * time.Duration(freq) frequency = time.Second * time.Duration(freq)
} }
c.Frequency = frequency c.Frequency = frequency
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) c.BatchSize = uint64(viper.GetInt64("watcher.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) c.BatchNumber = uint64(viper.GetInt64("watcher.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel") c.ValidationLevel = viper.GetInt("watcher.validationLevel")
dbConn := overrideDBConnConfig(c.DBConfig) dbConn := overrideDBConnConfig(c.DBConfig)
db := utils.LoadPostgres(dbConn, c.NodeInfo) db := utils.LoadPostgres(dbConn, c.NodeInfo)

View File

@ -117,7 +117,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
for { for {
select { select {
case <-bfs.QuitChan: case <-bfs.QuitChan:
log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) log.Infof("quiting %s BackFill process", bfs.chain.String())
return return
case <-ticker.C: case <-ticker.C:
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)

1
pkg/validate/service.go Normal file
View File

@ -0,0 +1 @@
package validate

View File

@ -66,7 +66,7 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc
} }
params = &btcParams params = &btcParams
default: default:
panic("SuperNode is not configured for a specific chain type") panic("ipfs-blockchain-watcher is not configured for a specific chain type")
} }
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
@ -136,7 +136,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo {
return &p2p.NodeInfo{ return &p2p.NodeInfo{
// TODO: formalize this // TODO: formalize this
ID: "vulcanizeDB", ID: "vulcanizeDB",
Name: "superNode", Name: "ipfs-blockchain-watcher",
} }
} }

View File

@ -78,19 +78,19 @@ func NewConfig() (*Config, error) {
c := new(Config) c := new(Config)
var err error var err error
viper.BindEnv("superNode.chain", SUPERNODE_CHAIN) viper.BindEnv("watcher.chain", SUPERNODE_CHAIN)
viper.BindEnv("superNode.sync", SUPERNODE_SYNC) viper.BindEnv("watcher.sync", SUPERNODE_SYNC)
viper.BindEnv("superNode.workers", SUPERNODE_WORKERS) viper.BindEnv("watcher.workers", SUPERNODE_WORKERS)
viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH) viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH)
viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH) viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH)
viper.BindEnv("superNode.server", SUPERNODE_SERVER) viper.BindEnv("watcher.server", SUPERNODE_SERVER)
viper.BindEnv("superNode.wsPath", SUPERNODE_WS_PATH) viper.BindEnv("watcher.wsPath", SUPERNODE_WS_PATH)
viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) viper.BindEnv("watcher.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) viper.BindEnv("watcher.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) viper.BindEnv("watcher.backFill", SUPERNODE_BACKFILL)
c.Historical = viper.GetBool("superNode.backFill") c.Historical = viper.GetBool("watcher.backFill")
chain := viper.GetString("superNode.chain") chain := viper.GetString("watcher.chain")
c.Chain, err = shared.NewChainType(chain) c.Chain, err = shared.NewChainType(chain)
if err != nil { if err != nil {
return nil, err return nil, err
@ -109,9 +109,9 @@ func NewConfig() (*Config, error) {
c.DBConfig.Init() c.DBConfig.Init()
c.Sync = viper.GetBool("superNode.sync") c.Sync = viper.GetBool("watcher.sync")
if c.Sync { if c.Sync {
workers := viper.GetInt("superNode.workers") workers := viper.GetInt("watcher.workers")
if workers < 1 { if workers < 1 {
workers = 1 workers = 1
} }
@ -132,14 +132,14 @@ func NewConfig() (*Config, error) {
c.SyncDBConn = &syncDB c.SyncDBConn = &syncDB
} }
c.Serve = viper.GetBool("superNode.server") c.Serve = viper.GetBool("watcher.server")
if c.Serve { if c.Serve {
wsPath := viper.GetString("superNode.wsPath") wsPath := viper.GetString("watcher.wsPath")
if wsPath == "" { if wsPath == "" {
wsPath = "127.0.0.1:8080" wsPath = "127.0.0.1:8080"
} }
c.WSEndpoint = wsPath c.WSEndpoint = wsPath
ipcPath := viper.GetString("superNode.ipcPath") ipcPath := viper.GetString("watcher.ipcPath")
if ipcPath == "" { if ipcPath == "" {
home, err := os.UserHomeDir() home, err := os.UserHomeDir()
if err != nil { if err != nil {
@ -148,7 +148,7 @@ func NewConfig() (*Config, error) {
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
} }
c.IPCEndpoint = ipcPath c.IPCEndpoint = ipcPath
httpPath := viper.GetString("superNode.httpPath") httpPath := viper.GetString("watcher.httpPath")
if httpPath == "" { if httpPath == "" {
httpPath = "127.0.0.1:8081" httpPath = "127.0.0.1:8081"
} }