diff --git a/cmd/streamSubscribe.go b/cmd/streamEthSubscribe.go similarity index 97% rename from cmd/streamSubscribe.go rename to cmd/streamEthSubscribe.go index 42d0c3f7..75eab893 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -55,7 +55,10 @@ func init() { func streamEthSubscription() { // Prep the subscription config/filters to be sent to the server - ethSubConfig := config.NewEthSubscriptionConfig() + ethSubConfig, err := config.NewEthSubscriptionConfig() + if err != nil { + log.Fatal(err) + } // Create a new rpc client and a subscription streamer with that client rpcClient := getRPCClient() @@ -170,7 +173,7 @@ func streamEthSubscription() { } func getRPCClient() core.RPCClient { - vulcPath := viper.GetString("superNode.ethSubscription.wsPath") + vulcPath := viper.GetString("superNode.ethSubscription.path") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided } diff --git a/cmd/superNode.go b/cmd/superNode.go index 58eac22c..de999300 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -30,11 +30,17 @@ import ( var superNodeCmd = &cobra.Command{ Use: "superNode", Short: "VulcanizeDB SuperNode", - Long: `This command works alongside a modified geth node which streams -all block and state (diff) data over a websocket subscription. This process -then converts the eth data to IPLD objects and publishes them to IPFS. Additionally, -it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which -relays relevant data to requesting clients.`, + Long: `This command configures a VulcanizeDB SuperNode. + +The Sync process streams all chain data from the appropriate chain, processes this data into IPLD objects +and publishes them to IPFS. It then indexes the CIDs against useful data fields/metadata in Postgres. + +The Serve process creates and exposes a rpc subscription server over ws and ipc. Transformers can subscribe to +these endpoints to stream + +The BackFill process spins up a background process which periodically probes the Postgres database to identify +and fill in gaps in the data +`, Run: func(cmd *cobra.Command, args []string) { subCommand = cmd.CalledAs() logWithCommand = *log.WithField("SubCommand", subCommand) @@ -68,7 +74,7 @@ func superNode() { } } if superNodeConfig.BackFill { - backFiller, err := super_node.NewBackFillService(superNodeConfig.BackFillSettings) + backFiller, err := super_node.NewBackFillService(superNodeConfig) if err != nil { logWithCommand.Fatal(err) } @@ -97,5 +103,6 @@ func startServers(superNode super_node.SuperNode, settings *config.SuperNode) er if err != nil { return err } - return nil + _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{"eth"}, nil, nil, rpc.HTTPTimeouts{}) + return err } diff --git a/db/migrations/00034_create_receipt_cids_table.sql b/db/migrations/00034_create_receipt_cids_table.sql index f53ab730..4015e3b0 100644 --- a/db/migrations/00034_create_receipt_cids_table.sql +++ b/db/migrations/00034_create_receipt_cids_table.sql @@ -4,7 +4,10 @@ CREATE TABLE public.receipt_cids ( tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, cid TEXT NOT NULL, contract VARCHAR(66), - topic0s VARCHAR(66)[] + topic0s VARCHAR(66)[], + topic1s VARCHAR(66)[], + topic2s VARCHAR(66)[], + topic3s VARCHAR(66)[] ); -- +goose Down diff --git a/db/schema.sql b/db/schema.sql index 8371d69e..a23e5831 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -593,7 +593,10 @@ CREATE TABLE public.receipt_cids ( tx_id integer NOT NULL, cid text NOT NULL, contract character varying(66), - topic0s character varying(66)[] + topic0s character varying(66)[], + topic1s character varying(66)[], + topic2s character varying(66)[], + topic3s character varying(66)[] ); diff --git a/documentation/super_node/setup.md b/documentation/super_node/setup.md new file mode 100644 index 00000000..d3f20a28 --- /dev/null +++ b/documentation/super_node/setup.md @@ -0,0 +1,216 @@ +## Super Node Setup + +Vulcanizedb can act as an index for chain data stored on IPFS through the use of the `superNode` command. + +### Manual Setup + +These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/statediffing) +and IPFS. + +#### IPFS +To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs) + +`go get github.com/ipfs/go-ipfs` + +`cd $GOPATH/src/github.com/ipfs/go-ipfs` + +`make install` + +If we want to use Postgres as our backing datastore, we need to use the vulcanize fork of go-ipfs. + +Start by adding the fork and switching over to it: + +`git remote add vulcanize https://github.com/vulcanize/go-ipfs.git` + +`git fetch vulcanize` + +`git checkout -b postgres_update vulcanize/postgres_update` + +Now install this fork of ipfs, first be sure to remove any previous installation. + +`make install` + +Check that is installed properly by running + +`ipfs` + +You should see the CLI info/help output. + +And now we initialize with the `postgresds` profile. +If ipfs was previously initialized we will need to remove the old profile first. +We also need to provide env variables for the postgres connection: + +We can either set these manually, e.g. +```bash +export IPFS_PGHOST= +export IPFS_PGUSER= +export IPFS_PGDATABASE= +export IPFS_PGPORT= +export IPFS_PGPASSWORD= +``` + +And then run the ipfs command + +`ipfs init --profile=postgresds` + +Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` +which has usage: + +`./ipfs_postgres.sh "` + +and will ask us to enter the password, avoiding storing it to an ENV variable. + +Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes (in fact, we can't). + +#### Geth +For Geth, we currently *require* a special fork, and we can set this up as follows: + +Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch + +`go get github.com/ethereum/go-ethereum` + +`cd $GOPATH/src/github.com/ethereum/go-ethereum` + +`git remote add vulcanize https://github.com/vulcanize/go-ethereum.git` + +`git fetch vulcanize` + +`git checkout -b statediffing vulcanize/statediff_at_anyblock-1.9.9` + +Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first) + +`make geth` + +And run the output binary with statediffing turned on: + +`cd $GOPATH/src/github.com/ethereum/go-ethereum/build/bin` + +`./geth --statediff --statediff.streamblock --ws --syncmode=full` + +Note: other CLI options- statediff specific ones included- can be explored with `./geth help` + +The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter. +Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this +will be indicated in the output. + +Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process. +The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc" + +#### Vulcanizedb + +The `superNode` command is used to initialize and run an instance of the VulcanizeDB SuperNode + +Usage: + +`./vulcanizedb superNode --config=` + +```toml +[superNode] + chain = "ethereum" + ipfsPath = "/root/.ipfs" + + [superNode.database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "ec2-user" + + [superNode.sync] + on = true + wsPath = "ws://127.0.0.1:8546" + workers = 1 + + [superNode.server] + on = true + ipcPath = "/root/.vulcanize/vulcanize.ipc" + wsPath = "127.0.0.1:8080" + + [superNode.backFill] + on = false + httpPath = "" + frequency = 5 + batchSize = 50 +``` + +### Dockerfile Setup + +The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance. +Note that the instance will need sufficient memory and storage for this to work. + +1. Install basic dependencies +``` +sudo yum update +sudo yum install -y curl gpg gcc gcc-c++ make git +``` + +2. Install Go 1.12 +``` +wget https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz +tar -xzf go1.12.6.linux-amd64.tar.gz +sudo mv go /usr/local +``` + +3. Edit .bash_profile to export GOPATH +``` +export GOROOT=/usr/local/go +export GOPATH=$HOME/go +export PATH=$GOPATH/bin:$GOROOT/bin:$PATH +``` + +4. Install and setup Postgres +``` +sudo yum install postgresql postgresql96-server +sudo service postgresql96 initdb +sudo service postgresql96 start +sudo -u postgres createuser -s ec2-user +sudo -u postgres createdb ec2-user +sudo su postgres +psql +ALTER USER "ec2-user" WITH SUPERUSER; +/q +exit +``` + +4b. Edit hba_file to trust connections +``` +psql +SHOW hba_file; +/q +sudo vim {PATH_TO_FILE} +``` + +4c. Stop and restart Postgres server to affect changes +``` +sudo service postgresql96 stop +sudo service postgresql96 start +``` + +5. Install and start Docker (exit and re-enter ec2 instance afterwards to affect changes) +``` +sudo yum install -y docker +sudo service docker start +sudo usermod -aG docker ec2-user +``` + +6. Fetch the repository and switch to this working branch +``` +go get github.com/vulcanize/vulcanizedb +cd $GOPATH/src/github.com/vulcanize/vulcanizedb +git checkout ipfs_concurrency +``` + +7. Create the db +``` +createdb vulcanize_public +``` + +8. Build and run the Docker image +``` +cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node +docker build . +docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID} +``` \ No newline at end of file diff --git a/documentation/super_node/subscription.md b/documentation/super_node/subscription.md new file mode 100644 index 00000000..09bd1ae0 --- /dev/null +++ b/documentation/super_node/subscription.md @@ -0,0 +1,96 @@ +## SuperNode Subscription + +A transformer can subscribe to the SueprNode service over its ipc or ws endpoints, when subscribing the transformer +specifies the chain and a set of parameters which define which subsets of that chain's data the server should feed to them. + +### Ethereum data +The `streamEthSubscribe` command serves as a simple demonstration/example of subscribing to the super-node Ethereum feed, it subscribes with a set of parameters +defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node Ethereum data, +the shared/libraries/streamer can be used. + +Usage: + +`./vulcanizedb streamEthSubscribe --config=` + +The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubscription config structure](../../pkg/super_node/config/eth_subscription.go) + +```toml +[superNode] + [superNode.ethSubscription] + historicalData = true + historicalDataOnly = false + startingBlock = 0 + endingBlock = 0 + wsPath = "ws://127.0.0.1:8080" + [superNode.ethSubscription.headerFilter] + off = false + uncles = false + [superNode.ethSubscription.txFilter] + off = false + src = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + dst = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + [superNode.ethSubscription.receiptFilter] + off = false + contracts = [] + topics = [ + [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + ] + ] + [superNode.ethSubscription.stateFilter] + off = false + addresses = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" + ] + intermediateNodes = false + [superNode.ethSubscription.storageFilter] + off = true + addresses = [] + storageKeys = [] + intermediateNodes = false +``` + +`ethSubscription.path` is used to define the SuperNode ws url OR ipc endpoint we subscribe to + +`ethSubscription.historicalData` specifies whether or not the super-node should look up historical data in its cache and +send that to the subscriber, if this is set to `false` then the super-node only streams newly synced/incoming data + +`ethSubscription.historicalDataOnly` will tell the super-node to only send historical data with the specified range and +not stream forward syncing data + +`ethSubscription.startingBlock` is the starting block number for the range we want to receive data in + +`ethSubscription.endingBlock` is the ending block number for the range we want to receive data in; +setting to 0 means there is no end/we will continue streaming indefinitely. + +`ethSubscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to +not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers. + +`ethSubscription.txFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to +not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, +if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained +in `src` and `dst`, respectively. + +`ethSubscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to +not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, +if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is +a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will +only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for +transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. + +`ethSubscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to +not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, +if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node +only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. + +`ethSubscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to +not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, +if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string +array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas +the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node +only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. \ No newline at end of file diff --git a/documentation/super_node/super-node.md b/documentation/super_node/super-node.md deleted file mode 100644 index 1fad5903..00000000 --- a/documentation/super_node/super-node.md +++ /dev/null @@ -1,348 +0,0 @@ -# Super Node - -Vulcanizedb can act as an index for Ethereum data stored on IPFS through the use of the `syncAndPublish` and -`syncPublishScreenAndServe` commands. - -## Manual Setup - -These commands work in conjunction with a [state-diffing full Geth node](https://github.com/vulcanize/go-ethereum/tree/statediffing) -and IPFS. - -### IPFS -To start, download and install [IPFS](https://github.com/vulcanize/go-ipfs) - -`go get github.com/ipfs/go-ipfs` - -`cd $GOPATH/src/github.com/ipfs/go-ipfs` - -`make install` - -If we want to use Postgres as our backing datastore, we need to use the vulcanize fork of go-ipfs. - -Start by adding the fork and switching over to it: - -`git remote add vulcanize https://github.com/vulcanize/go-ipfs.git` - -`git fetch vulcanize` - -`git checkout -b postgres_update vulcanize/postgres_update` - -Now install this fork of ipfs, first be sure to remove any previous installation. - -`make install` - -Check that is installed properly by running - -`ipfs` - -You should see the CLI info/help output. - -And now we initialize with the `postgresds` profile. -If ipfs was previously initialized we will need to remove the old profile first. -We also need to provide env variables for the postgres connection: - -We can either set these manually, e.g. -```bash -export IPFS_PGHOST= -export IPFS_PGUSER= -export IPFS_PGDATABASE= -export IPFS_PGPORT= -export IPFS_PGPASSWORD= -``` - -And then run the ipfs command - -`ipfs init --profile=postgresds` - -Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` -which has usage: - -`./ipfs_postgres.sh "` - -and will ask us to enter the password, avoiding storing it to an ENV variable. - -Once we have initialized ipfs, that is all we need to do with it- we do not need to run a daemon during the subsequent processes (in fact, we can't). - -### Geth -For Geth, we currently *require* a special fork, and we can set this up as follows: - -Begin by downloading geth and switching to the vulcanize/rpc_statediffing branch - -`go get github.com/ethereum/go-ethereum` - -`cd $GOPATH/src/github.com/ethereum/go-ethereum` - -`git remote add vulcanize https://github.com/vulcanize/go-ethereum.git` - -`git fetch vulcanize` - -`git checkout -b statediffing vulcanize/statediffing` - -Now, install this fork of geth (make sure any old versions have been uninstalled/binaries removed first) - -`make geth` - -And run the output binary with statediffing turned on: - -`cd $GOPATH/src/github.com/ethereum/go-ethereum/build/bin` - -`./geth --statediff --statediff.streamblock --ws --syncmode=full` - -Note: other CLI options- statediff specific ones included- can be explored with `./geth help` - -The output from geth should mention that it is `Starting statediff service` and block synchronization should begin shortly thereafter. -Note that until it receives a subscriber, the statediffing process does essentially nothing. Once a subscription is received, this -will be indicated in the output. - -Also in the output will be the websocket url and ipc paths that we will use to subscribe to the statediffing process. -The default ws url is "ws://127.0.0.1:8546" and the default ipcPath- on Darwin systems only- is "Users/user/Library/Ethereum/geth.ipc" - -### Vulcanizedb - -There are two commands to choose from: - -#### syncAndPublish - -`syncAndPublih` performs the functions of the super node- syncing data from Geth, converting them to IPLDs, -publishing those IPLDs to IPFS, and creating a local Postgres index to relate their CIDS to useful metadata. - -Usage: - -`./vulcanizedb syncAndPublish --config=` - -The config file for the `syncAndPublish` command looks very similar to the basic config file -```toml -[database] - name = "vulcanize_demo" - hostname = "localhost" - port = 5432 - -[client] - ipcPath = "ws://127.0.0.1:8546" - ipfsPath = "/Users/user/.ipfs" -``` - -With an additional field, `client.ipcPath`, that is either the ws url or the ipc path that Geth has exposed (the url and path output -when the geth sync was started), and `client.ipfsPath` which is the path the ipfs datastore directory. - -#### syncPublishScreenAndServe - -`syncPublishScreenAndServe` does everything that `syncAndPublish` does, plus it opens up an RPC server which exposes -an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to their transformations - -Usage: - -`./vulcanizedb syncPublishScreenAndServe --config=` - -The config file for the `syncPublishScreenAndServe` command has two additional fields and looks like: - -```toml -[database] - name = "vulcanize_demo" - hostname = "localhost" - port = 5432 - -[client] - ipcPath = "ws://127.0.0.1:8546" - ipfsPath = "/Users/user/.ipfs" - -[server] - ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" - wsEndpoint = "127.0.0.1:80" - -[superNodeBackFill] - on = false - ipcPath = "" - frequency = 5 -``` - -The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url -the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively. -Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to -be known by transformers for them to subscribe to the super node. - -Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential -for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and -anywhere throughout the sync if the processes are interrupted. The `superNodeBackFill` config mapping is used to optionally configure -the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps. -`superNodeBackFill.on` turns the backfill process on, the `superNodeBackFill.ipcPath` is the rpc path for the archival geth node, and `superNodeBackFill.frequency` -sets at what frequency (in minutes) the backfill process checks for and fills in gaps. - - -## Dockerfile Setup - -The below provides step-by-step directions for how to setup the super node using the provided Dockerfile on an AWS Linux AMI instance. -Note that the instance will need sufficient memory and storage for this to work. - -1. Install basic dependencies -``` -sudo yum update -sudo yum install -y curl gpg gcc gcc-c++ make git -``` - -2. Install Go 1.12 -``` -wget https://dl.google.com/go/go1.12.6.linux-amd64.tar.gz -tar -xzf go1.12.6.linux-amd64.tar.gz -sudo mv go /usr/local -``` - -3. Edit .bash_profile to export GOPATH -``` -export GOROOT=/usr/local/go -export GOPATH=$HOME/go -export PATH=$GOPATH/bin:$GOROOT/bin:$PATH -``` - -4. Install and setup Postgres -``` -sudo yum install postgresql postgresql96-server -sudo service postgresql96 initdb -sudo service postgresql96 start -sudo -u postgres createuser -s ec2-user -sudo -u postgres createdb ec2-user -sudo su postgres -psql -ALTER USER "ec2-user" WITH SUPERUSER; -/q -exit -``` - -4b. Edit hba_file to trust connections -``` -psql -SHOW hba_file; -/q -sudo vim {PATH_TO_FILE} -``` - -4c. Stop and restart Postgres server to affect changes -``` -sudo service postgresql96 stop -sudo service postgresql96 start -``` - -5. Install and start Docker (exit and re-enter ec2 instance afterwards to affect changes) -``` -sudo yum install -y docker -sudo service docker start -sudo usermod -aG docker ec2-user -``` - -6. Fetch the repository and switch to this working branch -``` -go get github.com/vulcanize/vulcanizedb -cd $GOPATH/src/github.com/vulcanize/vulcanizedb -git checkout ipfs_concurrency -``` - -7. Create the db -``` -createdb vulcanize_public -``` - -8. Build and run the Docker image -``` -cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node -docker build . -docker run --network host -e VDB_PG_CONNECT=postgres://localhost:5432/vulcanize_public?sslmode=disable {IMAGE_ID} -``` - - -## Subscribing - -A transformer can subscribe to the `syncPublishScreenAndServe` service over its ipc or ws endpoints, when subscribing the transformer -specifies which subsets of the synced data it is interested in and the server will forward only these data. - -The `streamSubscribe` command serves as a simple demonstration/example of subscribing to the super-node feed, it subscribes with a set of parameters -defined in the loaded config file, and prints the streamed data to stdout. To build transformers that subscribe to and use super-node data, -the shared/libraries/streamer can be used. - -Usage: - -`./vulcanizedb streamSubscribe --config=` - -The config for `streamSubscribe` has the `subscribe` set of parameters, for example: - -```toml -[subscription] - path = "ws://127.0.0.1:8080" - backfill = true - backfillOnly = false - startingBlock = 0 - endingBlock = 0 - [subscription.headerFilter] - off = false - uncles = false - [subscription.trxFilter] - off = false - src = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - dst = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - [subscription.receiptFilter] - off = false - topic0s = [ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" - ] - [subscription.stateFilter] - off = false - addresses = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" - ] - intermediateNodes = false - [subscription.storageFilter] - off = true - addresses = [ - "", - "" - ] - storageKeys = [ - "", - "" - ] - intermediateNodes = false -``` - -`subscription.path` is used to define the ws url OR ipc endpoint we will subscribe to the super-node over -(the `server.ipcPath` or `server.wsEndpoint` that the super-node has defined in their config file). - -`subscription.backfill` specifies whether or not the super-node should look up historical data in its cache and -send that to the subscriber, if this is set to `false` then the super-node only forwards newly synced/incoming data. - -`subscription.backfillOnly` will tell the super-node to only send historical data and not stream incoming data going forward. - -`subscription.startingBlock` is the starting block number for the range we want to receive data in. - -`subscription.endingBlock` is the ending block number for the range we want to receive data in; -setting to 0 means there is no end/we will continue indefinitely. - -`subscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to -not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers. - -`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to -not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, -if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained -in `src` and `dst`, respectively. - -`subscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to -not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, -if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is -a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will -only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for -transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. - -`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, -if it has any addresses then the super-node will only send state leafs (accounts) corresponding to those account addresses. By default the super-node -only sends along state leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. - -`subscription.storageFilter` has four sub-options: `off`, `addresses`, `storageKeys`, and `intermediateNodes`. Setting `off` to true tells the super-node to -not send any storage data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter storage for, -if it has any addresses then the super-node will only send storage nodes from the storage tries at those state addresses. `storageKeys` is another string -array that can be filled with storage keys we want to filter storage data for. It is important to note that the storageKeys are the actual keccak256 hashes, whereas -the addresses in the `addresses` fields are the ETH addresses and not their keccak256 hashes that serve as the actual state keys. By default the super-node -only sends along storage leafs, if we want to receive branch and extension nodes as well `intermediateNodes` can be set to `true`. diff --git a/environments/superNode.toml b/environments/superNode.toml index 72481603..5447c365 100644 --- a/environments/superNode.toml +++ b/environments/superNode.toml @@ -21,4 +21,5 @@ [superNode.backFill] on = false httpPath = "" - frequency = 5 \ No newline at end of file + frequency = 5 + batchSize = 50 \ No newline at end of file diff --git a/environments/superNodeSubscription.toml b/environments/superNodeSubscription.toml index 7a384ef9..b3cea0c0 100644 --- a/environments/superNodeSubscription.toml +++ b/environments/superNodeSubscription.toml @@ -8,7 +8,7 @@ [superNode.ethSubscription.headerFilter] off = false uncles = false - [superNode.ethSubscription.trxFilter] + [superNode.ethSubscription.txFilter] off = false src = [ "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", @@ -19,9 +19,11 @@ [superNode.ethSubscription.receiptFilter] off = false contracts = [] - topic0s = [ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + topics = [ + [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + ] ] [superNode.ethSubscription.stateFilter] off = false diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/builders.go similarity index 100% rename from pkg/ipfs/helpers.go rename to pkg/ipfs/builders.go diff --git a/pkg/ipfs/models.go b/pkg/ipfs/models.go index 62093ae0..eb0312be 100644 --- a/pkg/ipfs/models.go +++ b/pkg/ipfs/models.go @@ -16,7 +16,7 @@ package ipfs -type IPLDModel struct { - Key string `db:"key"` +type BlockModel struct { + CID string `db:"key"` Data []byte `db:"data"` } diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index d736c601..fa31b74e 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -62,7 +62,7 @@ type BackFillService struct { } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(settings *config.BackFill) (BackFillInterface, error) { +func NewBackFillService(settings *config.SuperNode) (BackFillInterface, error) { publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) if err != nil { return nil, err @@ -83,6 +83,10 @@ func NewBackFillService(settings *config.BackFill) (BackFillInterface, error) { if err != nil { return nil, err } + batchSize := settings.BatchSize + if batchSize == 0 { + batchSize = DefaultMaxBatchSize + } return &BackFillService{ Indexer: indexer, Converter: converter, @@ -90,7 +94,7 @@ func NewBackFillService(settings *config.BackFill) (BackFillInterface, error) { Retriever: retriever, Fetcher: fetcher, GapCheckFrequency: settings.Frequency, - BatchSize: settings.BatchSize, + BatchSize: batchSize, }, nil } diff --git a/pkg/super_node/config/config.go b/pkg/super_node/config/config.go index 608fa827..04fc8195 100644 --- a/pkg/super_node/config/config.go +++ b/pkg/super_node/config/config.go @@ -42,31 +42,35 @@ type SuperNode struct { Chain ChainType IPFSPath string DB *postgres.DB + DBConfig config.Database Quit chan bool // Server fields - Serve bool - WSEndpoint string - IPCEndpoint string + Serve bool + WSEndpoint string + HTTPEndpoint string + IPCEndpoint string // Sync params Sync bool Workers int WSClient core.RPCClient NodeInfo core.Node // Backfiller params - BackFill bool - BackFillSettings *BackFill + BackFill bool + HTTPClient core.RPCClient + Frequency time.Duration + BatchSize uint64 } -// NewSuperNodeConfig is used to initialize a SuperNode config +// NewSuperNodeConfig is used to initialize a SuperNode config from a config .toml file func NewSuperNodeConfig() (*SuperNode, error) { - dbConfig := config.Database{ + sn := new(SuperNode) + sn.DBConfig = config.Database{ Name: viper.GetString("superNode.database.name"), Hostname: viper.GetString("superNode.database.hostname"), Port: viper.GetInt("superNode.database.port"), User: viper.GetString("superNode.database.user"), Password: viper.GetString("superNode.database.password"), } - sn := new(SuperNode) var err error sn.Chain, err = NewChainType(viper.GetString("superNode.chain")) if err != nil { @@ -94,7 +98,7 @@ func NewSuperNodeConfig() (*SuperNode, error) { if sn.Serve { wsPath := viper.GetString("superNode.server.wsPath") if wsPath == "" { - wsPath = "127.0.0.1:8080" + wsPath = "ws://127.0.0.1:8546" } sn.WSEndpoint = wsPath ipcPath := viper.GetString("superNode.server.ipcPath") @@ -106,48 +110,31 @@ func NewSuperNodeConfig() (*SuperNode, error) { ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") } sn.IPCEndpoint = ipcPath + httpPath := viper.GetString("superNode.server.httpPath") + if httpPath == "" { + httpPath = "http://127.0.0.1:8547" + } + sn.HTTPEndpoint = httpPath } - db := utils.LoadPostgres(dbConfig, sn.NodeInfo) + db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo) sn.DB = &db sn.Quit = make(chan bool) if viper.GetBool("superNode.backFill.on") { - sn.BackFill = true - sn.BackFillSettings, err = NewBackFillerConfig(dbConfig) + if err := sn.BackFillFields(); err != nil { + return nil, err + } } return sn, err } -// BackFill config struct -type BackFill struct { - Chain ChainType - IPFSPath string - DB *postgres.DB - HTTPClient core.RPCClient - Frequency time.Duration - BatchSize uint64 -} - -// newBackFillerConfig is used to initialize a backfiller config -func NewBackFillerConfig(dbConfig config.Database) (*BackFill, error) { - bf := new(BackFill) - var err error - bf.Chain, err = NewChainType(viper.GetString("superNode.chain")) +// BackFillFields is used to fill in the BackFill fields of the config +func (sn *SuperNode) BackFillFields() error { + sn.BackFill = true + _, httpClient, err := getNodeAndClient(sn.Chain, viper.GetString("superNode.backFill.httpPath")) if err != nil { - return nil, err + return err } - ipfsPath := viper.GetString("superNode.ipfsPath") - if ipfsPath == "" { - home, homeDirErr := os.UserHomeDir() - if homeDirErr != nil { - return nil, err - } - ipfsPath = filepath.Join(home, ".ipfs") - } - bf.IPFSPath = ipfsPath - node, httpClient, err := getNodeAndClient(bf.Chain, viper.GetString("superNode.backFill.httpPath")) - db := utils.LoadPostgres(dbConfig, node) - bf.DB = &db - bf.HTTPClient = httpClient + sn.HTTPClient = httpClient freq := viper.GetInt("superNode.backFill.frequency") var frequency time.Duration if freq <= 0 { @@ -155,8 +142,9 @@ func NewBackFillerConfig(dbConfig config.Database) (*BackFill, error) { } else { frequency = time.Duration(freq) } - bf.Frequency = frequency - return bf, nil + sn.Frequency = frequency + sn.BatchSize = uint64(viper.GetInt64("superNode.backFill.batchSize")) + return nil } func getNodeAndClient(chain ChainType, path string) (core.Node, core.RPCClient, error) { diff --git a/pkg/super_node/config/eth_subscription.go b/pkg/super_node/config/eth_subscription.go index 161efa89..3108990a 100644 --- a/pkg/super_node/config/eth_subscription.go +++ b/pkg/super_node/config/eth_subscription.go @@ -17,6 +17,7 @@ package config import ( + "errors" "math/big" "github.com/spf13/viper" @@ -53,7 +54,7 @@ type ReceiptFilter struct { Off bool MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions Contracts []string - Topic0s []string + Topics [][]string } // StateFilter contains filter settings for state @@ -72,7 +73,7 @@ type StorageFilter struct { } // Init is used to initialize a EthSubscription struct with env variables -func NewEthSubscriptionConfig() *EthSubscription { +func NewEthSubscriptionConfig() (*EthSubscription, error) { sc := new(EthSubscription) // Below default to false, which means we do not backfill by default sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData") @@ -89,16 +90,22 @@ func NewEthSubscriptionConfig() *EthSubscription { // Below defaults to false and two slices of length 0 // Which means we get all transactions by default sc.TxFilter = TxFilter{ - Off: viper.GetBool("superNode.ethSubscription.trxFilter.off"), - Src: viper.GetStringSlice("superNode.ethSubscription.trxFilter.src"), - Dst: viper.GetStringSlice("superNode.ethSubscription.trxFilter.dst"), + Off: viper.GetBool("superNode.ethSubscription.txFilter.off"), + Src: viper.GetStringSlice("superNode.ethSubscription.txFilter.src"), + Dst: viper.GetStringSlice("superNode.ethSubscription.txFilter.dst"), } // Below defaults to false and one slice of length 0 // Which means we get all receipts by default + t := viper.Get("superNode.ethSubscription.receiptFilter.topics") + topics, ok := t.([][]string) + if !ok { + return nil, errors.New("superNode.ethSubscription.receiptFilter.topics needs to be a slice of string slices") + } sc.ReceiptFilter = ReceiptFilter{ Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"), + MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"), Contracts: viper.GetStringSlice("superNode.ethSubscription.receiptFilter.contracts"), - Topic0s: viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic0s"), + Topics: topics, } // Below defaults to two false, and a slice of length 0 // Which means we get all state leafs by default, but no intermediate nodes @@ -115,7 +122,7 @@ func NewEthSubscriptionConfig() *EthSubscription { Addresses: viper.GetStringSlice("superNode.ethSubscription.storageFilter.addresses"), StorageKeys: viper.GetStringSlice("superNode.ethSubscription.storageFilter.storageKeys"), } - return sc + return sc, nil } // StartingBlock satisfies the SubscriptionSettings() interface diff --git a/pkg/super_node/eth/api.go b/pkg/super_node/eth/api.go index 668e96e6..fb441ff1 100644 --- a/pkg/super_node/eth/api.go +++ b/pkg/super_node/eth/api.go @@ -18,11 +18,17 @@ package eth import ( "context" + "math/big" - "github.com/ethereum/go-ethereum/core/types" - + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ipfs/go-block-format" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) // APIName is the namespace for the super node's eth api @@ -42,35 +48,159 @@ func NewPublicEthAPI(b *Backend) *PublicEthAPI { } } -/* -to start, need -eth_blockNumber -eth_getLogs -eth_getHeaderByNumber -*/ - // BlockNumber returns the block number of the chain head. func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 { number, _ := pea.b.retriever.RetrieveLastBlockNumber() return hexutil.Uint64(number) } +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) ([]*types.Log, error) { + // Convert FilterQuery into ReceiptFilter + addrStrs := make([]string, len(crit.Addresses)) + for i, addr := range crit.Addresses { + addrStrs[i] = addr.String() + } + topicStrSets := make([][]string, 4) + for i, topicSet := range crit.Topics { + if i > 3 { + break + } + for _, topic := range topicSet { + topicStrSets[i] = append(topicStrSets[i], topic.String()) + } + } + filter := config.ReceiptFilter{ + Contracts: addrStrs, + Topics: topicStrSets, + } + tx, err := pea.b.db.Beginx() + if err != nil { + return nil, err + } + // If we have a blockhash to filter on, fire off single retrieval query + if crit.BlockHash != nil { + rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil) + if err != nil { + return nil, err + } + if err := tx.Commit(); err != nil { + return nil, err + } + rctIPLDs, err := pea.b.fetcher.FetchRcts(rctCIDs) + if err != nil { + return nil, err + } + return extractLogsOfInterest(rctIPLDs, filter.Topics) + } + // Otherwise, create block range from criteria + // nil values are filled in; to request a single block have both ToBlock and FromBlock equal that number + startingBlock := crit.FromBlock + endingBlock := crit.ToBlock + if startingBlock == nil { + startingBlockInt, err := pea.b.retriever.RetrieveFirstBlockNumber() + if err != nil { + return nil, err + } + startingBlock = big.NewInt(startingBlockInt) + } + if endingBlock == nil { + endingBlockInt, err := pea.b.retriever.RetrieveLastBlockNumber() + if err != nil { + return nil, err + } + endingBlock = big.NewInt(endingBlockInt) + } + start := startingBlock.Int64() + end := endingBlock.Int64() + allRctCIDs := make([]ReceiptModel, 0) + for i := start; i <= end; i++ { + rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) + if err != nil { + return nil, err + } + allRctCIDs = append(allRctCIDs, rctCIDs...) + } + if err := tx.Commit(); err != nil { + return nil, err + } + rctIPLDs, err := pea.b.fetcher.FetchRcts(allRctCIDs) + if err != nil { + return nil, err + } + return extractLogsOfInterest(rctIPLDs, filter.Topics) +} + +func extractLogsOfInterest(rctIPLDs []blocks.Block, wantedTopics [][]string) ([]*types.Log, error) { + var logs []*types.Log + for _, rctIPLD := range rctIPLDs { + rctRLP := rctIPLD.RawData() + var rct types.ReceiptForStorage + if err := rlp.DecodeBytes(rctRLP, &rct); err != nil { + return nil, err + } + for _, log := range rct.Logs { + if wanted := wantedLog(wantedTopics, log.Topics); wanted == true { + logs = append(logs, log) + } + } + } + return logs, nil +} + +// returns true if the log matches on the filter +func wantedLog(wantedTopics [][]string, actualTopics []common.Hash) bool { + // actualTopics will always have length <= 4 + // wantedTopics will always have length == 4 + matches := 0 + for i, actualTopic := range actualTopics { + // If we have topics in this filter slot, count as a match if the actualTopic matches one of the ones in this filter slot + if len(wantedTopics[i]) > 0 { + matches += sliceContainsHash(wantedTopics[i], actualTopic) + } else { + // Filter slot is empty, not matching any topics at this slot => counts as a match + matches++ + } + } + if matches == len(actualTopics) { + return true + } + return false +} + +// returns 1 if the slice contains the hash, 0 if it does not +func sliceContainsHash(slice []string, hash common.Hash) int { + for _, str := range slice { + if str == hash.String() { + return 1 + } + } + return 0 +} + // GetHeaderByNumber returns the requested canonical block header. -// * When blockNr is -1 the chain head is returned. +// When blockNr is -1 the chain head is returned. +// We cannot support pending block calls since we do not have an active miner func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { header, err := pea.b.HeaderByNumber(ctx, number) if header != nil && err == nil { - return pea.rpcMarshalHeader(header), err + return pea.rpcMarshalHeader(header) } return nil, err } // rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires -// a `PublicBlockchainAPI`. -func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) map[string]interface{} { +// a `PublicEthAPI`. +func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) { fields := RPCMarshalHeader(header) - fields["totalDifficulty"] = (*hexutil.Big)(pea.b.GetTd(header.Hash())) - return fields + td, err := pea.b.GetTd(header.Hash()) + if err != nil { + return nil, err + } + fields["totalDifficulty"] = (*hexutil.Big)(td) + return fields, nil } // RPCMarshalHeader converts the given header to the RPC output . diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index f1d6f5cd..84e8cf00 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -29,6 +29,7 @@ import ( "github.com/hashicorp/golang-lru" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) var ( @@ -38,6 +39,7 @@ var ( type Backend struct { retriever *CIDRetriever fetcher *IPLDFetcher + resolver *IPLDResolver db *postgres.DB headerCache *lru.Cache // Cache for the most recent block headers @@ -54,6 +56,7 @@ func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) { return &Backend{ retriever: r, fetcher: f, + resolver: NewIPLDResolver(), db: r.Database(), }, nil } @@ -101,6 +104,49 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe return header, nil } -func (b *Backend) GetTd(blockHash common.Hash) *big.Int { - panic("implement me") +// GetTd retrieves and returns the total difficulty at the given block hash +func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) { + pgStr := `SELECT header_cids.td FROM header_cids + WHERE header_cids.block_hash = $1` + var tdStr string + err := b.db.Select(&tdStr, pgStr, blockHash.String()) + if err != nil { + return nil, err + } + td, ok := new(big.Int).SetString(tdStr, 10) + if !ok { + return nil, errors.New("total difficulty retrieved from Postgres cannot be converted to an integer") + } + return td, nil +} + +func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { + tx, err := b.db.Beginx() + if err != nil { + return nil, err + } + receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, config.ReceiptFilter{}, 0, &hash, nil) + if err != nil { + return nil, err + } + if err := tx.Commit(); err != nil { + return nil, err + } + if len(receiptCIDs) == 0 { + return nil, nil + } + receiptIPLDs, err := b.fetcher.FetchRcts(receiptCIDs) + if err != nil { + return nil, err + } + receiptBytes := b.resolver.ResolveReceipts(receiptIPLDs) + logs := make([][]*types.Log, len(receiptBytes)) + for i, rctRLP := range receiptBytes { + var rct types.ReceiptForStorage + if err := rlp.DecodeBytes(rctRLP, &rct); err != nil { + return nil, err + } + logs[i] = rct.Logs + } + return logs, nil } diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index bc1c3141..c99a6136 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -75,8 +75,9 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { return nil, err } txMeta := TxModel{ - Dst: handleNullAddr(trx.To()), - Src: handleNullAddr(&from), + Dst: handleNullAddr(trx.To()), + Src: handleNullAddr(&from), + TxHash: trx.Hash().String(), } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta) @@ -98,16 +99,21 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { if transactions[i].To() != nil { receipt.ContractAddress = *transactions[i].To() } - // Extract topic0 data from the receipt's logs for indexing - rctMeta := ReceiptModel{ - Topic0s: make([]string, 0, len(receipt.Logs)), - Contract: receipt.ContractAddress.Hex(), - } + // Extract topic and contract data from the receipt for indexing + topicSets := make([][]string, 4) for _, log := range receipt.Logs { - if len(log.Topics) < 1 { - continue + for i := range topicSets { + if i < len(log.Topics) { + topicSets[i] = append(topicSets[i], log.Topics[i].Hex()) + } } - rctMeta.Topic0s = append(rctMeta.Topic0s, log.Topics[0].Hex()) + } + rctMeta := ReceiptModel{ + Topic0s: topicSets[0], + Topic1s: topicSets[1], + Topic2s: topicSets[2], + Topic3s: topicSets[3], + Contract: receipt.ContractAddress.Hex(), } // receipt and rctMeta will have same indexes convertedPayload.Receipts = append(convertedPayload.Receipts, receipt) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 4148006d..2b71d433 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/vulcanize/vulcanizedb/pkg/super_node/config" @@ -54,7 +55,11 @@ func (s *ResponseFilterer) Filter(filter, payload interface{}) (interface{}, err if err != nil { return StreamPayload{}, err } - if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, txHashes); err != nil { + var filterTxs []common.Hash + if ethFilters.ReceiptFilter.MatchTxs { + filterTxs = txHashes + } + if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { return StreamPayload{}, err } if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil { @@ -99,8 +104,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, respons for i, trx := range payload.Block.Body().Transactions { if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) - err := trx.EncodeRLP(trxBuffer) - if err != nil { + if err := trx.EncodeRLP(trxBuffer); err != nil { return nil, err } trxHashes = append(trxHashes, trx.Hash()) @@ -132,11 +136,12 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamPayload, payload *IPLDPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { for i, receipt := range payload.Receipts { - if checkReceipts(receipt, receiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, receiptFilter.Contracts, payload.ReceiptMetaData[i].Contract, trxHashes, receiptFilter.MatchTxs) { + // topics is always length 4 + topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} + if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.Contracts, payload.ReceiptMetaData[i].Contract, trxHashes) { receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptBuffer := new(bytes.Buffer) - err := receiptForStorage.EncodeRLP(receiptBuffer) - if err != nil { + if err := receiptForStorage.EncodeRLP(receiptBuffer); err != nil { return err } response.ReceiptsRlp = append(response.ReceiptsRlp, receiptBuffer.Bytes()) @@ -146,57 +151,78 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, res return nil } -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash, matchTxs bool) bool { +func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool { // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go - if len(wantedTopics) == 0 && len(wantedContracts) == 0 && (len(wantedTrxHashes) == 0 || !matchTxs) { + if len(wantedTopics) == 0 && len(wantedContracts) == 0 && len(wantedTrxHashes) == 0 { return true } - // No matter what filters we have, we keep receipts for specific trxs we are interested in - if matchTxs { - for _, wantedTrxHash := range wantedTrxHashes { - if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { + // Keep receipts that are from watched txs + for _, wantedTrxHash := range wantedTrxHashes { + if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { + return true + } + } + // If there are no wanted contract addresses, we keep all receipts that match the topic filter + if len(wantedContracts) == 0 { + if match := filterMatch(wantedTopics, actualTopics); match == true { + return true + } + } + // If there are wanted contract addresses to filter on + for _, wantedAddr := range wantedContracts { + // and this is an address of interest + if wantedAddr == actualContract { + // we keep the receipt if it matches on the topic filter + if match := filterMatch(wantedTopics, actualTopics); match == true { return true } } } + return false +} - if len(wantedContracts) == 0 { - // We keep all receipts that have logs we are interested in - for _, wantedTopic := range wantedTopics { - for _, actualTopic := range actualTopics { - if wantedTopic == actualTopic { - return true - } +func filterMatch(wantedTopics, actualTopics [][]string) bool { + // actualTopics should always be length 4, members could be nil slices though + lenWantedTopics := len(wantedTopics) + matches := 0 + for i, actualTopicSet := range actualTopics { + if i < lenWantedTopics { + // If we have topics in this filter slot, count as a match if one of the topics matches + if len(wantedTopics[i]) > 0 { + matches += slicesShareString(actualTopicSet, wantedTopics[i]) + } else { + // Filter slot is empty, not matching any topics at this slot => counts as a match + matches++ } + } else { + // Filter slot doesn't exist, not matching any topics at this slot => count as a match + matches++ } - } else { // We keep all receipts that belong to one of the specified contracts if we aren't filtering on topics - for _, wantedContract := range wantedContracts { - if wantedContract == actualContract { - if len(wantedTopics) == 0 { - return true - } - // Or if we have contracts and topics to filter on we only keep receipts that satisfy both conditions - for _, wantedTopic := range wantedTopics { - for _, actualTopic := range actualTopics { - if wantedTopic == actualTopic { - return true - } - } - } + } + if matches == 4 { + return true + } + return false +} + +// returns 1 if the two slices have a string in common, 0 if they do not +func slicesShareString(slice1, slice2 []string) int { + for _, str1 := range slice1 { + for _, str2 := range slice2 { + if str1 == str2 { + return 1 } } } - - return false + return 0 } func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamPayload, payload *IPLDPayload) error { if !stateFilter.Off { response.StateNodesRlp = make(map[common.Hash][]byte) - keyFilters := make([]common.Hash, 0, len(stateFilter.Addresses)) - for _, addr := range stateFilter.Addresses { - keyFilter := AddressToKey(common.HexToAddress(addr)) - keyFilters = append(keyFilters, keyFilter) + keyFilters := make([]common.Hash, len(stateFilter.Addresses)) + for i, addr := range stateFilter.Addresses { + keyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) } for _, stateNode := range payload.StateNodes { if checkNodeKeys(keyFilters, stateNode.Key) { @@ -225,15 +251,13 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamPayload, payload *IPLDPayload) error { if !storageFilter.Off { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) - stateKeyFilters := make([]common.Hash, 0, len(storageFilter.Addresses)) - for _, addr := range storageFilter.Addresses { - keyFilter := AddressToKey(common.HexToAddress(addr)) - stateKeyFilters = append(stateKeyFilters, keyFilter) + stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses)) + for i, addr := range storageFilter.Addresses { + stateKeyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) } - storageKeyFilters := make([]common.Hash, 0, len(storageFilter.StorageKeys)) - for _, store := range storageFilter.StorageKeys { - keyFilter := HexToKey(store) - storageKeyFilters = append(storageKeyFilters, keyFilter) + storageKeyFilters := make([]common.Hash, len(storageFilter.StorageKeys)) + for i, store := range storageFilter.StorageKeys { + storageKeyFilters[i] = common.HexToHash(store) } for stateKey, storageNodes := range payload.StorageNodes { if checkNodeKeys(stateKeyFilters, stateKey) { diff --git a/pkg/super_node/eth/filterer_test.go b/pkg/super_node/eth/filterer_test.go index ea285f0e..73868627 100644 --- a/pkg/super_node/eth/filterer_test.go +++ b/pkg/super_node/eth/filterer_test.go @@ -19,14 +19,13 @@ package eth_test import ( "bytes" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var ( @@ -159,6 +158,18 @@ var _ = Describe("Filterer", func() { Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0)) Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1)) Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + + payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + superNodePayload8, ok := payload8.(eth.StreamPayload) + Expect(ok).To(BeTrue()) + Expect(superNodePayload8.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) + Expect(len(superNodePayload8.HeadersRlp)).To(Equal(0)) + Expect(len(superNodePayload8.UnclesRlp)).To(Equal(0)) + Expect(len(superNodePayload8.TransactionsRlp)).To(Equal(0)) + Expect(len(superNodePayload8.StorageNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload8.StateNodesRlp)).To(Equal(0)) + Expect(len(superNodePayload8.ReceiptsRlp)).To(Equal(0)) }) }) }) diff --git a/pkg/super_node/eth/helpers.go b/pkg/super_node/eth/helpers.go deleted file mode 100644 index fec021b7..00000000 --- a/pkg/super_node/eth/helpers.go +++ /dev/null @@ -1,33 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" -) - -// AddressToKey hashes an address -func AddressToKey(address common.Address) common.Hash { - return crypto.Keccak256Hash(address[:]) -} - -// HexToKey hashes a hex (0x leading or not) string -func HexToKey(hex string) common.Hash { - addr := common.FromHex(hex) - return crypto.Keccak256Hash(addr[:]) -} diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index e94061b8..0e1a7b98 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" @@ -118,8 +117,8 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa } func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error { - _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s) VALUES ($1, $2, $3, $4)`, - txID, cidMeta.CID, cidMeta.Contract, pq.Array(cidMeta.Topic0s)) + _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s, topic1s, topic2s, topic3s) VALUES ($1, $2, $3, $4, $5, $6, $7)`, + txID, cidMeta.CID, cidMeta.Contract, pq.Array(cidMeta.Topic0s), pq.Array(cidMeta.Topic1s), pq.Array(cidMeta.Topic2s), pq.Array(cidMeta.Topic3s)) return err } diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index 71d6feed..24076f73 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" @@ -52,32 +53,82 @@ var ( MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header()) + Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") + AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") + mockTopic11 = common.HexToHash("0x04") + mockTopic12 = common.HexToHash("0x06") + mockTopic21 = common.HexToHash("0x05") + mockTopic22 = common.HexToHash("0x07") MockTrxMeta = []eth.TxModel{ { - CID: "", // This is empty until we go to publish to ipfs - Src: senderAddr.Hex(), - Dst: "0x0000000000000000000000000000000000000000", + CID: "", // This is empty until we go to publish to ipfs + Src: senderAddr.Hex(), + Dst: Address.String(), + TxHash: MockTransactions[0].Hash().String(), }, { - CID: "", - Src: senderAddr.Hex(), - Dst: "0x0000000000000000000000000000000000000001", + CID: "", + Src: senderAddr.Hex(), + Dst: AnotherAddress.String(), + TxHash: MockTransactions[1].Hash().String(), + }, + } + MockTrxMetaPostPublsh = []eth.TxModel{ + { + CID: "mockTrxCID1", // This is empty until we go to publish to ipfs + Src: senderAddr.Hex(), + Dst: Address.String(), + TxHash: MockTransactions[0].Hash().String(), + }, + { + CID: "mockTrxCID2", + Src: senderAddr.Hex(), + Dst: AnotherAddress.String(), + TxHash: MockTransactions[1].Hash().String(), }, } MockRctMeta = []eth.ReceiptModel{ { CID: "", Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000004", + mockTopic11.String(), }, - Contract: "0x0000000000000000000000000000000000000000", + Topic1s: []string{ + mockTopic12.String(), + }, + Contract: Address.String(), }, { CID: "", Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000005", + mockTopic21.String(), }, - Contract: "0x0000000000000000000000000000000000000001", + Topic1s: []string{ + mockTopic22.String(), + }, + Contract: AnotherAddress.String(), + }, + } + MockRctMetaPostPublish = []eth.ReceiptModel{ + { + CID: "mockRctCID1", + Topic0s: []string{ + mockTopic11.String(), + }, + Topic1s: []string{ + mockTopic12.String(), + }, + Contract: Address.String(), + }, + { + CID: "mockRctCID2", + Topic0s: []string{ + mockTopic21.String(), + }, + Topic1s: []string{ + mockTopic22.String(), + }, + Contract: AnotherAddress.String(), }, } @@ -99,10 +150,8 @@ var ( Leaf: true, }} emptyStorage = make([]statediff.StorageDiff, 0) - Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") - ContractLeafKey = eth.AddressToKey(Address) - AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") - AnotherContractLeafKey = eth.AddressToKey(AnotherAddress) + ContractLeafKey = crypto.Keccak256Hash(Address.Bytes()) + AnotherContractLeafKey = crypto.Keccak256Hash(AnotherAddress.Bytes()) testAccount = state.Account{ Nonce: NonceValue, Balance: big.NewInt(BalanceValue), @@ -150,6 +199,18 @@ var ( Leaf: true, }, } + MockStateMetaPostPublish = []eth.StateNodeModel{ + { + CID: "mockStateCID1", + Leaf: true, + StateKey: ContractLeafKey.String(), + }, + { + CID: "mockStateCID2", + Leaf: true, + StateKey: AnotherContractLeafKey.String(), + }, + } MockStorageNodes = map[common.Hash][]eth.TrieNode{ ContractLeafKey: { { @@ -173,36 +234,10 @@ var ( Block: MockBlock, Receipts: MockReceipts, HeaderRLP: MockHeaderRlp, - TrxMetaData: []eth.TxModel{ - { - CID: "", - Src: senderAddr.Hex(), - Dst: "0x0000000000000000000000000000000000000000", - }, - { - CID: "", - Src: senderAddr.Hex(), - Dst: "0x0000000000000000000000000000000000000001", - }, - }, - ReceiptMetaData: []eth.ReceiptModel{ - { - CID: "", - Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000004", - }, - Contract: "0x0000000000000000000000000000000000000000", - }, - { - CID: "", - Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000005", - }, - Contract: "0x0000000000000000000000000000000000000001", - }, - }, - StorageNodes: MockStorageNodes, - StateNodes: MockStateNodes, + TrxMetaData: MockTrxMeta, + ReceiptMetaData: MockRctMeta, + StorageNodes: MockStorageNodes, + StateNodes: MockStateNodes, } MockCIDPayload = ð.CIDPayload{ @@ -214,45 +249,13 @@ var ( ParentHash: MockBlock.ParentHash().String(), TotalDifficulty: "1337", }, - UncleCIDs: []eth2.HeaderModel{}, - TransactionCIDs: []eth.TxModel{ - { - TxHash: MockTransactions[0].Hash().String(), - CID: "mockTrxCID1", - Dst: "0x0000000000000000000000000000000000000000", - Src: senderAddr.Hex(), - }, - { - TxHash: MockTransactions[1].Hash().String(), - CID: "mockTrxCID2", - Dst: "0x0000000000000000000000000000000000000001", - Src: senderAddr.Hex(), - }, - }, + UncleCIDs: []eth2.HeaderModel{}, + TransactionCIDs: MockTrxMetaPostPublsh, ReceiptCIDs: map[common.Hash]eth.ReceiptModel{ - MockTransactions[0].Hash(): { - CID: "mockRctCID1", - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004"}, - Contract: "0x0000000000000000000000000000000000000000", - }, - MockTransactions[1].Hash(): { - CID: "mockRctCID2", - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000005"}, - Contract: "0x0000000000000000000000000000000000000001", - }, - }, - StateNodeCIDs: []eth.StateNodeModel{ - { - CID: "mockStateCID1", - Leaf: true, - StateKey: ContractLeafKey.String(), - }, - { - CID: "mockStateCID2", - Leaf: true, - StateKey: AnotherContractLeafKey.String(), - }, + MockTransactions[0].Hash(): MockRctMetaPostPublish[0], + MockTransactions[1].Hash(): MockRctMetaPostPublish[1], }, + StateNodeCIDs: MockStateMetaPostPublish, StorageNodeCIDs: map[common.Hash][]eth.StorageNodeModel{ ContractLeafKey: { { @@ -276,46 +279,10 @@ var ( TotalDifficulty: "1337", }, }, - Transactions: []eth2.TxModel{ - { - CID: "mockTrxCID1", - }, - { - TxHash: MockTransactions[1].Hash().String(), - CID: "mockTrxCID2", - Dst: "0x0000000000000000000000000000000000000001", - Src: senderAddr.String(), - }, - }, - Receipts: []eth2.ReceiptModel{ - { - CID: "mockRctCID1", - Contract: "0x0000000000000000000000000000000000000000", - Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000004", - }, - }, - { - CID: "mockRctCID2", - Contract: "0x0000000000000000000000000000000000000001", - Topic0s: []string{ - "0x0000000000000000000000000000000000000000000000000000000000000005", - }, - }, - }, - Uncles: []eth2.HeaderModel{}, - StateNodes: []eth.StateNodeModel{ - { - CID: "mockStateCID1", - Leaf: true, - StateKey: ContractLeafKey.Hex(), - }, - { - CID: "mockStateCID2", - Leaf: true, - StateKey: AnotherContractLeafKey.Hex(), - }, - }, + Transactions: MockTrxMetaPostPublsh, + Receipts: MockRctMetaPostPublish, + Uncles: []eth2.HeaderModel{}, + StateNodes: MockStateMetaPostPublish, StorageNodes: []eth.StorageNodeWithStateKeyModel{ { CID: "mockStorageCID", @@ -371,8 +338,8 @@ var ( // createTransactionsAndReceipts is a helper function to generate signed mock transactions and mock receipts with mock logs func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common.Address) { // make transactions - trx1 := types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) - trx2 := types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) + trx1 := types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), nil) + trx2 := types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), nil) transactionSigner := types.MakeSigner(params.MainnetChainConfig, BlockNumber) mockCurve := elliptic.P256() mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) @@ -392,19 +359,15 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common log.Fatal(err) } // make receipts - mockTopic1 := common.HexToHash("0x04") mockReceipt1 := types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50) - mockReceipt1.ContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") mockLog1 := &types.Log{ - Topics: []common.Hash{mockTopic1}, + Topics: []common.Hash{mockTopic11, mockTopic12}, } mockReceipt1.Logs = []*types.Log{mockLog1} mockReceipt1.TxHash = signedTrx1.Hash() - mockTopic2 := common.HexToHash("0x05") mockReceipt2 := types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) - mockReceipt2.ContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") mockLog2 := &types.Log{ - Topics: []common.Hash{mockTopic2}, + Topics: []common.Hash{mockTopic21, mockTopic22}, } mockReceipt2.Logs = []*types.Log{mockLog2} mockReceipt2.TxHash = signedTrx2.Hash() diff --git a/pkg/super_node/eth/models.go b/pkg/super_node/eth/models.go index 0530bb86..f1bc6c1a 100644 --- a/pkg/super_node/eth/models.go +++ b/pkg/super_node/eth/models.go @@ -43,6 +43,9 @@ type ReceiptModel struct { CID string `db:"cid"` Contract string `db:"contract"` Topic0s pq.StringArray `db:"topic0s"` + Topic1s pq.StringArray `db:"topic1s"` + Topic2s pq.StringArray `db:"topic2s"` + Topic3s pq.StringArray `db:"topic3s"` } type StateNodeModel struct { diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 74a580e6..c67c6931 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -148,14 +148,14 @@ func (pub *IPLDPublisher) publishTransactions(blockBody *types.Body, trxMeta []T if err != nil { return nil, err } - if len(transactionCids) != len(blockBody.Transactions) { + if len(transactionCids) != len(trxMeta) { return nil, errors.New("expected one CID for each transaction") } mappedTrxCids := make([]TxModel, len(transactionCids)) - for i, trx := range blockBody.Transactions { + for i, cid := range transactionCids { mappedTrxCids[i] = TxModel{ - CID: transactionCids[i], - TxHash: trx.Hash().Hex(), + CID: cid, + TxHash: trxMeta[i].TxHash, Src: trxMeta[i].Src, Dst: trxMeta[i].Dst, } @@ -178,6 +178,9 @@ func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts, receiptMeta [ CID: receiptsCids[i], Contract: receiptMeta[i].Contract, Topic0s: receiptMeta[i].Topic0s, + Topic1s: receiptMeta[i].Topic1s, + Topic2s: receiptMeta[i].Topic2s, + Topic3s: receiptMeta[i].Topic3s, } } return mappedRctCids, nil diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 3433210f..49e12d7c 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -20,14 +20,15 @@ import ( "fmt" "math/big" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // CIDRetriever satisfies the CIDRetriever interface for ethereum @@ -108,7 +109,7 @@ func (ecr *CIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interf } // Retrieve cached receipt CIDs if !streamFilter.ReceiptFilter.Off { - cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilter.ReceiptFilter, blockNumber, trxIds) + cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilter.ReceiptFilter, blockNumber, nil, trxIds) if err != nil { if err := tx.Rollback(); err != nil { log.Error(err) @@ -189,68 +190,94 @@ func (ecr *CIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])` args = append(args, pq.Array(txFilter.Src)) } - err := tx.Select(&results, pgStr, args...) - if err != nil { - return nil, err - } - return results, nil + return results, tx.Select(&results, pgStr, args...) } // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) + id := 1 args := make([]interface{}, 0, 4) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, - receipt_cids.contract, receipt_cids.topic0s + receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, + receipt_cids.topic2s, receipt_cids.topic3s FROM receipt_cids, transaction_cids, header_cids WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1` - args = append(args, blockNumber) - if len(rctFilter.Topic0s) > 0 { - pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]` - args = append(args, pq.Array(rctFilter.Topic0s)) - if len(rctFilter.Contracts) > 0 { - pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[]))` - args = append(args, pq.Array(rctFilter.Contracts)) - if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))` - args = append(args, pq.Array(trxIds)) - } else { - pgStr += `)` - } - } else { - pgStr += `)` - if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` - args = append(args, pq.Array(trxIds)) - } else { - pgStr += `)` + AND transaction_cids.header_id = header_cids.id` + if blockNumber > 0 { + pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id) + args = append(args, blockNumber) + id++ + } + if blockHash != nil { + pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id) + args = append(args, blockHash.String()) + id++ + } + if len(rctFilter.Contracts) > 0 { + // Filter on contract addresses if there are any + pgStr += fmt.Sprintf(` AND ((receipt_cids.contract = ANY($%d::VARCHAR(66)[])`, id) + args = append(args, pq.Array(rctFilter.Contracts)) + id++ + // Filter on topics if there are any + if len(rctFilter.Topics) > 0 { + pgStr += " AND (" + first := true + for i, topicSet := range rctFilter.Topics { + if i < 4 && len(topicSet) > 0 { + if first { + pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + first = false + } else { + pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + } + args = append(args, pq.Array(topicSet)) + id++ + } } + pgStr += ")" } - } else { - if len(rctFilter.Contracts) > 0 { - pgStr += ` AND (receipt_cids.contract = ANY($2::VARCHAR(66)[])` - args = append(args, pq.Array(rctFilter.Contracts)) - if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` - args = append(args, pq.Array(trxIds)) - } else { - pgStr += `)` + pgStr += ")" + // Filter on txIDs if there are any and we are matching txs + if rctFilter.MatchTxs && len(trxIds) > 0 { + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) + args = append(args, pq.Array(trxIds)) + } + pgStr += ")" + } else { // If there are no contract addresses to filter on + // Filter on topics if there are any + if len(rctFilter.Topics) > 0 { + pgStr += " AND ((" + first := true + for i, topicSet := range rctFilter.Topics { + if i < 4 && len(topicSet) > 0 { + if first { + pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + first = false + } else { + pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + } + args = append(args, pq.Array(topicSet)) + id++ + } } + pgStr += ")" + // Filter on txIDs if there are any and we are matching txs + if rctFilter.MatchTxs && len(trxIds) > 0 { + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) + args = append(args, pq.Array(trxIds)) + } + pgStr += ")" } else if rctFilter.MatchTxs && len(trxIds) > 0 { - pgStr += ` AND receipt_cids.tx_id = ANY($2::INTEGER[])` + // If there are no contract addresses or topics to filter on, + // Filter on txIDs if there are any and we are matching txs + pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) args = append(args, pq.Array(trxIds)) } } receiptCids := make([]ReceiptModel, 0) - err := tx.Select(&receiptCids, pgStr, args...) - if err != nil { - println(pgStr) - println("FUCK YOU\r\n\r\n\r\n") - } - return receiptCids, err + return receiptCids, tx.Select(&receiptCids, pgStr, args...) } // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters @@ -264,9 +291,9 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.State args = append(args, blockNumber) addrLen := len(stateFilter.Addresses) if addrLen > 0 { - keys := make([]string, 0, addrLen) - for _, addr := range stateFilter.Addresses { - keys = append(keys, HexToKey(addr).Hex()) + keys := make([]string, addrLen) + for i, addr := range stateFilter.Addresses { + keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String() } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) @@ -275,8 +302,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.State pgStr += ` AND state_cids.leaf = TRUE` } stateNodeCIDs := make([]StateNodeModel, 0) - err := tx.Select(&stateNodeCIDs, pgStr, args...) - return stateNodeCIDs, err + return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) } // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters @@ -291,9 +317,9 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.S args = append(args, blockNumber) addrLen := len(storageFilter.Addresses) if addrLen > 0 { - keys := make([]string, 0, addrLen) - for _, addr := range storageFilter.Addresses { - keys = append(keys, HexToKey(addr).Hex()) + keys := make([]string, addrLen) + for i, addr := range storageFilter.Addresses { + keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String() } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) @@ -309,8 +335,7 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.S pgStr += ` AND storage_cids.leaf = TRUE` } storageNodeCIDs := make([]StorageNodeWithStateKeyModel, 0) - err := tx.Select(&storageNodeCIDs, pgStr, args...) - return storageNodeCIDs, err + return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index 8912360b..31360ed6 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -19,8 +19,6 @@ package eth_test import ( "math/big" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -29,6 +27,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" eth2 "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var ( @@ -51,7 +50,7 @@ var ( Off: true, }, ReceiptFilter: config.ReceiptFilter{ - Contracts: []string{"0x0000000000000000000000000000000000000001"}, + Contracts: []string{mocks.AnotherAddress.String()}, }, StateFilter: config.StateFilter{ Off: true, @@ -70,7 +69,7 @@ var ( Off: true, }, ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004"}, + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000004"}}, }, StateFilter: config.StateFilter{ Off: true, @@ -89,8 +88,34 @@ var ( Off: true, }, ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004", "0x0000000000000000000000000000000000000000000000000000000000000005"}, - Contracts: []string{"0x0000000000000000000000000000000000000000"}, + Topics: [][]string{ + {"0x0000000000000000000000000000000000000000000000000000000000000004"}, + {"0x0000000000000000000000000000000000000000000000000000000000000006"}, + }, + Contracts: []string{mocks.Address.String()}, + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctTopicsAndContractFilterFail = &config.EthSubscription{ + Start: big.NewInt(0), + End: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TxFilter: config.TxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Topics: [][]string{ + {"0x0000000000000000000000000000000000000000000000000000000000000004"}, + {"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt + }, + Contracts: []string{mocks.Address.String()}, }, StateFilter: config.StateFilter{ Off: true, @@ -109,8 +134,8 @@ var ( Off: true, }, ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000005"}, - Contracts: []string{"0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000001"}, + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}}, + Contracts: []string{mocks.Address.String(), mocks.AnotherAddress.String()}, }, StateFilter: config.StateFilter{ Off: true, @@ -128,8 +153,8 @@ var ( TxFilter: config.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter ReceiptFilter: config.ReceiptFilter{ MatchTxs: true, - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have + Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, StateFilter: config.StateFilter{ Off: true, @@ -145,12 +170,12 @@ var ( Off: true, }, TxFilter: config.TxFilter{ - Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt + Dst: []string{mocks.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt }, ReceiptFilter: config.ReceiptFilter{ MatchTxs: true, - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have + Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, StateFilter: config.StateFilter{ Off: true, @@ -356,6 +381,10 @@ var _ = Describe("Retriever", func() { StateKey: mocks.ContractLeafKey.Hex(), CID: "mockStateCID1", })) + + _, empty, err = retriever.Retrieve(rctTopicsAndContractFilterFail, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(empty).To(BeTrue()) }) }) diff --git a/pkg/super_node/eth/test_helpers.go b/pkg/super_node/eth/test_helpers.go index 00d2469d..d56772bf 100644 --- a/pkg/super_node/eth/test_helpers.go +++ b/pkg/super_node/eth/test_helpers.go @@ -74,13 +74,3 @@ func ReceiptModelsContainsCID(rcts []ReceiptModel, cid string) bool { } return false } - -// ListContainsRange used to check if a list of [2]uint64 contains a particula [2]uint64 -func ListContainsRange(rangeList [][2]uint64, rng [2]uint64) bool { - for _, rangeInList := range rangeList { - if rangeInList == rng { - return true - } - } - return false -}