diff --git a/Makefile b/Makefile index 9848db9b..7b1a680c 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,7 @@ integrationtest: | $(GINKGO) $(LINT) build: go fmt ./... - go build + GO111MODULE=on go build # Parameter checks ## Check that DB variables are provided diff --git a/README.md b/README.md index 2748c8e6..4b8c3067 100644 --- a/README.md +++ b/README.md @@ -13,14 +13,27 @@ 1. [License](#license) ## Background -ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS -all chain data. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications. +ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, store, and index +all blockchain data in Postgres-IPFS. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications. Currently the service supports complete processing of all Bitcoin and Ethereum data. ## Architecture More details on the design of ipfs-blockchain-watcher can be found in [here](./documentation/architecture.md) +## Dependencies +Minimal build dependencies +* Go (1.13) +* Git +* GCC compiler +* This repository + +Potential external dependencies +* Goose +* Postgres +* Statediffing go-ethereum +* Bitcoin node + ## Install 1. [Goose](#goose) 1. [Postgres](#postgres) @@ -75,7 +88,7 @@ Skip this step if you already have access to a node that displays the statediffi Begin by downloading geth and switching to the statediffing branch: -`go get github.com/ethereum/go-ethereum` +`GO111MODULE=off go get -d github.com/ethereum/go-ethereum` `cd $GOPATH/src/github.com/ethereum/go-ethereum` @@ -122,7 +135,7 @@ Finally, setup the watcher process itself. Start by downloading ipfs-blockchain-watcher and moving into the repo: -`go get github.com/vulcanize/ipfs-blockchain-watcher` +`GO111MODULE=off go get -d github.com/vulcanize/ipfs-blockchain-watcher` `cd $GOPATH/src/github.com/vulcanize/ipfs-blockchain-watcher` @@ -200,7 +213,7 @@ For Ethereum: A number of different APIs for remote access to ipfs-blockchain-watcher data can be exposed, these are discussed in more detail [here](./documentation/apis.md) ### Testing -`make test` will run the unit tests +`make test` will run the unit tests `make test` setups a clean `vulcanize_testing` db ## Contributing diff --git a/cmd/resync.go b/cmd/resync.go index 3f0df616..9ffde23e 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -39,7 +39,7 @@ var resyncCmd = &cobra.Command{ } func rsyncCmdCommand() { - logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) + logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta) logWithCommand.Debug("loading resync configuration variables") rConfig, err := resync.NewConfig() if err != nil { diff --git a/cmd/watch.go b/cmd/watch.go index b6e29e03..de4acc92 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -55,7 +55,7 @@ and fill in gaps in the data } func watch() { - logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) + logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta) var forwardPayloadChan chan shared.ConvertedData wg := new(s.WaitGroup) diff --git a/db/migrations/00008_create_eth_state_cids_table.sql b/db/migrations/00008_create_eth_state_cids_table.sql index 4bfa8228..e0bf6e57 100644 --- a/db/migrations/00008_create_eth_state_cids_table.sql +++ b/db/migrations/00008_create_eth_state_cids_table.sql @@ -8,7 +8,7 @@ CREATE TABLE eth.state_cids ( state_path BYTEA, node_type INTEGER, diff BOOLEAN NOT NULL DEFAULT FALSE, - UNIQUE (header_id, state_path, diff) + UNIQUE (header_id, state_path) ); -- +goose Down diff --git a/db/migrations/00009_create_eth_storage_cids_table.sql b/db/migrations/00009_create_eth_storage_cids_table.sql index f19bc62e..944d39ed 100644 --- a/db/migrations/00009_create_eth_storage_cids_table.sql +++ b/db/migrations/00009_create_eth_storage_cids_table.sql @@ -8,7 +8,7 @@ CREATE TABLE eth.storage_cids ( storage_path BYTEA, node_type INTEGER NOT NULL, diff BOOLEAN NOT NULL DEFAULT FALSE, - UNIQUE (state_id, storage_path, diff) + UNIQUE (state_id, storage_path) ); -- +goose Down diff --git a/db/schema.sql b/db/schema.sql index a3b55a44..caa1bb67 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -774,11 +774,11 @@ ALTER TABLE ONLY eth.state_accounts -- --- Name: state_cids state_cids_header_id_state_path_diff_key; Type: CONSTRAINT; Schema: eth; Owner: - +-- Name: state_cids state_cids_header_id_state_path_key; Type: CONSTRAINT; Schema: eth; Owner: - -- ALTER TABLE ONLY eth.state_cids - ADD CONSTRAINT state_cids_header_id_state_path_diff_key UNIQUE (header_id, state_path, diff); + ADD CONSTRAINT state_cids_header_id_state_path_key UNIQUE (header_id, state_path); -- @@ -798,11 +798,11 @@ ALTER TABLE ONLY eth.storage_cids -- --- Name: storage_cids storage_cids_state_id_storage_path_diff_key; Type: CONSTRAINT; Schema: eth; Owner: - +-- Name: storage_cids storage_cids_state_id_storage_path_key; Type: CONSTRAINT; Schema: eth; Owner: - -- ALTER TABLE ONLY eth.storage_cids - ADD CONSTRAINT storage_cids_state_id_storage_path_diff_key UNIQUE (state_id, storage_path, diff); + ADD CONSTRAINT storage_cids_state_id_storage_path_key UNIQUE (state_id, storage_path); -- diff --git a/documentation/apis.md b/documentation/apis.md index 81184b89..6b03ac82 100644 --- a/documentation/apis.md +++ b/documentation/apis.md @@ -42,18 +42,16 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/streamer" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" ) config, _ := eth.NewEthSubscriptionConfig() rlpConfig, _ := rlp.EncodeToBytes(config) vulcPath := viper.GetString("watcher.ethSubscription.path") - rawRPCClient, _ := rpc.Dial(vulcPath) - rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) - stream := streamer.NewSuperNodeStreamer(rpcClient) - payloadChan := make(chan watcher.SubscriptionPayload, 20000) - subscription, _ := stream.Stream(payloadChan, rlpConfig) + rpcClient, _ := rpc.Dial(vulcPath) + subClient := client.NewClient(rpcClient) + payloadChan := make(chan watch.SubscriptionPayload, 20000) + subscription, _ := subClient.Stream(payloadChan, rlpConfig) for { select { case payload := <- payloadChan: @@ -162,20 +160,18 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc "github.com/ethereum/go-ethereum/rpc" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/libraries/shared/streamer" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/client" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node/btc" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/client" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" ) config, _ := btc.NewBtcSubscriptionConfig() rlpConfig, _ := rlp.EncodeToBytes(config) vulcPath := viper.GetString("watcher.btcSubscription.path") - rawRPCClient, _ := rpc.Dial(vulcPath) - rpcClient := client.NewRPCClient(rawRPCClient, vulcPath) - stream := streamer.NewSuperNodeStreamer(rpcClient) - payloadChan := make(chan super_node.SubscriptionPayload, 20000) - subscription, _ := stream.Stream(payloadChan, rlpConfig) + rpcClient, _ := rpc.Dial(vulcPath) + subClient := client.NewClient(rpcClient) + payloadChan := make(chan watch.SubscriptionPayload, 20000) + subscription, _ := subClient.Stream(payloadChan, rlpConfig) for { select { case payload := <- payloadChan: @@ -210,7 +206,7 @@ The .toml file being used to fill the Bitcoin subscription config would look som These configuration parameters are broken down as follows: -`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint to subscribe to +`btcSubscription.wsPath` is used to define the ipfs-blockchain-watcher ws url OR ipc endpoint to subscribe to `btcSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and send that to the subscriber, if this is set to `false` then ipfs-blockchain-watcher only streams newly synced/incoming data diff --git a/documentation/architecture.md b/documentation/architecture.md index f312bb0d..8b617c10 100644 --- a/documentation/architecture.md +++ b/documentation/architecture.md @@ -102,7 +102,7 @@ For Ethereum: ## Database -Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations). +Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../db/migrations). Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema. Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely. diff --git a/main.go b/main.go index 160912b8..af533f77 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,24 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package main import ( - "github.com/vulcanize/ipfs-blockchain-watcher/cmd" - "github.com/sirupsen/logrus" + + "github.com/vulcanize/ipfs-blockchain-watcher/cmd" ) func main() { diff --git a/pkg/eth/indexer.go b/pkg/eth/indexer.go index 4285c9fa..ff57d2d4 100644 --- a/pkg/eth/indexer.go +++ b/pkg/eth/indexer.go @@ -150,7 +150,7 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, stateKey = stateCID.StateKey } err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7) + ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true, stateCID.MhKey).Scan(&stateID) if err != nil { @@ -181,7 +181,7 @@ func (in *CIDIndexer) indexStateCID(tx *sqlx.Tx, stateNode StateNodeModel, heade stateKey = stateNode.StateKey } err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7) + ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) return stateID, err @@ -200,7 +200,7 @@ func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel, storageKey = storageCID.StorageKey } _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (state_id, storage_path, diff) DO UPDATE SET (storage_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)`, + ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) return err } diff --git a/pkg/historical/config.go b/pkg/historical/config.go index f102da2b..bdb88481 100644 --- a/pkg/historical/config.go +++ b/pkg/historical/config.go @@ -64,8 +64,8 @@ func NewConfig() (*Config, error) { c := new(Config) var err error - viper.BindEnv("superNode.chain", SUPERNODE_CHAIN) - chain := viper.GetString("superNode.chain") + viper.BindEnv("watcher.chain", SUPERNODE_CHAIN) + chain := viper.GetString("watcher.chain") c.Chain, err = shared.NewChainType(chain) if err != nil { return nil, err @@ -96,13 +96,13 @@ func (c *Config) init() error { viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH) - viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY) - viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) - viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) - viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) - viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) + viper.BindEnv("watcher.frequency", SUPERNODE_FREQUENCY) + viper.BindEnv("watcher.batchSize", SUPERNODE_BATCH_SIZE) + viper.BindEnv("watcher.batchNumber", SUPERNODE_BATCH_NUMBER) + viper.BindEnv("watcher.validationLevel", SUPERNODE_VALIDATION_LEVEL) + viper.BindEnv("watcher.timeout", shared.HTTP_TIMEOUT) - timeout := viper.GetInt("superNode.timeout") + timeout := viper.GetInt("watcher.timeout") if timeout < 15 { timeout = 15 } @@ -120,7 +120,7 @@ func (c *Config) init() error { c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP) } - freq := viper.GetInt("superNode.frequency") + freq := viper.GetInt("watcher.frequency") var frequency time.Duration if freq <= 0 { frequency = time.Second * 30 @@ -128,9 +128,9 @@ func (c *Config) init() error { frequency = time.Second * time.Duration(freq) } c.Frequency = frequency - c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) - c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) - c.ValidationLevel = viper.GetInt("superNode.validationLevel") + c.BatchSize = uint64(viper.GetInt64("watcher.batchSize")) + c.BatchNumber = uint64(viper.GetInt64("watcher.batchNumber")) + c.ValidationLevel = viper.GetInt("watcher.validationLevel") dbConn := overrideDBConnConfig(c.DBConfig) db := utils.LoadPostgres(dbConn, c.NodeInfo) diff --git a/pkg/historical/service.go b/pkg/historical/service.go index 81494d5d..f942c1dc 100644 --- a/pkg/historical/service.go +++ b/pkg/historical/service.go @@ -117,7 +117,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) + log.Infof("quiting %s BackFill process", bfs.chain.String()) return case <-ticker.C: gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) diff --git a/pkg/validate/service.go b/pkg/validate/service.go new file mode 100644 index 00000000..77fb2737 --- /dev/null +++ b/pkg/validate/service.go @@ -0,0 +1 @@ +package validate diff --git a/pkg/watch/api.go b/pkg/watch/api.go index 44d23ef4..6154cfe3 100644 --- a/pkg/watch/api.go +++ b/pkg/watch/api.go @@ -66,7 +66,7 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc } params = &btcParams default: - panic("SuperNode is not configured for a specific chain type") + panic("ipfs-blockchain-watcher is not configured for a specific chain type") } // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) @@ -136,7 +136,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo { return &p2p.NodeInfo{ // TODO: formalize this ID: "vulcanizeDB", - Name: "superNode", + Name: "ipfs-blockchain-watcher", } } diff --git a/pkg/watch/config.go b/pkg/watch/config.go index aeb7df85..267c3887 100644 --- a/pkg/watch/config.go +++ b/pkg/watch/config.go @@ -78,19 +78,19 @@ func NewConfig() (*Config, error) { c := new(Config) var err error - viper.BindEnv("superNode.chain", SUPERNODE_CHAIN) - viper.BindEnv("superNode.sync", SUPERNODE_SYNC) - viper.BindEnv("superNode.workers", SUPERNODE_WORKERS) + viper.BindEnv("watcher.chain", SUPERNODE_CHAIN) + viper.BindEnv("watcher.sync", SUPERNODE_SYNC) + viper.BindEnv("watcher.workers", SUPERNODE_WORKERS) viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH) viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH) - viper.BindEnv("superNode.server", SUPERNODE_SERVER) - viper.BindEnv("superNode.wsPath", SUPERNODE_WS_PATH) - viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) - viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) - viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) + viper.BindEnv("watcher.server", SUPERNODE_SERVER) + viper.BindEnv("watcher.wsPath", SUPERNODE_WS_PATH) + viper.BindEnv("watcher.ipcPath", SUPERNODE_IPC_PATH) + viper.BindEnv("watcher.httpPath", SUPERNODE_HTTP_PATH) + viper.BindEnv("watcher.backFill", SUPERNODE_BACKFILL) - c.Historical = viper.GetBool("superNode.backFill") - chain := viper.GetString("superNode.chain") + c.Historical = viper.GetBool("watcher.backFill") + chain := viper.GetString("watcher.chain") c.Chain, err = shared.NewChainType(chain) if err != nil { return nil, err @@ -109,9 +109,9 @@ func NewConfig() (*Config, error) { c.DBConfig.Init() - c.Sync = viper.GetBool("superNode.sync") + c.Sync = viper.GetBool("watcher.sync") if c.Sync { - workers := viper.GetInt("superNode.workers") + workers := viper.GetInt("watcher.workers") if workers < 1 { workers = 1 } @@ -132,14 +132,14 @@ func NewConfig() (*Config, error) { c.SyncDBConn = &syncDB } - c.Serve = viper.GetBool("superNode.server") + c.Serve = viper.GetBool("watcher.server") if c.Serve { - wsPath := viper.GetString("superNode.wsPath") + wsPath := viper.GetString("watcher.wsPath") if wsPath == "" { wsPath = "127.0.0.1:8080" } c.WSEndpoint = wsPath - ipcPath := viper.GetString("superNode.ipcPath") + ipcPath := viper.GetString("watcher.ipcPath") if ipcPath == "" { home, err := os.UserHomeDir() if err != nil { @@ -148,7 +148,7 @@ func NewConfig() (*Config, error) { ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") } c.IPCEndpoint = ipcPath - httpPath := viper.GetString("superNode.httpPath") + httpPath := viper.GetString("watcher.httpPath") if httpPath == "" { httpPath = "127.0.0.1:8081" }