Merge pull request #7 from vulcanize/doc

Misc fixes
This commit is contained in:
Ian Norden 2020-08-07 23:29:10 -05:00 committed by GitHub
commit fb340aca18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 92 additions and 67 deletions

View File

@ -76,7 +76,7 @@ integrationtest: | $(GINKGO) $(LINT)
build:
go fmt ./...
go build
GO111MODULE=on go build
# Parameter checks
## Check that DB variables are provided

View File

@ -13,14 +13,27 @@
1. [License](#license)
## Background
ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, and store in Postgres-IPFS
all chain data. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications.
ipfs-blockchain-watcher is a collection of interfaces that are used to extract, process, store, and index
all blockchain data in Postgres-IPFS. The raw data indexed by ipfs-blockchain-watcher serves as the basis for more specific watchers and applications.
Currently the service supports complete processing of all Bitcoin and Ethereum data.
## Architecture
More details on the design of ipfs-blockchain-watcher can be found in [here](./documentation/architecture.md)
## Dependencies
Minimal build dependencies
* Go (1.13)
* Git
* GCC compiler
* This repository
Potential external dependencies
* Goose
* Postgres
* Statediffing go-ethereum
* Bitcoin node
## Install
1. [Goose](#goose)
1. [Postgres](#postgres)
@ -75,7 +88,7 @@ Skip this step if you already have access to a node that displays the statediffi
Begin by downloading geth and switching to the statediffing branch:
`go get github.com/ethereum/go-ethereum`
`GO111MODULE=off go get -d github.com/ethereum/go-ethereum`
`cd $GOPATH/src/github.com/ethereum/go-ethereum`
@ -122,7 +135,7 @@ Finally, setup the watcher process itself.
Start by downloading ipfs-blockchain-watcher and moving into the repo:
`go get github.com/vulcanize/ipfs-blockchain-watcher`
`GO111MODULE=off go get -d github.com/vulcanize/ipfs-blockchain-watcher`
`cd $GOPATH/src/github.com/vulcanize/ipfs-blockchain-watcher`
@ -200,7 +213,7 @@ For Ethereum:
A number of different APIs for remote access to ipfs-blockchain-watcher data can be exposed, these are discussed in more detail [here](./documentation/apis.md)
### Testing
`make test` will run the unit tests
`make test` will run the unit tests
`make test` setups a clean `vulcanize_testing` db
## Contributing

View File

@ -39,7 +39,7 @@ var resyncCmd = &cobra.Command{
}
func rsyncCmdCommand() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading resync configuration variables")
rConfig, err := resync.NewConfig()
if err != nil {

View File

@ -55,7 +55,7 @@ and fill in gaps in the data
}
func watch() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Infof("running ipfs-blockchain-watcher version: %s", v.VersionWithMeta)
var forwardPayloadChan chan shared.ConvertedData
wg := new(s.WaitGroup)

View File

@ -8,7 +8,7 @@ CREATE TABLE eth.state_cids (
state_path BYTEA,
node_type INTEGER,
diff BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE (header_id, state_path, diff)
UNIQUE (header_id, state_path)
);
-- +goose Down

View File

@ -8,7 +8,7 @@ CREATE TABLE eth.storage_cids (
storage_path BYTEA,
node_type INTEGER NOT NULL,
diff BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE (state_id, storage_path, diff)
UNIQUE (state_id, storage_path)
);
-- +goose Down

View File

@ -774,11 +774,11 @@ ALTER TABLE ONLY eth.state_accounts
--
-- Name: state_cids state_cids_header_id_state_path_diff_key; Type: CONSTRAINT; Schema: eth; Owner: -
-- Name: state_cids state_cids_header_id_state_path_key; Type: CONSTRAINT; Schema: eth; Owner: -
--
ALTER TABLE ONLY eth.state_cids
ADD CONSTRAINT state_cids_header_id_state_path_diff_key UNIQUE (header_id, state_path, diff);
ADD CONSTRAINT state_cids_header_id_state_path_key UNIQUE (header_id, state_path);
--
@ -798,11 +798,11 @@ ALTER TABLE ONLY eth.storage_cids
--
-- Name: storage_cids storage_cids_state_id_storage_path_diff_key; Type: CONSTRAINT; Schema: eth; Owner: -
-- Name: storage_cids storage_cids_state_id_storage_path_key; Type: CONSTRAINT; Schema: eth; Owner: -
--
ALTER TABLE ONLY eth.storage_cids
ADD CONSTRAINT storage_cids_state_id_storage_path_diff_key UNIQUE (state_id, storage_path, diff);
ADD CONSTRAINT storage_cids_state_id_storage_path_key UNIQUE (state_id, storage_path);
--

View File

@ -42,18 +42,16 @@ An example of how to subscribe to a real-time Ethereum data feed from ipfs-block
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/streamer"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
)
config, _ := eth.NewEthSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("watcher.ethSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
stream := streamer.NewSuperNodeStreamer(rpcClient)
payloadChan := make(chan watcher.SubscriptionPayload, 20000)
subscription, _ := stream.Stream(payloadChan, rlpConfig)
rpcClient, _ := rpc.Dial(vulcPath)
subClient := client.NewClient(rpcClient)
payloadChan := make(chan watch.SubscriptionPayload, 20000)
subscription, _ := subClient.Stream(payloadChan, rlpConfig)
for {
select {
case payload := <- payloadChan:
@ -162,20 +160,18 @@ An example of how to subscribe to a real-time Bitcoin data feed from ipfs-blockc
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/libraries/shared/streamer"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/super_node/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/client"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
)
config, _ := btc.NewBtcSubscriptionConfig()
rlpConfig, _ := rlp.EncodeToBytes(config)
vulcPath := viper.GetString("watcher.btcSubscription.path")
rawRPCClient, _ := rpc.Dial(vulcPath)
rpcClient := client.NewRPCClient(rawRPCClient, vulcPath)
stream := streamer.NewSuperNodeStreamer(rpcClient)
payloadChan := make(chan super_node.SubscriptionPayload, 20000)
subscription, _ := stream.Stream(payloadChan, rlpConfig)
rpcClient, _ := rpc.Dial(vulcPath)
subClient := client.NewClient(rpcClient)
payloadChan := make(chan watch.SubscriptionPayload, 20000)
subscription, _ := subClient.Stream(payloadChan, rlpConfig)
for {
select {
case payload := <- payloadChan:
@ -210,7 +206,7 @@ The .toml file being used to fill the Bitcoin subscription config would look som
These configuration parameters are broken down as follows:
`btcSubscription.wsPath` is used to define the SuperNode ws url OR ipc endpoint to subscribe to
`btcSubscription.wsPath` is used to define the ipfs-blockchain-watcher ws url OR ipc endpoint to subscribe to
`btcSubscription.historicalData` specifies whether or not ipfs-blockchain-watcher should look up historical data in its cache and
send that to the subscriber, if this is set to `false` then ipfs-blockchain-watcher only streams newly synced/incoming data

View File

@ -102,7 +102,7 @@ For Ethereum:
## Database
Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../../db/migrations).
Currently, ipfs-blockchain-watcher persists all data to a single Postgres database. The migrations for this DB can be found [here](../db/migrations).
Chain-specific data is populated under a chain-specific schema (e.g. `eth` and `btc`) while shared data- such as the IPFS blocks table- is populated under the `public` schema.
Subsequent watchers which act on the raw chain data should build and populate their own schemas or separate databases entirely.

19
main.go
View File

@ -1,9 +1,24 @@
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"github.com/vulcanize/ipfs-blockchain-watcher/cmd"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/cmd"
)
func main() {

View File

@ -150,7 +150,7 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload,
stateKey = stateCID.StateKey
}
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`,
headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true, stateCID.MhKey).Scan(&stateID)
if err != nil {
@ -181,7 +181,7 @@ func (in *CIDIndexer) indexStateCID(tx *sqlx.Tx, stateNode StateNodeModel, heade
stateKey = stateNode.StateKey
}
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`,
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
return stateID, err
@ -200,7 +200,7 @@ func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel,
storageKey = storageCID.StorageKey
}
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path, diff) DO UPDATE SET (storage_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)`,
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
return err
}

View File

@ -64,8 +64,8 @@ func NewConfig() (*Config, error) {
c := new(Config)
var err error
viper.BindEnv("superNode.chain", SUPERNODE_CHAIN)
chain := viper.GetString("superNode.chain")
viper.BindEnv("watcher.chain", SUPERNODE_CHAIN)
chain := viper.GetString("watcher.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
@ -96,13 +96,13 @@ func (c *Config) init() error {
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH)
viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY)
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
viper.BindEnv("watcher.frequency", SUPERNODE_FREQUENCY)
viper.BindEnv("watcher.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("watcher.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("watcher.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("watcher.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
timeout := viper.GetInt("watcher.timeout")
if timeout < 15 {
timeout = 15
}
@ -120,7 +120,7 @@ func (c *Config) init() error {
c.NodeInfo, c.HTTPClient = shared.GetBtcNodeAndClient(btcHTTP)
}
freq := viper.GetInt("superNode.frequency")
freq := viper.GetInt("watcher.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Second * 30
@ -128,9 +128,9 @@ func (c *Config) init() error {
frequency = time.Second * time.Duration(freq)
}
c.Frequency = frequency
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel")
c.BatchSize = uint64(viper.GetInt64("watcher.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("watcher.batchNumber"))
c.ValidationLevel = viper.GetInt("watcher.validationLevel")
dbConn := overrideDBConnConfig(c.DBConfig)
db := utils.LoadPostgres(dbConn, c.NodeInfo)

View File

@ -117,7 +117,7 @@ func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
for {
select {
case <-bfs.QuitChan:
log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String())
log.Infof("quiting %s BackFill process", bfs.chain.String())
return
case <-ticker.C:
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)

1
pkg/validate/service.go Normal file
View File

@ -0,0 +1 @@
package validate

View File

@ -66,7 +66,7 @@ func (api *PublicWatcherAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc
}
params = &btcParams
default:
panic("SuperNode is not configured for a specific chain type")
panic("ipfs-blockchain-watcher is not configured for a specific chain type")
}
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
@ -136,7 +136,7 @@ func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo {
return &p2p.NodeInfo{
// TODO: formalize this
ID: "vulcanizeDB",
Name: "superNode",
Name: "ipfs-blockchain-watcher",
}
}

View File

@ -78,19 +78,19 @@ func NewConfig() (*Config, error) {
c := new(Config)
var err error
viper.BindEnv("superNode.chain", SUPERNODE_CHAIN)
viper.BindEnv("superNode.sync", SUPERNODE_SYNC)
viper.BindEnv("superNode.workers", SUPERNODE_WORKERS)
viper.BindEnv("watcher.chain", SUPERNODE_CHAIN)
viper.BindEnv("watcher.sync", SUPERNODE_SYNC)
viper.BindEnv("watcher.workers", SUPERNODE_WORKERS)
viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH)
viper.BindEnv("bitcoin.wsPath", shared.BTC_WS_PATH)
viper.BindEnv("superNode.server", SUPERNODE_SERVER)
viper.BindEnv("superNode.wsPath", SUPERNODE_WS_PATH)
viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL)
viper.BindEnv("watcher.server", SUPERNODE_SERVER)
viper.BindEnv("watcher.wsPath", SUPERNODE_WS_PATH)
viper.BindEnv("watcher.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("watcher.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("watcher.backFill", SUPERNODE_BACKFILL)
c.Historical = viper.GetBool("superNode.backFill")
chain := viper.GetString("superNode.chain")
c.Historical = viper.GetBool("watcher.backFill")
chain := viper.GetString("watcher.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
@ -109,9 +109,9 @@ func NewConfig() (*Config, error) {
c.DBConfig.Init()
c.Sync = viper.GetBool("superNode.sync")
c.Sync = viper.GetBool("watcher.sync")
if c.Sync {
workers := viper.GetInt("superNode.workers")
workers := viper.GetInt("watcher.workers")
if workers < 1 {
workers = 1
}
@ -132,14 +132,14 @@ func NewConfig() (*Config, error) {
c.SyncDBConn = &syncDB
}
c.Serve = viper.GetBool("superNode.server")
c.Serve = viper.GetBool("watcher.server")
if c.Serve {
wsPath := viper.GetString("superNode.wsPath")
wsPath := viper.GetString("watcher.wsPath")
if wsPath == "" {
wsPath = "127.0.0.1:8080"
}
c.WSEndpoint = wsPath
ipcPath := viper.GetString("superNode.ipcPath")
ipcPath := viper.GetString("watcher.ipcPath")
if ipcPath == "" {
home, err := os.UserHomeDir()
if err != nil {
@ -148,7 +148,7 @@ func NewConfig() (*Config, error) {
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
}
c.IPCEndpoint = ipcPath
httpPath := viper.GetString("superNode.httpPath")
httpPath := viper.GetString("watcher.httpPath")
if httpPath == "" {
httpPath = "127.0.0.1:8081"
}