diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 87b3fe78..4bece647 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -163,7 +163,6 @@ func streamSubscribe() { func subscriptionConfig() { log.Info("loading subscription config") - vulcPath = viper.GetString("subscription.path") subConfig = config.Subscription{ // Below default to false, which means we do not backfill by default BackFill: viper.GetBool("subscription.backfill"), @@ -215,6 +214,10 @@ func subscriptionConfig() { } func getRpcClient() core.RpcClient { + vulcPath := viper.GetString("subscription.path") + if vulcPath == "" { + vulcPath = "ws://127.0.0.1:2019" // default to and try the default ws url if no path is provided + } rawRpcClient, err := rpc.Dial(vulcPath) if err != nil { log.Fatal(err) diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index 3b1d1f15..0ff96bc5 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -16,8 +16,12 @@ package cmd import ( + "os" + "path/filepath" syn "sync" + "github.com/spf13/viper" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -48,7 +52,6 @@ it maintains a local index of the IPLD objects' CIDs in Postgres.`, func init() { rootCmd.AddCommand(syncAndPublishCmd) - syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") } func syncAndPublish() { @@ -56,6 +59,15 @@ func syncAndPublish() { db := utils.LoadPostgres(databaseConfig, blockChain.Node()) quitChan := make(chan bool) + + ipfsPath := viper.GetString("client.ipfsPath") + if ipfsPath == "" { + home, err := os.UserHomeDir() + if err != nil { + log.Fatal(err) + } + ipfsPath = filepath.Join(home, ".ipfs") + } processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) if err != nil { log.Fatal(err) diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 91648745..6da80263 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -45,8 +45,6 @@ relays relevant data to requesting clients.`, func init() { rootCmd.AddCommand(syncPublishScreenAndServeCmd) - syncPublishScreenAndServeCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") - syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "sub-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server") } func syncPublishScreenAndServe() { @@ -55,6 +53,15 @@ func syncPublishScreenAndServe() { db := utils.LoadPostgres(databaseConfig, blockChain.Node()) quitChan := make(chan bool, 1) + + ipfsPath := viper.GetString("client.ipfsPath") + if ipfsPath == "" { + home, err := os.UserHomeDir() + if err != nil { + log.Fatal(err) + } + ipfsPath = filepath.Join(home, ".ipfs") + } processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) if err != nil { log.Fatal(err) @@ -78,7 +85,7 @@ func syncPublishScreenAndServe() { } ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") } - _, _, err = rpc.StartIPCEndpoint(vulcPath, processor.APIs()) + _, _, err = rpc.StartIPCEndpoint(ipcPath, processor.APIs()) if err != nil { log.Fatal(err) } diff --git a/documentation/seed-node.md b/documentation/seed-node.md new file mode 100644 index 00000000..84aab527 --- /dev/null +++ b/documentation/seed-node.md @@ -0,0 +1,303 @@ +# Seed node commands +Another way that Vulcanizedb can serve as a caching layer for Ethereum is through the use of the `syncAndPublish` and +`syncPublishScreenAndServe` commands. + +## Setup + +These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/rpc_statediffing) +and IPFS. + +### IPFS +To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs) + +`go get github.com/ipfs/go-ipfs` + +`cd $GOPATH/src/github.com/ipfs/go-ipfs` + +`make install` + +If we want to use Postgres as our backing datastore, the setup is currently considerably more complicated because the Postgres support +exists on a fork. + +Begin by downloading and installing the normal IPFS as shown above. +Once that is done we need to initialize and then startup an IPFS daemon, +due to the employment of `gx` we need to first have a daemon running in order to publish the hashes that are needed to update it to work with Postgres. + +`ipfs init` + +`ipfs daemon` + +Now we can go about updating our ipfs dependencies. Start by switching to the Postgres supporting fork: + +`git remote add vulcanize https://github.com/vulcanize/go-ipfs.git` + +`git fetch vulcanize` + +`git checkout -b postgres vulcanize/postgres` + +Switch it's gx dep to use the fork of go-ipfs-config which supports Postgres. +This go-ipfs-config fork is approved but awaiting merger. It will be gx-ed when it is merged but not before, +so for now we need to do it ourselves locally: + +`go get github.com/ipfs/go-ipfs-config` + +`cd $GOPATH/src/github.com/ipfs/go-ipfs-config` + +`git remote add vulcanize https://github.com/vulcanize/go-ipfs-config.git` + +`git fetch vulcanize` + +`gx release patch` + +This outputs a hash, let's call it "go-ipfs-config-hash", this hash now needs to be gx imported into go-ipfs: + +`cd $GOPATH/src/github.com/ipfs/go-ipfs` + +`gx update go-ipfs-config-hash` + +This should update the go-ipfs-config dependency, it should also notify that the iptb-plugins has a different, +conflicting, go-ipfs-config dependency- so we need to patch a fix for that too: + +`go get github.com/ipfs/iptb-plugins` + +`cd $GOPATH/src/github.com/ipfs/iptb-plugins` + +`gx update go-ipfs-config-hash` + +`gx release patch` + +This outputs a hash, let's call it "iptb-plugins-hash", this hash now needs to be gx imported into go-ipfs too: + +`cd $GOPATH/src/github.com/ipfs/go-ipfs` + +`gx update iptb-plugins-hash` + +And now we should have resolved all of the `gx` dependency issues. +We can close the ipfs daemon at this point. + +Before installing this updated version of ipfs, we first need to edit the `GOPATH/src/github.com/ipfs/go-ipfs/plugin/loader/preload_list` so that +the postgresds plugin is not commented out on the bottom line. + +After that we need to delete the old, non-postgres, profile we initialized for the ipfs daemon. + +`rm ~/.ipfs/config` + +And get rid of the old executable + +`rm $GOPATH/bin/ipfs` + +And now we should be ready to install PG-IPFS. + +`make install` + +And this time we initialize with the `postgresds` profile. +We also need to provide env variables for the postgres connection: + +We can either set these manually, e.g. +```bash +export IPFS_PGHOST= +export IPFS_PGUSER= +export IPFS_PGDATABASE= +export IPFS_PGPORT= +export IPFS_PGPASSWORD= +``` + +And then run the ipfs command + +`ipfs init --profile=postgresds` + +Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` +which has usage: + +`./ipfs_postgres.sh ` + +and will ask us to enter the password, avoiding storing it to an ENV variable. + +Once we have initialized IPFS, that is all we need to do with it- we do not need to run a daemon during the subsequent processes. + +### Geth +For Geth, we currently *require* a special fork but the setup is considerably more straight forward than the forked ipfs setup: + +Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch + +`go get github.com/ethereum/go-ethereum` + +`cd $GOPATH/src/github.com/ethereum/go-ethereum` + +`git remote add vulcanize https://github.com/vulcanize/go-ethereum.git` + +`git fetch vulcanize` + +`git checkout -b rpc_statediffing vulcanize/rpc_statediffing` + +Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first) + +`make geth` + +And run the output binary with statediffing turned on: + +`cd $GOPATH/src/github.com/ethereum/go-ethereum/build/bin` + +`./geth --statediff --statediff.streamblock --ws --syncmode=full` + +Note: other CLI options- statediff specific ones included- can be explored with `./geth help` + +The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter. +Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this +will be indicated in the output. + +Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process. +The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc" + +### Vulcanizedb + +There are two commands to choose from: + +#### syncAndPublish + +`syncAndPublih` performs the functions of the seed node- syncing data from Geth, converting them to IPLDs, +publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata. + +Usage: + +`./vulcanizedb syncAndPublish --config=` + +The config file for the `syncAndPublish` command looks very similar to the basic config file +```toml +[database] + name = "vulcanize_demo" + hostname = "localhost" + port = 5432 + +[client] + ipcPath = "ws://127.0.0.1:8546" + ipfsPath = "/Users/user/.ipfs" +``` + +With an additional field, `client.ipcPath`, that is either the ws url or the ipc path that Geth has exposed (the url and path output +when the geth sync was started), and `client.ipfsPath` which is the path the ipfs datastore directory. + +#### syncPublishScreenAndServe + +`syncPublishScreenAndServe` does everythin th at `syncAndPublish` does, plut it opens up an RPC server which exposes +an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to thier transformations + +Usage: + +`./vulcanizedb syncPublishScreenAndServe --config=` + +The config file for the `syncPublishScreenAndServe` command has two additional fields and looks like: + +```toml +[database] + name = "vulcanize_demo" + hostname = "localhost" + port = 5432 + +[client] + ipcPath = "ws://127.0.0.1:8546" + ipfsPath = "/Users/user/.ipfs" + +[server] + ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" + wsEndpoint = "127.0.0.1:2019" +``` + +The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url +the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively. + +#### Subscribing + +A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer +specifies which subsets of the synced data it is interested in and the server will forward only these data. + +The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the seed-node feed, it subscribes with a set of parameters +defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use seed-node data, +the shared/libraries/streamer can be used. + +Usage: + +`./vulcanizedb streamSubscribe --config=` + +The config for `streamSubscribe` has the `subscribe` set of parameters, for example: + +```toml +[subscription] + path = "ws://127.0.0.1:2019" + backfill = true + backfillOnly = false + startingBlock = 0 + endingBlock = 0 + [subscription.headerFilter] + off = false + finalOnly = true + [subscription.trxFilter] + off = false + src = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + dst = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + [subscription.receiptFilter] + off = false + topic0s = [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + ] + [subscription.stateFilter] + off = false + addresses = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" + ] + intermediateNodes = false + [subscription.storageFilter] + off = true + addresses = [ + "", + "" + ] + storageKeys = [ + "", + "" + ] + intermediateNodes = false +``` + +`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the seed-node over +(the `server.ipcPath` or `server.wsEndpoint` that the seed-node has defined in their config file). + +`subscription.backfill` specifies whether or not the seed-node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the seed-node only forwards newly synced/incoming data. + +`subscription.backfillOnly` will tell the seed-node to only send historical data and not stream incoming data going forward. + +`subscription.startingBlock` is the starting block number for the range we want to receive data in. + +`subscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue indefinitely. + +`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the seed-node to +not send any headers to the subscriber; setting `finalOnly` to true tells the seed-node to send only canonical headers. + +`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the seed-node to +not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, +if they have any addresses then the seed-node will only send transactions that were sent or received by the addresses contained +in `src` and `dst`, respectively. + +`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the seed-node to +not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, +if it has any topics then the seed-node will only send receipts that contain logs which have that topic0. + +`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the seed-node to +not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, +if it has any addresses then the seed-node will only send state leafs (accounts) corresponding to those account addresses. By default the seed-node +only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the seed-node to +not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, +if it has any addresses then the seed-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string +array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas +the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the seed-node +only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + diff --git a/environments/syncPublishScreenAndServe.toml b/environments/syncPublishScreenAndServe.toml index d3e2159d..5d234e40 100644 --- a/environments/syncPublishScreenAndServe.toml +++ b/environments/syncPublishScreenAndServe.toml @@ -5,6 +5,7 @@ [client] ipcPath = "ws://127.0.0.1:8546" + ipfsPath = "/Users/iannorden/.ipfs" [server] ipcPath = "/Users/iannorden/.vulcanize/vulcanize.ipc" diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 8cef5a18..7ec63828 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -50,7 +50,7 @@ type Publisher struct { // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { - l, err := loader.NewPluginLoader("~/.ipfs/plugins") + l, err := loader.NewPluginLoader("") if err != nil { return nil, err }